Directions
1. Create a stream (inventory_stream
) from the inventory topic to stream the inventory change events in real time.
CREATE STREAM inventory_stream (cid STRING, item STRING, qty INTEGER, price DOUBLE, balance INTEGER) with (VALUE_FORMAT='json', KAFKA_TOPIC='inventory');
2. Now a table created from a 'Select'
query on inventory_stream
will have the most up-to-date inventory information by order of items. As new events arrive on the source inventory topic, the table will be updated.
SET 'auto.offset.reset' = 'earliest'; CREATE TABLE inventory_stream_table AS SELECT item, SUM(qty) AS item_qty FROM inventory_stream GROUP BY item; Sample output : ksql> SELECT item, item_qty from inventory_stream_table; watch | 80 tablet | 30 laptop | 20 phone | 35