Setting auto.offset.reset for Kafka Rest Proxy

Yesterday I was struggling with auto.offset.reset configuration property for communication with Confluent Kafka Rest Proxy. I was using curl command to create Kafka consumer

curl -X POST -d '{"name": "instance1","format": "avro", "auto.offset.reset": "earliest", "auto.commit.enable": false}' -H "Content-Type: application/vnd.kafka.json.v2+json" https://myhost:8082/consumers/TEST -E certificates/Test.keystore.cert.pem --key certificates/Test.keystore.key.pem -k

and was receiving successful response

{"instance_id":"instance1","base_uri":"https://myhost:8082/consumers/TEST/instances/instance1"}

Then I tried to rewrite curl logic to Python code using requests module.

    payload = '{"name": "instance1","format": "avro", "auto.offset.reset": "earliest", "auto.commit.enable": false}'
    resp = requests.post('https://myhost:8082/consumers/TEST',
                         payload,
                         cert=('certificates/Test.keystore.cert.pem',
                               'certificates/Test.keystore.key.pem'),
                         verify=False)
    app.logger.debug('Create consumer response status code[%s], body[%s], headers[%s]', resp.status_code, resp.content, resp.headers)

But I started to receive error

Invalid consumer configuration: Wrong value earliest of auto.offset.reset in ConsumerConfi
g; Valid values are smallest and largest
Create consumer response status code[422], body[b'{"error_code":42204,"message":"Invalid consumer configuration: Wrong value earliest of auto.offset.reset in ConsumerConfi
g; Valid values are smallest and largest"}'], headers[{'Date': 'Wed, 01 Apr 2020 12:43:54 GMT', 'Content-Type': 'application/vnd.kafka.v1+json', 'Content-Length': '163', 'Server': 'Jetty(9.4.18.v20190429)'}]

I was confused. The same code but different results. I did a some googling but couldn’t find the reason. I came across Stackoverflow post which indicated that “If you use Kafka version older than 0.9, you have to replace earliestlatest with smallest,largest“. Still strange. Which version of Kafka do I use? Can I use two versions at the same time and some Kafka Rest Proxy loadbalancer forwards HTTP requests to different versions? Or maybe some compatibility mode is turned on Kafka? It turned out that I forgot to add Content-Type header in my Python code. I made a fix

    headers = {'content-type': 'application/vnd.kafka.json.v2+json'}
    payload = '{"name": "instance1","format": "avro", "auto.offset.reset": "earliest", "auto.commit.enable": false}'
    resp = requests.post('https://myhost:8082/consumers/TEST',
                         payload,
                         cert=('certificates/Test.keystore.cert.pem',
                               'certificates/Test.keystore.key.pem'
                               ),
                         verify=False,
                         headers=headers)
    app.logger.debug('Create consumer response status code[%s], body[%s], headers[%s]', resp.status_code, resp.content, resp.headers)

and started to receive 200 response. Apparently, if not specified then Content-Type application/vnd.kafka.v1+json is used. I changed Content-Type header for curl method to application/vnd.kafka.v1+json

curl -X POST -d '{"name": "instance1","format": "avro", "auto.offset.reset": "earliest", "auto.commit.enable": false}' -H "Content-Type: application/vnd.kafka.json.v1+json" https://myhost:8082/consumers/TEST -E certificates/Test.keystore.cert.pem --key certificates/Test.keystore.key.pem -k

and got error

{"error_code":42204,"message":"Invalid consumer configuration: Wrong value earliest of auto.offset.reset in ConsumerConfig; Valid values are smallest and largest"}

On the other hand mixing application/vnd.kafka.json.v2+json with smallest or largest value for auto.offset.reset

curl -X POST -d '{"name": "instance1","format": "avro", "auto.offset.reset": "smallest", "auto.commit.enable": false}' -H "Content-Type: application/vnd.kafka.json.v2+json" https://myhost:8082/consumers/TEST -E certificates/Test.keystore.cert.pem --key certificates/Test.keystore.key.pem -k

ends with error

{"error_code":42204,"message":"Invalid consumer configuration: Invalid value smallest for configuration auto.offset.reset: String must be one of: latest, earliest, none"}

Summary

Allowed values for auto.offset.reset property depend on REST Proxy API version.

API versionContent-TypeAllowed values
v1application/vnd.kafka.json.v1+jsonsmallest, largest
v2application/vnd.kafka.json.v2+json latest, earliest, none

Leave a Comment