kafka-connect-bigquery
kafka-connect-bigquery copied to clipboard
Unauthorized; error code: 401 when using schema registry basic auth
Issue: when using confluent cloud hosted schema registry users are getting the following error:
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
The reason for this is an old version used:
https://github.com/wepay/kafka-connect-bigquery/blob/1.2.0/build.gradle#L15
ioConfluentVersion = '5.1.1'
In the above version we didn't have the basic auth to the Schema registry, with this in mind the REST call will drop the authentication method and we will forever get error code 401.
To resolve this, you can recompile the code using the README steps, with simply using 5.3.1 in the file mentioned above.
To fix this we should PR a higher version to the above code.
https://github.com/wepay/kafka-connect-bigquery/pull/209
@MosheBlumbergX this can be closed now, right?
Hey guys, is this resolved?
We use a hosted schema registry that supports basic auth, which we're passing in through the URL, but we're seeing
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
@alexhwoods it's difficult to debug just based on that message. Could you supply a complete log and your connector config?
of course!
{
"name": "alex-clay-testing-1",
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"topics": "outbound-orders.public.sales_order_request",
"topicsToTables": "\\.=$2",
"project": "alex-clay-test-1",
"datasets": ".*=testing1",
"schemaRetriever": "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
"credentials": "<service-account-key>",
"sanitizeTopics": "true",
"autoCreateTables": "true",
"schemaRegistryLocation": "<url-to-schema-registry>",
"autoUpdateSchemas": "true",
"keySource": "JSON"
}
And when we hit the REST endpoint checking the status of the connector, we get:
curl <kafka-connect-rest-url>/connectors/alex-clay-testing-1/status
{
"name": "alex-clay-testing-1",
"connector": {
"state": "FAILED",
"worker_id": "<worker-id>:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Exception while fetching latest schema metadata for topic=outbound-orders.public.sales_order_request, subject=outbound-orders.public.sales_order_request-value\n\tat com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever.retrieveSchema(SchemaRegistrySchemaRetriever.java:72)\n\tat com.wepay.kafka.connect.bigquery.SchemaManager.createTable(SchemaManager.java:48)\n\tat com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.ensureExistingTables(BigQuerySinkConnector.java:117)\n\tat com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.ensureExistingTables(BigQuerySinkConnector.java:140)\n\tat com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.start(BigQuerySinkConnector.java:159)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:110)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:135)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:195)\n\tat org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:257)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1183)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.processConnectorConfigUpdates(DistributedHerder.java:541)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:388)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:282)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')\n at [Source: sun.net.www.protocol.http.HttpURLConnection$HttpInputStream@4a049f1a; line: 1, column: 2]; error code: 50005\n\tat io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:209)\n\tat io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:235)\n\tat io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:444)\n\tat io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:436)\n\tat io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:209)\n\tat com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever.retrieveSchema(SchemaRegistrySchemaRetriever.java:68)\n\t... 17 more\n"
},
"tasks": [],
"type": "sink"
}
with the key line there being:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
That's why I think it's having trouble connecting to the schema registry. The reason I think this might be the case is the unexpected character is the start of HTML, and the only thing I've seen return content types of HTML from the schema registry is the following response.
<html><body><h1>401 Unauthorized</h1>You need a valid user and password to access this content.</body></html>
And with our schema registry, when you just do
curl <url>/subjects/<subject>/versions
with the basic auth in the URL it works fine, which is why I'm confused
We got this working by digging into the code. There are undocumented config options that made it work
"schemaRegistryClient.basic.auth.credentials.source": "USER_INFO",
"schemaRegistryClient.basic.auth.user.info": "username:password",