zerocode
zerocode copied to clipboard
Kafka - KSQL testing in action
Open issue- https://github.com/confluentinc/ksql/issues/2126
@authorjapps please can you describe what your issue is? And provide details about your environment—versions, deployment approach e.g. Docker or not, etc.
Hello @rmoff mate, thanks for coming back on this.
Issue-1
I faced the similar issue while bringing up ksql-cli
. See below error message-
$ docker exec -it compose_ksql-cli_1 bash
root@692642cfa0cb:/# ksql
===========================================
= _ __ _____ ____ _ =
= | |/ // ____|/ __ \| | =
= | ' /| (___ | | | | | =
= | < \___ \| | | | | =
= | . \ ____) | |__| | |____ =
= |_|\_\_____/ \___\_\______| =
= =
= Streaming SQL Engine for Apache Kafka® =
===========================================
Copyright 2017-2018 Confluent Inc.
CLI v5.1.0, Server v<unknown> located at http://localhost:8088
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
**************** ERROR ********************
Remote server address may not be valid.
Address: http://localhost:8088
Error issuing GET to KSQL server. path:/info
Caused by: java.net.ConnectException: Connection refused (Connection refused)
Caused by: Could not connect to the server.
*******************************************
ksql>
ksql> list topics;
Error issuing POST to KSQL server. path:ksql
Caused by: java.net.ConnectException: Connection refused (Connection refused)
Caused by: Could not connect to the server.
ksql> exit
Exiting KSQL.
But at the same time list topics
(or show topics
), passed if directly queried from ksql-server
via http, See the output below ( test steps dsl are here ).
***Step PASSED:Consume via KSQL query->load_kafka
2019-01-17 21:32:40,305 [main] INFO
--------- TEST-STEP-CORRELATION-ID: 0f91dca4-7300-4a7c-83fd-7056c8225c6f ---------
*requestTimeStamp:2019-01-17T21:32:38.621
step:load_kafka
url:kafka-topic:demo-ksql
method:load
request:
{
"records" : [ {
"key" : "1547760758613",
"value" : "Hello, Created for KSQL demo"
} ]
}
--------- TEST-STEP-CORRELATION-ID: 0f91dca4-7300-4a7c-83fd-7056c8225c6f ---------
Response:
{
"status" : "Ok",
"recordMetadata" : {
"offset" : 0,
"timestamp" : 1547760760264,
"serializedKeySize" : 13,
"serializedValueSize" : 34,
"topicPartition" : {
"hash" : 749715182,
"partition" : 0,
"topic" : "demo-ksql"
}
}
}
*responseTimeStamp:2019-01-17T21:32:40.298
*Response delay:1677.0 milli-secs
---------> Assertion: <----------
{
"status" : "Ok"
}
-done-
2019-01-17 21:32:40,306 [main] INFO org.jsmart.zerocode.core.runner.ZeroCodeMultiStepsScenarioRunnerImpl -
Overridden the header key:Accept, with value:application/vnd.ksql.v1+json
***Step PASSED:Consume via KSQL query->ksql_show_topics
--------- TEST-STEP-CORRELATION-ID: 5f9ea949-1f37-41b8-a157-6d69b334c023 ---------
*requestTimeStamp:2019-01-17T21:32:40.306
step:ksql_show_topics
url:http://localhost:8088/ksql
method:POST
request:
{
"headers" : {
"Content-Type" : "application/vnd.ksql.v1+json; charset=utf-8",
"Accept" : "application/vnd.ksql.v1+json"
},
"body" : {
"ksql" : "SHOW TOPICS;",
"streamsProperties" : { }
}
}
--------- TEST-STEP-CORRELATION-ID: 5f9ea949-1f37-41b8-a157-6d69b334c023 ---------
Response:
{
"status" : 200,
"headers" : {
"Date" : [ "Thu, 17 Jan 2019 21:32:40 GMT" ],
"Server" : [ "Jetty(9.4.12.v20180830)" ],
"Transfer-Encoding" : [ "chunked" ],
"Vary" : [ "Accept-Encoding, User-Agent" ],
"Content-Type" : [ "application/vnd.ksql.v1+json" ]
},
"body" : [ {
"@type" : "kafka_topics",
"statementText" : "SHOW TOPICS;",
"topics" : [ {
"name" : "_schemas",
"registered" : false,
"replicaInfo" : [ 1 ],
"consumerCount" : 0,
"consumerGroupCount" : 0
}, {
"name" : "demo-ksql",
"registered" : false,
"replicaInfo" : [ 1 ],
"consumerCount" : 0,
"consumerGroupCount" : 0
} ]
} ]
}
*responseTimeStamp:2019-01-17T21:32:40.670
*Response delay:364.0 milli-secs
---------> Assertion: <----------
{
"status" : 200,
"body" : [ {
"topics.SIZE" : "$GT.0",
"topics[?(@.name=='demo-ksql')].registered.SIZE" : 1
} ]
}
-done-
The schema registry, bootstrap server configs are below-
kafka.bootstrap.servers=localhost:9092
kafka.producer.properties=kafka_servers/kafka_producer_avro.properties
kafka.consumer.properties=kafka_servers/kafka_consumer_avro.properties
# Kafka REST Proxy end point for sending avro messages
web.application.endpoint.host=http://localhost
web.application.endpoint.port=8082
web.application.endpoint.context=
# URL of Kafka KSQL server
kafka-ksql-server-fqdn=http://localhost:8088
The test case is below to run directly.
@TargetEnv("kafka_servers/kafka_test_server_avro.properties")
@RunWith(ZeroCodeUnitRunner.class)
public class KafkaKsqlTest {
...
}
Note- But it seems like, the issue has been closed at your end, but I haven't has a chance to have a look at it with the fresh instructions you have provided. Will keep you posted here once I rerun.
Issue-2
When I directly do from the ksql-server via http call it throws error.
print 'demo-ksql' from beginning; java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord
Is it because of of non-json
content?
See log here:
--------- TEST-STEP-CORRELATION-ID: 34ce0a62-8811-4a3b-8892-9eb3d5cf1d7c ---------
*requestTimeStamp:2019-01-17T22:01:15.180
step:ksql_print_records
url:http://localhost:8088/query
method:POST
request:
{
"headers" : {
"Content-Type" : "application/vnd.ksql.v1+json; charset=utf-8",
"Accept" : "application/vnd.ksql.v1+json"
},
"body" : {
"ksql" : "print 'demo-ksql' from beginning;",
"streamsProperties" : { }
}
}
--------- TEST-STEP-CORRELATION-ID: 34ce0a62-8811-4a3b-8892-9eb3d5cf1d7c ---------
Response:
{
"status" : 200,
"headers" : {
"Date" : [ "Thu, 17 Jan 2019 22:01:15 GMT" ],
"Server" : [ "Jetty(9.4.12.v20180830)" ],
"Transfer-Encoding" : [ "chunked" ],
"Vary" : [ "Accept-Encoding, User-Agent" ],
"Content-Type" : [ "application/vnd.ksql.v1+json" ]
},
"rawBody" : "java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord\n"
}
*responseTimeStamp:2019-01-17T22:01:16.057
*Response delay:877.0 milli-secs
---------> Assertion: <----------
{
"status" : 200,
"body" : "$NOT.NULL"
}
-done-
java.lang.RuntimeException: Assertion failed for :-
[Consume via KSQL query]
|
|
+---Step --> [ksql_print_records]
Failures:
---------
Assertion path '$.body' with actual value 'null' did not match the expected value 'NOT NULL'
Note- I haven't tried this via ksql-cli, will update you here after trying.
The test step which was executed for the above result is as below-
{
"name": "ksql_print_records",
"url": "${kafka-ksql-server-fqdn}/query",
"operation": "POST",
"request": {
"headers": {
"Content-Type": "application/vnd.ksql.v1+json; charset=utf-8",
"Accept": "application/vnd.ksql.v1+json"
},
"body": {
"ksql": "print 'demo-ksql' from beginning;",
"streamsProperties": {}
}
},
"assertions": {
"status": 200,
"body": "$NOT.NULL"
}
}
Re. the CLI, you need to provide the KSQL Server address for it to connect to. So based on https://github.com/authorjapps/zerocode/blob/master/docker/compose/kafka-schema-registry.yml your test should be invoked ksql http://ksql-server:8088
, not just ksql
.
For java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord
check the KSQL server log; it sounds like there may be something up with the message payload. Delimited (CSV), JSON, and Avro are all supported.
@authorjapps wrote: Thanks a lot. That worked 👍 . Issue-1 sorted.
root@692642cfa0cb:/# ksql http://ksql-server:8088 CLI v5.1.0, Server v5.1.0 located at http://ksql-server:8088
ksql> list topics;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
-----------------------------------------------------------------------------------------
_schemas | false | 1 | 1 | 0 | 0
demo-ksql | false | 1 | 1 | 1 | 1
-----------------------------------------------------------------------------------------
KSQL server log
But the response status code is 200 in the server
[2019-01-18 06:16:16,093] INFO 172.31.0.3 - - [18/Jan/2019:06:16:15 +0000] "POST /query HTTP/1.1" 200 73 573 (io.confluent.rest-utils.requests:60)
[2019-01-18 06:16:10,916] INFO Building AST for list topics;. (io.confluent.ksql.KsqlEngine:323)
[2019-01-18 06:16:10,972] INFO 172.31.0.3 - - [18/Jan/2019:06:16:10 +0000] "POST /ksql HTTP/1.1" 200 265 66 (io.confluent.rest-utils.requests:60)
[2019-01-18 06:16:15,522] INFO Building AST for print 'demo-ksql' from beginning;. (io.confluent.ksql.KsqlEngine:323)
[2019-01-18 06:16:15,558] INFO Printing topic 'demo-ksql' (io.confluent.ksql.rest.server.resources.streaming.StreamedQueryResource:132)
[2019-01-18 06:16:15,579] ERROR Exception encountered while writing to output stream (io.confluent.ksql.rest.server.resources.streaming.TopicStreamWriter:112)
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord
at io.confluent.ksql.rest.server.resources.streaming.TopicStream$Format$2$1.print(TopicStream.java:132)
at io.confluent.ksql.rest.server.resources.streaming.TopicStream$RecordFormatter.lambda$format$1(TopicStream.java:75)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at io.confluent.ksql.rest.server.resources.streaming.TopicStream$RecordFormatter.format(TopicStream.java:82)
at io.confluent.ksql.rest.server.resources.streaming.TopicStreamWriter.write(TopicStreamWriter.java:95)
at org.glassfish.jersey.message.internal.StreamingOutputProvider.writeTo(StreamingOutputProvider.java:79)
at org.glassfish.jersey.message.internal.StreamingOutputProvider.writeTo(StreamingOutputProvider.java:61)
at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$TerminalWriterInterceptor.invokeWriteTo(WriterInterceptorExecutor.java:266)
at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$TerminalWriterInterceptor.aroundWriteTo(WriterInterceptorExecutor.java:251)
at org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:163)
at org.glassfish.jersey.server.internal.JsonWithPaddingInterceptor.aroundWriteTo(JsonWithPaddingInterceptor.java:109)
at org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:163)
at org.glassfish.jersey.server.internal.MappableExceptionWrapperInterceptor.aroundWriteTo(MappableExceptionWrapperInterceptor.java:85)
at org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:163)
at org.glassfish.jersey.message.internal.MessageBodyFactory.writeTo(MessageBodyFactory.java:1135)
at org.glassfish.jersey.server.ServerRuntime$Responder.writeResponse(ServerRuntime.java:662)
at org.glassfish.jersey.server.ServerRuntime$Responder.processResponse(ServerRuntime.java:395)
at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:385)
at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:280)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:272)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:268)
at org.glassfish.jersey.internal.Errors.process(Errors.java:316)
at org.glassfish.jersey.internal.Errors.process(Errors.java:298)
at org.glassfish.jersey.internal.Errors.process(Errors.java:268)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:289)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:256)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:703)
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:416)
at org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:409)
at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:584)
at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:525)
at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:462)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:533)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1595)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1340)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1564)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1242)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:126)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:174)
at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:220)
at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:740)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
at org.eclipse.jetty.server.Server.handle(Server.java:503)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:364)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:260)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:765)
at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:683)
at java.lang.Thread.run(Thread.java:748)
[2019-01-18 06:16:16,093] INFO 172.31.0.3 - - [18/Jan/2019:06:16:15 +0000] "POST /query HTTP/1.1" 200 73 573 (io.confluent.rest-utils.requests:60)
KSQL cli output
root@692642cfa0cb:/# date
Fri Jan 18 06:07:46 UTC 2019
root@692642cfa0cb:/# ksql http://ksql-server:8088
===========================================
= _ __ _____ ____ _ =
= | |/ // ____|/ __ \| | =
= | ' /| (___ | | | | | =
= | < \___ \| | | | | =
= | . \ ____) | |__| | |____ =
= |_|\_\_____/ \___\_\______| =
= =
= Streaming SQL Engine for Apache Kafka® =
===========================================
Copyright 2017-2018 Confluent Inc.
CLI v5.1.0, Server v5.1.0 located at http://ksql-server:8088
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql> list topics;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
-----------------------------------------------------------------------------------------
_schemas | false | 1 | 1 | 0 | 0
demo-ksql | false | 1 | 1 | 1 | 1
-----------------------------------------------------------------------------------------
ksql> print 'demo-ksql' from beginning;
java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord
ksql>
Kafka console-consumer
$ docker exec compose_kafka_1 kafka-console-consumer --bootstrap-server localhost:29092 --topic demo-ksql --from-beginning
8Hello, Created for KSQL demo
8Hello, Created for KSQL demo
Hello World
:::Note:::
I am able to consume via the Java client(via JUnit), perfectly fine, see below(last step has the records, it consumed one records because, it has committed till the previous offset)-
***Step PASSED:Unload - consume a message from kafka->load_kafka
2019-01-18 06:21:01,431 [main] INFO
--------- TEST-STEP-CORRELATION-ID: 31229a85-ece9-486c-b39e-7f5181c2ecc7 ---------
*requestTimeStamp:2019-01-18T06:21:00.808
step:load_kafka
url:kafka-topic:demo-ksql
method:load
request:
{
"records" : [ {
"key" : "1547792460796",
"value" : "Hello World"
} ]
}
--------- TEST-STEP-CORRELATION-ID: 31229a85-ece9-486c-b39e-7f5181c2ecc7 ---------
Response:
{
"status" : "Ok",
"recordMetadata" : {
"offset" : 3,
"timestamp" : 1547792461364,
"serializedKeySize" : 13,
"serializedValueSize" : 11,
"topicPartition" : {
"hash" : 749715182,
"partition" : 0,
"topic" : "demo-ksql"
}
}
}
*responseTimeStamp:2019-01-18T06:21:01.422
*Response delay:614.0 milli-secs
---------> Assertion: <----------
{
"status" : "Ok"
}
-done-
2019-01-18 06:21:01,433 [main] INFO
---------------------------------------------------------
kafka.bootstrap.servers - localhost:9092
---------------------------------------------------------
2019-01-18 06:21:01,456 [main] INFO org.jsmart.zerocode.core.kafka.receive.KafkaReceiver -
### Kafka Consumer Effective configs:ConsumerLocalConfigs{recordType='null', fileDumpTo='target/temp/demo.txt', commitAsync=false, commitSync=true, showRecordsConsumed=true, maxNoOfRetryPollsOrTimeouts=5, pollingTime=1000, seek=null}
2019-01-18 06:21:01,467 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = consumerGroup14
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 2
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
org.jsmart.zerocode.core.kafka.helper.KafkaConsumerHelper -
Record Key - 1547792460796 , Record value - Hello World, Record partition - 0, Record offset - 3
2019-01-18 06:21:10,609 [main] INFO org.jsmart.zerocode.core.runner.StepNotificationHandler -
***Step PASSED:Unload - consume a message from kafka->onload_kafka
--------- TEST-STEP-CORRELATION-ID: 81e13c39-588e-451d-b8e9-f8677c389c1c ---------
*requestTimeStamp:2019-01-18T06:21:01.433
step:onload_kafka
url:kafka-topic:demo-ksql
method:unload
request:
{ }
--------- TEST-STEP-CORRELATION-ID: 81e13c39-588e-451d-b8e9-f8677c389c1c ---------
Response:
{
"records" : [ {
"topic" : "demo-ksql",
"partition" : 0,
"offset" : 3,
"timestamp" : 1547792461364,
"timestampType" : "CREATE_TIME",
"serializedKeySize" : 13,
"serializedValueSize" : 11,
"headers" : {
"headers" : [ ],
"isReadOnly" : false
},
"key" : "1547792460796",
"value" : "Hello World",
"leaderEpoch" : {
"value" : 0
}
} ],
"size" : 1
}
*responseTimeStamp:2019-01-18T06:21:10.607
*Response delay:9174.0 milli-secs
---------> Assertion: <----------
{
"size" : "$GT.0"
}
-done-
2019-01-18 06:21:10,730 [main] INFO org.jsmart.zerocode.core.runner.ZeroCodeUnitRunner -
**FINISHED executing all Steps for [Unload - consume a message from kafka] **.
Steps were:[load_kafka, onload_kafka]
The records were here raw text messages. Please let me know if you want me to try with JSON messages. Happy to share the same workflow logs.
:::Another thing to note:::-
- The
kafka-console-consumer
doesn't print thekey
where the Java client prints it. - Also there are some other weird behaviour I have spotted with this
console-consumer
, happy to share, but not part of this issue, as it is not blocking me.
You can put what you want onto a Kafka topic, and using the Consumer API you can read what you want. KSQL is a higher level abstraction, and as such requires the data to be in a format that it expects, namely, CSV, JSON, or Avro.
However, I can't reproduce the error you're getting with just the data you've shown above
ksql> PRINT 'test' FROM BEGINNING;
Format:STRING
1/18/19 12:22:07 PM UTC , NULL , foo,bar
1/18/19 12:22:29 PM UTC , NULL , Hello world
1/18/19 12:22:58 PM UTC , NULL , 8Hello, Created for KSQL demo
1/18/19 12:23:10 PM UTC , NULL , 8Hello, Created for KSQL demo
1/18/19 12:23:17 PM UTC , NULL , Hello World
What I would suggest is that you either just put a simple JSON message on the topic, or if you want to debug the current error further, use kafkacat
to dump the full message and raise it as a bug over on the KSQL project https://github.com/confluentinc/ksql/issues/new
e.g.
kafkacat -b kafka:29092 -t test -C -f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\nTimestamp: %T\tPartition: %p\tOffset: %o\n--\n'
Key (-1 bytes):
Value (7 bytes): foo,bar
Timestamp: 1547814127770 Partition: 0 Offset: 0
--
Key (-1 bytes):
Value (11 bytes): Hello world
Timestamp: 1547814149934 Partition: 0 Offset: 1
--
Key (-1 bytes):
Value (29 bytes): 8Hello, Created for KSQL demo
Timestamp: 1547814178652 Partition: 0 Offset: 2
--
Key (-1 bytes):
Value (29 bytes): 8Hello, Created for KSQL demo
Timestamp: 1547814190700 Partition: 0 Offset: 3
--
Key (-1 bytes):
Value (11 bytes): Hello World
Timestamp: 1547814197870 Partition: 0 Offset: 4
--
Feel free to head over to http://cnfl.io/slack and the #ksql channel for more help on this if you want.
Thanks mate, it's on the ksql
channel now 👍. Let's see if anyone helps us, otherwise, one of our team folks will raise as a bug in the KSQL if it helps in solving the problem.
I was also thinking, in case of an error response from Kakfa-KSQL server over REST call, 200 doesn't sound alright.
Better would 400(+) or 500(+) to differentiate the call was not succeeded.
Response:
{
"status" : 200,
"headers" : {
"Date" : [ "Thu, 17 Jan 2019 22:01:15 GMT" ],
"Server" : [ "Jetty(9.4.12.v20180830)" ],
"Content-Type" : [ "application/vnd.ksql.v1+json" ]
},
"rawBody" : "java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord\n"
}
I will try with JSON message and come back here on this.
Thanks for your support 🙏
https://github.com/confluentinc/ksql/issues/2386 has been raised now.