Monday, August 3, 2020

Using Kafka Streams Interactive Queries to Peek Inside of your KTables

Recently we found a bug in one of our Kafka Streams applications, and as I was looking into it, I found that we had a
Stream -> Table left join that was failing. This didn't make sense, as every indication was that the data, with the correct key, should have been in the KTable at the time that the join was attempted.

So, I set out to verify that. It was easy to see what was in the stream, but I was struggling to figure out how to see what was in the table. Using Kafkacat, I could see that the data was in the underlying topic, but I needed to see that in context with the KTable at runtime.

That's when I turned to the helpful geniuses on the Confluent Community Slack group. On there, someone suggested that I use an interactive query.

Now, to some, this might be a no-brainer, but I am still somewhat new to Kafka and had never used interactive queries. But there's a first time for everything, so I dug into it.

I guess I shouldn’t have been surprised by how easy it was, but Kafka Streams never ceases to amaze me. The following bit of code is all it took to give me a view inside my KTable:

Let's walk through this code.

(Line 3) First off I needed to get a hold of the KafkaStreams instance, in order to access the state store.

Since the bit of topology that I’m working on is in a different Java class from the one where the stream is created and launched, I have to make a call to get it.

(Line 4) To access the state store, I needed its name, so I called queryableStoreName() on the KTable.

(Line 5) Now I can get a hold of the state store itself, in the form of a ReadOnlyKeyValueStore, using KafkaStream's store() method.

(Line 6) To see all of the records in the store, I used a KeyValueIterator that is returned from the store.all() method.

(Line 7-10) For each record, I print the key and value, and then, on line 11, I close the state store.

I bundled that all up in a handy method called queryKTableStore().

Now I was able to add a peek() statement, calling this method, to my topology, right before the leftJoin that was failing.

That gave me output like this: key: 10001 Widget: {id:10001, name: Winding Widget, price: 299.95}
key: 10002 Widget: {id:10002, name: Whining Widget, price: 199.95}
key: 10003 Widget: {id:10003, name: Wonkey Widget, price: 499.95}

And of course, the key I was trying to join on, 10004, was not in the store, which means that it was not in the KTable. I added another peek() call after the failed join attempt, and now the output was more like this:

key: 10001 Widget: {id:10001, name: Winding Widget, price: 299.95}
key: 10002 Widget: {id:10002, name: Whining Widget, price: 199.95}
key: 10003 Widget: {id:10003, name: Wonkey Widget, price: 499.95}
key: 10004 Widget: {id:10004, name: Wonder Widget, price: 999.95}

Now it's there! Mystery solved! I have a timing problem on my hands... which is another mystery, but one for a different post. For now, I just wanted to point out this simple and powerful feature of Kafka Streams.

Before leaving I also wanted to point out that that the ReadOnlyKeyValueStore is limited to one application instance. In my case, running locally, I only had one instance, but in a distributed environment, things could get more complicated. Also, ReadOnlyKeyValueStore has another method for accessing data by key, if you already know the key. store.get(key) will return the value if it exists for that key. Of course there is more you can do and you can learn more about it in the Developer's Guide