kafka-connect-bigquery icon indicating copy to clipboard operation
kafka-connect-bigquery copied to clipboard

Using the connector with Json-based topic

Open ofirsharony opened this issue 7 years ago • 20 comments

Hi guys,

I've successfully used the connector to stream data from an avro-based topic to BigQuery. Does the connector support streaming Json-based topics as well?

Thanks.

ofirsharony avatar May 08 '17 09:05 ofirsharony

I believe so, but I've not given it a try personally. I think @C0urante set things up so that kcbq-confluent is a separate package from kcbq-connector (which has the core connector stuff).

That said, I think you'll have to implement a SchemaRetriever to make the connector work. With the Confluent implementation, the schema is fetched from the Avro schema registry. Without a schema registry, you'll have to get the proper schema for a given table from somewhere (or you'll have to manually create the tables or something).

@C0urante and @mtagle might have more thoughts on this.

Please post your findings. I think you might be the first to use JSON with this. :) Would be good to get feedback.

criccomini avatar May 08 '17 15:05 criccomini

Believe @criccomini has it right. Theoretically (I have to qualify this since I haven't actually seen it tested yet and things may have changed since the last time I touched this project), all you'd have to worry about would be A) ensuring the necessary tables were created somehow (either by hand or via the SchemaRetriever API) and B) making sure that the JSON data flowing into the connector matched the format of the table(s) it would be flowing into.

C0urante avatar May 08 '17 15:05 C0urante

@ofirsharony any update here?

criccomini avatar Jun 16 '17 16:06 criccomini

@criccomini Not at the moment. I've decided to work only with Avro-based topics for now, saving on storage and using its schema evolution.

ofirsharony avatar Jun 17 '17 05:06 ofirsharony

I just looked in to this. I don't think it's as simple creating the table manually and configuring the connector to disable schemas.

i.e. the following config:

    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.shemas.enable=false
    value.converter.schemas.enable=false

resulted in this exception for me:

at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:165)
at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.getRecordRow(BigQuerySinkTask.java:143)
at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:51)
at com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:69)
Caused by: java.lang.NullPointerException

Looking at that code it seems like the expectation of a schema is pretty baked in.

natb1 avatar Mar 09 '18 21:03 natb1

Tried something similar and faced a different exception Although in my case I have a string field and a timestamp and aparently the timestamp is the problem

[2018-06-13 13:48:26,639] ERROR WorkerSinkTask{id=bigquery-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
java.lang.NoSuchMethodError: org.apache.kafka.connect.sink.SinkRecord.<init>(Ljava/lang/String;ILorg/apache/kafka/connect/data/Schema;Ljava/lang/Object;Lorg/apache/kafka/connect/data/Schema;Ljava/lang/Object;JLjava/lang/Long;Lorg/apache/kafka/common/record/TimestampType;L
java/lang/Iterable;)V
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:471)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

tkaymak avatar Jun 13 '18 11:06 tkaymak

@james-woods , your issue sounds like you have conflicting versions of Kafka JARs in your classpath.

criccomini avatar Jun 13 '18 16:06 criccomini

Hey, any updates on this? I am trying to do something similar in #174

rohinrohin avatar Jul 15 '19 12:07 rohinrohin

any update about this issue? I face the same problem. Hopefully, there is a good solution to solve this.

bmd-benitaclarissa avatar Aug 06 '19 09:08 bmd-benitaclarissa

I've successfully used the connector to stream data from an avro-based topic to BigQuery. Does the connector support streaming Json-based topics as well?

Could you please lend a struggling person a hand here in achieving the connection? I have been trying for 2 weeks to connect Mysql>Debezium>Kafka>BigQuery Sink Connector>GBQ I manage to start the Mysql>Debezium connector and it is working, but when I start the BigQuery Sink Connector it instantly break. After checking the status I get:

org.apache.kafka.connect.errors.ConnectException: Exception encountered while trying to fetch latest schema metadata from Schema Registry\n\tat com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRe gistrySchemaRetriever.retrieveSchema(SchemaRegistrySchemaRetriever.java:67)\n\tat com.wepay.kafka.connect.bigquery. SchemaManager.createTable(SchemaManager.java:48)\n\tat com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.ensur eExistingTables(BigQuerySinkConnector.java:113)\n\tat com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.ensure ExistingTables(BigQuerySinkConnector.java:136)\ ........ etc

kmillanr avatar Sep 06 '19 14:09 kmillanr

Hey, I don't think it works with JSON. What you can do, which is what I did as well, is to launch a KSQL Server, to convert the JSON data into AVRO and store it in another topic. Once configured, it is really easy, KSQL takes care of the schema creation for you as well. Hope this helps.

PedroEFLourenco avatar Sep 06 '19 14:09 PedroEFLourenco

I will instantly try this approach, thank you for your response! will get back to you as soon as I try it out. Any specific documentation you would recommend in order to do the conversion?

kmillanr avatar Sep 06 '19 14:09 kmillanr

Yep, inside KSQL, query wise, this is all you need: https://www.confluent.io/stream-processing-cookbook/ksql-recipes/changing-data-serialization-format-json-avro

The biggest trouble I had was configuring the server... lots of silly issues connecting to schema registry through SSL. My final and functional set of properties is the following: bootstrap.servers security.protocol=SSL ssl.keystore.type=PKCS12 ssl.truststore.location ssl.truststore.password ssl.keystore.location ssl.keystore.password ksql.schema.registry.url listeners=http://localhost:8083 ksql.schema.registry.ssl.truststore.location ksql.schema.registry.ssl.truststore.password ksql.schema.registry.ssl.keystore.location ksql.schema.registry.ssl.keystore.password ksql.schema.registry.basic.auth.credentials.source=USER_INFO ksql.schema.registry.basic.auth.user.info ksql.schema.registry.security.protocol=SSL ksql.schema.registry.ssl.keystore.type=PKCS12 ksql.key.converter=io.confluent.connect.avro.AvroConverter ksql.value.converter=io.confluent.connect.avro.AvroConverter ksql.internal.key.converter=org.apache.kafka.connect.json.JsonConverter ksql.internal.value.converter=org.apache.kafka.connect.json.JsonConverter ksql.internal.key.converter.schemas.enable=false ksql.internal.value.converter.schemas.enable=false ksql.streams.auto.offset.reset=earliest

PedroEFLourenco avatar Sep 06 '19 15:09 PedroEFLourenco

Yep, inside KSQL, query wise, this is all you need: https://www.confluent.io/stream-processing-cookbook/ksql-recipes/changing-data-serialization-format-json-avro

The biggest trouble I had was configuring the server... lots of silly issues connecting to schema registry through SSL. My final and functional set of properties is the following: bootstrap.servers security.protocol=SSL ssl.keystore.type=PKCS12 ssl.truststore.location ssl.truststore.password ssl.keystore.location ssl.keystore.password ksql.schema.registry.url listeners=http://localhost:8083 ksql.schema.registry.ssl.truststore.location ksql.schema.registry.ssl.truststore.password ksql.schema.registry.ssl.keystore.location ksql.schema.registry.ssl.keystore.password ksql.schema.registry.basic.auth.credentials.source=USER_INFO ksql.schema.registry.basic.auth.user.info ksql.schema.registry.security.protocol=SSL ksql.schema.registry.ssl.keystore.type=PKCS12 ksql.key.converter=io.confluent.connect.avro.AvroConverter ksql.value.converter=io.confluent.connect.avro.AvroConverter ksql.internal.key.converter=org.apache.kafka.connect.json.JsonConverter ksql.internal.value.converter=org.apache.kafka.connect.json.JsonConverter ksql.internal.key.converter.schemas.enable=false ksql.internal.value.converter.schemas.enable=false ksql.streams.auto.offset.reset=earliest

Hey Pedro, took me a while to get the hang of KSQL. After printing the topic in KSQL it showed the format was already AVRO... still getting the schema registry error. This is very strange for me given that when I look into the documentation (I'm very new to kafka) the Confluent Platform’s Schema Registry tool, "allows schemas for data to be stored and updated with configurable compatibility settings (forwards, backwards, both, none)." My understanding is that I shouldn't have to specify the schemas of a given table. So why am I getting this error... any light?

kmillanr avatar Sep 10 '19 15:09 kmillanr

Yep, inside KSQL, query wise, this is all you need: https://www.confluent.io/stream-processing-cookbook/ksql-recipes/changing-data-serialization-format-json-avro The biggest trouble I had was configuring the server... lots of silly issues connecting to schema registry through SSL. My final and functional set of properties is the following: bootstrap.servers security.protocol=SSL ssl.keystore.type=PKCS12 ssl.truststore.location ssl.truststore.password ssl.keystore.location ssl.keystore.password ksql.schema.registry.url listeners=http://localhost:8083 ksql.schema.registry.ssl.truststore.location ksql.schema.registry.ssl.truststore.password ksql.schema.registry.ssl.keystore.location ksql.schema.registry.ssl.keystore.password ksql.schema.registry.basic.auth.credentials.source=USER_INFO ksql.schema.registry.basic.auth.user.info ksql.schema.registry.security.protocol=SSL ksql.schema.registry.ssl.keystore.type=PKCS12 ksql.key.converter=io.confluent.connect.avro.AvroConverter ksql.value.converter=io.confluent.connect.avro.AvroConverter ksql.internal.key.converter=org.apache.kafka.connect.json.JsonConverter ksql.internal.value.converter=org.apache.kafka.connect.json.JsonConverter ksql.internal.key.converter.schemas.enable=false ksql.internal.value.converter.schemas.enable=false ksql.streams.auto.offset.reset=earliest

Btw, the properties you showed me, where can I set them? I am using solely the command line for everything at this point. The last thing I would ask is about schema definition. So according to the link you sent me I would have to define it, the thing is my company has over 40 thousand tables, would I have to define a schema for each one?

kmillanr avatar Sep 10 '19 15:09 kmillanr

Hey, you can put the properties in a .properties file, and pass them into KSQL by starting it like this: ./ksql-server-start properties_file.properties

Regarding schemas, not sure on how you'd do that. Maybe you need some development on your side to analyze data and derive schemas from it.

I still think KSQL has schema creation capabilities based on this for instance: https://docs.confluent.io/current/ksql/docs/installation/server-config/avro-schema.html

But I am not 100% sure it will match the level of independence you need.

PedroEFLourenco avatar Sep 10 '19 23:09 PedroEFLourenco

Hey, you can put the properties in a .properties file, and pass them into KSQL by starting it like this: ./ksql-server-start properties_file.properties

Regarding schemas, not sure on how you'd do that. Maybe you need some development on your side to analyze data and derive schemas from it.

I still think KSQL has schema creation capabilities based on this for instance: https://docs.confluent.io/current/ksql/docs/installation/server-config/avro-schema.html

But I am not 100% sure it will match the level of independence you need.

Hey man, thanks for the response. Actually you are right, been doing some research and yes, ksql is supposed to do all that for you. Now, I've been looking into stream creation. One more question, how do I save my ksql stream into a new topic? Is that what you were referring to in your first response?

kmillanr avatar Sep 10 '19 23:09 kmillanr

Yes, that's exactly it. Step 2 from the link I posted in my first response. :)

PedroEFLourenco avatar Sep 11 '19 00:09 PedroEFLourenco

Got it, will give it a shot tomorrow and get back to you. thx

kmillanr avatar Sep 11 '19 00:09 kmillanr

Finally got it to work, thank you Pedro! but now I'm facing another issue. When adding a new table into mysql (producer) kafka creates the new topic, and I see it listed, however, the table is not created in BigQuery? Is there a way so that, when I create a new table in the source it automatically sends the new topic/table to the consumer?

kmillanr avatar Sep 19 '19 14:09 kmillanr