Streaming-ETL

Datenfilterung

KSQL streaming queries run continuously. You can persist the streaming query output to a Kafka topic by using the KSQL CREATE STREAM AS syntax. KSQL takes a real-time feed of events from one Kafka topic, transforms them and writes them continually to another.

This example shows how to filter data streaming data from an inbound topic to exclude records that originate from a particular geography.

Environment 4.1 or higher

Directions

In this example, a source event stream named purchases is used.

{
  "order_id": 1,
  "customer_name": "Maryanna Andryszczak",
  "date_of_birth": "1922-06-06T02:21:59Z",
  "product": "Nut - Walnut, Pieces",
  "order_total_usd": "1.65",
  "town": "Portland",
  "country": "United States"
}

1. In KSQL, register the purchases stream:

ksql> CREATE STREAM purchases \
      (order_id INT, customer_name VARCHAR, date_of_birth VARCHAR, \
       product VARCHAR, order_total_usd VARCHAR, town VARCHAR, country VARCHAR) \
       WITH (KAFKA_TOPIC='purchases', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

2. Inspect the first few messages as they arrive:

SELECT * FROM PURCHASES LIMIT 5;

3. Filter to show just those where the country is Germany:

SELECT ORDER_ID, PRODUCT, TOWN, COUNTRY FROM PURCHASES WHERE COUNTRY='Germany';

4. Create a new KSQL stream containing just German orders:

CREATE STREAM PUCHASES_GERMANY AS SELECT * FROM PURCHASES WHERE COUNTRY='Germany';

5. The new stream, PUCHASES_GERMANY, populates a Kafka topic of the same name, as seen here:

ksql> LIST TOPICS;

 Kafka Topic        | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
------------------------------------------------------------------------------------------------
 _confluent-metrics | false      | 12         | 1                  | 0         | 0
 PUCHASES_GERMANY   | true       | 4          | 1                  | 0         | 0
 purchases          | true       | 1          | 1                  | 1         | 1
------------------------------------------------------------------------------------------------
ksql>

Wir verwenden Cookies, damit wir nachvollziehen können, wie Sie unsere Website verwenden, und um Ihr Erlebnis zu optimieren. Klicken Sie hier, wenn Sie mehr erfahren oder Ihre Cookie-Einstellungen ändern möchten. Wenn Sie weiter auf dieser Website surfen, stimmen Sie unserer Nutzung von Cookies zu.