ksqlDB Streams Example

Test runner:

ksqldb-cli ksql-test-runner -e extensions/ \
                            -s statements/create-orders.ksql \
                            -i tests/orders-input.json \
                            -o tests/orders-output.json | grep '>>>'

Execute statement files, create orders stream:

ksqldb-cli ksql -f statements/create-orders.ksql http://ksqldb:8088

Insert orders:

ksqldb-cli ksql -f statements/insert-orders.ksql http://ksqldb:8088

List of streams:

http :8088/ksql ksql="list streams;" | jq '.[].streams[] | [{name: .name, topic: .topic}]'

Show content:

ksqldb-cli ksql -e "PRINT 'kafka-ksqldb.order_sizes' FROM BEGINNING;" http://ksqldb:8088

Deleting all orders:

ksqldb-cli ksql -e "DROP STREAM ORDERSIZES DELETE TOPIC; DROP STREAM ORDERS DELETE TOPIC;" http://ksqldb:8088

Statements

statements/create-orders.ksql

CREATE STREAM orders (orderId INT KEY, orderUnits INT, totalAmount DOUBLE)
  WITH (kafka_topic='kafka-ksqldb.orders', key_format='json', value_format='json', partitions=10, replicas=3);

CREATE STREAM orderSizes 
  WITH (kafka_topic='kafka-ksqldb.order_sizes', key_format='json', value_format='json', partitions=10, replicas=3)
  AS SELECT
    orderId,
    orderUnits,
    totalAmount,
    CASE WHEN orderUnits < 2 THEN 'small' WHEN orderUnits < 4 THEN 'medium' ELSE 'large' END AS orderSize,
    taxes(totalAmount) AS tax
  FROM orders EMIT CHANGES;

statements/insert-orders.ksql

INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000000, 2, 100.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000001, 4, 200.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000002, 6, 300.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000003, 3, 150.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000004, 1, 50.0);

Tests

tests/orders-input.json

{
  "inputs": [
    {"topic": "kafka-ksqldb.orders", "timestamp": 0, "key": 1000000, "value": {"orderUnits": 2, "totalAmount": 100.0}},
    {"topic": "kafka-ksqldb.orders", "timestamp": 0, "key": 1000001, "value": {"orderUnits": 4, "totalAmount": 200.0}},
    {"topic": "kafka-ksqldb.orders", "timestamp": 0, "key": 1000002, "value": {"orderUnits": 6, "totalAmount": 300.0}},
    {"topic": "kafka-ksqldb.orders", "timestamp": 0, "key": 1000003, "value": {"orderUnits": 3, "totalAmount": 150.0}},
    {"topic": "kafka-ksqldb.orders", "timestamp": 0, "key": 1000004, "value": {"orderUnits": 1, "totalAmount": 50.0}}
  ]
}

tests/orders-output.json

{
  "outputs": [
    {"topic": "kafka-ksqldb.order_sizes", "timestamp": 0, "key": 1000000, "value": {"ORDERUNITS": 2, "TOTALAMOUNT": 100.0, "ORDERSIZE": "medium", "TAX": 12.0}},
    {"topic": "kafka-ksqldb.order_sizes", "timestamp": 0, "key": 1000001, "value": {"ORDERUNITS": 4, "TOTALAMOUNT": 200.0, "ORDERSIZE": "large" , "TAX": 24.0}},
    {"topic": "kafka-ksqldb.order_sizes", "timestamp": 0, "key": 1000002, "value": {"ORDERUNITS": 6, "TOTALAMOUNT": 300.0, "ORDERSIZE": "large" , "TAX": 36.0}},
    {"topic": "kafka-ksqldb.order_sizes", "timestamp": 0, "key": 1000003, "value": {"ORDERUNITS": 3, "TOTALAMOUNT": 150.0, "ORDERSIZE": "medium", "TAX": 18.0}},
    {"topic": "kafka-ksqldb.order_sizes", "timestamp": 0, "key": 1000004, "value": {"ORDERUNITS": 1, "TOTALAMOUNT": 50.0, "ORDERSIZE": "small" , "TAX": 6.0}}
  ]
}