[Bug]: Running Kafka sample example fails
What happened?
What actually happened, including error codes if applies.
Running sample example of kafka in below link fails on execution
https://github.com/treeverse/lakeFS-samples/tree/main/01_standalone_examples/kafka
Steps to Reproduce:
- docker compose --profile local-lakefs up
- Open Jupyter UI http://localhost:8890/ in your web browser. Open "Kafka Streaming Demo" notebook from Jupyter UI and start running the cells When we try to create test_connector in kafka connect it fails with following logs
[2025-08-15 21:10:47,451] ERROR [Worker clientId=connect-1, groupId=connect-group] Failed to start connector 'test_connector' (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
org.apache.kafka.connect.errors.ConnectException: Failed to start connector: test_connector
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$startConnector$25(DistributedHerder.java:1461)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:334)
at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:140)
at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:117)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to transition connector test_connector to state STARTED
... 8 more
Caused by: org.apache.kafka.common.config.ConfigException: timezone configuration must be set when using rotate.schedule.interval.ms
at io.confluent.connect.s3.S3SinkConnectorConfig.validateTimezone(S3SinkConnectorConfig.java:693)
at io.confluent.connect.s3.S3SinkConnectorConfig.<init>(S3SinkConnectorConfig.java:685)
at io.confluent.connect.s3.S3SinkConnectorConfig.<init>(S3SinkConnectorConfig.java:671)
at io.confluent.connect.s3.S3SinkConnector.start(S3SinkConnector.java:59)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:184)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:209)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:348)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:331)
... 7 more (edited)
! curl -s http://connect:8083/connectors/test_connector/status
{"name":"test_connector","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"connect:8083","trace":"org.apache.kafka.connect.errors.ConnectException: com.amazonaws.services.s3.model.AmazonS3Exception: This operation is not supported in LakeFS (Service: Amazon S3; Status Code: 405; Error Code: ERRLakeFSNotSupported; Request ID: 38a50271-eef1-4a54-a3d2-cbdffb69b50d; S3 Extended Request ID: 244B7112FEE1340C; Proxy: null), S3 Extended Request ID: 244B7112FEE1340C\n\tat io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:138)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:312)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: com.amazonaws.services.s3.model.AmazonS3Exception: This operation is not supported in LakeFS (Service: Amazon S3; Status Code: 405; Error Code: ERRLakeFSNotSupported; Request ID: 38a50271-eef1-4a54-a3d2-cbdffb69b50d; S3 Extended Request ID: 244B7112FEE1340C; Proxy: null), S3 Extended Request ID: 244B7112FEE1340C\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)\n\tat com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)\n\tat com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)\n\tat com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5445)\n\tat com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5392)\n\tat com.amazonaws.services.s3.AmazonS3Client.getAcl(AmazonS3Client.java:4050)\n\tat com.amazonaws.services.s3.AmazonS3Client.getBucketAcl(AmazonS3Client.java:1273)\n\tat com.amazonaws.services.s3.AmazonS3Client.getBucketAcl(AmazonS3Client.java:1263)\n\tat com.amazonaws.services.s3.AmazonS3Client.doesBucketExistV2(AmazonS3Client.java:1401)\n\tat io.confluent.connect.s3.storage.S3Storage.bucketExists(S3Storage.java:197)\n\tat io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:112)\n\t... 8 more\n"}],"type":"sink"}
Produce some streaming data¶
for e in range(100):
producer.send('quickstart', bytes(f"message-{e}", 'utf-8'))
sleep(3)
producer.flush()
Read streaming data sinked to lakeFS repo
dataPath = f"s3a://{repo_name}/{streaming_branch}/ingest/quickstart/partition=0/"
print(f"Reading ingested data from {dataPath}")
df = spark.read.csv(dataPath).withColumnRenamed("_c0","data")
df.show()
Reading ingested data from s3a://kafka-stream-demo/streaming_20250815T203546/ingest/quickstart/partition=0/
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
Cell In[32], line 3
1 dataPath = f"[s3a://{repo_name}/{streaming_branch}/ingest/quickstart/partition=0/](s3a://{repo_name}/%7Bstreaming_branch%7D/ingest/quickstart/partition=0/)"
2 print(f"Reading ingested data from {dataPath}")
----> 3 df = spark.read.csv(dataPath).withColumnRenamed("_c0","data")
4 df.show()
File /usr/local/spark/python/pyspark/sql/readwriter.py:535, in DataFrameReader.csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup, modifiedBefore, modifiedAfter, unescapedQuoteHandling)
533 if type(path) == list:
534 assert self._spark._sc._jvm is not None
--> 535 return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
536 elif isinstance(path, RDD):
538 def func(iterator):
File /usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
1315 command = proto.CALL_COMMAND_NAME +\
1316 self.command_header +\
1317 args_command +\
1318 proto.END_COMMAND_PART
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1324 for temp_arg in temp_args:
1325 temp_arg._detach()
File /usr/local/spark/python/pyspark/sql/utils.py:196, in capture_sql_exception.<locals>.deco(*a, **kw)
192 converted = convert_exception(e.java_exception)
193 if not isinstance(converted, UnknownException):
194 # Hide where the exception came from that shows a non-Pythonic
195 # JVM exception message.
--> 196 raise converted from None
197 else:
198 raise
AnalysisException: Path does not exist: s3a://kafka-stream-demo/streaming_20250815T203546/ingest/quickstart/partition=0
Expected behavior
Creation of test-connector should just work
lakeFS version
No response
How lakeFS is installed
Installed via docker-compose as mentioned above
Affected clients
No response
Relevant log output
Shared above
Contact details
No response
I tested this Kafka sample with latest lakeFS v1.65.2. It used to work fine with very old version. Kafka Connect is making getAcl call to lakeFS and failing. Here are the logs:
{
"name": "test_connector",
"connector": {
"state": "RUNNING",
"worker_id": "connect:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "connect:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: com.amazonaws.services.s3.model.AmazonS3Exception: This operation is not supported in LakeFS (Service: Amazon S3; Status Code: 405; Error Code: ERRLakeFSNotSupported; Request ID: 781cd3ce-a05d-4f2e-af75-2f01a5294a38; S3 Extended Request ID: B04FD60EDB27F4FA; Proxy: null), S3 Extended Request ID: B04FD60EDB27F4FA
at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:138)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:312)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: com.amazonaws.services.s3.model.AmazonS3Exception: This operation is not supported in LakeFS (Service: Amazon S3; Status Code: 405; Error Code: ERRLakeFSNotSupported; Request ID: 781cd3ce-a05d-4f2e-af75-2f01a5294a38; S3 Extended Request ID: B04FD60EDB27F4FA; Proxy: null), S3 Extended Request ID: B04FD60EDB27F4FA
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5445)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5392)
at com.amazonaws.services.s3.AmazonS3Client.getAcl(AmazonS3Client.java:4050)
at com.amazonaws.services.s3.AmazonS3Client.getBucketAcl(AmazonS3Client.java:1273)
at com.amazonaws.services.s3.AmazonS3Client.getBucketAcl(AmazonS3Client.java:1263)
at com.amazonaws.services.s3.AmazonS3Client.doesBucketExistV2(AmazonS3Client.java:1401)
at io.confluent.connect.s3.storage.S3Storage.bucketExists(S3Storage.java:197)
at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:112)
... 8 more\n"
}
],
"type": "sink"
}
So, I ran Kafka Connect in DEBUG mode and found additional info (kafka-stream-demo is the repo name):
[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "GET /kafka-stream-demo/?acl HTTP/1.1[\r][\n]" (org.apache.http.wire)
[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "Host: lakefs:8000[\r][\n]" (org.apache.http.wire)
[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "amz-sdk-invocation-id: 313c4c2a-d33a-3ea2-316f-13da93970136[\r][\n]" (org.apache.http.wire)
[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "amz-sdk-request: attempt=1;max=4[\r][\n]" (org.apache.http.wire)
[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "amz-sdk-retry: 0/0/500[\r][\n]" (org.apache.http.wire)
[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "Authorization: AWS4-HMAC-SHA256 Credential=AKIAIOSFOLKFSSAMPLES/20250821/us-east-1/s3/aws4_request, SignedHeaders=amz-sdk-invocation-id;amz-sdk-request;amz-sdk-retry;content-type;host;user-agent;x-amz-content-sha256;x-amz-date, Signature=d3c0e297b765c4511125cb5924c9ddb7c55071f38d33b2a6ffc099b5b4a4b998[\r][\n]" (org.apache.http.wire)
[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "Content-Type: application/octet-stream[\r][\n]" (org.apache.http.wire)
[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "User-Agent: APN/1.0 Confluent/1.0 KafkaS3Connector/10.0.5, aws-sdk-java/1.11.1015 Linux/6.10.14-linuxkit OpenJDK_64-Bit_Server_VM/11.0.18+10-LTS java/11.0.18 scala/2.13.10 kotlin/1.6.0 vendor/Azul_Systems,_Inc. cfg/retry-mode/legacy[\r][\n]" (org.apache.http.wire)
[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "x-amz-content-sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855[\r][\n]" (org.apache.http.wire)
[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "X-Amz-Date: 20250821T173101Z[\r][\n]" (org.apache.http.wire)
[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "Content-Length: 0[\r][\n]" (org.apache.http.wire)
[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "Connection: Keep-Alive[\r][\n]" (org.apache.http.wire)
[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "[\r][\n]" (org.apache.http.wire)
[2025-08-21 17:31:02,008] DEBUG http-outgoing-0 << "HTTP/1.1 405 Method Not Allowed[\r][\n]" (org.apache.http.wire)
[2025-08-21 17:31:02,008] DEBUG http-outgoing-0 << "Content-Type: application/xml[\r][\n]" (org.apache.http.wire)
[2025-08-21 17:31:02,008] DEBUG http-outgoing-0 << "X-Amz-Request-Id: 208f671b-d20c-4e3f-9ef7-9a2262dc5396[\r][\n]" (org.apache.http.wire)
[2025-08-21 17:31:02,008] DEBUG http-outgoing-0 << "Date: Thu, 21 Aug 2025 17:31:02 GMT[\r][\n]" (org.apache.http.wire)
[2025-08-21 17:31:02,008] DEBUG http-outgoing-0 << "Content-Length: 287[\r][\n]" (org.apache.http.wire)
[2025-08-21 17:31:02,008] DEBUG http-outgoing-0 << "[\r][\n]" (org.apache.http.wire)
[2025-08-21 17:31:02,008] DEBUG http-outgoing-0 << "<?xml version="1.0" encoding="UTF-8"?>[\n]" (org.apache.http.wire)
[2025-08-21 17:31:02,008] DEBUG http-outgoing-0 << "<Error><Code>ERRLakeFSNotSupported</Code><Message>This operation is not supported in LakeFS</Message><Resource></Resource><Region>us-east-1</Region><RequestId>208f671b-d20c-4e3f-9ef7-9a2262dc5396</RequestId><HostId>33895F07ACCAA729</HostId></Error>" (org.apache.http.wire)
[2025-08-21 17:31:02,009] DEBUG http-outgoing-0 << HTTP/1.1 405 Method Not Allowed (org.apache.http.headers)
This issue started with lakeFS v1.3.1.
@ACodingfreak This sample works fine with lakeFS v1.3.0. While we are working to resolve the issue, I changed Kafka demo temporarily to use lakeFS v1.3.0. Please git pull latest samples and try it again.
This PR might be connected https://github.com/treeverse/lakeFS/pull/7028
@kesarwam is it still relevant? after #7028 fix?