LIGHT

  • News
  • Docs
  • Community
  • Reddit
  • GitHub

Ksqldb Active Consumer Tutorial

Summary

Pull queries are a relatively new but integral feature offered by ksqlDB. In contrast to push queries, which perpetually stream incremental query results to clients, pull queries follow a traditional request/response model, which means that a pull query retrieves a finite result from the ksqlDB server and terminate on completion, similar to the way queries work with traditional databases. KsqlDB server is part of the Confluent Platform, and it provides the streams processor to hide the details of the Kafka streams. Users can create tables and streams on the KsqlDB server and subscribe to the KsqlDB server from the sidecar. Once the table or stream is changed, a rest call will be issued by the sidecar to the backend API to notify that new data received.

There are two types of queries in ksqldb:

  • pull
  • push

Please refer here for detail.

“/ksqldb/active” endpoint support both query types, but we suggest to use pull query for this endpoint only. It much stable and have better performance.

  • Pull queries are expressed using a strict subset of ANSI SQL.
  • You can issue a pull query against any table that was created by a CREATE TABLE AS SELECT statement.
  • Currently, we do not support pull queries against tables created by using a CREATE TABLE statement.
  • Pull queries do not support JOIN, PARTITION BY, GROUP BY and WINDOW clauses (but can query materialized tables that contain those clauses).

If you query to against KStream or KTable which created by using a CREATE TABLE statement, set the query type as “push”.

There is new request object has been added into light-kafka:

https://github.com/networknt/light-kafka/blob/master/kafka-entity/src/main/java/com/networknt/kafka/entity/KsqlDbPullQueryRequest.java

Sample request payload:

{
    "offset": "earliest",
    "deserializationError": false,
    "queryType": "pull",
     "tableScanEnable": true,
    "query": "select * from QUERYUSER;"
}

Fields detail:

  • offset optional field, only use for push query. Available values: earliest/latest

  • queryType optional field, indicate query type. Available values: pull/push

  • deserializationError optional field, indicates whether to fail if corrupt messages are read. Available values: true/false

  • tableScanEnable optional field, indicates whether full table scan allowed. Available values: true/false

  • query required field, ksqlDB query string


In light-4j kafka-sidecar, there is startuphook (KsqldbActiveConsumerStartupHook) which use to initial ksql Active consumer.

KsqldbActiveConsumerStartupHook will initial a kafka API client based the kafka-ksqldb.yml config.

For local connection, it only need host and port for connection:

ksqldbHost: ${kafka-ksqldb.ksqldbHost:localhost}
# ksqlDB port
ksqldbPort: ${kafka-ksqldb.ksqldbPort:8088}

For Enterprise Kafka KSQL server, we need use tls connection and use the base Authentication:

useTls: ${kafka-ksqldb.useTls:false}
trustStore: ${kafka-ksqldb.trustStore:/truststore/kafka.server.truststore.jks}
trustStorePassword: ${kafka-ksqldb.trustStorePassword:changeme}
basicAuthCredentialsUser: ${kafka-ksqldb.basicAuthCredentialsUser:userId}
basicAuthCredentialsPassword: ${kafka-ksqldb.basicAuthCredentialsPassword:changeme}

There is an API endpoint for user to use Active consumer to run query and get the query result as API response:

There is new endpoint added for executing ksqlDB query:

  '/ksqldb/active':
    post:
      operationId: KsqlDBPullQueryActive
      summary: KsqlDBPullQuery APIs by active consumer
      requestBody:
        description: "process a ksqlDB query"
        required: true
        content:
          application/json:
            schema:
              "$ref": "#/components/schemas/KsqlDbPullQueryRequest"
      responses:
        '200':
          description: Successful response
          content:
            application/json:
              schema:
                type: object

Verify kafka sidecar ksql query locally :

  • Create a topic name as test:

create-topic

And set the value JSON schema:

{
  "$id": "http://example.com/myURI.schema.json",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "additionalProperties": false,
  "description": "Sample schema to help you get started.",
  "properties": {
    "country": {
      "enum": [
        "CA",
        "US"
      ],
      "type": "string"
    },
    "firstName": {
      "description": "First Name",
      "type": "string"
    },
    "lastName": {
      "description": "Last Name",
      "type": "string"
    },
    "userId": {
      "description": "User Id",
      "type": "string"
    }
  },
  "title": "value_test",
  "type": "object"
}

And then use the kafka sidecar “/producers/test” endpoint (POST) to populate some message to the topic.

Sample request body:

{
    "records": [
        {
            "key": "1",
            "value": {
                "userId": "1111",
                 "firstName": "test1"
            }
        },
        {
            "key": "2",
            "value": {
                "userId": "2222",
                 "firstName": "test2"                
            }
        },
        {
            "key": "3",
            "value": {
                "userId": "3333",
                 "firstName": "test3"                       
            }
        }
    ]
}
  • Create KTable based on the topic created above:
CREATE TABLE USERS 
   (ID STRING PRIMARY KEY, USERID STRING, FIRSTNAME STRING, LASTNAME STRING, COUNTRY STRING) 
    WITH (KAFKA_TOPIC='test', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON_SR');
  • Create query able KTable based on the KTable above:

 CREATE TABLE QUERYUSER AS SELECT * FROM USERS;

  • Start kafka sidecar and verify by curl command:
curl --location --request POST 'http://localhost:8084/ksqldb/active' \
--header 'Content-Type: application/json' \
--data-raw ' 
{
    "offset": "earliest",
    "deserializationError": false,
    "queryType": "pull",
     "tableScanEnable": true,
    "query": "select * from QUERYUSER1 where id = '\''1'\'';"
}
'

Response:

[
    {
        "USERID": "4444",
        "FIRSTNAME": "test1",
        "ID": "1"
    }
]
  • About Light
    • Overview
    • Testimonials
    • What is Light
    • Features
    • Principles
    • Benefits
    • Roadmap
    • Community
    • Articles
    • Videos
    • License
    • Why Light Platform
  • Getting Started
    • Get Started Overview
    • Environment
    • Light Codegen Tool
    • Light Rest 4j
    • Light Tram 4j
    • Light Graphql 4j
    • Light Hybrid 4j
    • Light Eventuate 4j
    • Light Oauth2
    • Light Portal Service
    • Light Proxy Server
    • Light Router Server
    • Light Config Server
    • Light Saga 4j
    • Light Session 4j
    • Webserver
    • Websocket
    • Spring Boot Servlet
  • Architecture
    • Architecture Overview
    • API Category
    • API Gateway
    • Architecture Patterns
    • CQRS
    • Eco System
    • Event Sourcing
    • Fail Fast vs Fail Slow
    • Integration Patterns
    • JavaEE declining
    • Key Distribution
    • Microservices Architecture
    • Microservices Monitoring
    • Microservices Security
    • Microservices Traceability
    • Modular Monolith
    • Platform Ecosystem
    • Plugin Architecture
    • Scalability and Performance
    • Serverless
    • Service Collaboration
    • Service Mesh
    • SOA
    • Spring is bloated
    • Stages of API Adoption
    • Transaction Management
    • Microservices Cross-cutting Concerns Options
    • Service Mesh Plus
    • Service Discovery
  • Design
    • Design Overview
    • Design First vs Code First
    • Desgin Pattern
    • Service Evolution
    • Consumer Contract and Consumer Driven Contract
    • Handling Partial Failure
    • Idempotency
    • Server Life Cycle
    • Environment Segregation
    • Database
    • Decomposition Patterns
    • Http2
    • Test Driven
    • Multi-Tenancy
    • Why check token expiration
    • WebServices to Microservices
  • Cross-Cutting Concerns
    • Concerns Overview
  • API Styles
    • Light-4j for absolute performance
    • Style Overview
    • Distributed session on IMDG
    • Hybrid Serverless Modularized Monolithic
    • Kafka - Event Sourcing and CQRS
    • REST - Representational state transfer
    • Web Server with Light
    • Websocket with Light
    • Spring Boot Integration
    • Single Page Application
    • GraphQL - A query language for your API
    • Light IBM MQ
    • Light AWS Lambda
    • Chaos Monkey
  • Infrastructure Services
    • Service Overview
    • Light Proxy
    • Light Mesh
    • Light Router
    • Light Portal
    • Messaging Infrastructure
    • Centralized Logging
    • COVID-19
    • Light OAuth2
    • Metrics and Alerts
    • Config Server
    • Tokenization
    • Light Controller
  • Tool Chain
    • Tool Chain Overview
  • Utility Library
  • Service Consumer
    • Service Consumer
  • Development
    • Development Overview
  • Deployment
    • Deployment Overview
    • Frontend Backend
    • Linux Service
    • Windows Service
    • Install Eventuate on Windows
    • Secure API
    • Client vs light-router
    • Memory Limit
    • Deploy to Kubernetes
  • Benchmark
    • Benchmark Overview
  • Tutorial
    • Tutorial Overview
  • Troubleshooting
    • Troubleshoot
  • FAQ
    • FAQ Overview
  • Milestones
  • Contribute
    • Contribute to Light
    • Development
    • Documentation
    • Example
    • Tutorial
“Ksqldb Active Consumer Tutorial” was last updated: November 3, 2021: add h2 doc for enable h2 console in light4j API (#306) (0429537)
Improve this page
  • News
  • Docs
  • Community
  • Reddit
  • GitHub
  • About Light
  • Getting Started
  • Architecture
  • Design
  • Cross-Cutting Concerns
  • API Styles
  • Infrastructure Services
  • Tool Chain
  • Utility Library
  • Service Consumer
  • Development
  • Deployment
  • Benchmark
  • Tutorial
  • Troubleshooting
  • FAQ
  • Milestones
  • Contribute