ksqlDB Queries

Create a Stream

Create orders stream:

ksql -f kafka-ksqldb/ksql/create-orders.ksql http://ksqldb:8088

The previous command executed this:

CREATE STREAM orders (orderId INT KEY, orderUnits INT, totalAmount DOUBLE)
  WITH (kafka_topic='ksqldb.orders', key_format='json', value_format='json', partitions=10, replicas=3);

CREATE STREAM orderSizes 
  WITH (kafka_topic='ksqldb.order_sizes', key_format='json', value_format='json', partitions=10, replicas=3)
  AS SELECT
    orderId,
    orderUnits,
    totalAmount,
    CASE WHEN orderUnits < 2 THEN 'small' WHEN orderUnits < 4 THEN 'medium' ELSE 'large' END AS orderSize,
    taxes(totalAmount) AS tax
  FROM orders EMIT CHANGES;

List of streams:

ksql -e "SHOW STREAMS;" http://ksqldb:8088

Insert

As any other SQL interpreter, ksqlDB will use the command INSERT to populate a table.

INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000000, 2, 100.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000001, 4, 200.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000002, 6, 300.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000003, 3, 150.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000004, 1, 50.0);

Insert orders:

ksql -f kafka-ksqldb/ksql/insert-orders.ksql http://ksqldb:8088

Show

ksql -e "PRINT 'ksqldb.order_sizes' FROM BEGINNING;" http://ksqldb:8088

Drop

ksql -e "DROP STREAM ORDERSIZES;" http://ksqldb:8088
ksql -e "DROP STREAM ORDERS;" http://ksqldb:8088