Streaming-ETL

Aggregating Data

In this example, we’ll see how to take an inbound stream of order data and write a new Kafka topic with the calculated total value and number of orders placed within a rolling five-minute window.

Directions

1. Register the existing purchases topic for use as a KSQL stream called purchases:

 CREATE STREAM purchases 
 (order_id INT, customer_name VARCHAR, date_of_birth VARCHAR, 
 product VARCHAR, order_total_usd DOUBLE, town VARCHAR, country VARCHAR) 
 WITH (KAFKA_TOPIC='purchases', VALUE_FORMAT='JSON');

2. Inspect the first few messages as they arrive:

 SELECT * FROM PURCHASES LIMIT 5;

3. Aggregate the order values (ORDER_TOTAL_USD) by country:

 SELECT COUNTRY, 
        COUNT(*) AS ORDER_COUNT, 
        SUM(ORDER_TOTAL_USD) AS ORDER_TOTAL_USD 
   FROM PURCHASES 
         WINDOW TUMBLING (SIZE 5 MINUTES) 
   GROUP BY COUNTRY;

Note that as each new event arrives it will trigger an update of the aggregate, which will be re-emitted:

 United States | 591.3700000000001 | 124
 Germany | 81.15 | 15
 United States | 609.0200000000001 | 127
 United Kingdom | 52.870000000000005 | 11
 United States | 616.8600000000001 | 130
 United States | 638.9900000000002 | 134 

7. The message key includes the timestamp window, as seen if we persist the results to a KSQL table:

 CREATE TABLE ORDERS_BY_COUNTRY_BY_5_MINS AS 
 SELECT COUNTRY, 
        COUNT(*) AS ORDER_COUNT, 
        SUM(ORDER_TOTAL_USD) AS ORDER_TOTAL_USD 
   FROM PURCHASES 
         WINDOW TUMBLING (SIZE 5 MINUTES) 
   GROUP BY COUNTRY;

 ksql> SELECT ROWKEY, COUNTRY, ORDER_COUNT, ORDER_TOTAL_USD FROM ORDERS_BY_COUNTRY_BY_5_MINS;

 United States : Window{start=1542800400000 end=-} | United States | 193 | 960.8500000000001
 Germany : Window{start=1542800400000 end=-} | Germany | 24 | 120.51000000000002
 United Kingdom : Window{start=1542800400000 end=-} | United Kingdom | 16 | 64.64
< Back to the Stream Processing Cookbook