Producer and Consumer
This tutorial will build a microservice that produces a list of users to a Kafka topic and consumes the list of users with another get endpoint.
To scaffold a new project, we put the, openapi.yaml and config.json to the model-config repo in the following folder.
The command line is in the, and the specification has two endpoints.
The generated project is located in the light-example-4j.
You can build and run the generated project from the command line with:
mvn clean install exec:exec
First, let’s implement the producer-handler. It accepts a list of users and sends them to a Kafka topic specified by the path parameter.
In the pom.xml, we add the following dependency.
</dependency> and
We need to create a startup hook to initialize the Kafka producer during the server startup and a shutdown hook to close the Kafka producer during the shutdown.
package com.networknt.kafka;
import com.networknt.service.SingletonServiceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.networknt.kafka.producer.LightProducer;
import com.networknt.kafka.producer.TransactionalProducer;
import com.networknt.server.StartupHookProvider;
public class ProducerStartupHook implements StartupHookProvider {
private static Logger logger = LoggerFactory.getLogger(ProducerStartupHook.class);
public void onStartup() {
logger.debug("ProducerStartupHook start");
TransactionalProducer producer = (TransactionalProducer) SingletonServiceFactory.getBean(LightProducer.class);;
new Thread(producer).start();
logger.debug("ProducerStartupHook complete");
package com.networknt.kafka;
import com.networknt.kafka.producer.LightProducer;
import com.networknt.server.ShutdownHookProvider;
import com.networknt.service.SingletonServiceFactory;
public class ProducerShutdownHook implements ShutdownHookProvider {
public void onShutdown() {
LightProducer producer = SingletonServiceFactory.getBean(LightProducer.class);
try { if(producer != null) producer.close(); } catch(Exception e) {e.printStackTrace();}
We need to add the producer startup and shutdown hooks to the service.yml and pick a producer implementation optimized for throughput.
# Singleton service factory configuration/IoC injection
# StartupHookProvider implementations, there are one to many and they are called in the same sequence defined.
- com.networknt.server.StartupHookProvider:
- com.networknt.kafka.ProducerStartupHook
# ShutdownHookProvider implementations, there are one to many and they are called in the same sequence defined.
- com.networknt.server.ShutdownHookProvider:
- com.networknt.kafka.ProducerShutdownHook
- com.networknt.kafka.producer.LightProducer:
- com.networknt.kafka.producer.TransactionalProducer
This is the configuration file for the light-kafka producer.
acks: all
retries: 3
batchSize: 16384
lingerMs: 1
bufferMemory: 33554432
keySerializer: org.apache.kafka.common.serialization.ByteArraySerializer
valueSerializer: org.apache.kafka.common.serialization.ByteArraySerializer
keyDeSerializer: org.apache.kafka.common.serialization.ByteArraySerializer
valueDeSerializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
sessionTimeout: 30000
autoOffsetreset: earliest
enableAutoCommit: false
bootstrapServers: localhost:9092
# every server instance will have a unique transactionId
transactionId: T1000
transactionTimeoutMs: 900000
transactionalIdExpirationMs: 2073600000
Update the generated handler to get the topic and a list of users to send to Kafka.
package com.networknt.kafka.handler;
import com.networknt.body.BodyHandler;
import com.networknt.config.JsonMapper;
import com.networknt.handler.LightHttpHandler;
import com.networknt.kafka.producer.LightProducer;
import com.networknt.service.SingletonServiceFactory;
import io.undertow.server.HttpServerExchange;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
public class ProducerTopicPostHandler implements LightHttpHandler {
private static Logger logger = LoggerFactory.getLogger(ProducerTopicPostHandler.class);
private static String STATUS_ACCEPTED = "SUC10202";
public void handleRequest(HttpServerExchange exchange) throws Exception {
logger.debug("ProducerTopicPostHandler start");
String topic = exchange.getQueryParameters().get("topic").getFirst();"topic: " + topic);
List<Map<String, Object>> list = (List)exchange.getAttachment(BodyHandler.REQUEST_BODY);
LightProducer producer = SingletonServiceFactory.getBean(LightProducer.class);
BlockingQueue<ProducerRecord<byte[], byte[]>> txQueue = producer.getTxQueue();
for (int i = 0; i < list.size(); i++) {
Map<String, Object> userMap = list.get(i);
String userId = (String)userMap.get("userId");
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, userId.getBytes(StandardCharsets.UTF_8), JsonMapper.toJson(userMap).getBytes(StandardCharsets.UTF_8));
try {
} catch (InterruptedException e) {
logger.error("Exception:", e);
logger.debug("Send message with userId = " + userId + " to topic: " + topic);
setExchangeStatus(exchange, STATUS_ACCEPTED);
Test Producer
We need to start the confluent locally and create a test topic.
confluent local start
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test --config
To consume the message from test topic on the console.
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --property print.key=true --property key.separator="-" --from-beginning
To start the server.
mvn clean install exec:exec
Send a request from curl and make sure application/json content-type is used.
curl -k --location --request POST 'https://localhost:8443/producer/test' \
--header 'Content-Type: application/json' \
--data-raw '[{"userId":"stevehu","firstName":"Steve","lastName":"Hu","country":"CA"},{"userId":"joedoe","firstName":"Joe","lastName":"Doe","country":"US"},{"userId":"johnwalter","firstName":"John","lastName":"Walter","country":"CA"}]'
From the consumer console, you should see the following lines.
In the above producer test, we use the Kafka console consumer to verify that we have messages pushed to the Kafka topic from our producer-handler. In this step, we are going to implement a consumer handler.
Add the following dependency
</dependency> and
We need to create a startup hook to initialize the Kafka consumer during the server startup and a shutdown hook to close the Kafka consumer during the shutdown.
package com.networknt.kafka;
import com.networknt.kafka.consumer.LightConsumer;
import com.networknt.server.StartupHookProvider;
import com.networknt.service.SingletonServiceFactory;
public class ConsumerStartupHook implements StartupHookProvider {
public static UserConsumer consumer;
public void onStartup() {
consumer = (UserConsumer) SingletonServiceFactory.getBean(LightConsumer.class);;
package com.networknt.kafka;
import com.networknt.kafka.consumer.LightConsumer;
import com.networknt.server.ShutdownHookProvider;
import com.networknt.service.SingletonServiceFactory;
public class ConsumerShutdownHook implements ShutdownHookProvider {
public void onShutdown() {
LightConsumer consumer = SingletonServiceFactory.getBean(LightConsumer.class);
try { if(consumer != null) consumer.close(); } catch(Exception e) {e.printStackTrace();}
This is the consumer implementation that extends from the AbstractConsumer in the light-kafka.
package com.networknt.kafka;
import com.networknt.config.JsonMapper;
import com.networknt.kafka.consumer.AbstractConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
public class UserConsumer extends AbstractConsumer {
public void subscribe(String topic) {
public List<Map<String, Object>> poll() {
List<Map<String, Object>> list = new ArrayList<>();
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<byte[], byte[]> record: records) {
String value = new String(record.value(), StandardCharsets.UTF_8);
Map<String, Object> userMap = JsonMapper.string2Map(value);
userMap.put("partition", record.partition());
userMap.put("offset", record.offset());
return list;
We need to add the consumer startup and shutdown hooks and the KeyValueConsumer.
# Singleton service factory configuration/IoC injection
# StartupHookProvider implementations, there are one to many and they are called in the same sequence defined.
- com.networknt.server.StartupHookProvider:
- com.networknt.kafka.ProducerStartupHook
- com.networknt.kafka.ConsumerStartupHook
# ShutdownHookProvider implementations, there are one to many and they are called in the same sequence defined.
- com.networknt.server.ShutdownHookProvider:
- com.networknt.kafka.ProducerShutdownHook
- com.networknt.kafka.ConsumerShutdownHook
- com.networknt.kafka.producer.LightProducer:
- com.networknt.kafka.producer.TransactionalProducer
- com.networknt.kafka.consumer.LightConsumer:
- com.networknt.kafka.UserConsumer
bootstrapServers: localhost:9092
isolationLevel: read_committed
enableAutoCommit: false
autoOffsetReset: earliest
autoCommitIntervalMs: 1000
keyDeserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
valueDeserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
# every consumer must have a groupId and it is retrieved from environment of server.yml
groupId: group1
This is the handler implementation. The topic is the path parameter and the response is a list of key/value pairs.
package com.networknt.kafka.handler;
import com.networknt.config.JsonMapper;
import com.networknt.handler.LightHttpHandler;
import com.networknt.httpstring.ContentType;
import com.networknt.kafka.ConsumerStartupHook;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import io.undertow.util.HttpString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ConsumerTopicGetHandler implements LightHttpHandler {
private static Logger logger = LoggerFactory.getLogger(ConsumerTopicGetHandler.class);
public void handleRequest(HttpServerExchange exchange) throws Exception {
logger.debug("ConsumerTopicGetHandler start");
String topic = exchange.getQueryParameters().get("topic").getFirst();"topic: " + topic);
List<Map<String, Object>> list = ConsumerStartupHook.consumer.poll();
if(list != null) {
exchange.getResponseHeaders().add(Headers.CONTENT_TYPE, "application/json");
Test Consumer
After you have tested the producer, there should be some records in the Kafka test topic. Run the following command to retrieve the records from Kafka topic.
curl -k 'https://localhost:8443/consumer/test'
The result:
This example only touches the surface of the light-kafka capabilities. You can leverage many configuration combinations in the producer and consumer to build your handlers. However, the producer and consumer example is a good starting point if you are new to Kafka.
The final project can be found in the light-example-4j repository at producer-consumer