Deep Dive into Joining in Kafka Streams
Joining is one of the most useful capabilities we have when working with Kafka Streams, but it takes some effort to wrap your head around.
I’ll start off by saying — streaming applications are not relational databases. As cool as KSQL is, don’t get fooled into thinking this is exactly like joining two tables in Postgres. It is, but now time is a factor.
To start off, we’re going to review a few non-obvious concepts of Kafka & Kafka Streams.
The stream table duality
This is the realization that a stream and a table are two sides of the same coin. When you have a table, underneath it is the changelog—a stream. Even a normal database like Postgres is implemented with a Write-Ahead Log.
Similarly, if you have a stream, if you want a snapshot of the current state at any given time—that snapshot would be a table.
Within the context of Kafka Streams, this means we can go back and forth between a KStream
and a KTable
. Because a KStream
can represent the changelog on a KTable
, and a KTable
can be a snapshot of the current state of the stream.
Kafka exploits this internally, and we can too when we build our streaming applications.
Keys & Partitions Matter
If things are set up properly, you can spend a lot of time developing a topology without thinking about the message keys. If things are set up properly…
Until recently (like, very recently) the only way to join streams was based off of message keys. There was even the further requirement that the two input topics have the same keys & the same number of partitions.
It’s understandable why you could only join by key. How a join is implemented under the hood is pretty complex, and going on anything other than message key is far more expensive.
Given that we have to join by keys, why would the data have to be co-partitioned?
Well, a KTable
is partitioned, meaning it only handles a subset of partitions. (If you want a KTable
that handles all of the partitions in a topic, you can use a GlobalKTable
). So if a KTable
is only getting a subset of partitions, and the destination partition of a message is determined by its key + the number of partitions, then topics have to be equivalently keyed and partitioned in order to join.
Windowing
This is where the idea of time really comes into play, and we start to see how different these operations are from a traditional database join.
Windowing comes off as very simple, but there’s 4 different types of windows we can use in a Kafka Streams app.
Name | Description | Example |
---|---|---|
Tumbling time window | Fixed-size, non- overlapping windows |
Pager Duty. Did a PS ticket occur during your window? |
Hopping time window | Fixed-size, overlapping windows |
The S&P 500 is often calculated as a rolling average to smooth out variability, that would be done using a hopping time window. |
Sliding time window | Fixed-size, overlapping windows that work on differences between record timestamp |
These are only used for joins in Kafka Streams. It slides continuously along the time axis. If two records are within X seconds of each other, they can be joined. |
Session window | Dynamically-sized, non- overlapping, data-driven windows |
Mostly used for user behaviour analysis. |
Windowing will come back up for the KStream
<> KStream
join, sliding time windowing, in particular.
Inner, Left, and Outer Joins
This is one of those topics that does actually remain the same in between databases and streaming, woohoo! To review:
- An inner join is a join that only includes records when there is a corresponding record on both sides
- A left join is a join that includes all records in the left table (or stream), regardless of if they exist on the right.
- An outer join includes all records that appear in either table, irrespective of whether they exist in the other one.
Now, in order to understand the specific behaviour of the different types of joins in Kafka Streams, we’re going to run an experiment.
Experiment
This is somewhat inspired by the Kafka Streams Join Semantics Confluence page. I did this because we had a pretty particular situation at work that wasn’t quickly resolved by just looking at that page.
The goal of this experiment was to understand a bit better how the different types of joins act in different situations. We’re only going to deal with inner joins due to the number of permutations, but other types of joins can be useful in Kafka Streams, particularly the left join.
Problem & Setup
Imagine we have two topics, left-topic
and right-topic
. The main scenario we’re going to use for testing I’m calling “Many Left Events”.
ts | left-topic | right-topic |
---|---|---|
1 | a | |
2 | b | |
3 | A | |
4 | c | |
5 | d | |
6 | B |
I chose this situation for the following reasons:
- The
left-topic
has multiple events occurring before theright-topic
has its first event. This is the subtlety that led to this rabbit hole in the first place. As you’ll see in the results, it very much affects the behaviour. It’s also not addressed in the documentation. - While the
left-topic
does have a lot of events, I wanted to include two events inright-topic
in case there was some kind of multiplicity that occurs. Sneak peek: there is, in aKStream
<>KStream
join.
Using Strings
A key choice in this experiment is using strings. Integers would’ve been fine too — I just wanted to avoid anything related to Avro to avoid an unrelated area of complexity. The goal here is to analyze joining in its simplest form.
The Value Joiner
In this experiment, we’re going to use the exact same ValueJoiner
as the one they used on the Confluence page linked in the intro.
In practice, it looks something like this.
val valueJoiner: (String, String) -> String = { leftValue: String?, rightValue: String? ->
"$leftValue - $rightValue"
}
valueJoiner("a", "b") // "a - b"
Results
We’ll start with the KStream
<> KStream
join. the main thing you need to remember about a KStream
<> KStream
join is that it’s windowed. For the sake of this experiment, I’m making the window sufficiently large to contain all of these events.
KStream <> KStream Join
ts | left-topic | right-topic | Result |
---|---|---|---|
1 | a | ||
2 | b | ||
3 | A | a - A b - A |
|
4 | c | c - A | |
5 | d | d - A | |
6 | B | a - B b - B c - B d - B |
There are two key things to remember about a KStream
<> KStream
join
- It’s windowed. Only events that fall within the same window will ever be joined. For a more thorough overview of window the different types of windows, go here.
- Within the window, it acts as a cartesian product.
Quick Review of Cartesian Product
A cartesian product is a set operation. It’s most clear through an example. A standard deck of cards is the cross product between two sets:
val values = ["Ace", "King", "Queen", "Jack", "10", "9", "8", "7", "6", "5", "4", "3", "2"]
val suits = ["♠", "♥", "♦", "♣"]
val deck = mutableListOf<String>()
// this is a cartesian product
suits.forEach { suit ->
values.forEach { value ->
deck.add("$value of $suit")
}
}
// [Ace of ♠, King of ♠, Queen of ♠, Jack of ♠, 10 of ♠, 9 of ♠, 8 of ♠, 7 of ♠, 6 of ♠, 5 of ♠, 4 of ♠, 3 of ♠, 2 of ♠, Ace of ♥, King of ♥, Queen of ♥, Jack of ♥, 10 of ♥, 9 of ♥, 8 of ♥, 7 of ♥, 6 of ♥, 5 of ♥, 4 of ♥, 3 of ♥, 2 of ♥, Ace of ♦, King of ♦, Queen of ♦, Jack of ♦, 10 of ♦, 9 of ♦, 8 of ♦, 7 of ♦, 6 of ♦, 5 of ♦, 4 of ♦, 3 of ♦, 2 of ♦, Ace of ♣, King of ♣, Queen of ♣, Jack of ♣, 10 of ♣, 9 of ♣, 8 of ♣, 7 of ♣, 6 of ♣, 5 of ♣, 4 of ♣, 3 of ♣, 2 of ♣]
The fact that a KStream
<> KStream
join is a cartesian product gives some insight as to why it must be windowed — because for some new matching record, it creates an event for each matching record from the other topic. This means it has to keep all the events during that window in memory. Given an infinite window, that would mean every message ever published to the two input topics.
KTable <> KTable Join
Now let’s move on to the other symmetric join type - a KTable
<> KTable
join. KTable
<> KTable
joins are designed to be consistent with joins in relational databases.
KTable <> KTable
ts | left-topic | right-topic | Result |
---|---|---|---|
1 | a | ||
2 | b | ||
3 | A | b - A | |
4 | c | c - A | |
5 | d | d - A | |
6 | B | d - B |
Key things to remember about a KTable
<> KTable
join
- It’s not windowed
- The result is a
KTable
, but recall theKStream
KTable
duality. This means you can just do.toStream()
on the result, and you’re effectively looking at the changelog stream of the join table.
KStream <> KTable
In a KStream
<> KTable
join, the result is a KStream
. You can think of this almost as if the KStream
is data coming from a Kafka topic, and the KTable
data just comes from a relational database and enriches the KStream
messages. That’s not far from what the idea of a KTable
is—it’s just a materialized table from a topic that lives in an application, not a table in an actual database.
Because of this, the KStream
is providing the actual events. The KTable
is just their to enrich those events.
KStream <> KTable
ts | left-topic | right-topic | Result |
---|---|---|---|
1 | a | ||
2 | b | ||
3 | A | ||
4 | c | c - A | |
5 | d | d - A | |
6 | B |
That’s probably a lot less resulting events than you imagined. There’s two key facts that we can conclude about a KStream
<> KTable
join.
- If a message isn’t present in the
KTable
for a correspondingKStream
event, no resulting event occurs. - Changes to the
KTable
do not fire events in and of themselves.
Now, if it was critical that an event in the KStream
caused an event to fire even when a message with that key wasn’t present in the KTable
, we could use a KStream.leftJoin(KTable)
.
If we wanted just an event for the last event in the left-topic
(that would be b
), we could use a KTable.join(KTable).toStream()
.
Recall that in the changelog of a KTable
<> KTable
join we get events fired for changes to either KTable
, joined with the current state of the other.
What about KStream
<> GlobalKTable
?
Our situation really isn’t designed to test this well. We would just get the same results we got in KStream
<> KTable
.
This type of join shines in a different set of circumstances, namely, when data isn’t co-partitioned or when you when you want to get away from the joining requirements placed on message keys.
So what type of join should I use?
If you’re stuck on this problem (like I was), I would encourage you to do the following
Define the common scenarios for your use case
Our scenario was special because it had two messages published to left-topic
before any arrived in right-topic
, and that caused some behaviour we had to work around, particularly if we wanted to use the KStream
<> KTable
join.
Define the desired behaviour
I didn’t say so at the outset of the experiment, but the behavior I was really looking for was this:
ts | left-topic | right-topic | Result |
---|---|---|---|
1 | a | ||
2 | b | ||
3 | A | a - A b - A |
|
4 | c | c - A | |
5 | d | d - A | |
6 | B |
Now that I’ve run all the experiments and analyzed the behaviour of the join types, I see a few problems with this.
I wanted events a - A
, b - A
to fire, but nothing to happen when d - B
fired. That’s a contradiction. So in my use case, we had to change the design a little bit to fit into what’s possible with a join in Kafka Streams.
Conclusion
Joining streams is harder than joining tables, because they likely weren’t produced with joining in mind, and time is involved.
Some of the key concepts are
- The stream table duality
- Windowing
- Inner, left, & outer joins
And if you get stuck in a situation where you’re not understanding the behaviour of a given join, or you’re not sure which one to use, I suggest you run an experiment similar to the one we did in this article.
P.S. I learned in writing this that the joining of two rivers is called a confluence. Sound familiar?