Erkennung von Anomalien

Detecting Unusual Credit Card Activity

In this recipe, we aggregate credit card transactions for each customer over a two-hour period and join it with the customer’s average credit card spend. If the total credit card spend over the past two hours is more than the average credit card usage of a customer, the account will be flagged as a possible case of credit card theft.

You can try this out for yourself with the code here.

Directions

1. Register the existing transactions topic as a KSQL stream:

CREATE STREAM TRANSACTIONS_RAW (ACCOUNT_ID VARCHAR, 
                                TIMESTAMP VARCHAR, 
                                CARD_TYPE VARCHAR, 
                                AMOUNT DOUBLE, 
                                IP_ADDRESS VARCHAR, 
                                TRANSACTION_ID VARCHAR) 
                           WITH (KAFKA_TOPIC='transactions',
                                 VALUE_FORMAT='JSON');

2. Repartition the stream on account_id, and use Avro for the target stream (this is optional):

CREATE STREAM TRANSACTIONS_SOURCE 
    WITH (VALUE_FORMAT='AVRO') AS 
          SELECT * 
            FROM TRANSACTIONS_RAW 
    PARTITION BY ACCOUNT_ID;

3. Register the existing stream of customer data from Oracle in the topic customers as a KSQL stream:

CREATE STREAM CUST_RAW_STREAM (ID BIGINT, 
                               FIRST_NAME VARCHAR, 
                               LAST_NAME VARCHAR, 
                               EMAIL VARCHAR, 
                               AVG_CREDIT_SPEND DOUBLE) 
              WITH (KAFKA_TOPIC='customers', 
                    VALUE_FORMAT='JSON');

4. Repartition the customer data stream by account_id to prepare for the join, and use Avro for the target stream (this is optional):

CREATE STREAM CUSTOMER_REKEYED 
    WITH (VALUE_FORMAT='AVRO') AS 
            SELECT * 
              FROM CUST_RAW_STREAM 
      PARTITION BY ID;

5. Register the partitioned customer data topic as a KSQL table used for the join with the incoming stream of transactions:

CREATE TABLE customer 
WITH (KAFKA_TOPIC='CUSTOMER_REKEYED', 
      VALUE_FORMAT='AVRO', 
      KEY='ID');

6. Join the transactions to customer information:

CREATE STREAM TRANSACTIONS_ENRICHED AS 
SELECT   T.ACCOUNT_ID, T.CARD_TYPE, T.AMOUNT, 
          C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME, 
          C.AVG_CREDIT_SPEND 
FROM     TRANSACTIONS_SOURCE T 
          INNER JOIN CUSTOMER C 
          ON T.ACCOUNT_ID = C.ID;

7. Aggregate the stream of transactions for each account ID using a two-hour tumbling window, and filter for accounts in which the total spend in a two-hour period is greater than the customer’s average:

CREATE TABLE POSSIBLE_STOLEN_CARD AS 
SELECT   TIMESTAMPTOSTRING(WindowStart(), 'yyyy-MM-dd HH:mm:ss Z') AS WINDOW_START, 
           T.ACCOUNT_ID, T.CARD_TYPE, SUM(T.AMOUNT) AS TOTAL_CREDIT_SPEND, 
           T.FULL_NAME, MAX(T.AVG_CREDIT_SPEND) AS AVG_CREDIT_SPEND 
  FROM     TRANSACTIONS_ENRICHED T 
           WINDOW TUMBLING (SIZE 2 HOURS) 
  GROUP BY T.ACCOUNT_ID, T.CARD_TYPE, T.FULL_NAME 
  HAVING   SUM(T.AMOUNT) > MAX(T.AVG_CREDIT_SPEND) ;

Examine the output:

ksql> SELECT WINDOW_START, ACCOUNT_ID, CARD_TYPE, 
      TOTAL_CREDIT_SPEND, FULL_NAME, AVG_CREDIT_SPEND 
      FROM POSSIBLE_STOLEN_CARD;
2019-01-11 16:00:00 +0000 | 100019 | jcb | 90.69 | Horatius Keefe | 60.58
2019-01-11 16:00:00 +0000 | 100012 | mastercard | 84.04 | Juditha Shwalbe | 53.94
2019-01-11 16:00:00 +0000 | 100016 | maestro | 76.01 | Milo Drewes | 68.33
2019-01-11 16:00:00 +0000 | 100035 | visa-electron | 69.61 | Roxine Furminger | 59.68
…
< Back to the Stream Processing Cookbook

Wir verwenden Cookies, damit wir nachvollziehen können, wie Sie unsere Website verwenden, und um Ihr Erlebnis zu optimieren. Klicken Sie hier, wenn Sie mehr erfahren oder Ihre Cookie-Einstellungen ändern möchten. Wenn Sie weiter auf dieser Website surfen, stimmen Sie unserer Nutzung von Cookies zu.