Project Metamorphosis: Wir präsentieren die Event-Streaming-Plattform der nächsten GenerationMehr Erfahren

Real-Time Data Replication with ksqlDB

Data can originate in a number of different sources—transactional databases, mobile applications, external integrations, one-time scripts, etc.—but eventually it has to be synchronized to a central data warehouse for analysis and decision making. At Bolt, one of the leading ride-hailing apps in Europe and Africa, we use ksqlDB in our data warehouse stream processing pipeline to power every decision we make.

ksqlDB Data Warehouse Stream Process Pipeline

Having a central data warehouse allows analysts, scientists, developers, operation specialists, and other decision-makers to easily get the information they need by querying a single source of data. Warehouse users can compare numbers over different lines of business and time periods, as well as calculate aggregates and make predictions on top of it. In order to move data between systems and increase availability, we need data replication. Data replication is the process of copying data from one server to another. Ideally, a data replication pipeline should satisfy the following requirements:

  • The data has to be 100% consistent with its source. No data loss or corruption along the way is acceptable.
  • Replication has to be resilient to failures of different nature, including the source system being unavailable and replication machine failure.
  • Replication has to be scalable. Bolt is growing fast, and so are the volumes of data we process. One of the main requirements for our replication pipeline was the ability to facilitate 10x growth of data.
  • There should be a way to manipulate the data. Some fields might need to be excluded from replication completely, some need to be obfuscated, and some need to be added. Bolt must comply with European privacy regulations like GDPR at the ever-increasing scale of our data. This, for example, includes protecting users’ personally identifiable information.
  • It should support data denormalization, such as substituting IDs with real values or joining separate data sources into a single one.
  • It should be easy to maintain and integrate with existing infrastructure, which in our case consisted of Apache Kafka® as the main component.

We initially considered several different solutions and frameworks: ksqlDB, Apache Flink®, Apache Spark™, Apache Storm, Kafka Streams applications, all the way up to writing a replication service ourselves. ksqlDB had the honor of being a first-to-try candidate from that list because it is native to the Kafka ecosystem.

And it turned out to be a perfect fit for our use case. Let’s imagine we have a Kafka consumer reading a set of topics and ingesting data into a warehouse. In most of the cases, you do not want it to read source topics containing raw data, so you need data to be available in another topic. This is exactly what ksqlDB does for you. Here, you can see how easy it is to replicate data from source_topic to destination_topic, which in our case, is later ingested into data warehouse:

  1. Create a stream reading data from source_topic containing raw data:
    CREATE STREAM source_topic_stream (
        ROWKEY BIGINT KEY,
        created_at VARCHAR,
        data VARCHAR,
        user_id BIGINT
    ) WITH (
        kafka_topic = 'source_topic',
        value_format = 'json'
    );
    
  2. Create a stream populating destination_topic with selected data from a source stream:
    CREATE STREAM destination_topic_stream WITH (
        kafka_topic = 'destination_topic',
        value_format = 'json'
    ) AS
    SELECT rowkey, user_id, created
    FROM source_topic_stream
    EMIT CHANGES;
    
  3. Make your consumer read data from destination_topic

You can repeat this process for every topic whose data you want to propagate to a warehouse or any other system. At Bolt, we’ve found ksqlDB to be a particularly good fit for replicating all our data to a warehouse due to a number of reasons.

ksqlDB is natively integrated into the Kafka ecosystem

ksqlDB is essentially built on top of the Kafka consumer and producer protocols, providing all guarantees that native clients have. It is also extremely easy to integrate with Kafka.

ksqlDB is easy to deploy and manage

ksqlDB is deployed as a separate cluster of machines completely decoupled from the main Kafka cluster. It supports two deployment modes (interactive and headless), both of which are quite easy to deploy, maintain, and scale.

ksqlDB is linearly scalable

ksqlDB has separate cluster deployments and is built on top of native Kafka consumers and producers, meaning it can easily scale out horizontally. You can add or remove nodes from a ksqlDB cluster without affecting the main Kafka brokers and without need to reconfigure anything.

ksqlDB natively integrates with Confluent Schema Registry allowing for strict data type checks

Using the Apache Avro™ data format for your Kafka messages automatically creates “documentation” for them. Every topic has its schema defined and available for every other process to use. In the context of ksqlDB, this also prevents users from having to specify types for every field in a stream.

Instead of:

CREATE STREAM source_topic_stream (
    ROWKEY BIGINT KEY,
    created_at VARCHAR,
    data VARCHAR,
    user_id BIGINT
) WITH (
    kafka_topic = 'source_topic_name',
    value_format = 'json'
);

…users can simply write:

CREATE STREAM source_topic_stream WITH (
    kafka_topic = 'source_topic_name',
    value_format = 'avro',
    key = 'id'
);

The ksqlDB engine will contact Schema Registry and resolve the given topic’s schema automatically.

ksqlDB’s query language is declarative

You can explicitly specify fields you want to extract from each stream. This makes it easy to support and update streams:

CREATE STREAM source_topic_stream WITH (
    kafka_topic = 'source_topic_name',
    value_format = 'avro',
    key = 'id'
);

CREATE STREAM destination_topic_stream WITH (
    kafka_topic = 'destination_topic_name',
    value_format = 'avro'
) AS
SELECT id, created_at, updated_at, data
FROM source_topic_stream
EMIT CHANGES;

ksqlDB is extensible with custom user-defined functions (UDFs)

If you need any functionality not currently provided by ksqlDB runtime, you can write plain Java class and plug it into a ksqlDB engine.

import java.math.BigDecimal;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;

@UdfDescription(name = "get_city_for_coordinates", description = "Given latitude and longitude, returns name of a city those coordinates are located in, or N/A in case no matches found")
public class GetCityForCoordinates {

    @Udf(description = "Get city name from given latitude and longitude")
    public String getCityForCoordinates(
            @UdfParameter(description = "Latitude") final BigDecimal lat, @UdfParameter(description = "Longitude") final BigDecimal lng
    ) {
        // Some implementation logic here
    }

}

Then, you can use it later:

SELECT id, GET_CITY_FOR_COORDINATES(lat, lng)
FROM source_stream ...;

The ksqlDB DSL is perfect for doing data manipulations

You can easily filter your data:

CREATE STREAM source_topic_stream WITH (
    kafka_topic = 'source_topic_name',
    value_format = 'avro',
    key = 'id'
);

CREATE STREAM destination_topic_stream WITH (
    kafka_topic = 'destination_topic_name',
    value_format = 'avro'
) AS
SELECT id, created_at, data, amount
FROM source_topic_stream
WHERE amount <= 100
EMIT CHANGES;

In addition, you can replicate a single data source to several destinations based on distribution criteria:

CREATE STREAM customers_stream WITH (
    kafka_topic = 'customers',
    value_format = 'avro',
    key = 'id'
);

CREATE STREAM customers_male_stream WITH (
    kafka_topic = 'customers_male',
    value_format = 'avro'
) AS
SELECT id, created_at, age, name, customer_account_id
FROM customers_stream
WHERE gender = 'M'
EMIT CHANGES;

CREATE STREAM customers_female_stream WITH (
    kafka_topic = 'customers_female',
    value_format = 'avro'
) AS
SELECT id, created_at, age, name, customer_account_id
FROM customers_stream
WHERE gender = 'F'
EMIT CHANGES;

Or, you can merge several source data streams into a single destination stream:

CREATE STREAM purchases_online_stream WITH (
    kafka_topic = 'purchases_online',
    value_format = 'avro',
    key = 'id'
);

CREATE STREAM purchases_offline_stream WITH (
    kafka_topic = 'purchases_offline',
    value_format = 'avro',
    key = 'id'
);

CREATE STREAM purchases_total_stream WITH (
    kafka_topic = 'purchases_total',
    value_format = 'avro'
) AS
SELECT id, created_at, item_id, item_price, customer_id, 'online' as "purchase_channel"
FROM purchases_online_stream
EMIT CHANGES;

INSERT INTO purchases_total_stream
SELECT id, created_at, item_id, item_price, customer_id, 'offline' as "purchase_channel"
FROM purchases_offline_stream;

In this example, we have also added an artificial column that specifies which purchase channel was used (online or offline), making it possible to backtrace its source.

It is very easy to add, modify, and remove streams in ksqlDB

If a new topic needs to be replicated, there is no need to write a new Java-based Kafka Streams application, compile, and deploy it. ksqlDB does all this under the hood automatically. All you need to do is provide a new ksqlDB statement to the engine.

It’s quite the ride!

ksqlDB’s functionalities allow us to do pretty much any manipulation with data one can think of. Half a year ago, we migrated all replication dataflows to ksqlDB and never regretted having done it. As of the time of this writing, we are replicating over 500 topics to our warehouse, and this number is growing every day. All our use cases are essentially stateless at the moment, but we are also considering using ksqlDB for different types of stateful stream processing scenarios. In my next blog post, I will describe how we have adopted the Confluent Platform for our broader data replication pipeline.

Get started with ksqlDB

Ready to check ksqlDB out? Head over to ksqldb.io to get started, where you can follow the quick start, read the docs, and learn more!

Ruslan Gibaiev is an experienced software engineer and leader who has worked for both big corporations and hyper-growth startups across fintech, AR/VR, internet search, and micromobility industries. His current interest is in stream processing and all things data. Ruslan likes solving challenging problems and sharing his experience by talking at conferences and meetups.

Did you like this blog post? Share it now

Subscribe to the Confluent blog

More Articles Like This

Project Metamorphosis Month 5: Global Event Streaming in Confluent Cloud

This is the fifth month of Project Metamorphosis: an initiative that addresses the manual toil of running Apache Kafka® by bringing the best characteristics of modern cloud-native data systems to

How Bolt Has Adopted Change Data Capture with Confluent Platform

This article describes why Bolt, the leading European on-demand transportation platform that operates across ride-hailing, food delivery, and e-mobility sectors, has journeyed toward adopting change data capture (CDC). Adopting CDC

The Curious Incident of the State Store in Recovery in ksqlDB

When operating cloud infrastructure, “time is money” is more than a cliché—it is interpreted literally as every processing second stacks up on the monthly bill. ksqlDB strives to reduce these