Recently I have been playing around integrating RPA Blue Prism with Confluent Kafka. Blue Prism process was consuming messages from Kafka topic using Kafka Rest Proxy. To achieve it I used build in Utility – HTTP business object with HTTP Request action.

Reading messages from Kafka REST Proxy consists of 5 steps
1. Creating consumer instance
HTTP POST with body
{
"name": "instance1",
"format": "avro",
"auto.offset.reset": "earliest",
"auto.commit.enable": false
}
to URL
https://kafka-rest-url:8082/consumers/BLPRSMauto.commit.enable is set to false as we don’t want to commit offsets immediately.
2. Subscribing to topic
HTTP POST with body
{
"topics": [
"my-topic"
]
}
to URL
https://kafka-rest-url:8082/consumers/BLPRSM/instances/instance1/subscription
3. Polling messages
HTTP GET is called in a loop to fetch messages
https://kafka-rest-url:8082/consumers/BLPRSM/instances/instance1/records?timeout=5000
4. Committing offsets
Once we are sure that Blue Prism processed all Kafka message successfully we are ready to mark Kafka messages as consumed (commit offsets) HTTP POST with no body is sent to URL
https://kafka-rest-url:8082/consumers/BLPRSM/instances/instance1/offsets
5. Deleting consumer instance
Kafka consumer should be deleted for cleanup. HTTP DELETE is sent to URL
https://kafka-rest-url:8082/consumers/BLPRSM/instances/instance1
Loop from step 3 was executed as long as there were new messages on Kafka. Everything was working fine until I executed performance tests. It turned out that if I produce huge number (more than 1 thousand) of Kafka messages it takes a lot of time for Blue Prism to consume them. Committing offsets from step 4 was failing because Kafka consumer session expired.
I decided to limit number of iterations in the loop from step 3 and use max_bytes query parameter.
https://kafka-rest-url:8082/consumers/BLPRSM/instances/instance1/records?timeout=5000&max_bytes=30000We can find out from Kafka docs that
max_bytes – The maximum number of bytes of unencoded keys and values that should be included in the response. This provides approximate control over the size of responses and the amount of memory required to store the decoded response.
I limited number of Kafka messages consumed in a single Blue Prism process run to ~500. But then there was a problem of committing Kafka offsets. Offsets were supposed to move by e.g 500 to mark 500 messages as consumed. But offsets changed by 600 instead. I lost 100 not consumed messages! It turns out that it is known Confluent Kafka bug which is not yet fixed.
Luckily we may specify custom offsets in commit offset POST body
{
"offsets": [
{
"topic": "my-topic",
"partition": 0,
"offset": 555
},
{
"topic": "my-topic",
"partition": 1,
"offset": 550
},
{
"topic": "my-topic",
"partition": 2,
"offset": 565
}
]
}
And it works like a charm! I had to loop over Kafka messages from a single batch to find offsets for each partition which is not so simple if you cannot use programming language but Blue Prism blocks instead.
Hello Konrad,
I am doing a project where i have to consume Topics, i have two options either i will be using blue prisms consumer and producer avro VBO or i can make use of REST API. Since you already have implemented the REST API can you suggest which approach is better ?