ksqlDB Streams Example
- statements location: kafka-ksqldb/statements
- test location: kafka-ksqldb/tests
Test runner:
ksqldb-cli ksql-test-runner -e extensions/ \
-s statements/create-orders.ksql \
-i tests/orders-input.json \
-o tests/orders-output.json | grep '>>>'
Execute statement files, create orders stream:
ksqldb-cli ksql -f statements/create-orders.ksql http://ksqldb:8088
Insert orders:
ksqldb-cli ksql -f statements/insert-orders.ksql http://ksqldb:8088
List of streams:
http :8088/ksql ksql="list streams;" | jq '.[].streams[] | [{name: .name, topic: .topic}]'
Show content:
ksqldb-cli ksql -e "PRINT 'kafka-ksqldb.order_sizes' FROM BEGINNING;" http://ksqldb:8088
Deleting all orders:
ksqldb-cli ksql -e "DROP STREAM ORDERSIZES DELETE TOPIC; DROP STREAM ORDERS DELETE TOPIC;" http://ksqldb:8088
Statements
statements/create-orders.ksql
CREATE STREAM orders (orderId INT KEY, orderUnits INT, totalAmount DOUBLE)
WITH (kafka_topic='kafka-ksqldb.orders', key_format='json', value_format='json', partitions=10, replicas=3);
CREATE STREAM orderSizes
WITH (kafka_topic='kafka-ksqldb.order_sizes', key_format='json', value_format='json', partitions=10, replicas=3)
AS SELECT
orderId,
orderUnits,
totalAmount,
CASE WHEN orderUnits < 2 THEN 'small' WHEN orderUnits < 4 THEN 'medium' ELSE 'large' END AS orderSize,
taxes(totalAmount) AS tax
FROM orders EMIT CHANGES;
statements/insert-orders.ksql
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000000, 2, 100.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000001, 4, 200.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000002, 6, 300.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000003, 3, 150.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000004, 1, 50.0);
Tests
tests/orders-input.json
{
"inputs": [
{"topic": "kafka-ksqldb.orders", "timestamp": 0, "key": 1000000, "value": {"orderUnits": 2, "totalAmount": 100.0}},
{"topic": "kafka-ksqldb.orders", "timestamp": 0, "key": 1000001, "value": {"orderUnits": 4, "totalAmount": 200.0}},
{"topic": "kafka-ksqldb.orders", "timestamp": 0, "key": 1000002, "value": {"orderUnits": 6, "totalAmount": 300.0}},
{"topic": "kafka-ksqldb.orders", "timestamp": 0, "key": 1000003, "value": {"orderUnits": 3, "totalAmount": 150.0}},
{"topic": "kafka-ksqldb.orders", "timestamp": 0, "key": 1000004, "value": {"orderUnits": 1, "totalAmount": 50.0}}
]
}
tests/orders-output.json
{
"outputs": [
{"topic": "kafka-ksqldb.order_sizes", "timestamp": 0, "key": 1000000, "value": {"ORDERUNITS": 2, "TOTALAMOUNT": 100.0, "ORDERSIZE": "medium", "TAX": 12.0}},
{"topic": "kafka-ksqldb.order_sizes", "timestamp": 0, "key": 1000001, "value": {"ORDERUNITS": 4, "TOTALAMOUNT": 200.0, "ORDERSIZE": "large" , "TAX": 24.0}},
{"topic": "kafka-ksqldb.order_sizes", "timestamp": 0, "key": 1000002, "value": {"ORDERUNITS": 6, "TOTALAMOUNT": 300.0, "ORDERSIZE": "large" , "TAX": 36.0}},
{"topic": "kafka-ksqldb.order_sizes", "timestamp": 0, "key": 1000003, "value": {"ORDERUNITS": 3, "TOTALAMOUNT": 150.0, "ORDERSIZE": "medium", "TAX": 18.0}},
{"topic": "kafka-ksqldb.order_sizes", "timestamp": 0, "key": 1000004, "value": {"ORDERUNITS": 1, "TOTALAMOUNT": 50.0, "ORDERSIZE": "small" , "TAX": 6.0}}
]
}