LIGHT

  • News
  • Docs
  • Community
  • Reddit
  • GitHub
Star

Kafka Streams

Kafka-sidecar supports Kafka Streams Applications with drop-in support. You just need to drop your streams application jar file into the folder mapped to /streamsjar in Docker volume.

Start Kafka

Before running any demo applications, we need to get the Confluent Platform started. If you have Docker installed, you can start it from the kafka-sidecar repository.

cd ~/networknt
git clone https://github.com/networknt/kafka-sidecar.git
cd kafka-sidecar
docker-compose down
docker-compose up

Test Topics

Once the Confluent Platform is up and running, create two topics from the control center or with the following command line.

cd ~/networknt/kafka-sidecar

docker-compose exec broker kafka-topics --create --if-not-exists --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic streams-plaintext-input --config retention.ms=-1
docker-compose exec broker kafka-topics --create --if-not-exists --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic streams-wordcount-output --config retention.ms=-1

Create Jar

The first step is creating a Kafka streams application, testing it, and building a jar file. We will soon create a light-codegen generator to help users scaffold a new project. For now, you can choose an example application from light-example-4j/kafka-sidecar for DSL or Processor API, depending on your business use cases.

Kafka Streams with DSL

If you want to start a Kafka Streams application with DSL, you can check out the light-example-4j and switch to the master branch.

cd ~/networknt
git clone https://github.com/networknt/light-example-4j.git
cd light-example-4j
git checkout master
cd kafka-sidecar/kafka-streams-dsl
mvn clean install -Prelease
java -jar target/word-count-1.0.0.jar

Please be aware we are using -Prelease to build the fatjar so that all the dependencies will be packaged into the jar. Now the server should be started with HTTPS/2 on port 8443. You can also start the server within the IDE to debug your Kafka Streams application.

Test DSL Streams Application

First, let’s start a console consumer to consume the messages from the streams-wordcount-output topic.

cd ~/networknt/kafka-sidecar
docker-compose exec broker kafka-console-consumer --topic streams-wordcount-output --from-beginning --bootstrap-server localhost:9092 --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

Second, let’s start a console producer and input some words.

cd ~/networknt/kafka-sidecar
docker-compose exec broker kafka-console-producer --broker-list localhost:9092 --topic streams-plaintext-input

After entering some words like below, we should see the result from the consumer console.

Producer Console

>hello
>world
>hello
>world
>hello
>world
>world
>world
>world
>world
>world
>world
>

Consumer Console

hello	2
world	2
hello	3
world	3
world	5
world	9

The above example is a direct conversion of word count example from Confluent GitHub repository.

In the next step, we will drop the jar file to Kafka-sidecar and let the sidecar to host the Kafka Streams application.

Kafka Sidecar

In the kafka-sidecar repository, we have a folder under config named streams. Within this folder, we have config files and values.yml to support the above streams application.

Here is the values.yml file that we have modified for the configuration of the streams application. We use docker-compose service names for the bootstrap servers and the registry URL.

# streams configuration
kafka-streams.application.id: word-count-docker
kafka-streams.bootstrap.servers: broker:29092
kafka-streams.schema.registry.url: http://schema-registry:8081

For startup hooks and shutdown hooks, we add the classes from the example application.

# Service Startup and Shutdown Hooks
service.com.networknt.server.StartupHookProvider:
  # - com.networknt.mesh.kafka.ProducerStartupHook
  - com.networknt.kafka.WordCountStartupHook
  # - com.networknt.mesh.kafka.ActiveConsumerStartupHook
  # - com.networknt.mesh.kafka.KsqldbReactiveConsumerStartupHook
  # - com.networknt.mesh.kafka.KsqldbActiveConsumerStartupHook
  # - com.networknt.mesh.kafka.ReactiveConsumerStartupHook
  # - com.networknt.mesh.kafka.AdminClientStartupHook
service.com.networknt.server.ShutdownHookProvider:
  # - com.networknt.mesh.kafka.ProducerShutdownHook
  - com.networknt.kafka.WordCountShutdownHook
  # - com.networknt.mesh.kafka.ActiveConsumerShutdownHook
  # - com.networknt.mesh.kafka.KsqldbReactiveConsumerShutdownHook
  # - com.networknt.mesh.kafka.KsqldbActiveConsumerShutdownHook
  # - com.networknt.mesh.kafka.ReactiveConsumerShutdownHook
  # - com.networknt.mesh.kafka.AdminClientShutdownHook

To make the application run within the Kafka-sidecar, we must copy the streams application jar file to the streamsjar folder in the kafka-sidecar repository. Note: we don’t need to copy the fatjar as the Kafka-sidecar contains all the dependencies to run a Kafka Streams application.

Since we have built the application with fatjar before with mvn clean install -Prelease, we just need to copy the original jar file.

cd ~/networknt/kafka-sidecar
cp ~/networknt/light-example-4j/kafka-sidecar/kafka-streams-dsl/target/original-word-count-1.0.0.jar ./streamsjar

Note that the original jar is used as the Kafka-sidecar will provide all the dependencies. Now, we can start the kafka-sidecar with the following docker-compose.

cd ~/networknt/kafka-sidecar
docker-compose -f docker-compose-streams.yml up

Once the server is up, we can test with the existing console producer and consumer. Type some sentences on the producer console; you should see the word count changing on the consumer console.

Next

We can enable the producer on the sidecar so that we can produce to the input topic with the producer API instead of the console producer.

We can add a backend consumer to consumer the messages from the output topic and output from the docker-compose console to replace the console consumer.

We can create another Kafka-streams application and drop it into the same streamsjar folder to start two streams applications on the same sidecar instance.

  • About Light
    • Overview
    • Testimonials
    • What is Light
    • Features
    • Principles
    • Benefits
    • Roadmap
    • Community
    • Articles
    • Videos
    • License
    • Why Light Platform
  • Getting Started
    • Get Started Overview
    • Environment
    • Light Codegen Tool
    • Light Rest 4j
    • Light Tram 4j
    • Light Graphql 4j
    • Light Hybrid 4j
    • Light Eventuate 4j
    • Light Oauth2
    • Light Portal Service
    • Light Proxy Server
    • Light Router Server
    • Light Config Server
    • Light Saga 4j
    • Light Session 4j
    • Webserver
    • Websocket
    • Spring Boot Servlet
  • Architecture
    • Architecture Overview
    • API Category
    • API Gateway
    • Architecture Patterns
    • CQRS
    • Eco System
    • Event Sourcing
    • Fail Fast vs Fail Slow
    • Integration Patterns
    • JavaEE declining
    • Key Distribution
    • Microservices Architecture
    • Microservices Monitoring
    • Microservices Security
    • Microservices Traceability
    • Modular Monolith
    • Platform Ecosystem
    • Plugin Architecture
    • Scalability and Performance
    • Serverless
    • Service Collaboration
    • Service Mesh
    • SOA
    • Spring is bloated
    • Stages of API Adoption
    • Transaction Management
    • Microservices Cross-cutting Concerns Options
    • Service Mesh Plus
    • Service Discovery
  • Design
    • Design Overview
    • Design First vs Code First
    • Desgin Pattern
    • Service Evolution
    • Consumer Contract and Consumer Driven Contract
    • Handling Partial Failure
    • Idempotency
    • Server Life Cycle
    • Environment Segregation
    • Database
    • Decomposition Patterns
    • Http2
    • Test Driven
    • Multi-Tenancy
    • Why check token expiration
    • WebServices to Microservices
  • Cross-Cutting Concerns
    • Concerns Overview
  • API Styles
    • Light-4j for absolute performance
    • Style Overview
    • Distributed session on IMDG
    • Hybrid Serverless Modularized Monolithic
    • Kafka - Event Sourcing and CQRS
    • REST - Representational state transfer
    • Web Server with Light
    • Websocket with Light
    • Spring Boot Integration
    • Single Page Application
    • GraphQL - A query language for your API
    • Light IBM MQ
    • Light AWS Lambda
    • Chaos Monkey
  • Infrastructure Services
    • Service Overview
    • Light Proxy
    • Light Mesh
    • Light Router
    • Light Portal
    • Messaging Infrastructure
    • Centralized Logging
    • COVID-19
    • Light OAuth2
    • Metrics and Alerts
    • Config Server
    • Tokenization
    • Light Controller
  • Tool Chain
    • Tool Chain Overview
  • Utility Library
  • Service Consumer
    • Service Consumer
  • Development
    • Development Overview
  • Deployment
    • Deployment Overview
    • Frontend Backend
    • Linux Service
    • Windows Service
    • Install Eventuate on Windows
    • Secure API
    • Client vs light-router
    • Memory Limit
    • Deploy to Kubernetes
  • Benchmark
    • Benchmark Overview
  • Tutorial
    • Tutorial Overview
  • Troubleshooting
    • Troubleshoot
  • FAQ
    • FAQ Overview
  • Milestones
  • Contribute
    • Contribute to Light
    • Development
    • Documentation
    • Example
    • Tutorial
“Kafka Streams” was last updated: January 23, 2024: fixes #387 update kafka-streams.md for the kafka-sidecar to the latest (dd0c19a)
Improve this page
  • News
  • Docs
  • Community
  • Reddit
  • GitHub
  • About Light
  • Getting Started
  • Architecture
  • Design
  • Cross-Cutting Concerns
  • API Styles
  • Infrastructure Services
  • Tool Chain
  • Utility Library
  • Service Consumer
  • Development
  • Deployment
  • Benchmark
  • Tutorial
  • Troubleshooting
  • FAQ
  • Milestones
  • Contribute