Data Wrangling

Using Event Time from IoT Sensor Readings

Sensor data typically includes a timestamp of when the value was read. You can use KSQL to format a stream that uses this data timestamp as the message timestamp. This is as an alternative to the default ROWTIME value, where Apache Kafka® normally writes its internal timestamp, set by the producing application or just the time that the message was written to the brokers. Using the timestamp from the sensor reading makes windowing and aggregate operations more intuitive because those operations will be based on when the data was sampled.

Directions

1. Below is a sample of the raw sensor data that is being produced to a Kafka topic, where the first element of the delimited data is the epoch timestamp of the sensor reading.

1546033893602,0.013611,0.001587,-0.003052,0,-1,-1,21
1546033893735,-0.004700,-0.001587,-0.010132,0,0,0,21
1546033893864,-0.003723,-0.001099,-0.001587,0,0,0,21
1546033893995,-0.001526,-0.000610,0.002075,0,0,-1,21
1546033894122,-0.001526,-0.003296,0.006470,0,0,0,21
1546033894255,0.000427,-0.005981,-0.016479,-1,0,0,21

2. Create a stream with the sensor time as the TIMESTAMP:

CREATE STREAM sensor_stream ( \
    ts BIGINT, \
    accel_x DOUBLE, \
    accel_y DOUBLE, \
    accel_z DOUBLE,  \
    gyro_x INTEGER, \
    gyro_y INTEGER, \
    gyro_z INTEGER, \
    temp_c INTEGER \
    ) WITH (KAFKA_TOPIC='sensor_delimited', VALUE_FORMAT='DELIMITED', TIMESTAMP='ts');

Notice the timestamp (ts) is already being specified as BIGINT and the TIMESTAMP=ts in the WITH clause.

From here, we can run a quick validation query in the Confluent Control Center UI and see that the ROWTIME and TS are in fact the same values:Confluent Control Center UI

3. We now have raw data and a stream we can query based on the sensor reading timestamp, so let’s use the other side of the stream and table duality to look at the data further. Here, the sensor includes motion data, and we can analyze the maximum acceleration rate of the x-axis over the past hour.

ksql> CREATE TABLE sensor_table_accel_x_hourly AS \
   SELECT ts,\
          TIMESTAMPTOSTRING(windowstart(), 'yyyy-MM-dd HH:mm:ss') AS window_start_ts,\
          TIMESTAMPTOSTRING(windowend(), 'yyyy-MM-dd HH:mm:ss') AS window_end_ts,\
          max(accel_x)  AS max_x_accel\
     FROM sensor_stream \
          WINDOW SESSION (60 MINUTES) \
 GROUP BY accel_x;

From there, we can see what the maximum acceleration in g-forces was in the past hour:

ksql> SELECT window_start_ts, window_end_ts, max_x_accel \
FROM SENSOR_TABLE_ACCEL_X_HOURLY \
LIMIT 1;
2018-12-28  22:37:34 | 2018-12-28  22:41:06 | -0.004395
< Back to the Stream Processing Cookbook

Wir verwenden Cookies, damit wir nachvollziehen können, wie Sie unsere Website verwenden, und um Ihr Erlebnis zu optimieren. Klicken Sie hier, wenn Sie mehr erfahren oder Ihre Cookie-Einstellungen ändern möchten. Wenn Sie weiter auf dieser Website surfen, stimmen Sie unserer Nutzung von Cookies zu.