lakeFS icon indicating copy to clipboard operation
lakeFS copied to clipboard

[Bug]: Running Kafka sample example fails

Open ACodingfreak opened this issue 4 months ago • 4 comments

What happened?

What actually happened, including error codes if applies.

Running sample example of kafka in below link fails on execution

https://github.com/treeverse/lakeFS-samples/tree/main/01_standalone_examples/kafka

Steps to Reproduce:

  1. docker compose --profile local-lakefs up
  2. Open Jupyter UI http://localhost:8890/ in your web browser. Open "Kafka Streaming Demo" notebook from Jupyter UI and start running the cells When we try to create test_connector in kafka connect it fails with following logs
[2025-08-15 21:10:47,451] ERROR [Worker clientId=connect-1, groupId=connect-group] Failed to start connector 'test_connector' (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
org.apache.kafka.connect.errors.ConnectException: Failed to start connector: test_connector
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$startConnector$25(DistributedHerder.java:1461)
        at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:334)
        at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:140)
        at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:117)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to transition connector test_connector to state STARTED
        ... 8 more
Caused by: org.apache.kafka.common.config.ConfigException: timezone configuration must be set when using rotate.schedule.interval.ms
        at io.confluent.connect.s3.S3SinkConnectorConfig.validateTimezone(S3SinkConnectorConfig.java:693)
        at io.confluent.connect.s3.S3SinkConnectorConfig.<init>(S3SinkConnectorConfig.java:685)
        at io.confluent.connect.s3.S3SinkConnectorConfig.<init>(S3SinkConnectorConfig.java:671)
        at io.confluent.connect.s3.S3SinkConnector.start(S3SinkConnector.java:59)
        at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:184)
        at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:209)
        at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:348)
        at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:331)
        ... 7 more (edited) 

! curl -s http://connect:8083/connectors/test_connector/status

{"name":"test_connector","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"connect:8083","trace":"org.apache.kafka.connect.errors.ConnectException: com.amazonaws.services.s3.model.AmazonS3Exception: This operation is not supported in LakeFS (Service: Amazon S3; Status Code: 405; Error Code: ERRLakeFSNotSupported; Request ID: 38a50271-eef1-4a54-a3d2-cbdffb69b50d; S3 Extended Request ID: 244B7112FEE1340C; Proxy: null), S3 Extended Request ID: 244B7112FEE1340C\n\tat io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:138)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:312)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: com.amazonaws.services.s3.model.AmazonS3Exception: This operation is not supported in LakeFS (Service: Amazon S3; Status Code: 405; Error Code: ERRLakeFSNotSupported; Request ID: 38a50271-eef1-4a54-a3d2-cbdffb69b50d; S3 Extended Request ID: 244B7112FEE1340C; Proxy: null), S3 Extended Request ID: 244B7112FEE1340C\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)\n\tat com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)\n\tat com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)\n\tat com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5445)\n\tat com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5392)\n\tat com.amazonaws.services.s3.AmazonS3Client.getAcl(AmazonS3Client.java:4050)\n\tat com.amazonaws.services.s3.AmazonS3Client.getBucketAcl(AmazonS3Client.java:1273)\n\tat com.amazonaws.services.s3.AmazonS3Client.getBucketAcl(AmazonS3Client.java:1263)\n\tat com.amazonaws.services.s3.AmazonS3Client.doesBucketExistV2(AmazonS3Client.java:1401)\n\tat io.confluent.connect.s3.storage.S3Storage.bucketExists(S3Storage.java:197)\n\tat io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:112)\n\t... 8 more\n"}],"type":"sink"}

Produce some streaming data

for e in range(100):
    producer.send('quickstart', bytes(f"message-{e}", 'utf-8'))

sleep(3)
producer.flush()

Read streaming data sinked to lakeFS repo

dataPath = f"s3a://{repo_name}/{streaming_branch}/ingest/quickstart/partition=0/"
print(f"Reading ingested data from {dataPath}")
df = spark.read.csv(dataPath).withColumnRenamed("_c0","data")
df.show()
Reading ingested data from s3a://kafka-stream-demo/streaming_20250815T203546/ingest/quickstart/partition=0/

---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
Cell In[32], line 3
      1 dataPath = f"[s3a://{repo_name}/{streaming_branch}/ingest/quickstart/partition=0/](s3a://{repo_name}/%7Bstreaming_branch%7D/ingest/quickstart/partition=0/)"
      2 print(f"Reading ingested data from {dataPath}")
----> 3 df = spark.read.csv(dataPath).withColumnRenamed("_c0","data")
      4 df.show()

File /usr/local/spark/python/pyspark/sql/readwriter.py:535, in DataFrameReader.csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup, modifiedBefore, modifiedAfter, unescapedQuoteHandling)
    533 if type(path) == list:
    534     assert self._spark._sc._jvm is not None
--> 535     return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    536 elif isinstance(path, RDD):
    538     def func(iterator):

File /usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /usr/local/spark/python/pyspark/sql/utils.py:196, in capture_sql_exception.<locals>.deco(*a, **kw)
    192 converted = convert_exception(e.java_exception)
    193 if not isinstance(converted, UnknownException):
    194     # Hide where the exception came from that shows a non-Pythonic
    195     # JVM exception message.
--> 196     raise converted from None
    197 else:
    198     raise

AnalysisException: Path does not exist: s3a://kafka-stream-demo/streaming_20250815T203546/ingest/quickstart/partition=0

Expected behavior

Creation of test-connector should just work

lakeFS version

No response

How lakeFS is installed

Installed via docker-compose as mentioned above

Affected clients

No response

Relevant log output

Shared above

Contact details

No response

ACodingfreak avatar Aug 19 '25 16:08 ACodingfreak

I tested this Kafka sample with latest lakeFS v1.65.2. It used to work fine with very old version. Kafka Connect is making getAcl call to lakeFS and failing. Here are the logs:

{
  "name": "test_connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "connect:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "connect:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: com.amazonaws.services.s3.model.AmazonS3Exception: This operation is not supported in LakeFS (Service: Amazon S3; Status Code: 405; Error Code: ERRLakeFSNotSupported; Request ID: 781cd3ce-a05d-4f2e-af75-2f01a5294a38; S3 Extended Request ID: B04FD60EDB27F4FA; Proxy: null), S3 Extended Request ID: B04FD60EDB27F4FA
      at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:138)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:312)
      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      at java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: com.amazonaws.services.s3.model.AmazonS3Exception: This operation is not supported in LakeFS (Service: Amazon S3; Status Code: 405; Error Code: ERRLakeFSNotSupported; Request ID: 781cd3ce-a05d-4f2e-af75-2f01a5294a38; S3 Extended Request ID: B04FD60EDB27F4FA; Proxy: null), S3 Extended Request ID: B04FD60EDB27F4FA
      at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
      at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
      at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
      at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
      at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
      at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
      at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
      at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
      at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
      at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
      at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
      at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5445)
      at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5392)
      at com.amazonaws.services.s3.AmazonS3Client.getAcl(AmazonS3Client.java:4050)
      at com.amazonaws.services.s3.AmazonS3Client.getBucketAcl(AmazonS3Client.java:1273)
      at com.amazonaws.services.s3.AmazonS3Client.getBucketAcl(AmazonS3Client.java:1263)
      at com.amazonaws.services.s3.AmazonS3Client.doesBucketExistV2(AmazonS3Client.java:1401)
      at io.confluent.connect.s3.storage.S3Storage.bucketExists(S3Storage.java:197)
      at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:112)
      ... 8 more\n"
    }
  ],
  "type": "sink"
}

So, I ran Kafka Connect in DEBUG mode and found additional info (kafka-stream-demo is the repo name):

[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "GET /kafka-stream-demo/?acl HTTP/1.1[\r][\n]" (org.apache.http.wire)

[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "Host: lakefs:8000[\r][\n]" (org.apache.http.wire)

[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "amz-sdk-invocation-id: 313c4c2a-d33a-3ea2-316f-13da93970136[\r][\n]" (org.apache.http.wire)

[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "amz-sdk-request: attempt=1;max=4[\r][\n]" (org.apache.http.wire)

[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "amz-sdk-retry: 0/0/500[\r][\n]" (org.apache.http.wire)

[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "Authorization: AWS4-HMAC-SHA256 Credential=AKIAIOSFOLKFSSAMPLES/20250821/us-east-1/s3/aws4_request, SignedHeaders=amz-sdk-invocation-id;amz-sdk-request;amz-sdk-retry;content-type;host;user-agent;x-amz-content-sha256;x-amz-date, Signature=d3c0e297b765c4511125cb5924c9ddb7c55071f38d33b2a6ffc099b5b4a4b998[\r][\n]" (org.apache.http.wire)

[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "Content-Type: application/octet-stream[\r][\n]" (org.apache.http.wire)

[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "User-Agent: APN/1.0 Confluent/1.0 KafkaS3Connector/10.0.5, aws-sdk-java/1.11.1015 Linux/6.10.14-linuxkit OpenJDK_64-Bit_Server_VM/11.0.18+10-LTS java/11.0.18 scala/2.13.10 kotlin/1.6.0 vendor/Azul_Systems,_Inc. cfg/retry-mode/legacy[\r][\n]" (org.apache.http.wire)

[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "x-amz-content-sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855[\r][\n]" (org.apache.http.wire)

[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "X-Amz-Date: 20250821T173101Z[\r][\n]" (org.apache.http.wire)

[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "Content-Length: 0[\r][\n]" (org.apache.http.wire)

[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "Connection: Keep-Alive[\r][\n]" (org.apache.http.wire)

[2025-08-21 17:31:02,007] DEBUG http-outgoing-0 >> "[\r][\n]" (org.apache.http.wire)

[2025-08-21 17:31:02,008] DEBUG http-outgoing-0 << "HTTP/1.1 405 Method Not Allowed[\r][\n]" (org.apache.http.wire)

[2025-08-21 17:31:02,008] DEBUG http-outgoing-0 << "Content-Type: application/xml[\r][\n]" (org.apache.http.wire)

[2025-08-21 17:31:02,008] DEBUG http-outgoing-0 << "X-Amz-Request-Id: 208f671b-d20c-4e3f-9ef7-9a2262dc5396[\r][\n]" (org.apache.http.wire)

[2025-08-21 17:31:02,008] DEBUG http-outgoing-0 << "Date: Thu, 21 Aug 2025 17:31:02 GMT[\r][\n]" (org.apache.http.wire)

[2025-08-21 17:31:02,008] DEBUG http-outgoing-0 << "Content-Length: 287[\r][\n]" (org.apache.http.wire)

[2025-08-21 17:31:02,008] DEBUG http-outgoing-0 << "[\r][\n]" (org.apache.http.wire)

[2025-08-21 17:31:02,008] DEBUG http-outgoing-0 << "<?xml version="1.0" encoding="UTF-8"?>[\n]" (org.apache.http.wire)

[2025-08-21 17:31:02,008] DEBUG http-outgoing-0 << "<Error><Code>ERRLakeFSNotSupported</Code><Message>This operation is not supported in LakeFS</Message><Resource></Resource><Region>us-east-1</Region><RequestId>208f671b-d20c-4e3f-9ef7-9a2262dc5396</RequestId><HostId>33895F07ACCAA729</HostId></Error>" (org.apache.http.wire)

[2025-08-21 17:31:02,009] DEBUG http-outgoing-0 << HTTP/1.1 405 Method Not Allowed (org.apache.http.headers)

kesarwam avatar Aug 21 '25 17:08 kesarwam

This issue started with lakeFS v1.3.1.

@ACodingfreak This sample works fine with lakeFS v1.3.0. While we are working to resolve the issue, I changed Kafka demo temporarily to use lakeFS v1.3.0. Please git pull latest samples and try it again.

kesarwam avatar Aug 21 '25 19:08 kesarwam

This PR might be connected https://github.com/treeverse/lakeFS/pull/7028

guy-har avatar Sep 04 '25 13:09 guy-har

@kesarwam is it still relevant? after #7028 fix?

nopcoder avatar Dec 10 '25 14:12 nopcoder