LIGHT

  • News
  • Docs
  • Community
  • Reddit
  • GitHub

Ksqldb Consumer Tutorial

KsqlDB server is part of the Confluent Platform, and it provides the streams processor to hide the details of the Kafka streams. Users can create tables and streams on the KsqlDB server and subscribe to the KsqlDB server from the sidecar. Once the table or stream is changed, a rest call will be issued by the sidecar to the backend API to notify that new data received.

Kafka Topic

We will use the Confluent local services for this tutorial. After start the Confluent Platform locally. We need to create a topic called test with the following schemas for the key and value.

key schema

{
  "$id": "http://example.com/myURI.schema.json",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "additionalProperties": false,
  "description": "Sample schema to help you get started.",
  "title": "key_test",
  "type": "string"
}

value schema

{
  "$id": "http://example.com/myURI.schema.json",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "additionalProperties": false,
  "description": "Sample schema to help you get started.",
  "properties": {
    "country": {
      "enum": [
        "CA",
        "US"
      ],
      "type": "string"
    },
    "firstName": {
      "description": "First Name",
      "type": "string"
    },
    "lastName": {
      "description": "Last Name",
      "type": "string"
    },
    "userId": {
      "description": "User Id",
      "type": "string"
    }
  },
  "title": "value_test",
  "type": "object"
}

The defined schemas will ensure that the data we produce to the test topic will be validated with the Kafka sidecar producer.

KsqlDB Stream

We are going to create a stream based on the Kafka topic test from the Control Center.

CREATE STREAM TEST_STREAM (
    userId VARCHAR KEY, 
    firstName VARCHAR, 
    lastName VARCHAR, 
    country VARCHAR
) WITH (
  kafka_topic = 'test',
  value_format = 'JSON_SR'
);

Please note we are using “JSON_SR” here as value format. There are two JSON formats, JSON and JSON_SR. Both support serializing and deserializing JSON data. The latter offers integration with the Schema Registry, registering and retrieving JSON schemas while the former does not. These two formats are not byte compatible (you cannot read data produced by one by the other)

Verify KsqlDB Stream

Now let’s add some event message to the topic (test) we created above:

  • Start kafka-sidecar locally by referring to kafka-sidecar

  • produce some event message to “test” topic:

curl --location --request POST 'https://localhost:8443/producers/test' \
--header 'Content-Type: application/json' \
--data-raw '{
    "records": [
        {
            "key": "1",
            "value": {
                "userId": "1111",
                 "firstName": "test1"
            }
        },
        {
            "key": "2",
            "value": {
                "userId": "2222",
                 "firstName": "test2"                
            }
        },
        {
            "key": "3",
            "value": {
                "userId": "3333",
                 "firstName": "test3"                       
            }
        }
    ]
}'
  • From the Kafka Control Center, in the navigation bar, click ksqlDB. In the Steams tab, we can see the “TEST_STREAM” we created above.

  • Click the stream link (TEST_STREAM), and then click “Query stream” button.

  • We can see the query running and result display as below:

Stream result

  • About Light
    • Overview
    • Testimonials
    • What is Light
    • Features
    • Principles
    • Benefits
    • Roadmap
    • Community
    • Articles
    • Videos
    • License
    • Why Light Platform
  • Getting Started
    • Get Started Overview
    • Environment
    • Light Codegen Tool
    • Light Rest 4j
    • Light Tram 4j
    • Light Graphql 4j
    • Light Hybrid 4j
    • Light Eventuate 4j
    • Light Oauth2
    • Light Portal Service
    • Light Proxy Server
    • Light Router Server
    • Light Config Server
    • Light Saga 4j
    • Light Session 4j
    • Webserver
    • Websocket
    • Spring Boot Servlet
  • Architecture
    • Architecture Overview
    • API Category
    • API Gateway
    • Architecture Patterns
    • CQRS
    • Eco System
    • Event Sourcing
    • Fail Fast vs Fail Slow
    • Integration Patterns
    • JavaEE declining
    • Key Distribution
    • Microservices Architecture
    • Microservices Monitoring
    • Microservices Security
    • Microservices Traceability
    • Modular Monolith
    • Platform Ecosystem
    • Plugin Architecture
    • Scalability and Performance
    • Serverless
    • Service Collaboration
    • Service Mesh
    • SOA
    • Spring is bloated
    • Stages of API Adoption
    • Transaction Management
    • Microservices Cross-cutting Concerns Options
    • Service Mesh Plus
    • Service Discovery
  • Design
    • Design Overview
    • Design First vs Code First
    • Desgin Pattern
    • Service Evolution
    • Consumer Contract and Consumer Driven Contract
    • Handling Partial Failure
    • Idempotency
    • Server Life Cycle
    • Environment Segregation
    • Database
    • Decomposition Patterns
    • Http2
    • Test Driven
    • Multi-Tenancy
    • Why check token expiration
    • WebServices to Microservices
  • Cross-Cutting Concerns
    • Concerns Overview
  • API Styles
    • Light-4j for absolute performance
    • Style Overview
    • Distributed session on IMDG
    • Hybrid Serverless Modularized Monolithic
    • Kafka - Event Sourcing and CQRS
    • REST - Representational state transfer
    • Web Server with Light
    • Websocket with Light
    • Spring Boot Integration
    • Single Page Application
    • GraphQL - A query language for your API
    • Light IBM MQ
    • Light AWS Lambda
    • Chaos Monkey
  • Infrastructure Services
    • Service Overview
    • Light Proxy
    • Light Mesh
    • Light Router
    • Light Portal
    • Messaging Infrastructure
    • Centralized Logging
    • COVID-19
    • Light OAuth2
    • Metrics and Alerts
    • Config Server
    • Tokenization
    • Light Controller
  • Tool Chain
    • Tool Chain Overview
  • Utility Library
  • Service Consumer
    • Service Consumer
  • Development
    • Development Overview
  • Deployment
    • Deployment Overview
    • Frontend Backend
    • Linux Service
    • Windows Service
    • Install Eventuate on Windows
    • Secure API
    • Client vs light-router
    • Memory Limit
    • Deploy to Kubernetes
  • Benchmark
    • Benchmark Overview
  • Tutorial
    • Tutorial Overview
  • Troubleshooting
    • Troubleshoot
  • FAQ
    • FAQ Overview
  • Milestones
  • Contribute
    • Contribute to Light
    • Development
    • Documentation
    • Example
    • Tutorial
“Ksqldb Consumer Tutorial” was last updated: September 9, 2021: update ksqldb tutorial document (#292) (636c71d)
Improve this page
  • News
  • Docs
  • Community
  • Reddit
  • GitHub
  • About Light
  • Getting Started
  • Architecture
  • Design
  • Cross-Cutting Concerns
  • API Styles
  • Infrastructure Services
  • Tool Chain
  • Utility Library
  • Service Consumer
  • Development
  • Deployment
  • Benchmark
  • Tutorial
  • Troubleshooting
  • FAQ
  • Milestones
  • Contribute