20 December 2016

This post is written using Apache Kafka 0.8.2

Creating a Apache Kafka client is a pretty straight-forward and prescriptive endeavor. What is not straight-forward or even expected is the behavior of the Iterator that is used to poll a Apache Kafka topic/partition for messages. More on this in a moment. First, let’s look at the typical setup to consume data from a Apache Kafka stream (for the sake of keeping this post brief, I am going to skip the details around creating and configuring a ConsumerConnector):

final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
    consumerConnector.createMessageStreams(Collections.singletonMap("topic",1));
final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("topic");
final ConsumerIterator<byte[],byte[]> = streams.get(0).iterator();‍‍‍‍‍‍‍‍

With the ConsumerIterator in hand, the next step is to poll the Iterator for incoming messages:

while(iterator.hasNext()) {
    MessageAndMetadata<byte[], byte[]> message = iterator.next();
    ...
}‍‍‍‍

‍ ‍This all seems pretty simple. Now, back to the issue with this code: the expectation is that this would check the Iterator for a message and if not present loop immediately and check again (standard Iterator behavior). However, this is not the case. The behavior of the ConsumerIterator is actually controlled by the consumer.timeout.ms configuration setting. This setting controls whether or not the Iterator “throw(s) a timeout exception to the consumer if no message is available for consumption after the specified interval”. By default, this value is set to -1, which means that the call to hasNext() will block indefinitely until a message is available on the topic/partition assigned to the consumer. The Java documentation for the Iterator interface does not specify whether or not the hasNext() method is allowed to block indefinitely, so its hard to say that the ConsumerIterator is violating the contract. However, this is certainly not the behavior anyone use to using the Iterator pattern in Java would expect, as collections typically don’t block until data is available in the data structure. If the consumer.timeout.ms configuration setting is set to a positive value, the consumption code would need to be modified to handle a ConsumerTimeoutException:

while(active) {
    try {
        if(iterator.hasNext()) {
            MessageAndMetadata<byte[],byte[]> message = iterator.next();
            ...
        }
    } catch(ConsumerTimeoutException e) {
        // Do nothing -- this means no data is available on the topic/partition
    }
}‍‍‍‍‍‍‍‍‍‍

‍ Now, the call to hasNext() will behave more like an Iterator retrieved from a collection, which is to say it will not block indefinitely. It is recommended that you do some testing to determine an acceptable timeout value to avoid looping too frequently, as this will cause an increase in CPU utilization by the loop. It is also worth noting that the Kafka documentation does not directly link the configuration setting and the ConsumerIterator and this issue would most likely go unnoticed in scenarios where data is consistently available to the client. In any case, this issue highlights the need to take a deeper look at any API or library you include in your application in order to ensure that you understand exactly how it works and what performance impacts it may have on the execution of your code.

comments powered by Disqus