Streaming-ETL

Datenmaskierung

KSQL streaming queries run continuously. You can persist the streaming query output to a Kafka topic by using the KSQL CREATE STREAM AS syntax. KSQL takes a real-time feed of events from one Kafka topic, transforms them and writes them continually to another.

This example shows how to mask streaming data from an inbound topic that contains personally identifiable information (PII) and persist the output to a Kafka topic.

Directions

In this example, a source event stream named purchases is used.

{
  "order_id": 1,
  "customer_name": "Maryanna Andryszczak",
  "date_of_birth": "1922-06-06T02:21:59Z",
  "product": "Nut - Walnut, Pieces",
  "order_total_usd": "1.65",
  "town": "Portland",
  "country": "United States"
}

1. In KSQL, register the purchases stream:

ksql> CREATE STREAM purchases 
      (order_id INT, customer_name VARCHAR, date_of_birth VARCHAR, 
       product VARCHAR, order_total_usd VARCHAR, town VARCHAR, country VARCHAR) 
       WITH (KAFKA_TOPIC='purchases', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

2. Create a derived topic in which all PII is excluded:

ksql> CREATE STREAM PURCHASES_PII_MASKED AS 
      SELECT ORDER_ID, PRODUCT, ORDER_TOTAL_USD, TOWN, COUNTRY 
      FROM PURCHASES;

 Message
----------------------------
 Stream created and running
----------------------------

3. Query the Kafka topic and you will see that it does not contain any PII data:

ksql> DESCRIBE PURCHASES_PII_MASKED;

 Field           | Type
---------------------------------------------
 ROWTIME         | BIGINT           (system)
 ROWKEY          | VARCHAR(STRING)  (system)
 ORDER_ID        | INTEGER
 PRODUCT         | VARCHAR(STRING)
 ORDER_TOTAL_USD | VARCHAR(STRING)
 TOWN            | VARCHAR(STRING)
 COUNTRY         | VARCHAR(STRING)
---------------------------------------------

ksql> PRINT 'PURCHASES_PII_MASKED';
Format:JSON
{"ROWTIME":1525960235832,"ROWKEY":"null","ORDER_ID":1,"COUNTRY":"United States","TOWN":"Portland","PRODUCT":"Nut - Walnut, Pieces","ORDER_TOTAL_USD":"1.65"}
{"ROWTIME":1525960258302,"ROWKEY":"null","ORDER_ID":3,"COUNTRY":"United States","TOWN":"Honolulu","PRODUCT":"Veal - Chops, Split, Frenched","ORDER_TOTAL_USD":"1.59"}
[...]

4. You can also use a variety of MASK functions in KSQL. Here, we retain the customer name and date of birth, but obfuscated:

CREATE STREAM MASKED_PURCHASES AS 
  SELECT  MASK(CUSTOMER_NAME) AS CUSTOMER_NAME, 
          MASK_RIGHT(DATE_OF_BIRTH,12) AS DATE_OF_BIRTH, 
          ORDER_ID, PRODUCT, ORDER_TOTAL_USD, TOWN, COUNTRY 
  FROM PURCHASES;
ksql> SELECT CUSTOMER_NAME, DATE_OF_BIRTH, PRODUCT, ORDER_TOTAL_USD FROM MASKED_PURCHASES LIMIT 1;
Xxxxxx-Xxxxxx | 1908-03-nnXnn-nn-nnX | Langers - Mango Nectar | 5.80
< Back to the Stream Processing Cookbook

Wir verwenden Cookies, damit wir nachvollziehen können, wie Sie unsere Website verwenden, und um Ihr Erlebnis zu optimieren. Klicken Sie hier, wenn Sie mehr erfahren oder Ihre Cookie-Einstellungen ändern möchten. Wenn Sie weiter auf dieser Website surfen, stimmen Sie unserer Nutzung von Cookies zu.