Erkennung von Anomalien

Verarbeitung von Syslog-Daten: Mustererkennung und Warnungen

KSQL can enrich and filter syslog data to reveal particular conditions or events. syslog is a standard technology that applications can use to send log messages to local files or remote servers. With KSQL, you can filter and react to events in real time rather than performing historical analysis of syslog data from cold storage.

Because data flows in from numerous servers and network devices, the volume of messages can be large. In this example, KSQL is used to detect and alert for patterns in the data.

Environment 4.1 or higher

Directions

In this example, the source event stream is named syslog.

1. Register the existing syslog topic for use as a KSQL Stream called syslog:

 CREATE STREAM syslog \
 (TYPE VARCHAR, HOST VARCHAR, MESSAGE VARCHAR, SEVERITY INT, TAG VARCHAR, FACILITY INT, REMOTEADDRESS VARCHAR, DATE BIGINT) \
 WITH (KAFKA_TOPIC='syslog', VALUE_FORMAT='JSON');

2. Inspect the first few messages as they arrive:

 ksql> SELECT HOST, TAG, MESSAGE FROM SYSLOG LIMIT 20;

 rpi-02 | CRON | pam_unix(cron:session): session opened for user smmsp by (uid=0)
 rpi-02 | /USR/SBIN/CRON | (smmsp) CMD (test -x /etc/init.d/sendmail && /usr/share/sendmail/sendmail cron-msp)
 [...]

3. It’s easy to filter out noise:

 ksql> SELECT HOST, TAG, MESSAGE FROM SYSLOG \
         WHERE TAG !='CRON' \
         AND TAG !='/USR/SBIN/CRON' \
         LIMIT 20;

 rpi-02 | minissdpd | device not found for removing : uuid:RKU-42XXX-1GU4A6067130::upnp:rootdevice
 rpi-02 | minissdpd | device not found for removing : uuid:RKU-42XXX-1GU4A6067130
 [...]

4. It’s also easy to filter for just specific types of message; in this example, include only SSH connections:

CREATE TABLE hosts_with_error_sla_breach AS \
 SELECT HOST, TAG, MESSAGE FROM SYSLOG \
 WHERE TAG ='sshd' \
 LIMIT 20;

 rpi-03 | sshd | Invalid user xbmc from 186.249.209.22
 rpi-03 | sshd | input_userauth_request: invalid user xbmc [preauth]

5. Create a Kafka topic of just SSH connections, populated in real time from the source syslog topic:

 CREATE STREAM SYSLOG_SSHD AS \
 SELECT * FROM SYSLOG \
 WHERE TAG ='sshd';

6. Create a Kafka topic of SSH brute-force attempts, daisy chained from the first:

 CREATE STREAM SYSLOG_SSHD_BRUTEFORCE_ATTACK AS \
 SELECT HOST, TAG, MESSAGE FROM SYSLOG_SSHD \
 WHERE MESSAGE LIKE 'Invalid user%';

7. Observe that there are now two new topics created, each of which contains live feeds of derived syslog data based on the predicate specified:

 ksql> LIST TOPICS;

 Kafka Topic                   | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
 -----------------------------------------------------------------------------------------------------------
 syslog                        | true       | 1          | 1                  | 2         | 2
 SYSLOG_SSHD                   | true       | 4          | 1                  | 0         | 0
 SYSLOG_SSHD_BRUTEFORCE_ATTACK | true       | 4          | 1                  | 0         | 0

Wir verwenden Cookies, damit wir nachvollziehen können, wie Sie unsere Website verwenden, und um Ihr Erlebnis zu optimieren. Klicken Sie hier, wenn Sie mehr erfahren oder Ihre Cookie-Einstellungen ändern möchten. Wenn Sie weiter auf dieser Website surfen, stimmen Sie unserer Nutzung von Cookies zu.