KSQL, the SQL engine for streaming data, is a very powerful tool that helps great deals in Streaming Analytics use cases. It comes with a set of functions that could be used to transform, filter or aggregate data and the good thing is that you can extend it easily by implementing and adding your own UDF (User Defined Function) or UDAF (User Defined Aggregate Function). Let’s see how we can do so and add a simple function to KSQL.
The UDF I want to implement here is DATEADD. If you’re familiar with SQL, you have definitely used it: it takes in a date and adds or subtracts a specific number value to a specific part of datetime, and spits out a new datetime.
To implement a User Defined Function (UDF or UDAF) you would need to code your function in Java and then import the jar file in your KSQL server. You can read about the full process here, I point out a couple of things that I believe you should pay attention to:
- Make sure you set @UdfDescription and @Udf in your java code properly
- Change the versions in pom.xml according to your environment. For example:
<confluent.version>5.1.0</confluent.version>
- Pay attention the data types you can use in your java code. You can use only the following types as parameters or return values of your function:
Java Type | KSQL Type |
int | INTEGER |
Integer | INTEGER |
boolean | BOOLEAN |
Boolean | BOOLEAN |
long | LONG |
Long | LONG |
double | DOUBLE |
Double | DOUBLE |
String | VARCHAR |
List | Array |
Map | MAP |
The Code
As I said above, we need to implement our UDF in Java. And let me start talking about the code by saying that I’m not a Java developer. I can code in it, with lots of help from Google, but certainly not the best code optimiser and applier of best practices. So please be gentle:
package com.thebipalace.ksql.udfdateadd;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import java.util.Calendar;
import java.util.Date;
@UdfDescription(name = "DATEADD", description = "Get previous or future period for a given date")
public class DateAdd {
@Udf(description = "Get previous or future period for a given date")
public long dateAdd(final long date,final String period ,final int amount) {
Calendar cal = Calendar.getInstance();
Date currentDate = new Date(date);
cal.setTime(currentDate);
// print current date
//System.out.println("The current date is : " + cal.getTime());
Character periodChar = period.toCharArray()[0];
switch(periodChar){
case 'Y': cal.add(Calendar.YEAR, amount);
break;
case 'M': cal.add(Calendar.MONTH, amount);
break;
case 'D': cal.add(Calendar.DAY_OF_MONTH, amount);
break;
case 'H': cal.add(Calendar.HOUR, amount);
break;
case 'N': cal.add(Calendar.MINUTE, amount);
break;
case 'S': cal.add(Calendar.SECOND, amount);
break;
}
return cal.getTime().getTime();
}
}
It’s a very simple function that takes 3 parameters:
- date: with a type of long (that represents the number of milliseconds passed since 1/Jan/1970) since KSQL UDFs don’ accept Date data types. You can use KSQL’s TIMESTAMPTOSTRING to convert long or BIGINT values representing dates into readable formats.
- period: With the data type of String. This is the period you want to add or subtract from in your date. As you can see in the code, options are Year, Month, Day, Hour, Minute and Second.
- amount: of type Int, the amount of time you want to move date back or forth. e.g. 1 month or 23 days
And spits out a new long representation of the new date/time which is the result of applying the amount of periods on date.
Deployment
To be able to start using the UDF with KSQl you need to deploy it to your KSQL cluster. Steps are listed in the link I mentioned above, this is basically what you need to do:
- Compile your code by running following command in the root directory of your Java project:
mvn clean package
- Take the jar file with “_with-dependencies” postfix to the server where KSQL is running and copy it to “<path–to–confluent>/etc/ksql/ext”. Make sure “ksql-server.properties point to this location, for example”:
ksql.extension.dir=/home/centos/kafka/confluent-5.0.0/etc/ksql/ext/
- And restart KSQL Server:
<path-to-confluent>/bin/confluent stop ksql-server
<path-to-confluent>/bin/confluent start ksql-server
Then Fire off KSQL CLI:
LOG_DIR=./ksql_logs <path-to-confluent>/bin/ksql
And list the functions. DATEADD should be there:
LIST FUNCTIONS;
And there you go. Your new UDF is ready to be used.
Usage
Our new UDF is ready to be used. Just use like any other function in your KSQL queries, here’s an example:
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), TIMESTAMPTOSTRING(DATEADD(ROWTIME, 'M', 1), 'yyyy-MM-dd HH:mm:ss') from orders_raw;
This function could be useful for period to period comparisons. For example, you’re running a marketing campaign and you want to compare the number of hits on your website from this year with last year when you were running the same campaign.
Or in sales: month to month comparison on how well your sales is going in real time.
Hope this is useful for some of you out there. Like always, feel free to reach out if you had any questions or comments/feedbacks.
Great post, thanks for the deets!