Committing custom offsets to Kafka

Recently I have been playing around integrating RPA Blue Prism with Confluent Kafka. Blue Prism process was consuming messages from Kafka topic using Kafka Rest Proxy. To achieve it I used build in Utility – HTTP business object with HTTP Request action.

Reading messages from Kafka REST Proxy consists of 5 steps

1. Creating consumer instance
HTTP POST with body

{
  "name": "instance1",
  "format": "avro",
  "auto.offset.reset": "earliest",
  "auto.commit.enable": false
}

to URL

https://kafka-rest-url:8082/consumers/BLPRSM
auto.commit.enable is set to false as we don’t want to commit offsets immediately.

2. Subscribing to topic
HTTP POST with body

{
  "topics": [
    "my-topic"
  ]
}

to URL

https://kafka-rest-url:8082/consumers/BLPRSM/instances/instance1/subscription

3. Polling messages
HTTP GET is called in a loop to fetch messages

https://kafka-rest-url:8082/consumers/BLPRSM/instances/instance1/records?timeout=5000

4. Committing offsets
Once we are sure that Blue Prism processed all Kafka message successfully we are ready to mark Kafka messages as consumed (commit offsets) HTTP POST with no body is sent to URL

https://kafka-rest-url:8082/consumers/BLPRSM/instances/instance1/offsets

5. Deleting consumer instance
Kafka consumer should be deleted for cleanup. HTTP DELETE is sent to URL

https://kafka-rest-url:8082/consumers/BLPRSM/instances/instance1


Loop from step 3 was executed as long as there were new messages on Kafka. Everything was working fine until I executed performance tests. It turned out that if I produce huge number (more than 1 thousand) of Kafka messages it takes a lot of time for Blue Prism to consume them. Committing offsets from step 4 was failing because Kafka consumer session expired.
I decided to limit number of iterations in the loop from step 3 and use max_bytes query parameter.

https://kafka-rest-url:8082/consumers/BLPRSM/instances/instance1/records?timeout=5000&max_bytes=30000
We can find out from Kafka docs that

max_bytes – The maximum number of bytes of unencoded keys and values that should be included in the response. This provides approximate control over the size of responses and the amount of memory required to store the decoded response.

I limited number of Kafka messages consumed in a single Blue Prism process run to ~500. But then there was a problem of committing Kafka offsets. Offsets were supposed to move by e.g 500 to mark 500 messages as consumed. But offsets changed by 600 instead. I lost 100 not consumed messages! It turns out that it is known Confluent Kafka bug which is not yet fixed.
Luckily we may specify custom offsets in commit offset POST body

{
  "offsets": [
    {
      "topic": "my-topic",
      "partition": 0,
      "offset": 555
    },
    {
      "topic": "my-topic",
      "partition": 1,
      "offset": 550
    },
    {
      "topic": "my-topic",
      "partition": 2,
      "offset": 565
    }
  ]
}

And it works like a charm! I had to loop over Kafka messages from a single batch to find offsets for each partition which is not so simple if you cannot use programming language but Blue Prism blocks instead.

Leave a Comment