Streaming-ETL

Aggregating Data

In this example, we’ll see how to take an inbound stream of order data and write a new Kafka topic with the calculated total value and number of orders placed within a rolling five-minute window.

Directions

1. Register the existing purchases topic for use as a KSQL stream called purchases:

 CREATE STREAM purchases \
 (order_id INT, customer_name VARCHAR, date_of_birth VARCHAR, \
 product VARCHAR, order_total_usd DOUBLE, town VARCHAR, country VARCHAR) \
 WITH (KAFKA_TOPIC='purchases', VALUE_FORMAT='JSON');

2. Inspect the first few messages as they arrive:

 SELECT * FROM PURCHASES LIMIT 5;

3. Aggregate the order values (ORDER_TOTAL_USD) by country:

 SELECT COUNTRY, \
        COUNT(*) AS ORDER_COUNT, \
        SUM(ORDER_TOTAL_USD) AS ORDER_TOTAL_USD \
   FROM PURCHASES \
         WINDOW TUMBLING (SIZE 5 MINUTES) \
   GROUP BY COUNTRY;

Note that as each new event arrives it will trigger an update of the aggregate, which will be re-emitted:

 United States | 591.3700000000001 | 124
 Germany | 81.15 | 15
 United States | 609.0200000000001 | 127
 United Kingdom | 52.870000000000005 | 11
 United States | 616.8600000000001 | 130
 United States | 638.9900000000002 | 134 

7. The message key includes the timestamp window, as seen if we persist the results to a KSQL table:

 CREATE TABLE ORDERS_BY_COUNTRY_BY_5_MINS AS \
 SELECT COUNTRY, \
        COUNT(*) AS ORDER_COUNT, \
        SUM(ORDER_TOTAL_USD) AS ORDER_TOTAL_USD \
   FROM PURCHASES \
         WINDOW TUMBLING (SIZE 5 MINUTES) \
   GROUP BY COUNTRY;

 ksql> SELECT ROWKEY, COUNTRY, ORDER_COUNT, ORDER_TOTAL_USD FROM ORDERS_BY_COUNTRY_BY_5_MINS;

 United States : Window{start=1542800400000 end=-} | United States | 193 | 960.8500000000001
 Germany : Window{start=1542800400000 end=-} | Germany | 24 | 120.51000000000002
 United Kingdom : Window{start=1542800400000 end=-} | United Kingdom | 16 | 64.64
< Back to the Stream Processing Cookbook

Wir verwenden Cookies, damit wir nachvollziehen können, wie Sie unsere Website verwenden, und um Ihr Erlebnis zu optimieren. Klicken Sie hier, wenn Sie mehr erfahren oder Ihre Cookie-Einstellungen ändern möchten. Wenn Sie weiter auf dieser Website surfen, stimmen Sie unserer Nutzung von Cookies zu.