Streaming-ETL

Sysmon Event Processing

KSQL can be used to process and enrich security events in real time, providing extra context for security analysts during investigations. Sysmon is a free tool for Windows endpoints that allows the collection of valuable security events that have a direct relationship with each other via a property called the ProcessGUID. Join statements can be performed via KSQL queries in real time.

In this recipe, we will show you how to join two specific Sysmon events in order to expedite the analysis of data and provide more context to the data collected while hunting for specific adversarial techniques.

Directions

1. Configure KSQL to process all data that exists in a topic (and not just from the current point onwards, which is the default):

ksql> SET 'auto.offset.reset' = 'earliest';

2. Register the Kafka topic that contains Sysmon event logs as a stream and name it WINLOGBEAT_STREAM. In this case, the available Kafka topic was named winlogbeat. Remember that we are performing a join between Sysmon events 1 and 3, and the data is in JSON format. Therefore, we only need to specify the column names of the two Sysmon events (1 and 3) and define nested fields via the STRUCT data type.

ksql> CREATE STREAM WINLOGBEAT_STREAM \
            (source_name VARCHAR, \
            type VARCHAR, \
            task VARCHAR, \
            log_name VARCHAR, \
            computer_name VARCHAR, \
            event_data STRUCT< \
                UtcTime VARCHAR, \
                ProcessGuid VARCHAR, \
                ProcessId INTEGER, \
                Image VARCHAR, \
                FileVersion VARCHAR, \
                Description VARCHAR, \
                Product VARCHAR, \
                Company VARCHAR, \
                CommandLine VARCHAR, \
                CurrentDirectory VARCHAR, \
                User VARCHAR, \
                LogonGuid VARCHAR, \
                LogonId VARCHAR, \
                TerminalSessionId INTEGER, \
                IntegrityLevel VARCHAR, \
                Hashes VARCHAR, \
                ParentProcessGuid VARCHAR, \
                ParentProcessId INTEGER, \
                ParentImage VARCHAR, \
                ParentCommandLine VARCHAR, \
                Protocol VARCHAR, \
                Initiated VARCHAR, \
                SourceIsIpv6 VARCHAR, \
                SourceIp VARCHAR, \
                SourceHostname VARCHAR, \
                SourcePort INTEGER, \
                SourcePortName VARCHAR, \
                DestinationIsIpv6 VARCHAR, \
                DestinationIp VARCHAR, \
                DestinationHostname VARCHAR, \
                DestinationPort INTEGER, \
                DestinationPortName VARCHAR>, \
            event_id INTEGER) \
WITH (KAFKA_TOPIC='winlogbeat', VALUE_FORMAT='JSON');

Optionally, query a sample of the data:

ksql> SELECT SOURCE_NAME , COMPUTER_NAME , EVENT_DATA->PARENTIMAGE, EVENT_DATA->PRODUCT  \
        FROM WINLOGBEAT_STREAM \
        WHERE EVENT_ID =1 OR EVENT_ID = 3 \
        LIMIT 2;
Microsoft-Windows-Sysmon | MSEDGEWIN10 | C:\Windows\System32\svchost.exe | Microsoft® Windows® Operating System
Microsoft-Windows-Sysmon | MSEDGEWIN10 | C:\Windows\System32\wuauclt.exe | Microsoft Malware Protection

3. Rekey the WINLOGBEAT_STREAM stream so that the key is correct for subsequent join operations. The rekey is on the process_guid field since that’s the common property with a unique value between Sysmon events 1 and 3. It’s done using the PARTITION BY command.

Note that this operation also flattens the nested event_data field.

ksql> CREATE STREAM WINLOGBEAT_STREAM_REKEY \
            WITH (VALUE_FORMAT='JSON', \
                  PARTITIONS=1, \
                  TIMESTAMP='event_date_creation') \
            AS SELECT \
            STRINGTOTIMESTAMP(event_data->UtcTime, 'yyyy-MM-dd HH:mm:ss.SSS') AS event_date_creation, \
            event_data->ProcessGuid AS process_guid, \
            event_data->ProcessId AS process_id, \
            event_data->Image AS process_path, \
            event_data->FileVersion AS file_version, \
            event_data->Description AS file_description, \
            event_data->Company AS file_company, \
            event_data->CommandLine AS process_command_line, \
            event_data->CurrentDirectory AS process_current_directory, \
            event_data->User AS user_account, \
            event_data->LogonGuid AS user_logon_guid, \
            event_data->LogonId AS user_logon_id, \
            event_data->TerminalSessionId AS user_session_id, \
            event_data->IntegrityLevel AS process_integrity_level, \
            event_data->Hashes AS hashes, \
            event_data->ParentProcessGuid AS parent_process_guid, \
            event_data->ParentProcessId AS parent_process_id, \
            event_data->ParentImage AS parent_process_path, \
            event_data->ParentCommandLine AS parent_process_command_line, \
            event_data->Protocol AS network_protocol, \
            event_data->Initiated AS network_connection_initiated, \
            event_data->SourceIsIpv6 AS src_is_ipv6, \
            event_data->SourceIp AS src_ip_addr, \
            event_data->SourceHostname AS src_host_name, \
            event_data->SourcePort AS src_port, \
            event_data->SourcePortName AS src_port_name, \
            event_data->DestinationIsIpv6 AS dst_is_ipv6, \
            event_data->DestinationIp AS dst_ip_addr, \
            event_data->DestinationHostname AS dst_host_name, \
            event_data->DestinationPort AS dst_port, \
            event_data->DestinationPortName AS dst_port_name, \
            event_id, \
            source_name, \
            log_name \
    FROM WINLOGBEAT_STREAM \
    WHERE source_name='Microsoft-Windows-Sysmon' \
    PARTITION BY process_guid;

4. Using the rekeyed stream, create a stream with just the Sysmon ProcessCreate events (event_id=1):

ksql> CREATE STREAM SYSMON_PROCESS_CREATE \
            WITH (VALUE_FORMAT='JSON', \
                  PARTITIONS=1, \
                  TIMESTAMP='event_date_creation') \
            AS SELECT event_date_creation, \
                        process_guid, \
                        process_id, \
                        process_path, \
                        file_version, \
                        file_description, \
                        file_company, \
                        process_command_line, \
                        process_current_directory, \
                        user_account, \
                        user_logon_guid, \
                        user_logon_id, \
                        user_session_id, \
                        process_integrity_level, \
                        hashes, \
                        parent_process_guid, \
                        parent_process_id, \
                        parent_process_path, \
                        parent_process_command_line, \
                        event_id, \
                        source_name, \
                        log_name \
            FROM WINLOGBEAT_STREAM_REKEY \
            WHERE event_id=1;

Optionally, query a sample of this data. Note that it is just ProcessCreate events.

ksql> SELECT PROCESS_PATH, FILE_DESCRIPTION, USER_ACCOUNT  \
        FROM SYSMON_PROCESS_CREATE \
        LIMIT 2;
C:\Windows\System32\wuauclt.exe | Windows Update | NT AUTHORITY\SYSTEM
C:\Windows\SoftwareDistribution\Download\Install\AM_Engine_Patch_1.1.15500.2.exe | AntiMalware Definition Update | NT AUTHORITY\SYSTEM

5. From this derived Sysmon ProcessCreate stream (and underlying topic), now declare a KSQL table. We define the Sysmon ProcessCreate events as a table because for each key (process_guid), we want to know its current values (process_name, process_command_line, hashes, etc.) for when we subsequently join them with NetworkCreate events that have the same process_guid value.

ksql> CREATE TABLE SYSMON_PROCESS_CREATE_TABLE \
            (event_date_creation VARCHAR, \
            process_guid VARCHAR, \
            process_id INTEGER, \
            process_path VARCHAR, \
            file_version VARCHAR, \
            file_description VARCHAR, \
            file_company VARCHAR, \
            process_command_line VARCHAR, \
            process_current_directory VARCHAR, \
            user_account VARCHAR, \
            user_logon_guid VARCHAR, \
            user_logon_id VARCHAR, \
            user_session_id INTEGER, \
            process_integrity_level VARCHAR, \
            hashes VARCHAR, \
            parent_process_guid VARCHAR, \
            parent_process_id INTEGER, \
            parent_process_path VARCHAR, \
            parent_process_command_line VARCHAR, \
            event_id INTEGER, \
            source_name VARCHAR, \
            log_name VARCHAR) \
        WITH (KAFKA_TOPIC='SYSMON_PROCESS_CREATE', \
                VALUE_FORMAT='JSON', \
                KEY='process_guid');

6. Using the previously rekeyed stream, create a stream with just the Sysmon NetworkConnect events (event_id=3):

ksql> CREATE STREAM SYSMON_NETWORK_CONNECT \
            WITH (VALUE_FORMAT='JSON', \
                  PARTITIONS=1, \
                  TIMESTAMP='event_date_creation') \
            AS SELECT event_date_creation, \
                        process_guid, \
                        process_id, \
                        process_path, \
                        user_account, \
                        network_protocol, \
                        network_connection_initiated, \
                        src_is_ipv6, \
                        src_ip_addr, \
                        src_host_name, \
                        src_port, \
                        src_port_name, \
                        dst_is_ipv6, \
                        dst_ip_addr, \
                        dst_host_name, \
                        dst_port, \
                        dst_port_name, \
                        event_id, \
                        source_name, \
                        log_name \
            FROM WINLOGBEAT_STREAM_REKEY \
            WHERE event_id=3;

Optionally, query a sample of this data. Note that it is just NetworkConnect events.

ksql> SELECT PROCESS_PATH , SRC_HOST_NAME , DST_IP_ADDR \
        FROM SYSMON_NETWORK_CONNECT \
        LIMIT 2;
C:\Windows\System32\svchost.exe | MSEDGEWIN10.moffatt.me | 40.77.229.141
C:\Windows\System32\svchost.exe | MSEDGEWIN10.moffatt.me | 104.103.114.93

7. Join the NetworkConnect event stream with the lookups against the ProcessCreate. This enriched stream is persisted to a new Kafka topic called SYSMON_JOIN.

ksql> CREATE STREAM SYSMON_JOIN WITH (PARTITIONS=1) AS \
        SELECT N.EVENT_DATE_CREATION, \
                N.PROCESS_GUID, \
                N.PROCESS_ID, \
                N.PROCESS_PATH, \
                N.USER_ACCOUNT, \
                N.NETWORK_PROTOCOL, \
                N.NETWORK_CONNECTION_INITIATED, \
                N.SRC_IS_IPV6, \
                N.SRC_IP_ADDR,\
                N.SRC_HOST_NAME, \
                N.SRC_PORT, \
                N.SRC_PORT_NAME, \
                N.DST_IS_IPV6, \
                N.DST_IP_ADDR, \
                N.DST_HOST_NAME, \
                N.DST_PORT, \
                N.DST_PORT_NAME, \
                N.SOURCE_NAME, \
                N.LOG_NAME,\
                P.PROCESS_COMMAND_LINE, \
                P.HASHES, \
                P.PARENT_PROCESS_PATH, \
                P.PARENT_PROCESS_COMMAND_LINE, \
                P.USER_LOGON_GUID, \
                P.USER_LOGON_ID, \
                P.USER_SESSION_ID, \
                P.PROCESS_CURRENT_DIRECTORY, \
                P.PROCESS_INTEGRITY_LEVEL, \
                P.PARENT_PROCESS_GUID, \
                P.PARENT_PROCESS_ID \
        FROM SYSMON_NETWORK_CONNECT N \
                INNER JOIN SYSMON_PROCESS_CREATE_TABLE P \
                ON N.PROCESS_GUID = P.PROCESS_GUID;

Optionally, query a sample of the joined data. Note that for each network event you have information about the process responsible:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss'), SRC_HOST_NAME , DST_IP_ADDR , \
             PROCESS_COMMAND_LINE , PARENT_PROCESS_COMMAND_LINE , N_USER_ACCOUNT  \
        FROM SYSMON_JOIN;
2019-02-13 10:58:30 | MSEDGEWIN10.moffatt.me | 40.83.74.46 | c:\windows\system32\svchost.exe -k netsvcs -p -s PushToInstall | C:\Windows\system32\services.exe | NT AUTHORITY\SYSTEM
2019-02-13 10:59:54 | MSEDGEWIN10.moffatt.me | 2.21.186.132 | c:\windows\system32\svchost.exe -k unistacksvcgroup -s WpnUserService | C:\Windows\system32\services.exe | MSEDGEWIN10\IEUser
2019-02-13 11:00:10 | MSEDGEWIN10.moffatt.me | 204.79.197.200 | "C:\Windows\system32\backgroundTaskHost.exe" -ServerName:CortanaUI.AppXy7vb4pc2dr3kc93kfc509b1d0arkfb2x.mca | C:\Windows\system32\svchost.exe
2019-02-13 11:00:10 | MSEDGEWIN10.moffatt.me | 204.79.197.200 | "C:\Windows\SystemApps\Microsoft.Windows.Cortana_cw5n1h2txyewy\SearchUI.exe" -ServerName:CortanaUI.AppXa50dqqa5gqv4a428c9y1jjw7m3btvepj.mca |
2019-02-13 11:00:27 | MSEDGEWIN10.moffatt.me | 13.107.3.128 | "C:\Program Files\WindowsApps\Microsoft.MicrosoftOfficeHub_17.10314.31700.1000_x64__8wekyb3d8bbwe\Office16\OfficeHubTaskHost.exe" -ServerName:M
2019-02-13 11:00:42 | MSEDGEWIN10.moffatt.me | 2.21.186.81 | "C:\Users\IEUser\AppData\Local\Microsoft\OneDrive\OneDrive.exe" /background | C:\Windows\Explorer.EXE | MSEDGEWIN10\IEUser
< Back to the Stream Processing Cookbook

Wir verwenden Cookies, damit wir nachvollziehen können, wie Sie unsere Website verwenden, und um Ihr Erlebnis zu optimieren. Klicken Sie hier, wenn Sie mehr erfahren oder Ihre Cookie-Einstellungen ändern möchten. Wenn Sie weiter auf dieser Website surfen, stimmen Sie unserer Nutzung von Cookies zu.