ksqlDB Queries
Create a Stream
Create orders stream:
ksql -f kafka-ksqldb/ksql/create-orders.ksql http://ksqldb:8088
The previous command executed this:
CREATE STREAM orders (orderId INT KEY, orderUnits INT, totalAmount DOUBLE)
WITH (kafka_topic='ksqldb.orders', key_format='json', value_format='json', partitions=10, replicas=3);
CREATE STREAM orderSizes
WITH (kafka_topic='ksqldb.order_sizes', key_format='json', value_format='json', partitions=10, replicas=3)
AS SELECT
orderId,
orderUnits,
totalAmount,
CASE WHEN orderUnits < 2 THEN 'small' WHEN orderUnits < 4 THEN 'medium' ELSE 'large' END AS orderSize,
taxes(totalAmount) AS tax
FROM orders EMIT CHANGES;
List of streams:
ksql -e "SHOW STREAMS;" http://ksqldb:8088
Insert
As any other SQL interpreter, ksqlDB will use the command INSERT
to populate a table.
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000000, 2, 100.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000001, 4, 200.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000002, 6, 300.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000003, 3, 150.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000004, 1, 50.0);
Insert orders:
ksql -f kafka-ksqldb/ksql/insert-orders.ksql http://ksqldb:8088
Show
ksql -e "PRINT 'ksqldb.order_sizes' FROM BEGINNING;" http://ksqldb:8088
Drop
ksql -e "DROP STREAM ORDERSIZES;" http://ksqldb:8088
ksql -e "DROP STREAM ORDERS;" http://ksqldb:8088