LIGHT

  • News
  • Docs
  • Community
  • Reddit
  • GitHub
  • About Light
    • Overview
    • Testimonials
    • What is Light
    • Features
    • Principles
    • Benefits
    • Roadmap
    • Community
    • Articles
    • Videos
    • License
    • Why Light Platform
  • Getting Started
    • Get Started Overview
    • Environment
    • Light Codegen Tool
    • Light Rest 4j
    • Light Tram 4j
    • Light Graphql 4j
    • Light Hybrid 4j
    • Light Eventuate 4j
    • Light Oauth2
    • Light Portal Service
    • Light Proxy Server
    • Light Router Server
    • Light Config Server
    • Light Saga 4j
    • Light Session 4j
    • Webserver
    • Websocket
    • Spring Boot Servlet
  • Architecture
    • Architecture Overview
    • API Category
    • API Gateway
    • Architecture Patterns
    • CQRS
    • Eco System
    • Event Sourcing
    • Fail Fast vs Fail Slow
    • Integration Patterns
    • JavaEE declining
    • Key Distribution
    • Microservices Architecture
    • Microservices Monitoring
    • Microservices Security
    • Microservices Traceability
    • Modular Monolith
    • Platform Ecosystem
    • Plugin Architecture
    • Scalability and Performance
    • Serverless
    • Service Collaboration
    • Service Mesh
    • SOA
    • Spring is bloated
    • Stages of API Adoption
    • Transaction Management
    • Microservices Cross-cutting Concerns Options
    • Service Mesh Plus
    • Service Discovery
  • Design
    • Design Overview
    • Design First vs Code First
    • Desgin Pattern
    • Service Evolution
    • Consumer Contract and Consumer Driven Contract
    • Handling Partial Failure
    • Idempotency
    • Server Life Cycle
    • Environment Segregation
    • Database
    • Decomposition Patterns
    • Http2
    • Test Driven
    • Multi-Tenancy
    • Why check token expiration
    • WebServices to Microservices
  • Cross-Cutting Concerns
    • Concerns Overview
  • API Styles
    • Light-4j for absolute performance
    • Style Overview
    • Distributed session on IMDG
    • Hybrid Serverless Modularized Monolithic
    • Kafka - Event Sourcing and CQRS
    • REST - Representational state transfer
    • Web Server with Light
    • Websocket with Light
    • Spring Boot Integration
    • Single Page Application
    • GraphQL - A query language for your API
    • Light IBM MQ
    • Light AWS Lambda
    • Chaos Monkey
  • Infrastructure Services
    • Service Overview
    • Light Proxy
    • Light Mesh
    • Light Router
    • Light Portal
    • Messaging Infrastructure
    • Centralized Logging
    • COVID-19
    • Light OAuth2
    • Metrics and Alerts
    • Config Server
    • Tokenization
    • Light Controller
  • Tool Chain
    • Tool Chain Overview
  • Utility Library
  • Service Consumer
    • Service Consumer
  • Development
    • Development Overview
  • Deployment
    • Deployment Overview
    • Frontend Backend
    • Linux Service
    • Windows Service
    • Install Eventuate on Windows
    • Secure API
    • Client vs light-router
    • Memory Limit
    • Deploy to Kubernetes
  • Benchmark
    • Benchmark Overview
  • Tutorial
    • Tutorial Overview
  • Troubleshooting
    • Troubleshoot
  • FAQ
    • FAQ Overview
  • Milestones
  • Contribute
    • Contribute to Light
    • Development
    • Documentation
    • Example
    • Tutorial

Producer and Consumer

This tutorial will build a microservice that produces a list of users to a Kafka topic and consumes the list of users with another get endpoint.

Codegen

To scaffold a new project, we put the README.md, openapi.yaml and config.json to the model-config repo in the following folder.

https://github.com/networknt/model-config/tree/master/kafka/producer-consumer

The command line is in the README.md, and the specification has two endpoints.

The generated project is located in the light-example-4j.

https://github.com/networknt/light-example-4j/tree/master/kafka/producer-consumer

You can build and run the generated project from the command line with:

mvn clean install exec:exec

Producer

First, let’s implement the producer-handler. It accepts a list of users and sends them to a Kafka topic specified by the path parameter.

pom.xml

In the pom.xml, we add the following dependency.

        <dependency>
            <groupId>com.networknt</groupId>
            <artifactId>kafka-producer</artifactId>
            <version>${version.light-4j}</version>
        </dependency>
ProducerStartupHook.java and ProducerShutdownHook.java

We need to create a startup hook to initialize the Kafka producer during the server startup and a shutdown hook to close the Kafka producer during the shutdown.

ProducerStartupHook.java

package com.networknt.kafka;

import com.networknt.service.SingletonServiceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.networknt.kafka.producer.LightProducer;
import com.networknt.kafka.producer.TransactionalProducer;
import com.networknt.server.StartupHookProvider;

public class ProducerStartupHook implements StartupHookProvider {
    private static Logger logger = LoggerFactory.getLogger(ProducerStartupHook.class);
    @Override
    public void onStartup() {
        logger.debug("ProducerStartupHook start");
        TransactionalProducer producer = (TransactionalProducer) SingletonServiceFactory.getBean(LightProducer.class);
        producer.open();
        new Thread(producer).start();
        logger.debug("ProducerStartupHook complete");
    }
}

ProducerShutdownHook.java

package com.networknt.kafka;

import com.networknt.kafka.producer.LightProducer;
import com.networknt.server.ShutdownHookProvider;
import com.networknt.service.SingletonServiceFactory;

public class ProducerShutdownHook implements ShutdownHookProvider {
    @Override
    public void onShutdown() {
        LightProducer producer = SingletonServiceFactory.getBean(LightProducer.class);
        try { if(producer != null) producer.close(); } catch(Exception e) {e.printStackTrace();}
    }
}

service.yml

We need to add the producer startup and shutdown hooks to the service.yml and pick a producer implementation optimized for throughput.

service.yml

# Singleton service factory configuration/IoC injection
singletons:
# StartupHookProvider implementations, there are one to many and they are called in the same sequence defined.
- com.networknt.server.StartupHookProvider:
  - com.networknt.kafka.ProducerStartupHook
# ShutdownHookProvider implementations, there are one to many and they are called in the same sequence defined.
- com.networknt.server.ShutdownHookProvider:
  - com.networknt.kafka.ProducerShutdownHook
- com.networknt.kafka.producer.LightProducer:
  - com.networknt.kafka.producer.TransactionalProducer

kafka-producer.yml

This is the configuration file for the light-kafka producer.

---
acks: all
retries: 3
batchSize: 16384
lingerMs: 1
bufferMemory: 33554432
keySerializer: org.apache.kafka.common.serialization.ByteArraySerializer
valueSerializer: org.apache.kafka.common.serialization.ByteArraySerializer
keyDeSerializer: org.apache.kafka.common.serialization.ByteArraySerializer
valueDeSerializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
sessionTimeout: 30000
autoOffsetreset: earliest
enableAutoCommit: false
bootstrapServers: localhost:9092
# every server instance will have a unique transactionId
transactionId: T1000
transactionTimeoutMs: 900000
transactionalIdExpirationMs: 2073600000

ProducerTopicPostHandler.java

Update the generated handler to get the topic and a list of users to send to Kafka.

package com.networknt.kafka.handler;

import com.networknt.body.BodyHandler;
import com.networknt.config.JsonMapper;
import com.networknt.handler.LightHttpHandler;
import com.networknt.kafka.producer.LightProducer;
import com.networknt.service.SingletonServiceFactory;
import io.undertow.server.HttpServerExchange;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;

public class ProducerTopicPostHandler implements LightHttpHandler {
    private static Logger logger = LoggerFactory.getLogger(ProducerTopicPostHandler.class);
    private static String STATUS_ACCEPTED = "SUC10202";
    @Override
    public void handleRequest(HttpServerExchange exchange) throws Exception {
        logger.debug("ProducerTopicPostHandler start");
        String topic = exchange.getQueryParameters().get("topic").getFirst();
        logger.info("topic: " + topic);
        List<Map<String, Object>> list = (List)exchange.getAttachment(BodyHandler.REQUEST_BODY);

        LightProducer producer = SingletonServiceFactory.getBean(LightProducer.class);
        BlockingQueue<ProducerRecord<byte[], byte[]>> txQueue = producer.getTxQueue();

        for (int i = 0; i < list.size(); i++) {
            Map<String, Object> userMap = list.get(i);
            String userId = (String)userMap.get("userId");
            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, userId.getBytes(StandardCharsets.UTF_8), JsonMapper.toJson(userMap).getBytes(StandardCharsets.UTF_8));
            try {
                txQueue.put(record);
            } catch (InterruptedException e) {
                logger.error("Exception:", e);
            }
            logger.debug("Send message with userId = " + userId + " to topic: " + topic);
        }
        setExchangeStatus(exchange, STATUS_ACCEPTED);
    }
}

Test Producer

We need to start the confluent locally and create a test topic.

confluent local start

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test --config retention.ms=-1

To consume the message from test topic on the console.

kafka-console-consumer --bootstrap-server localhost:9092 --topic test --property print.key=true --property key.separator="-" --from-beginning

To start the server.

mvn clean install exec:exec

Send a request from curl and make sure application/json content-type is used.

curl -k --location --request POST 'https://localhost:8443/producer/test' \
--header 'Content-Type: application/json' \
--data-raw '[{"userId":"stevehu","firstName":"Steve","lastName":"Hu","country":"CA"},{"userId":"joedoe","firstName":"Joe","lastName":"Doe","country":"US"},{"userId":"johnwalter","firstName":"John","lastName":"Walter","country":"CA"}]'

From the consumer console, you should see the following lines.

stevehu-{"userId":"stevehu","firstName":"Steve","lastName":"Hu","country":"CA"}
joedoe-{"userId":"joedoe","firstName":"Joe","lastName":"Doe","country":"US"}
johnwalter-{"userId":"johnwalter","firstName":"John","lastName":"Walter","country":"CA"}

Consumer

In the above producer test, we use the Kafka console consumer to verify that we have messages pushed to the Kafka topic from our producer-handler. In this step, we are going to implement a consumer handler.

pom.xml

Add the following dependency

        <dependency>
            <groupId>com.networknt</groupId>
            <artifactId>kafka-consumer</artifactId>
            <version>${version.light-4j}</version>
        </dependency>

ConsumerStartupHook.java and ConsumerShutdownHook.java

We need to create a startup hook to initialize the Kafka consumer during the server startup and a shutdown hook to close the Kafka consumer during the shutdown.

ConsumerStartupHook.java

package com.networknt.kafka;

import com.networknt.kafka.consumer.LightConsumer;
import com.networknt.server.StartupHookProvider;
import com.networknt.service.SingletonServiceFactory;

public class ConsumerStartupHook implements StartupHookProvider {
    public static UserConsumer consumer;
    @Override
    public void onStartup() {
        consumer = (UserConsumer) SingletonServiceFactory.getBean(LightConsumer.class);
        consumer.open();
    }
}

ConsumerShutdownHook.java

package com.networknt.kafka;

import com.networknt.kafka.consumer.LightConsumer;
import com.networknt.server.ShutdownHookProvider;
import com.networknt.service.SingletonServiceFactory;

public class ConsumerShutdownHook implements ShutdownHookProvider {
    @Override
    public void onShutdown() {
        LightConsumer consumer = SingletonServiceFactory.getBean(LightConsumer.class);
        try { if(consumer != null) consumer.close(); } catch(Exception e) {e.printStackTrace();}
    }
}

UserConsumer.java

This is the consumer implementation that extends from the AbstractConsumer in the light-kafka.

package com.networknt.kafka;

import com.networknt.config.JsonMapper;
import com.networknt.kafka.consumer.AbstractConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;

public class UserConsumer extends AbstractConsumer {
    public void subscribe(String topic) {
        consumer.subscribe(Arrays.asList(topic));
    }

    public List<Map<String, Object>> poll() {
        List<Map<String, Object>> list = new ArrayList<>();
        ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
        for(ConsumerRecord<byte[], byte[]> record: records) {
            String value = new String(record.value(), StandardCharsets.UTF_8);
            Map<String, Object> userMap = JsonMapper.string2Map(value);
            userMap.put("partition", record.partition());
            userMap.put("offset", record.offset());
            list.add(userMap);
        }
        return list;
    }
}

service.yml

We need to add the consumer startup and shutdown hooks and the KeyValueConsumer.

# Singleton service factory configuration/IoC injection
singletons:
# StartupHookProvider implementations, there are one to many and they are called in the same sequence defined.
- com.networknt.server.StartupHookProvider:
  - com.networknt.kafka.ProducerStartupHook
  - com.networknt.kafka.ConsumerStartupHook
# ShutdownHookProvider implementations, there are one to many and they are called in the same sequence defined.
- com.networknt.server.ShutdownHookProvider:
  - com.networknt.kafka.ProducerShutdownHook
  - com.networknt.kafka.ConsumerShutdownHook
- com.networknt.kafka.producer.LightProducer:
  - com.networknt.kafka.producer.TransactionalProducer
- com.networknt.kafka.consumer.LightConsumer:
  - com.networknt.kafka.UserConsumer

kafka-consumer.yml
bootstrapServers: localhost:9092
isolationLevel: read_committed
enableAutoCommit: false
autoOffsetReset: earliest
autoCommitIntervalMs: 1000
keyDeserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
valueDeserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
# every consumer must have a groupId and it is retrieved from environment of server.yml
groupId: group1

ConsumerTopicGetHandler.java

This is the handler implementation. The topic is the path parameter and the response is a list of key/value pairs.

package com.networknt.kafka.handler;

import com.networknt.config.JsonMapper;
import com.networknt.handler.LightHttpHandler;
import com.networknt.httpstring.ContentType;
import com.networknt.kafka.ConsumerStartupHook;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import io.undertow.util.HttpString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.http.HttpHeaders;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ConsumerTopicGetHandler implements LightHttpHandler {
    private static Logger logger = LoggerFactory.getLogger(ConsumerTopicGetHandler.class);

    @Override
    public void handleRequest(HttpServerExchange exchange) throws Exception {
        logger.debug("ConsumerTopicGetHandler start");
        String topic = exchange.getQueryParameters().get("topic").getFirst();
        logger.info("topic: " + topic);
        ConsumerStartupHook.consumer.subscribe(topic);
        List<Map<String, Object>> list = ConsumerStartupHook.consumer.poll();
        if(list != null) {
            exchange.getResponseHeaders().add(Headers.CONTENT_TYPE, "application/json");
            exchange.getResponseSender().send(JsonMapper.toJson(list));
        }
        exchange.endExchange();
    }
}

Test Consumer

After you have tested the producer, there should be some records in the Kafka test topic. Run the following command to retrieve the records from Kafka topic.

curl -k 'https://localhost:8443/consumer/test'

The result:

[{"userId":"stevehu","firstName":"Steve","lastName":"Hu","country":"CA","partition":2,"offset":0},{"userId":"joedoe","firstName":"Joe","lastName":"Doe","country":"US","partition":2,"offset":1},{"userId":"johnwalter","firstName":"John","lastName":"Walter","country":"CA","partition":2,"offset":2}]

Summary

This example only touches the surface of the light-kafka capabilities. You can leverage many configuration combinations in the producer and consumer to build your handlers. However, the producer and consumer example is a good starting point if you are new to Kafka.

The final project can be found in the light-example-4j repository at producer-consumer

  • News
  • Docs
  • Community
  • Reddit
  • GitHub
  • About Light
  • Getting Started
  • Architecture
  • Design
  • Cross-Cutting Concerns
  • API Styles
  • Infrastructure Services
  • Tool Chain
  • Utility Library
  • Service Consumer
  • Development
  • Deployment
  • Benchmark
  • Tutorial
  • Troubleshooting
  • FAQ
  • Milestones
  • Contribute