Confluent
Building a Microservices Ecosystem with Kafka Streams and KSQL
Microservices

Building a Microservices Ecosystem with Kafka Streams and KSQL

Ben Stopford

Today, we invariably operate in ecosystems: groups of applications and services which together work towards some higher level business goal. When we make these systems event-driven they come with a number of advantages. The first is the idea that we can rethink our services not simply as a mesh of remote requests and responses—where services call each other for information or tell each other what to do—but as a cascade of notifications, decoupling each event source from its consequences.

The second comes from the realisation that these events are themselves facts: a narrative that not only describes the evolution of your business over time, it also represents a dataset in its own right—your orders, your payments, your customers, or whatever they may be. We can store these facts in the very infrastructure we use to broadcast them, linking applications and services together with a central data-plane that holds shared datasets and keeps services in sync.

These insights alone can do much to improve speed and agility, but the real benefits of streaming platforms come when we embrace not only the messaging backbone but the stream processing API itself. Such streaming services do not hesitate, they do not stop. They focus on the now—reshaping, redirecting, and reforming it; branching substreams, recasting tables, rekeying to redistribute and joining streams back together again. So this is a model that embraces parallelism not through brute force, but instead by sensing the natural flow of the system and morphing it to its whim.

Life is a series of natural and spontaneous changes. Don’t resist them – that only creates sorrow. Let reality be reality. Let things flow naturally forward.

Lao-Tzu

So we can use the Kafka Streams API to piece together complex business systems as a collection of asynchronously executing, event-driven services. The differentiator here is the API itself, which is far richer than, say, the Kafka Producer or Consumer. It makes code more readable, provides reusable implementations of common patterns like joins, aggregates, and filters and wraps the whole ecosystem with a transparent level of correctness.

Systems built in this way, in the real world, come in a variety of guises. They can be fine grained and fast executing, completing in the context of an HTTP request, or complex and long-running, manipulating the stream of events that map a whole company’s business flow. This post focusses on the former, building up a real-world example of a simple order management system that executes within the context of a HTTP request, and is entirely built with Kafka Streams. Each service is a small function, with well-defined inputs and outputs. As we build this ecosystem up, we will encounter problems such as blending streams and tables, reading our own writes, and managing consistency in a distributed and autonomous environment. Here is a picture of where we will end up (just to whet your appetite):

O’Reilly Book: Designing Event Driven Systems
Explore all these concepts in detail with the free O’Reilly book “Designing Event Driven Systems. Concepts and Patterns for Streaming Services with Apache Kafka”

Your System has State: So Let’s Deal with It

Building stateless services is widely considered a good idea. They can be scaled out, cookie-cutter-style, freed from the burdensome weight of loading data on startup. Webservers are a good example of this: to increase the capacity for generating dynamic content a web tier can be scaled horizontally, simply by adding new servers. So why would we want anything else? The rub is that most applications need state of some form, and this needs to live somewhere, so the system ends up bottlenecking on the data layer—often a database—sat at the other end of a network connection.

In distributed architectures like microservices, this problem is often more pronounced as data is spread throughout the entire estate. Each service becomes dependent on the worst case performance and liveness of all the services it connects to. Caching provides a respite from this, but caching has issues of its own: invalidation, consistency, not knowing what data isn’t cached, etc.

Streaming platforms come at this problem from a slightly different angle. Services can be stateless or stateful as they choose, but it’s the ability of the platform to manage statefulness—which means loading data into services—that really differentiates the approach. So why would you want to push data into your services? The answer is that it makes it easier to perform more data intensive operations efficiently (the approach was invented to solve ultra-high-throughput streaming problems after all).

To exemplify this point, imagine we have a user interface that allows users to browse Order, Payment, and Customer information in a scrollable grid. As the user can scroll through the items displayed, response time for each row needs to be snappy.

In a traditional, stateless model each row on the screen would require a call to all three services. This would be sluggish in practice, so caching would likely be added, along with some hand-crafted polling mechanism to keep the cache up to date.

But in the streaming approach, we can materialise the data locally via the API. We define a query for the data in our grid: “select * from orders, payments, customers where…” and the API executes it, stores it locally, keeps it up to date, and ensures it’s highly available should your service fail (this approach is discussed in more detail here).

To combat the challenges of being stateful, Kafka ships with a range of features to make the storage, movement, and retention of state practical: notably standby replicas and disk checkpoints to mitigate the need for complete rebuilds, and compacted topics to reduce the size of datasets that need to be moved.

So instead of pushing the data problem down a layer, stream processors are proudly stateful. They make data available wherever it is needed, throughout the ecosystem. This increases performance. It also increases autonomy. No remote calls needed!

Of course being stateful is always optional, and you’ll find that many services you build don’t require state. In the ecosystem we develop as part of this post; two are stateless and two are stateful, but the important point is that whether your services are stateless or stateful, a streaming platform provisions for both.

Coordination-Free by Design

One important implication of pushing data into many different services is we can’t manage consistency in the same way. We will have many copies of the same data embedded in different services which, if they were writable, could lead to collisions and inconsistency.

As this notion of ‘eventual consistency’ is often undesirable in business applications, the solution is to isolate consistency concerns (i.e. write operations) via the single writer principle. For example, the Orders Service would own how an Order evolves in time. Each downstream service then subscribes to the strongly ordered stream of events produced by this service, which they observe from their own temporal viewpoint.

This adds an important degree of ‘slack’ into the system, decoupling services from one another in time and making it easier for them to scale and evolve independently. We will walk through an example of how this works in practice later in the post (the Inventory Service), but first we need to look at the mechanics and tooling used to sew these ecosystems together.

Using Kafka Streams & KSQL to Build a Simple Email Service

Kafka Streams is the core API for stream processing on the JVM: Java, Scala, Clojure etc. It is based on a DSL (Domain Specific Language) that provides a declaratively-styled interface where streams can be joined, filtered, grouped or aggregated using the DSL itself. It also provides functionally-styled mechanisms — map, flatMap, transform, peek, etc.—for adding bespoke processing of messages one at a time. Importantly you can blend these two approaches together in the services you build, with the declarative interface providing a high level abstraction for SQL-like operations and the more functional methods adding the freedom to branch out into any arbitrary code you may wish to write.

But what if you’re not running on the JVM? In this case, you’d use KSQL. KSQL provides a simple, interactive SQL interface for stream processing and can be run standalone, for example via the Sidecar Pattern. KSQL utilises the Kafka Streams API under the hood, meaning we can use it to do the same kind of declarative slicing and dicing we might do in JVM code using the Streams API. Then a native Kafka client, in whatever language our service is built in, can process the manipulated streams one message at a time. Whichever approach we take, these tools let us model business operations in an asynchronous, non-blocking, and coordination-free manner.

Let’s consider something concrete. Imagine we have a service that sends emails to platinum-level clients (this fits into the ecosystem at the top of the above system diagram). We can break this problem into two parts: firstly we prepare by joining a stream of Orders to a table of Customers and filtering for the ‘Platinum’ clients. Secondly we need code to construct and send the email itself. We would do the former in the DSL and the latter with a per-message function:

//Filter input stream then send Email
orders.join(customers, Tuple::new)                            //join customers and orders
 .filter((k, tuple) → tuple.customer.level().equals(PLATINUM) //filter platinum customers
 && tuple.order.state().equals(CONFIRMED))                    //only consider confirmed orders
 .peek((k, tuple) → emailer.sendMail(tuple));                 //send email for each cust/order tuple

A more fully-fledged Email service can be found in the microservice code examples. We also extend this Transform/Process pattern later in the Inventory Service example.

An equivalent operation can be performed, off the JVM, using KSQL. The pattern is the same: the event stream is dissected with a declarative statement, then processed one record at a time. Here we implement the emailer in Node.js with KSQL running via the Sidecar Pattern:  

//Execute query in KSQL sidecar to filter stream
ksql> CREATE STREAM orders (ORDERID string, ORDERTIME bigint...) WITH (kafka_topic='orders', value_format='JSON');
ksql> CREATE STREAM platinum_emails as select * from orders, customers where client_level == ‘PLATINUM’ and state == ‘CONFIRMED’;
 
//In Node.js service send Email
var nodemailer = require('nodemailer');
…
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client(),
consumer = new Consumer(client, [ { topic: 'platinum_emails', partition: 0 } ] );

consumer.on('message', function (orderConsumerTuple) {
  sendMail(orderConsumerTuple);
});

The Tools of the Trade: Windows, Tables & State Stores

Before we develop more complex services let’s take a look at the stateful elements of the Kafka Streams API. Kafka Streams needs its own local storage for a few different reasons. The most obvious is for buffering, as unlike in a traditional database—where the data is laid out historically and can be ordered—in a streaming context events occur in real time, so there is less control over when, and in what order, they may turn up.

Let’s use a few variants on the email example. Imagine you want to send an email that confirms payment of a new order. We know that an Order and its corresponding Payment will turn up at around the same time, but we don’t know for sure which will come first or exactly how far apart they may be. We can put an upper limit on this though—let’s say an hour to be safe.

To avoid doing all of this buffering in memory, Kafka Streams implements disk-backed State Stores to overflow the buffered streams to disk (think of this as a disk-resident hashtable). So each stream is buffered in this State Store, keyed by its message key. Thus, regardless of which event turns up later, the corresponding event can be quickly retrieved.

Kafka Streams takes this same concept a step further to manage whole tables. Tables are a local manifestation of a complete topic—usually compacted—held in a state store by key. (You can also think of them as a stream with infinite retention.) In a services context such tables are often used for enrichment. Say we decide to include Customer information in our Email logic. We can’t easily use a stream-stream join as there is no specific correlation between a user creating an Order and a user updating their Customer Information—that’s to say that there is no logical upper limit on how far apart these events may be. So this style of operation requires a table: the whole stream of Customers, from offset 0, is replayed into the State Store inside the Kafka Streams API.

The nice thing about using a KTable is it behaves like a table in a database. So when we join a stream of Orders to a KTable of Customers, there is no need to worry about retention periods, windows or any other such complexity. If the customer record exists, the join will just work.

There are actually two types of table in Kafka Streams: KTables and Global KTables. With just one instance of a service running, these effectively behave the same. However if we scaled our service out—so it had four instances running in parallel—we’d see slightly different behaviours. This is because Global KTables are cloned: each service instance gets a complete copy of the entire table. Regular KTables are sharded: the dataset is spread over all service instances. So in short, Global KTables are easier to use, but they have scalability limits as they are cloned across machines, so use them for lookup tables (typically up to several gigabytes) that will fit easily on a machine’s local disk. Use KTables, and scale your services out, when the dataset is larger.

The final use of the State Store is to save information, just like we might write data to a regular database. This means we can save any information we wish and read it back again later, say after a restart. So we might expose an Admin interface to our Email Service which provides stats on emails that have been sent. We could store these stats in a state store and they’ll be saved locally, as well as being backed up to Kafka, inheriting all its durability guarantees.

Managing Streams, State and Consistency in the Inventory Service

Now we have a basic understanding of the tools used in stream processing, let’s look at how we can sew them together to solve a more complex and realistic use case—one which requires that reads and writes are performed consistently.

In the system design diagram, there is an Inventory Service. When a user makes a purchase—let’s say it’s an iPad—the Inventory Service makes sure there are enough iPads in stock for the order to be fulfilled. To do this a few things need to happen. The service needs to check how many iPads there are in the warehouse. It then needs to reserve one of those iPads until such time as the user completes their payment and the order is processed and shipped. This requires three actions be performed as a single, atomic transaction inside each service instance:

  1. Validate whether there is enough stock available (items in warehouse minus items reserved).
  2. Update the table of “reserved items” to reserve the iPad so no one else can take it.
  3. Send out a message that Validates the order.

You can find the code for this service here.

What is neat about this approach is its ability to scale. There is no remote locking, there are no remote reads, and we can scale out the inventory service linearly. What’s more, all operations are wrapped by Kafka’s transactional guarantees, meaning they either commit atomically or not at all, regardless of failures.

Bridging the Synchronous and Asynchronous Worlds

Finally, we can put all these ideas together in a more comprehensive ecosystem that validates and processes orders in response to a HTTP request, mapping the synchronous world of a standard REST interface to the asynchronous world of events, and back again. 

Code Examples
Download and play with the full code example for this Microservices ecosystem from Github

 

Looking at the Orders Service first, a REST interface provides methods to POST and GET Orders. Posting an Order creates an event in Kafka. This is picked up by three different validation engines (Fraud Check, Inventory Check, Order Details Check) which validate the order in parallel, emitting a PASS or FAIL based on whether each validation succeeds. The result of each validation is pushed through a separate topic, Order Validations, so that we retain the ‘single writer’ status of the Orders Service —> Orders Topic. The results of the various validation checks are aggregated back in the Order Service (Validation Aggregator) which then moves the order to a Validated or Failed state, based on the combined result.

To allow users to GET any order, the Orders Service creates a queryable materialized view (‘Orders View’ in the figure), using a state store in each instance of the service, so any Order can be requested historically. Note also that the Orders Service is partitioned over three nodes, so GET requests must be routed to the correct node to get a certain key. This is handled automatically using the Interactive Queries functionality in Kafka Streams. (We could also implement this view with an external database, via Kafka Connect.)

The linked code example also includes a blocking HTTP GET so that clients can read their own writes. In this way we bridge the synchronous, blocking paradigm of a Restful interface with the asynchronous, non-blocking processing performed server-side:

# Submit an order. Immediately retrieving it will block until validation completes.
$ curl -X POST ... --data {"id":"1"...} http://server:8081/orders/
$ curl -X GET http://server:8081/orders/validated/1?timeout=500

Finally, there are two validation services other than the Inventory Validation Service we discussed above. The Fraud Service tracks the total value of orders for each customer in a one-hour window, alerting if they go over the configured limit. This is implemented entirely using the Kafka Streams DSL, although it could be implemented in custom code via a Transformer etc., also. The Order Details Service validates the basic elements of the order itself. This is implemented with a producer/consumer pair, but could equally be implemented using Kafka Streams.

From the Micro to the Macro and Beyond

This example is about as fine-grained as streaming services get, but it is useful for demonstrating how a synchronous request-response paradigm can be bridged into an asynchronous, non-blocking one, and back again, via event collaboration. (We discussed the merits of event collaboration in an earlier post.)

But any distributed system comes with a baseline cost, both in complexity and in latency, so to really see the benefits of this type of system we need to grow and evolve the ecosystem further, adding services that do repricing, inventory management, fulfilment, shipping, billing etc., as well as increasing the functionality of the services we have. In these larger ecosystems the pluggability and extensibility that comes with an event-driven, brokered model increasingly pay dividends.

So the beauty of implementing services on a Streaming Platform lies in its ability to model both the micro and the macro with a single, ubiquitous workflow. Here, fine grained use cases merge into larger architectures that span departments, companies, and geographies.

Summing Up

When we build services using a Streaming Platform, some will be stateless: simple functions that take an input, perform a business operation and produce an output. Some will be stateful, but read only, as in when views need to be created so we can serve remote queries. Others will need to both read and write state, either entirely inside the Kafka ecosystem (and hence wrapped in Kafka’s transactional guarantees), or by calling out to other services or databases. Having all approaches available makes the Kafka’s Streams API a powerful tool for building event-driven services. 

But there are of course drawbacks to this approach. Whilst standby replicas, checkpoints, and compacted topics all mitigate the risks of pushing data to code, there is always a worst-case scenario where service-resident datasets must be rebuilt, and this should be considered as part of any system design. There is also a mindset shift that comes with the streaming model, one that is inherently asynchronous and adopts a more functional style, when compared to the more procedural style of service interfaces. But this is—in the opinion of this author—an investment worth making.

So hopefully the example described in this post helps to tackle some of these issues, particularly the bridge between synchronous and asynchronous worlds. We looked at how a small ecosystem can be built through the propagation of business events that describe the order management workflow. We saw how such services can be built on the JVM with Kafka’s Streams API, as well as off the JVM via KSQL. We also looked more closely at how to tackle trickier issues like consistency with writable state stores and change logs. Finally, we saw how simple functions, which are side-effect-free, can be composed into service ecosystems that operate as one. In the next post in this series, Bobby Calderwood will be taking this idea a step further as he makes a case for a more functional approach to microservices through some of the work done at Capital One.

Posts in this Series:

Part 1: The Data Dichotomy: Rethinking the Way We Treat Data and Services
Part 2: Build Services on a Backbone of Events
Part 3: Using Apache Kafka as a Scalable, Event-Driven Backbone for Service Architectures
Part 4: Chain Services with Exactly Once Guarantees
Part 5: Messaging as the Single Source of Truth
Part 6: Leveraging the Power of a Database Unbundled

O’Reilly Book: Designing Event Driven Systems
Explore all these concepts in detail with the free O’Reilly book “Designing Event Driven Systems. Concepts and Patterns for Streaming Services with Apache Kafka”

Interested in More?

If you have enjoyed this series, you might want to continue with the following resources to learn more about stream processing on Apache Kafka®:

 

Subscribe to the Confluent Blog

Subscribe

More Articles Like This

Noise Mapping with KSQL, a Raspberry Pi and a Software-Defined Radio
Simon Aubury

Noise Mapping with KSQL, a Raspberry Pi and a Software-Defined Radio

Simon Aubury . .

This guest post by Simon Aubury, a data engineer architect from Sydney, Australia, is based on his article Using KSQL, Apache Kafka, a Rasperry Pi and a software defined radio ...

Event Driven 2.0 – Data as a Service
Ben Stopford

Event Driven 2.0

Ben Stopford . .

In the future, data will be as automated and self-service as infrastructure is today. You’ll open a console that lists the data available in your company; define the pieces you ...

Available Now: Stream Processing Cookbook Featuring KSQL Recipes
Joanna Schloss

KSQL Recipes Available Now in the Stream Processing Cookbook

Joanna Schloss . .

For those of you who are hungry for more stream processing, we are pleased to share the recent release of Confluent’s Stream Processing Cookbook, which features short and tasteful KSQL ...

Leave a Reply

Your email address will not be published. Required fields are marked *

Comments

  1. Good stuff! One question about HTTP load balancer. If we have loadbalancer cluster with several nodes, it LB forwards request to Kafka topic and listens another topic.How can we guarantee that the same LB node will receive message from Kafka topic ? Because LB node that get HTTP request should return the HTTP response back to the client.

    1. Hi Aleksandr

      Thanks for the question. It’s a good one. The answer is you can’t, but this is handled by the code using the interactive queries feature that ships with Kafka.

      So your GET might be sent to node A, but the data you need is in a materialised view on node C. Kafka includes a discovery feature which allows the REST interface on node A to know that it needs to proxy the request to node C.

      The crucial method is the one getKeyLocationOrBlock() which is used to work out whether a request is local or remote: https://github.com/confluentinc/kafka-streams-examples/blob/3.3.0-post/src/main/java/io/confluent/examples/streams/microservices/OrdersService.java#L211

      Hope that helps

      B

      1. OK. But if I have one HTTP request to LoadBalancer node1-> LB node1 sends to Kafka->some service handled it and write result to Kafka topic->LB node1 consumes the result and sends it back to as HTTP response.

        I guess Kafka Streams is not responsible for it. I think LB node1 should send to Kafka queue the partition that it reads, then microservice can send the result to this particular partition.

        1. Yes – so what you say here is what would happen.
          PUT is Routed to OrderServceA Order(k1,V1-CREATED) -> OrdersTopic(P1) [assume key K1 maps to partition P1]
          Order is Validated -> Order(k1,V2-VALIDATED) which is sent to partition P1 and hence back to OrderServceA
          GET is say routed to OrderServceB. Is then rerouted to OrderServceA as it is the location of P1

          1. Thank you the great post. Very helpful one!
            Might be it’s a good idea to clarify the point raised by Aleksandr.
            We’re working on very similar architecture. In our case UI connects via websocket to the service (LB randomly assign the service) . So a websocket is opened to a service.
            Assume this is the Ordering service with many instances, like in your example. When an client asks for a new order, assume Instance O1 of Ordering service gets the request. It sends then OrderRequested event (fact) with Key1 to the Order topic.
            Inventory service takes OrderRequested event and makes all needed validations and writes OrderValidated event to the Order_Validated topic.
            Now it’s important part of the flow : O2 instance (not O1 !) reads the OrderRequested event (since it’s assigned to partition where Key1 is in). But O2 is not connected via websocket to the client. The instance O1 is connected.
            And in this case neither Kafka nor KStreams can not figure out where the websocket is connected to (LP assigned randomly the client to O1).
            I think that the only way to handle it is to keep global cache which maps all connections (clientID – serverID – OrderKey) and then every server instance could use this table to figure out where a websocket is attached to. Then redirect the output to the correct instance.
            Of course there are disconnect/connect , failures , etc. scenarios – but the idea is the same
            Will be interesting to hear your opinion
            Thank you,
            Vladi

          2. Yes, the websocket case is harder as we conflate both concerns for ordering and for routing into the key, so you could change the key to alter the routing, but it would break the ordering guarantees and vice versa. On a related note, this isn’t the case when you simulate request response (as responses typically don’t need to be ordered so the response key can be used for routing), but even in the pure request response case there are failure scenarios when a lookup table is valuable.

            So yes, the lookup table option is the best way to go, as you describe.

  2. Awesome article! Another great installment in the series!

    Reading through this series and going through the sample code, I can’t help but think about how one would go about generalizing (read remove “plumbing” code for) the lookup of data in the distributed Materialized Views (as the Orders View in this post).
    I was thinking that to this end it would be helpful if an piece of code could continuously read the metadata of a Kafka Stream without actually participating in the processing of the stream. Such a capability would allow for example the creation of a sidecar that could lookup the materialized data no matter which node it’s present on.
    My question is whether such a capability (essentially reading the stream of metadata of a Kafka Stream) exists?

    Thanks

    1. Hi George

      Certainly you can create a consumer that reads the stream, or even just reads the headers, but I may not fully follow what you are suggesting. What metadata did you have in mind?

      B

      1. Hi Ben,

        What I had in mind was being able to some way read the data that is available via the “StreamsMetadata” interface, but 1) without having to actually participate stream processing and 2) being able to read that data for all consumers of a stream.

        Thanks,
        George

  3. Hi Ben,
    thanks for the great article and example code, it is very enlightening!
    One question though: Your inventory service has the “orders” topic as its input. That means when you scale it out, each instance will handle a partition of order IDs (which, in the end, is more or less arbitrary).
    But orders from different partitions might refer to the same products; how do you maintain consistency and avoid lost updates on the “reserved stocks” store when two simultaneous orders place reservations on the same product? Is this logic somehow included in the join between Order and Product?
    Reading Martin Kleppmann’s “Designing Data Intensive Applications”, I was under the impression that in such a case consistency can only be achieved with a total ordering guarantee, by means of a separate “Product Claim” topic which is partitioned by product ID.

    1. That’s a great question Till, thanks for asking it.

      To ensure consistency we need to repartition the orders topic so that it is partitioned by the productId, not the orderId. That ensures that orders for the same product are always sent to the same node. Then we know the validation / reservation of the stock will be performed in order by a single thread.

      This happens in the selectKey step here: https://github.com/confluentinc/kafka-streams-examples/blob/1dcebedcc4a2e142160bb9a533db204390bb416f/src/main/java/io/confluent/examples/streams/microservices/InventoryService.java#L76

      Hope that helps, and again, great question.

      B

  4. Awesome post to start with!

    Would be really interesting to hear your thought around this: what if the requesting service that sends the post command to the Orders Service expects a response (validated or not) straight away, and also has an aggressive timeout of the request. We have that situation in our system with external api’s in request/response style to support with requirements on how long the request can take before they cut (and retry). Such an approach as presented here relies on x no services “eventually” say yay/nay, but the async approach give no guarantees about whom and when the order have/will be evaluated.

    Regarding this, how does the ValidationsAggregatorService know how many validations it shall expect, which are mandatory and which aren’t etc, without introducing a tight coupling between this service and other services for example? The “numberOfRules” is hardcoded to 3 in the example code, which isn’t ideal so curious to how you think such a solution can evolve over time (services are added, removed and so on).

    1. Thanks for the comment Andreas.

      It would be easy enough to code up a blocking POST, in the same way that the GET blocks if the order isn’t there, or if the validated order isn’t there (there are methods for each one). So what I think you are looking for in the current code is POST, get the id returned, then immediately GET/{id}/validated which will block until the order is valid or the passed timeout elapses. You are of course limited to the latency of the slowest service (it’ll be the fraud service in this case) and expect the latency to be higher than a monolithic system. The idea of this code is really to express a pattern for how you can do this type of thing (and FWIW there are real systems out there, in production, using this pattern with KStreams but they have more components).

      Regarding the number of rules, I wouldn’t do this in a real system (there are a few gaps in this code sample where pieces need to be filled in). My next step would be to give each rule a name, soft code the required rules in a global KTable, then refer to that from the valuations aggregator. I’ll get to this at some point in the future (but PR’s welcome :-))

      Great questions though. Let me know if you have more.

      1. Great Ben, I was looking over it. Do you know if the other diagram presented by Confluent are created with this app as well? I guess they have the same look’n feel. tks.

  5. Very Nice article on Kafka, KSQL & micro-services.As per the micro service architecture, each service should scale horizontally.In Kafka, is it possible to separate Kafka-connector and Kafka(broker) as independent service.So, that In-case of bottle-neck at the data-in ,we can scale the kafka-connector alone?

  6. Enjoying your series very much!
    A question regarding data affinity:
    Let’s assume that I have an event sourced account entity where the events are stored in kafka or even just a compacted topic with the latest id -> account state.
    When implementing a command such as withdraw i need to make sure that the account has enough funds in his balance before approving the command.
    If i want to use Kafka as a source of truth and use Kafka streams to build a materialized view used for my business processing how can I guarantee that two parallel withdraw commands are processed:
    1. Sequentially?
    2. Against the latest state of the account?
    In the examples there is always an inherent race condition between the REST api and Kafka Streams since the services aren’t “reading their own writes”, they are relying on Kafka Streams to consume and process the events and update the state stores eventually instead of just updating the state in-memory as part of processing the command.

    Hope my questions make sense..
    Thanks!

    1. HI Erik

      I’m glad you have been enjoying the series, and thanks for the question.

      You can guarantee that items for a certain partition will be processed in order, this guarantee is provided at a Kafka level but also in Streams.

      But if I’m understanding you correctly, you don’t need to make the writes eventually consistent, so the later part isn’t quite correct. Kafka Streams consumes events, updates state stores and emits events as a single atomic unit (within the Exactly Once guarantee). So for example the stock service validates that there is enough stock in the warehouse then reserves an item by writing to a local state store, then emits a validation event. These three operations are performed on a single thread and are transactional thanks to Kafka’s EoS feature.

      The limitation of this approach is we must apply this as a single writer, so say we want to validate a payment by checking the user has sufficient balance, then update that balance. We need to route that command to the Account Service and both validate and update the balance within a single state store, or alternatively an event sourced topic written to by a single Streams thread (per account). This is equivalent to the stock update example above.

      What we can’t do is any distributed transaction that spans different services.

      Does that answer your question? If I’m misunderstanding do clarify.
      B

  7. 1- In using Kafka as pub-sub or Streams, what is the technical difference? Example I am if I am building Events driven system, should I consider pub-sub model or Event stream?
    2- How to determine the topics? I am thinking it is noun rather than “event itself”. Example “lending-applications” could be the topic and “application created” as event, correct?
    3- Could we have multiple events in the same topic? For example, could I have a topic called “lending-applications” and write events like ‘application created’, ‘application approved’ etc., Consumer is interested event rather than just that the broad-topic, so how to write consumers?
    3a- In the main diag above diagram, I see that you have “Order Created” and “Order Validated” events in “Orders” topic, so I am assuming I am correct in my thinking.
    3b- One more question, why you have “OV TOPIC” separate to “Orders”, couldn’t we use the same “Orders” topic to write the validation results?
    4- Consumer groups and consuming messages? For example, I might have “lending-applications” topic and there could be multiple applications / services interested in that topic; Each service interested could be running multiple instances of that service; So in your example diagram above “Order Created” event consumed by three different services; So each of them are consumer groups of their own, correct?
    5- Could we have multiple message types / schemas in one topic?
    Schema Registry – do we have any support for .NET / Avro and NuGet libraries from Confluent around this?
    6- If I have “one topic” and “multiple events” like “lending-applications” and write events like ‘application created’, ‘application approved’; do they need to share the schema?
    8- Could you please advise me on any reference implementation (.NET / Java) for Stream and Stream processing? And “Pub-Sub” model?
    9- In multiple consumer scenario, how to go about “message commit”?

    Thank you once again for your time to reply

    1. Hi BMK – I took the liberty of consolidating your questions into one comment, I hope that’s ok. Here are some answers:

      1 – streams provides a far broader api and has features like joins and tables. The pub sub api is just subscribe and poll to get events. It doesn’t give you any richness for operating on them.
      2 – Rule of thumb is topic per business entity but try to avoid splitting things (i.e. you wouldn’t split an user into user and address)
      3. you can have multiple events in the same topic. Do this when their relative ordering is between event types is important.
      3a those are the same event, they just have a field that can be either created or validated. There is just one schema: Orders.
      3b. Good spot. Yes we could. Actually the code is a little simpler if you do it the way you suggest. The reason to use OV is a little subtler. It’s to do with the single writer principal. I don’t want services other than the Orders service writing to the orders topic. This is to do with consistency and schema migration. But if all your services are one deployable it doesn’t really matter.
      4. yes
      5 Yes. You need to union the avro schemas together when you add them to the schema registry. I’d be careful with this though. Do it where it makes sense, but in cases like the example here I don’t as it keeps things simpler (separate schemas just add complexity). Actually we have a post coming out very shortly on this subject so watch this space.
      6. There is a PR for this: https://confluent.slack.com/archives/C0N3N4YEP/p1513681981000283 but it’s not merged yet. I’m trying to find out what’s going on with it. It looks like people use the Microsoft avro serdes currently (some info here https://github.com/confluentinc/confluent-kafka-dotnet/issues/67)
      7. I would always use one schema for “lending applications” then have a state field that determines whether they are created or approved.
      8. There is no streams implementation in .NET just pub sub. The PubSub api to use is this one: https://github.com/confluentinc/confluent-kafka-dotnet and there are examples here: https://docs.confluent.io/current/clients/consumer.html#python-go-and-net-clients
      9. Not sure I follow this question but each consumer group tracks its own offsets.

      -B

      1. Thanks Ben for your answers much appreciated.

        Is there a recommendation when to choose:

        1. Pub/Sub
        2. Events Streaming

        Thank you
        BMK

  8. Could I ask you question ?

    if PRODUCT, ORDER topic have different number of partitions
    ex)
    PRODUCT partition has 10 (one hundred million record)
    ORDERS partition has 5

    If I want leftjoin…. I make temp-topic(partition 5) ???
    sample)
    kafka-topic –create –partitions 5 –topic temp
    ORDERS.through(temp).leftJoin(PRODUCT.through(temp))…..

    reference link :
    https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/microservices/ValidationsAggregatorService.java#L93

    1. If you want to join product and orders you should have the same number of partitions in each topic so that matching partitions end up in the same stream processor.

  9. Great article. However, I have few questions regarding Streams API
    1. When a new event occurs, materialized views are updated. Will they keep on updating? it becomes problematic for large organizations with millions of transactions every month. What’s the size limit for views? if there is no limit, what happens if the view needs to store 1TB or 2 Tb of view data (happens over the years)?

    2. What happens if we lose views data in a state store in worst case scenario despite having clusters)? how do we get data from microservices DB to state store? especially since we can not regenerate events.

    3. Is this a right approach to depend on Kafka state stores for data store and read?

    4. Can we use Oracle or MySql with Streams API instead of state store to store materialized views?

    1. Hi Pavan

      Good questions, thanks for raising them. Here are some thoughts:
      The size limit is really dictated by your worst case (re)load time. So, say you lost the disk on one of the view nodes, how long would it take to regenerate that part of the view. KStreams actually keeps a backup of the state store (Called a standby replica https://kafka.apache.org/10/documentation/streams/developer-guide/config-streams.html#num-standby-replicas) to mitigate this, but that pushes the worst case to ‘2 nodes lost’ and the longer the load time the more operational care you need to take. Bulk writes to rocksdb (which is used in a KStreams state store) max out at about ~100MB/s, so 100GBs of storage per node is going to take at least 15mins to load. You can of course add more nodes and scale out, but for heavy storage use cases it’s better to use an independent database. Kafka’s connect API provides a lot of different connectors (https://de.confluent.io/product/connectors/). MySQL/Oracle is supported through the JDCB connector. In theory you could back a state store by MySQL but in practice it probably wouldn’t work that well.
      In 2. you ask ‘how do we get data from microservices DB to state store? especially since we can not regenerate events.’ To get data out of a database you can use a CDC connector with Kafka’s Connect API (see page linked above). But you probably don’t want to pull data out of MySQL just to put it in a state store. The reason to pull events out of a database is typically to get data into Kafka, where it can be stored and made available to many different services that either do event driven processing or create their own materialized views.

      1. Thanks Ben for the response. I am not sure if I can this here,
        “What’s the best approach to implement Materialized views with Kafka Streams ( for event sourcing with CQRS) ? In my Use Case, I have 5 microservices sending events through Kafka topics. I want to have one microservice receiving those and creating/updating materialized views and storing them in database like Mysql (I want views to be highly available). When ever I send a request from UI, microservice looks in materialized views for data. Is this the right approach? I am thinking of using Kafka streams for event sourcing and CQRS?”

        1. Hi Pavan

          It might be better to discuss this on the microservices channel of confluent slack (https://launchpass.com/confluentcommunity). It’s a bit hard to give specific advice but if you are starting a new project then it’s often best to keep your design simple. MySQL is a decent enough database and their is a JDBC/MySQL Sink connector which can make creating the view easier. https://de.confluent.io/product/connectors/
          The nice thing about the CQRS pattern is that, once you have one view say in MySQL, you can easily create a second instance. The complexity is that they will have slightly different datasets in (as they run async to one another) so queries routed to different instances may return slightly different results at one point in time. For many use cases this will be fine, but it depends.

Try Confluent Platform

Download Now

We use cookies to understand how you use our site and to improve your experience. Click here to learn more or change your cookie settings. By continuing to browse, you agree to our use of cookies.