All around the world, companies are asking the same question: What is happening right now? We are inundated with pieces of data that have a fragment of the answer. But by the time we have assembled them into one clear view, the answer often no longer matters. It is too late.
Stateful stream processing is the way to beat the clock. It’s a programming paradigm that can materialize views of data in real time. What does that mean? It means you ask questions whose answers are incrementally updated as new information arrives. The effect is that your queries will always be fast.
ksqlDB, the event streaming database, makes it easy to build real-time materialized views with Apache Kafka®. But how does it work? Part 1 of this series looked at how stateless operations work. Now we will take a look at stateful ones. If you like, you can follow along by executing the example code yourself. ksqlDB’s quickstart makes it easy to get up and running.
The goal of a materialized view is simple: Make a pre-aggregated, read-optimized version of your data so that queries do less work when they run. Imagine a toll-booth worker that collects fees from cars as they drive by. When the worker wants to know how much money is in the register, there are two different ways to find out. The worker can, of course, count every bill each time. But another way is to maintain a running total, by remembering the current amount, and periodically adding new driver fees.
When does this read-optimized version of your data get built? In a traditional database, you have to trigger it to happen. And when you do, the triggered updates can be slow because every change since the last trigger needs to be integrated. The materialized views might even need to be rebuilt from scratch, which can take a lot of time.
In stream processing, maintenance of the view is automatic and incremental. The view updates as soon as new events arrive and is adjusted in the smallest possible manner based on the delta rather than recomputed from scratch. That is why we say stream processing gives you real-time materialized views. It would be like the toll-worker adding to the running sum immediately after each driver’s fee is collected.
If you had a stream of sensor data:
CREATE STREAM readings ( sensor VARCHAR KEY, area VARCHAR, reading INT ) WITH ( kafka_topic = 'readings', partitions = 2, value_format = 'json' );
Whose events looked like:
INSERT INTO readings (sensor, area, reading) VALUES ('sensor-1', 'wheel', 45); INSERT INTO readings (sensor, area, reading) VALUES ('sensor-2', 'motor', 41); INSERT INTO readings (sensor, area, reading) VALUES ('sensor-1', 'wheel', 92); INSERT INTO readings (sensor, area, reading) VALUES ('sensor-2', 'engine', 13); INSERT INTO readings (sensor, area, reading) VALUES ('sensor-2', 'engine', 90); INSERT INTO readings (sensor, area, reading) VALUES ('sensor-4', 'motor', 95); INSERT INTO readings (sensor, area, reading) VALUES ('sensor-3', 'engine', 67); INSERT INTO readings (sensor, area, reading) VALUES ('sensor-3', 'wheel', 52); INSERT INTO readings (sensor, area, reading) VALUES ('sensor-4', 'engine', 55); INSERT INTO readings (sensor, area, reading) VALUES ('sensor-3', 'engine', 37);
You might want to frequently check the current average of each sensor. You can do that by materializing a view of the stream:
-- process from the beginning of each stream SET 'auto.offset.reset' = 'earliest'; CREATE TABLE avg_readings AS SELECT sensor, AVG(reading) AS avg FROM readings GROUP BY sensor EMIT CHANGES;
What happens when you run this statement on ksqlDB? Its server (we’re just looking at a single node in this post—in a future one we’ll look at how this works when ksqlDB is clustered) creates a new persistent query that runs forever, processing data as it arrives. When each row is read from the
readings stream, the persistent query does two things. First, it incrementally updates the materialized view to integrate the incoming row. Second, it emits a row to a changelog topic. The changelog is an audit trail of all updates made to the materialized view, which we’ll see is handy both functionally and architecturally. Here is what that process looks like:
Pause the animation at any point and note the relationship between the materialized view (yellow box) and the changelog, hovering over the rows in the changelog to see their contents. The current values in the materialized views are the latest values per key in the changelog. For example, notice how the first and third events in partition
0 of the changelog are for key
sensor-1. The third event is a refinement of the first event—the reading changed from
68.5. That refinement causes the average for
sensor-1 to be updated incrementally by factoring in only the new data
A materialized view is only as good as the queries it serves, and ksqlDB gives you two ways to do it: push and pull queries. Both are issued by client programs to bring materialized view data into applications. Pull queries retrieve results at a point in time (namely “now”). If you run a query such as
SELECT * FROM readings WHERE sensor='sensor-1';, the result will be whatever is in the materialized view when it executes. You can explore what that pull query would return by sliding around the progress bar of the animation and inspecting the table below it. By contrast, push queries stream a subscription of query result changes of the query result to the client as they occur. If you run
SELECT * FROM readings WHERE sensor='sensor-1' EMIT CHANGES;, each of the rows in the changelog with key
sensor-1 will be continuously streamed to your application (
68.5, respectively, in this example).
Beyond the programming abstraction, what is actually going on under the hood? When ksqlDB begins executing the persistent query, it leverages RocksDB to store the materialized view locally on its disk. RocksDB is an embedded key/value store that runs in process in each ksqlDB server—you do not need to start, manage, or interact with it. RocksDB is used to store the materialized view because it takes care of all the details of storing and indexing an associative data structure on disk with high performance.
ksqlDB server creates one RocksDB instance per partition of its immediate input streams. This per-partition isolation is an architectural advantage when ksqlDB runs as a cluster, but it does have one important implication—all rows that you want to be aggregated together must reside on the same partition of the incoming stream. What happens if that isn’t the case?
There are many clauses that a materialized view statement can be created with, but perhaps the most common is
GROUP BY. In a relational database,
GROUP BY buckets rows according to some criteria before an aggregation executes. If it is a distributed database, data may need to be moved between nodes so that the node executing the operation has all the data it needs locally. As in relational databases, so in ksqlDB. ksqlDB repartitions your streams to ensure that all rows that have the same key reside on the same partition. This happens invisibility through a second, automatic stage of computation:
In distributed systems, the process of reorganizing data locality is known as shuffling. Kafka Streams, ksqlDB’s underlying execution engine, uses Kafka topics to shuffle intermediate data. These implementation-level topics are usually named
*-repartition and are created, managed, and purged on your behalf. Repartition topics for materialized views have the same number of partitions as their source topics. When records are shuffled across partitions, the overall order of data from each original partition is no longer guaranteed. This is important to consider when you initially load data into Kafka. In general, it is always wise to avoid a shuffle in any system if you can, since there is inherent I/O involved.
If your data is already partitioned according to the
GROUP BY criteria, the repartitioning is skipped. This is one of the huge advantages of ksqlDB’s strong type system on top of Kafka. Optimizations can be inferred from the schema of your data, and unnecessary I/O can be transparently omitted. You don’t need to remember to do these things; they simply happen for you.
The architecture described so far supports a myriad of materializations, but what happens when a hardware fault causes you to permanently lose the ksqlDB server node? RocksDB is an embedded key/value store. It has no replication support to create secondary copies over a network. In other words, RocksDB is treated as a transient resource. When you lose ksqlDB’s server, you also lose RocksDB. Is that a problem?
It turns out that it isn’t. Remember that every time a materialized view updates, the persistent query maintaining it also writes out a row to a changelog topic. Each row contains the value that the materialized view was updated to. When a fresh ksqlDB server comes online and is assigned a stateful task (like a
SUM() aggregation query), it checks to see whether it has any relevant data in RocksDB for that materialized view. If it doesn’t, it replays the changelog data directly into its RocksDB store. When it reaches the end, its local materialized view is up to date, and it can begin serving queries.
The process is the same even if the server boots up and has some prior RocksDB data. When ksqlDB is run as a cluster, another server may have taken over in its place. A ksqlDB server coming online with stale data in RocksDB can simply replay the part of the changelog that is new, allowing it to rapidly recover to the current state.
People often ask where exactly a materialized view is stored. It is, in fact, stored in two places, each of which is optimized for a different usage pattern. It is stored once in RocksDB on ksqlDB’s server in its materialized form for fast access. It is also stored once in Kafka’s brokers in the changelog in incremental update form for durable storage and recovery.
This design can recover from faults, but what happens when the changelog topic grows very large? If this was all there was to it, it would take a long time for a new server to come back online since it would need to load all the changes into RocksDB. The changelog topic, however, is configured for compaction. Compaction is a process that runs in the background on the Kafka broker that periodically deletes all but the latest record per key per topic partition. This means that older updates for each key are periodically deleted, and the changelog shrinks to only the most relevant values. Just as a real-estate agent takes bids for houses, the agent discards all but the highest bid on each home. For the purposes of selling the property, only the current highest bid matters. Lower bids can be discarded.
In practice, reloading a materialized view into ksqlDB tends to look less like the above animation, with many updates per key, and more like the below animation, with only one or a few updates per key. (Note the extra rows added for effect that weren’t present above, like
This approach is powerful because RockDB is highly efficient for bulk writes. ksqlDB continuously streams log data from Kafka over the network and inserts it into RocksDB at high speed.
Many materialized views compound data over time, aggregating data into one value that reflects history. Sometimes, though, you might want to create a materialized view that is just the last value for each key. The solution to this problem is straightforward.
All you do is wrap the column whose value you want to retain with the
LATEST_BY_OFFSET aggregation. To understand what
LATEST_BY_OFFSET is doing, it helps to understand the interface that aggregations have to implement. Aggregation functions have two key methods: one that initializes their state, and another that updates the state based on the arrival of a new row. For example, the
SUM aggregation initializes its total to zero and then adds the incoming value to its running total.
LATEST_BY_OFFSET is a clever function that initializes its state for each key to
null. Each time a new value arrives for the key, its old value is thrown out and replaced entirely by the new value.
This lets you build a materialized view that always reflects the last thing that happened, which is useful for building a recency cache.
As its name suggests, “latest” is defined in terms of offsets—not by time. In a future release, ksqlDB will support the same operation but with order defined in terms of timestamps, which can handle out of order data.
Real-time materialized views are a powerful construct for figuring out what is happening right now. Because they update in an incremental manner, their performance remains fast while also having a strong fault tolerance story.
In the next posts in this series, we’ll look at how fault tolerance, scaling, joins and time work. Until then, there’s no substitute for trying ksqlDB yourself.
Michael Drogalis is Confluent’s stream processing product lead, where he works on the direction and strategy behind all things compute related. Before joining Confluent, Michael served as the CEO of Distributed Masonry, a software startup that built a streaming-native data warehouse. He is also the author of several popular open source projects, most notably the Onyx Platform.