Yesterday I was struggling with auto.offset.reset configuration property for communication with Confluent Kafka Rest Proxy. I was using curl command to create Kafka consumer
curl -X POST -d '{"name": "instance1","format": "avro", "auto.offset.reset": "earliest", "auto.commit.enable": false}' -H "Content-Type: application/vnd.kafka.json.v2+json" https://myhost:8082/consumers/TEST -E certificates/Test.keystore.cert.pem --key certificates/Test.keystore.key.pem -k
and was receiving successful response
{"instance_id":"instance1","base_uri":"https://myhost:8082/consumers/TEST/instances/instance1"}
Then I tried to rewrite curl logic to Python code using requests module.
payload = '{"name": "instance1","format": "avro", "auto.offset.reset": "earliest", "auto.commit.enable": false}'
resp = requests.post('https://myhost:8082/consumers/TEST',
payload,
cert=('certificates/Test.keystore.cert.pem',
'certificates/Test.keystore.key.pem'),
verify=False)
app.logger.debug('Create consumer response status code[%s], body[%s], headers[%s]', resp.status_code, resp.content, resp.headers)
But I started to receive error
Invalid consumer configuration: Wrong value earliest of auto.offset.reset in ConsumerConfi
g; Valid values are smallest and largest
Create consumer response status code[422], body[b'{"error_code":42204,"message":"Invalid consumer configuration: Wrong value earliest of auto.offset.reset in ConsumerConfi g; Valid values are smallest and largest"}'], headers[{'Date': 'Wed, 01 Apr 2020 12:43:54 GMT', 'Content-Type': 'application/vnd.kafka.v1+json', 'Content-Length': '163', 'Server': 'Jetty(9.4.18.v20190429)'}]
I was confused. The same code but different results. I did a some googling but couldn’t find the reason. I came across Stackoverflow post which indicated that “If you use Kafka version older than 0.9, you have to replace earliest
, latest
with smallest
,largest
“. Still strange. Which version of Kafka do I use? Can I use two versions at the same time and some Kafka Rest Proxy loadbalancer forwards HTTP requests to different versions? Or maybe some compatibility mode is turned on Kafka? It turned out that I forgot to add Content-Type header in my Python code. I made a fix
headers = {'content-type': 'application/vnd.kafka.json.v2+json'}
payload = '{"name": "instance1","format": "avro", "auto.offset.reset": "earliest", "auto.commit.enable": false}'
resp = requests.post('https://myhost:8082/consumers/TEST',
payload,
cert=('certificates/Test.keystore.cert.pem',
'certificates/Test.keystore.key.pem'
),
verify=False,
headers=headers)
app.logger.debug('Create consumer response status code[%s], body[%s], headers[%s]', resp.status_code, resp.content, resp.headers)
and started to receive 200 response. Apparently, if not specified then Content-Type application/vnd.kafka.v1+json is used. I changed Content-Type header for curl method to application/vnd.kafka.v1+json
curl -X POST -d '{"name": "instance1","format": "avro", "auto.offset.reset": "earliest", "auto.commit.enable": false}' -H "Content-Type: application/vnd.kafka.json.v1+json" https://myhost:8082/consumers/TEST -E certificates/Test.keystore.cert.pem --key certificates/Test.keystore.key.pem -k
and got error
{"error_code":42204,"message":"Invalid consumer configuration: Wrong value earliest of auto.offset.reset in ConsumerConfig; Valid values are smallest and largest"}
On the other hand mixing application/vnd.kafka.json.v2+json with smallest or largest value for auto.offset.reset
curl -X POST -d '{"name": "instance1","format": "avro", "auto.offset.reset": "smallest", "auto.commit.enable": false}' -H "Content-Type: application/vnd.kafka.json.v2+json" https://myhost:8082/consumers/TEST -E certificates/Test.keystore.cert.pem --key certificates/Test.keystore.key.pem -k
ends with error
{"error_code":42204,"message":"Invalid consumer configuration: Invalid value smallest for configuration auto.offset.reset: String must be one of: latest, earliest, none"}
Summary
Allowed values for auto.offset.reset property depend on REST Proxy API version.
API version | Content-Type | Allowed values |
v1 | application/vnd.kafka.json.v1+json | smallest, largest |
v2 | application/vnd.kafka.json.v2+json | latest, earliest, none |