LIGHT

  • News
  • Docs
  • Community
  • Reddit
  • GitHub
  • About Light
    • Overview
    • Testimonials
    • What is Light
    • Features
    • Principles
    • Benefits
    • Roadmap
    • Community
    • Articles
    • Videos
    • License
    • Why Light Platform
  • Getting Started
    • Get Started Overview
    • Environment
    • Light Codegen Tool
    • Light Rest 4j
    • Light Tram 4j
    • Light Graphql 4j
    • Light Hybrid 4j
    • Light Eventuate 4j
    • Light Oauth2
    • Light Portal Service
    • Light Proxy Server
    • Light Router Server
    • Light Config Server
    • Light Saga 4j
    • Light Session 4j
    • Webserver
    • Websocket
    • Spring Boot Servlet
  • Architecture
    • Architecture Overview
    • API Category
    • API Gateway
    • Architecture Patterns
    • CQRS
    • Eco System
    • Event Sourcing
    • Fail Fast vs Fail Slow
    • Integration Patterns
    • JavaEE declining
    • Key Distribution
    • Microservices Architecture
    • Microservices Monitoring
    • Microservices Security
    • Microservices Traceability
    • Modular Monolith
    • Platform Ecosystem
    • Plugin Architecture
    • Scalability and Performance
    • Serverless
    • Service Collaboration
    • Service Mesh
    • SOA
    • Spring is bloated
    • Stages of API Adoption
    • Transaction Management
    • Microservices Cross-cutting Concerns Options
    • Service Mesh Plus
    • Service Discovery
  • Design
    • Design Overview
    • Design First vs Code First
    • Desgin Pattern
    • Service Evolution
    • Consumer Contract and Consumer Driven Contract
    • Handling Partial Failure
    • Idempotency
    • Server Life Cycle
    • Environment Segregation
    • Database
    • Decomposition Patterns
    • Http2
    • Test Driven
    • Multi-Tenancy
    • Why check token expiration
    • WebServices to Microservices
  • Cross-Cutting Concerns
    • Concerns Overview
  • API Styles
    • Light-4j for absolute performance
    • Style Overview
    • Distributed session on IMDG
    • Hybrid Serverless Modularized Monolithic
    • Kafka - Event Sourcing and CQRS
    • REST - Representational state transfer
    • Web Server with Light
    • Websocket with Light
    • Spring Boot Integration
    • Single Page Application
    • GraphQL - A query language for your API
    • Light IBM MQ
    • Light AWS Lambda
    • Chaos Monkey
  • Infrastructure Services
    • Service Overview
    • Light Proxy
    • Light Mesh
    • Light Router
    • Light Portal
    • Messaging Infrastructure
    • Centralized Logging
    • COVID-19
    • Light OAuth2
    • Metrics and Alerts
    • Config Server
    • Tokenization
    • Light Controller
  • Tool Chain
    • Tool Chain Overview
  • Utility Library
  • Service Consumer
    • Service Consumer
  • Development
    • Development Overview
  • Deployment
    • Deployment Overview
    • Frontend Backend
    • Linux Service
    • Windows Service
    • Install Eventuate on Windows
    • Secure API
    • Client vs light-router
    • Memory Limit
    • Deploy to Kubernetes
  • Benchmark
    • Benchmark Overview
  • Tutorial
    • Tutorial Overview
  • Troubleshooting
    • Troubleshoot
  • FAQ
    • FAQ Overview
  • Milestones
  • Contribute
    • Contribute to Light
    • Development
    • Documentation
    • Example
    • Tutorial

User Query with ksqlDB

In this tutorial, we will re-implement the stream-query with ksqldb instead of Kafka Streams.

Codegen

The README.md and config.json to scaffold a project in the light-example-4j can be found at the following URL.

https://github.com/networknt/model-config/tree/master/kafka/ksqldb-query

The generated project can be found at

https://github.com/networknt/light-example-4j/tree/master/kafka/ksqldb-query

Start Confluent

Download and install the confluent 6.0.1 locally with tar.gz file and start the services

confluent local services start

The ksqlDB server will be started along with other services.

Producer

Please refer to the producer-consumer to create the test topic and produce some message to the test topic.

Web Console

Access the web console from http://localhost:9021/ and go to the ksqlDB Editor. Issue the following command to create a table.

CREATE TABLE USER_TABLE (userId VARCHAR PRIMARY KEY, firstName VARCHAR, lastName VARCHAR, country VARCHAR)
    WITH (kafka_topic='test', value_format='json');

Once the table is created, we can issue the following command to query the table.

SELECT * from  USER_TABLE WHERE  USERID = 'stevehu' EMIT CHANGES;

As we have run the producer before, the messages are already sent to the test topic. We need to click the Add query properties link to add the following property.

auto.offset.reset = Earliest

Click the Run query button, and you will see the result like the following.

[{"USERID":"stevehu","FIRSTNAME":"Steve","LASTNAME":"Hu","COUNTRY":"CA"}]

pom.xml

In order to access the ksqlDB from our service, we need to add client dependency.

        <version.ksqldb>0.14.0</version.ksqldb>

        <dependency>
            <groupId>io.confluent.ksql</groupId>
            <artifactId>ksqldb-api-client</artifactId>
            <version>${version.ksqldb}</version>
        </dependency>

        <repository>
            <id>ksqlDB</id>
            <name>ksqlDB</name>
            <url>https://ksqldb-maven.s3.amazonaws.com/maven/</url>
        </repository>
        <repository>
            <id>confluent</id>
            <name>Confluent</name>
            <url>https://jenkins-confluent-packages-beta-maven.s3.amazonaws.com/6.1.0-beta201006024150/1/maven/</url>
        </repository>
    <pluginRepositories>
        <pluginRepository>
            <id>ksqlDB</id>
            <url>https://ksqldb-maven.s3.amazonaws.com/maven/</url>
        </pluginRepository>
        <pluginRepository>
            <id>confluent</id>
            <url>https://jenkins-confluent-packages-beta-maven.s3.amazonaws.com/6.1.0-beta201006024150/1/maven/</url>
        </pluginRepository>
    </pluginRepositories>
                    

QueryUserIdGetHandler

The following is the handler.

package com.networknt.kafka.handler;

import com.networknt.handler.LightHttpHandler;
import io.confluent.ksql.api.client.*;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import io.undertow.util.HttpString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

/**
For more information on how to write business handlers, please check the link below.
https://doc.networknt.com/development/business-handler/rest/
*/
public class QueryUserIdGetHandler implements LightHttpHandler {
    private static final Logger logger = LoggerFactory.getLogger(QueryUserIdGetHandler.class);
    private static final String QUERY = "SELECT * from USER_TABLE WHERE USERID = '%s' EMIT CHANGES;";
    private static final String OBJECT_NOT_FOUND = "ERR11637";
    private static String KSQLDB_SERVER_HOST = "localhost";
    private static int KSQLDB_SERVER_HOST_PORT = 8088;
    public Client client = null;

    public QueryUserIdGetHandler() {
        ClientOptions options = ClientOptions.create()
                .setHost(KSQLDB_SERVER_HOST)
                .setPort(KSQLDB_SERVER_HOST_PORT);
        client = Client.create(options);
    }

    @Override
    public void handleRequest(HttpServerExchange exchange) throws Exception {
        String userId = exchange.getQueryParameters().get("userId").getFirst();
        if(logger.isTraceEnabled()) logger.trace("userId = " + userId);
        String s = String.format(QUERY, userId);
        if(logger.isTraceEnabled()) logger.trace("query = " + s);
        Map<String, Object> properties = new HashMap<>();
        properties.put("auto.offset.reset", "earliest");
        StreamedQueryResult result = client.streamQuery(s, properties).get();
        Row row = result.poll();
        if(row != null) {
            exchange.getResponseHeaders().add(Headers.CONTENT_TYPE, "application/json");
            KsqlArray a = row.values();
            exchange.getResponseSender().send(a.toJsonString());
        } else {
            setExchangeStatus(exchange, OBJECT_NOT_FOUND, "user", userId);
        }
    }
}

Test

Issue the following command from the terminal to test it.

curl -k https://localhost:8443/query/stevehu

And the result:

["stevehu","Steve","Hu","CA"]

Summary

As you can see, the result is a list of data elements in the table. It is due to the design by the ksqldb client to mimic the database query result. In the return row object, we have access to each column’s header and data type. It is easy to convert the result to a JSON if we want.

Compared with the Kafka stream-query implementation; the service is significantly slower. It will get slower when more user objects are sent to the test topic because we set the property to redo the stream from the earliest each query dynamically.

In essence, we delegate the Streams processing to the ksqlDB server with SQL like statements, and then we call the ksqlDB with its Java client from our service built with light-rest-4j. In the stream-query, we combine the Streams processing and query on the local store in the same service to gain the best performance and reduce the license cost for the ksqlDB.

  • News
  • Docs
  • Community
  • Reddit
  • GitHub
  • About Light
  • Getting Started
  • Architecture
  • Design
  • Cross-Cutting Concerns
  • API Styles
  • Infrastructure Services
  • Tool Chain
  • Utility Library
  • Service Consumer
  • Development
  • Deployment
  • Benchmark
  • Tutorial
  • Troubleshooting
  • FAQ
  • Milestones
  • Contribute