Home | All Posts

[28 Mar 2011] .NET Client for Kafka Multi-Request

I made some enhancements to the .NET client for Kafka that were recently accepted into the project mainline. Aside from some basic namespacing reorganization and some improved error handling, I added two major bits: Multi-produce and Multi-fetch batch operations.

The Multi-produce and Multi-fetch operations provide for a way to group up multiple ProducerRequest or FetchRequest objects and send them to Kafka in batch. The Multi-produce works like this:

List<ProducerRequest> producerRequests = new List<ProducerRequest>
    new ProducerRequest("topic-a", 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("1: " + DateTime.UtcNow)) }),
    new ProducerRequest("topic-b", 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("2: " + DateTime.UtcNow)) }),
    new ProducerRequest("topic-z", 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("3: " + DateTime.UtcNow)) }),
    new ProducerRequest("zopic-z", 1, new List<Message> { new Message(Encoding.UTF8.GetBytes("4: " + DateTime.UtcNow)) })

MultiProducerRequest producerRequest = new MultiProducerRequest(producerRequests);
Producer producer = new Producer("localhost", 9092);

Note that the list of producerRequests pushes messages to three different topics and within one of those topic (topic-z), pushes to two different partitions. While the example above pushes a different message to each topic, it would not be hard to imagine a scenario where one might utilize this batching approach to send the same message to multiple topics and/or partitions.

The Multi-fetch works in a similar fashion:

Consumer consumer = new Consumer("localhost", 9092);
MultiFetchRequest request = new MultiFetchRequest(new List<FetchRequest>
    new FetchRequest("topic-a", 0, 0),
    new FetchRequest("topic-b", 0, 0),
    new FetchRequest("tobic-z", 0, 0),
    new FetchRequest("tobic-z", 1, 0)

List<List<Message>> messages = consumer.Consume(request);

The important bit to note in this example is that the Consume method returns a List of a List of Message objects. I’ll refer to that “inner” list as the “message set”. In the example above, there would be four message sets (one for each request), where the index of the request sent to Kafka in the batch, matches to the position in the List of message sets.

Home | All Posts