Confluent
How to Build a UDF and/or UDAF in KSQL 5.0
Confluent Platform

How to Build a UDF and/or UDAF in KSQL 5.0

Kai Waehner

KSQL is the streaming SQL engine that enables real-time data processing against Apache Kafka®. KSQL makes it easy to read, write and process streaming data in real time, at scale, using SQL-like semantics. KSQL already has plenty of available functions like SUBSTRING, STRINGTOTIMESTAMP or COUNT. Even so, many users need additional functions to process their data streams.

KSQL now has an official API for building your own functions. As of the release of Confluent Platform 5.0, KSQL supports creating user-defined scalar functions (UDF) and user-defined aggregate functions (UDAF). All it takes to write your own is just one Java class. The UD(A)F can embed any additional dependencies (like additional JARs or other binaries) if needed.

In this blog post, we’ll discuss the main need and motivation for UD(A)Fs in KSQL, show an advanced example with a deep-learning UDF, prove how easy it is to build even something that is very powerful and identify some issues that you might face during development and testing.

Motivation for KSQL UDFs/UDAFs

Let’s first discuss what UD(A)Fs are and why you might need to build your own ones.

Built-in KSQL functions to process streaming data

Functions are one of the key components in KSQL. Functions in KSQL are like functions, operations or methods in other SQL engines or programming languages, such as Java, Python or Go. They accept parameters, perform an action—such as a complex calculation—and return the result of that action as a value. They are used within a KSQL query to filter, transform or aggregate data.

Many functions are already built into KSQL, and more are added with every release. We distinguish them into two categories: stateless KSQL scalar functions and stateful KSQL aggregate functions.

But what do you do if you miss a specific function in the KSQL syntax? Pretty simple: You build your own one with the official KSQL API. Let’s see how easy it is to write your own stateless UDF or stateful UDAF.

UD(A)Fs in KSQL

In contrast to built-in functions, UD(A)Fs are functions provided by the user of a program or environment. A UD(A)F provides a mechanism for extending the functionality of the KSQL engine by adding a function that can be evaluated in a standard query language statement. Using a UD(A)F in KSQL looks exactly like using built-in functions in KSQL. Both are registered to the KSQL engine before startup. They expect input parameters and return output values.

Here is an example of the built-in function STRINGTOTIMESTAMP to convert a string value in the given format into the BIGINT value representing the timestamp:

This function takes the timestamp 2017-12-18 11:12:13.111 (data type: String) and converts it to 1513591933111 (data type: BigInt).

Now let’s take a look at the steps to build your own UDF to use it in your KSQL statements the same way as the STRINGTOTIMESTAMP function does above.

Deep-learning UDFs for anomaly detection

Building KSQL UDFs is easy, no matter if they are simple or powerful. With that in mind, let’s take a look at the steps to develop, test and deploy a new KSQL UDF.

Steps to create, deploy and test a KSQL UDF

You can build very simple KSQL functions like a specific scalar function for multiplication (see Confluent documentation to find out how), a really powerful function with many computations or aggregations, or anything in between these two extremities. Creating a stateful UDAF works the same way, of course.

The example below shows you how to build a more advanced UDF, which embeds an analytic model (i.e., an external dependency). This sounds complex, but in reality the implementation is simple. We write one Java class, add the dependency, write our business logic and build an uber JAR which includes the UDF and its dependency. This UDF can be deployed to the KSQL server(s) and then used in KSQL statements by end users without any coding in a programming language.

Java source code of the deep-learning UDF

Develop a UDF is quite easy, too. Just implement the function in a Java method within a Java class and add the corresponding Java annotations:

Here is the full source code for the anomaly detection KSQL UDF:

As you can see, you can really focus on implementing the business logic. In my example, I consume new events and do stateless processing by applying an analytic model. The prediction is returned immediately. Under the hood, KSQL receives events from a Kafka topic and sends the output to another Kafka topic.

The wrapper code for the UDF is minimal. You only need to add two annotations and the corresponding import statements to your POJO (plain old Java object):

  • Import io.confluent.ksql.function.udf.Udf and io.confluent.ksql.function.udf.Udf
  • @UDFDescription: class annotation to describe the name and functionality of the UDF
  • @UDF: method annotation to describe the functionality of the specific method (there can be more than one of these methods in a UDF class to process different interface signatures)

And that’s it. The annotations in this class make sure the KSQL server interprets this class correctly and loads the UDF into the runtime engine during startup.

Deployment to KSQL server(s)

The deployment of the UDF is pretty straightforward, no matter if you’re using a local installation on your laptop or a cluster of remote KSQL servers:

  • Build an uber JAR that includes the KSQL UDF and any dependencies. Typically, you use a build tool like Apache Maven™ or Gradle to do this. See the pom.xml of my GitHub project for an example using Maven.
  • Copy the uber JAR to the ext/ directory that is part of the KSQL distribution. The ext/ directory can be configured via the property ksql.extension.dir in KSQL configuration file ksql-server.properties. If this directory does not exist yet, simply create a new directory.
  • Start/restart your KSQL server instance(s) to pick up new UD(A)Fs.
  • Try out the new UDF in KSQL command line interface via Confluent REST Proxy or Confluent Control Center’s KSQL user interface, which has nice features like autocompletion.
  • If the UDF does not work, check the log’s errors. They will appear in the KSQL server log (ksql.log).

Working demo: MQTT sensor analytics with Apache Kafka and KSQL

Let’s now think about a practical example for a powerful UDF: continuous processing of millions of events from connected devices, specifically in the case of car sensors.Car sensors use case as a practical example for a powerful UDF

This use case leverages a UDF that does real-time predictions using an autoencoder neural network to detect anomalies. You can apply the analytic model by executing the KSQL statement in which “anomaly” is the UDF:

You can also create a new KSQL stream to deploy the statement for continuous processing. The following example filters values over a specific threshold of five to send alerts if an anomaly occurs:

This KSQL stream AnomalyDetectionWithFilter scales like any other KSQL or Kafka Streams application. It is built for S, M, L, X and XXL scenarios. If you need higher scalability or better throughput, simply create additional instances on KSQL servers.

If you want to try out this machine-learning UDF, just go to my GitHub project. It includes the source code and a step-by-step guide to test it using Confluent CLI and Confluent MQTT Proxy. The project also includes a MQTT sensor generator.

The story around building and deploying analytic models with KSQL is a very interesting and compelling use case worth discussing in further detail, but I’ll save that discussion for a future blog post that covers model training, deployment and inference in greater depth.

Potential issues during development and testing

During development and testing, I faced some challenges when creating my first UDF with the new KSQL REST API. If your UDF does not work, check for the following:

  • Compile errors? Did you include KSQL JAR into your project to use the classes and annotations? Either add ksql-udf.jar, which you get from a KSQL project build, or add it via Maven dependency (groupId: io.confluent.ksq; artifactId: ksql-udf).
  • Classpath errors at runtime? Are dependencies missing in the generated uber JAR? Extract the JAR and see if all needed dependencies are included (e.g., the analytic model in my example) and in the expected directory. Maybe your build process is not working well yet.
  • UDF not found at runtime? Do you see the error, “Can’t find any functions with the name XYZ”? Did you restart the KSQL server after you added the uber JAR to the /ext directory? UDFs are only loaded during the startup of the KSQL engine. Or, perhaps you have the wrong path? Did you configure the right path in property ksql.extension.dir in ksql-server.properties? Maybe start with an absolute path first. If that works, you can try a relative path.
  • Still experiencing problems? Check the logs—not just the KSQL log (ksql.log) but also other Kafka-related log files.

Please let us know if you face any other issues during development and testing of KSQL UDFs. All feedback is appreciated. While this blog post focused on building a stateless UDF, the development process is the same for a stateful UDAF.

What’s next for KSQL UD(A)Fs?

The foundation is here. You’ve learned how easy it is to create and deploy a new UD(A)F for KSQL, involving only a few steps to implement, register and execute a KSQL UDF. Once the developer builds it, then every end user of your KSQL instances can leverage it whether they’re a data engineer or data scientist.

The example of building a deep-learning UDF shows how you can even include other dependencies, such as an analytic model, to build very powerful functions. The end user just has to call the function in the KSQL statement—no need to understand what is going on under the hood.

The next step in the KSQL UD(A)F roadmap is to support building UD(A)Fs with programming languages other than Java. This way, even a data scientist who only understands Python or JavaScript can write and deploy their own functions. In addition to this, there are many new and exciting features to come, so be sure to keep an eye out for the next KSQL releases.

If you’re interested in what KSQL can do

Subscribe to the Confluent Blog

Abonnieren

More Articles Like This

Kafka Tutorials
Michael Drogalis

Announcing Tutorials for Apache Kafka

Michael Drogalis .

We’re excited to announce Tutorials for Apache Kafka®, a new area of our website for learning event streaming. Kafka Tutorials is a collection of common event streaming use cases, with ...

KSQL
Mitch Seymour

KSQL UDFs and UDAFs Made Easy

Mitch Seymour .

One of KSQL’s most powerful features is allowing users to build their own KSQL functions for processing real-time streams of data. These functions can be invoked on individual messages (user-defined ...

Apache Kafka Series – KSQL for Stream Processing – Hands On!
Simon Aubury

KSQL Training for Hands-On Learning

Simon Aubury .

I’ve been using KSQL from Confluent since its first developer preview in 2017. Reading, writing, and transforming data in Apache Kafka® using KSQL is an effective way to rapidly deliver ...

Leave a Reply

Your email address will not be published. Required fields are marked *

Comments

  1. Does KSQL UDF support to accept one or more input values and generate multiple output values? For example, parsing ip address into latitude and longitude. If yes, how to define the method signature, thanks!

    1. Hi Casel,

      yes, you can already do this. Even the “hello world example” from KSQL docs (https://docs.confluent.io/current/ksql/docs/udf.html) uses two input parameters:

      @Udf(description = “multiply two non-nullable INTs.”)
      public long multiply(final int v1, final int v2) {
      return v1 * v2;
      }

      For output, it is not that comfortable today, but you can use a single Array column (Java List). We will improve this in the future, but it is already doable this way.

      Hope this helps.

      Kai Waehner

    1. Not sure if I understand the question, Michael.

      Under the hood, the UDF is Java code. So you can do anything there.

      It also depends on the used model. H2O can generate models in Java, which means the model is part of the Java byte code. TensorFlow Java APIs also loads the model when you instantiate the Java Class.

      If your question is more about changing the model at runtime, this is harder with KSQL today as you need to restart the KSQL engine if you make changes to the UDF or want to load a new version of a UDF.
      With Kafka Streams, this is pretty easy to solve today. With KSQL, this is not supported out-of-the-box, but might work with a workaround.

  2. I have a python code which does Online K-means Clustering. Earlier, I didnt know we could use UDF’s for output for an input on live data. I had previously followed your tutorial of how to use tensorflow in python with Ksql, however, it is just meant for training.
    Now if I have to deploy this UDF in python, is it possible?

    1. Hi Merlin,

      thanks for your feedback.

      Today, KSQL UDFs have to be implemented with Java or other JVM languages. There is the possibility to use something like Jython or Py4J to write Python code – but this is not an ideal workaround, of course. Especially for more simple UDFs, it is probably easier to learn some Java syntax to write it or hand the task over to a Java developer.

      Confluent plans to provide other KSQL interfaces in the future to be able to write UDFs natively with just Python code. But this is probably not coming in 2019.

Try Confluent Platform

Download Now

Wir verwenden Cookies, damit wir nachvollziehen können, wie Sie unsere Website verwenden, und um Ihr Erlebnis zu optimieren. Klicken Sie hier, wenn Sie mehr erfahren oder Ihre Cookie-Einstellungen ändern möchten. Wenn Sie weiter auf dieser Website surfen, stimmen Sie unserer Nutzung von Cookies zu.