zerocode icon indicating copy to clipboard operation
zerocode copied to clipboard

Kafka - KSQL testing in action

Open authorjapps opened this issue 5 years ago • 8 comments

Open issue- https://github.com/confluentinc/ksql/issues/2126

authorjapps avatar Jan 13 '19 10:01 authorjapps

@authorjapps please can you describe what your issue is? And provide details about your environment—versions, deployment approach e.g. Docker or not, etc.

rmoff avatar Jan 16 '19 17:01 rmoff

Hello @rmoff mate, thanks for coming back on this.

Issue-1

I faced the similar issue while bringing up ksql-cli. See below error message-

$ docker exec -it compose_ksql-cli_1 bash
root@692642cfa0cb:/# ksql
                  
                  ===========================================
                  =        _  __ _____  ____  _             =
                  =       | |/ // ____|/ __ \| |            =
                  =       | ' /| (___ | |  | | |            =
                  =       |  <  \___ \| |  | | |            =
                  =       | . \ ____) | |__| | |____        =
                  =       |_|\_\_____/ \___\_\______|       =
                  =                                         =
                  =  Streaming SQL Engine for Apache Kafka® =
                  ===========================================

Copyright 2017-2018 Confluent Inc.

CLI v5.1.0, Server v<unknown> located at http://localhost:8088

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!


**************** ERROR ********************
Remote server address may not be valid.
Address: http://localhost:8088
Error issuing GET to KSQL server. path:/info
Caused by: java.net.ConnectException: Connection refused (Connection refused)
Caused by: Could not connect to the server.
*******************************************
ksql> 
ksql> list topics;
Error issuing POST to KSQL server. path:ksql
Caused by: java.net.ConnectException: Connection refused (Connection refused)
Caused by: Could not connect to the server.
ksql> exit
Exiting KSQL.

But at the same time list topics (or show topics), passed if directly queried from ksql-server via http, See the output below ( test steps dsl are here ).

***Step PASSED:Consume via KSQL query->load_kafka
2019-01-17 21:32:40,305 [main] INFO  
--------- TEST-STEP-CORRELATION-ID: 0f91dca4-7300-4a7c-83fd-7056c8225c6f ---------
*requestTimeStamp:2019-01-17T21:32:38.621
step:load_kafka
url:kafka-topic:demo-ksql
method:load
request:
{
  "records" : [ {
    "key" : "1547760758613",
    "value" : "Hello, Created for KSQL demo"
  } ]
} 
--------- TEST-STEP-CORRELATION-ID: 0f91dca4-7300-4a7c-83fd-7056c8225c6f ---------
Response:
{
  "status" : "Ok",
  "recordMetadata" : {
    "offset" : 0,
    "timestamp" : 1547760760264,
    "serializedKeySize" : 13,
    "serializedValueSize" : 34,
    "topicPartition" : {
      "hash" : 749715182,
      "partition" : 0,
      "topic" : "demo-ksql"
    }
  }
}
*responseTimeStamp:2019-01-17T21:32:40.298 
*Response delay:1677.0 milli-secs 
---------> Assertion: <----------
{
  "status" : "Ok"
} 
-done-

2019-01-17 21:32:40,306 [main] INFO  org.jsmart.zerocode.core.runner.ZeroCodeMultiStepsScenarioRunnerImpl - 
Overridden the header key:Accept, with value:application/vnd.ksql.v1+json

***Step PASSED:Consume via KSQL query->ksql_show_topics

--------- TEST-STEP-CORRELATION-ID: 5f9ea949-1f37-41b8-a157-6d69b334c023 ---------
*requestTimeStamp:2019-01-17T21:32:40.306
step:ksql_show_topics
url:http://localhost:8088/ksql
method:POST
request:
{
  "headers" : {
    "Content-Type" : "application/vnd.ksql.v1+json; charset=utf-8",
    "Accept" : "application/vnd.ksql.v1+json"
  },
  "body" : {
    "ksql" : "SHOW TOPICS;",
    "streamsProperties" : { }
  }
} 
--------- TEST-STEP-CORRELATION-ID: 5f9ea949-1f37-41b8-a157-6d69b334c023 ---------
Response:
{
  "status" : 200,
  "headers" : {
    "Date" : [ "Thu, 17 Jan 2019 21:32:40 GMT" ],
    "Server" : [ "Jetty(9.4.12.v20180830)" ],
    "Transfer-Encoding" : [ "chunked" ],
    "Vary" : [ "Accept-Encoding, User-Agent" ],
    "Content-Type" : [ "application/vnd.ksql.v1+json" ]
  },
  "body" : [ {
    "@type" : "kafka_topics",
    "statementText" : "SHOW TOPICS;",
    "topics" : [ {
      "name" : "_schemas",
      "registered" : false,
      "replicaInfo" : [ 1 ],
      "consumerCount" : 0,
      "consumerGroupCount" : 0
    }, {
      "name" : "demo-ksql",
      "registered" : false,
      "replicaInfo" : [ 1 ],
      "consumerCount" : 0,
      "consumerGroupCount" : 0
    } ]
  } ]
}
*responseTimeStamp:2019-01-17T21:32:40.670 
*Response delay:364.0 milli-secs 
---------> Assertion: <----------
{
  "status" : 200,
  "body" : [ {
    "topics.SIZE" : "$GT.0",
    "topics[?(@.name=='demo-ksql')].registered.SIZE" : 1
  } ]
} 
-done-

The schema registry, bootstrap server configs are below-

kafka.bootstrap.servers=localhost:9092
kafka.producer.properties=kafka_servers/kafka_producer_avro.properties
kafka.consumer.properties=kafka_servers/kafka_consumer_avro.properties
# Kafka REST Proxy end point for sending avro messages
web.application.endpoint.host=http://localhost
web.application.endpoint.port=8082
web.application.endpoint.context=
# URL of Kafka KSQL server
kafka-ksql-server-fqdn=http://localhost:8088

The test case is below to run directly.

  • Link to JUnit test case is here
  • JSON test case with steps is here
  • Docker file with ksql-cli is here
@TargetEnv("kafka_servers/kafka_test_server_avro.properties")
@RunWith(ZeroCodeUnitRunner.class)
public class KafkaKsqlTest {
 ...
}

Note- But it seems like, the issue has been closed at your end, but I haven't has a chance to have a look at it with the fresh instructions you have provided. Will keep you posted here once I rerun.

Issue-2

When I directly do from the ksql-server via http call it throws error.

print 'demo-ksql' from beginning; java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord

Is it because of of non-json content?

See log here:

--------- TEST-STEP-CORRELATION-ID: 34ce0a62-8811-4a3b-8892-9eb3d5cf1d7c ---------
*requestTimeStamp:2019-01-17T22:01:15.180
step:ksql_print_records
url:http://localhost:8088/query
method:POST
request:
{
  "headers" : {
    "Content-Type" : "application/vnd.ksql.v1+json; charset=utf-8",
    "Accept" : "application/vnd.ksql.v1+json"
  },
  "body" : {
    "ksql" : "print 'demo-ksql' from beginning;",
    "streamsProperties" : { }
  }
} 
--------- TEST-STEP-CORRELATION-ID: 34ce0a62-8811-4a3b-8892-9eb3d5cf1d7c ---------
Response:
{
  "status" : 200,
  "headers" : {
    "Date" : [ "Thu, 17 Jan 2019 22:01:15 GMT" ],
    "Server" : [ "Jetty(9.4.12.v20180830)" ],
    "Transfer-Encoding" : [ "chunked" ],
    "Vary" : [ "Accept-Encoding, User-Agent" ],
    "Content-Type" : [ "application/vnd.ksql.v1+json" ]
  },
  "rawBody" : "java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord\n"
}
*responseTimeStamp:2019-01-17T22:01:16.057 
*Response delay:877.0 milli-secs 
---------> Assertion: <----------
{
  "status" : 200,
  "body" : "$NOT.NULL"
} 
-done-


java.lang.RuntimeException: Assertion failed for :- 

[Consume via KSQL query] 
	|
	|
	+---Step --> [ksql_print_records] 

Failures:
--------- 
Assertion path '$.body' with actual value 'null' did not match the expected value 'NOT NULL'

Note- I haven't tried this via ksql-cli, will update you here after trying.

The test step which was executed for the above result is as below-

        {
            "name": "ksql_print_records",
            "url": "${kafka-ksql-server-fqdn}/query",
            "operation": "POST",
            "request": {
                "headers": {
                    "Content-Type": "application/vnd.ksql.v1+json; charset=utf-8",
                    "Accept": "application/vnd.ksql.v1+json"
                },
                "body": {
                    "ksql": "print 'demo-ksql' from beginning;",
                    "streamsProperties": {}
                }
            },
            "assertions": {
                "status": 200,
                "body": "$NOT.NULL"
            }
        }

authorjapps avatar Jan 17 '19 21:01 authorjapps

Re. the CLI, you need to provide the KSQL Server address for it to connect to. So based on https://github.com/authorjapps/zerocode/blob/master/docker/compose/kafka-schema-registry.yml your test should be invoked ksql http://ksql-server:8088, not just ksql.

For java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord check the KSQL server log; it sounds like there may be something up with the message payload. Delimited (CSV), JSON, and Avro are all supported.

@authorjapps wrote: Thanks a lot. That worked 👍 . Issue-1 sorted.

root@692642cfa0cb:/# ksql http://ksql-server:8088 CLI v5.1.0, Server v5.1.0 located at http://ksql-server:8088

ksql> list topics;

 Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups 
-----------------------------------------------------------------------------------------
 _schemas    | false      | 1          | 1                  | 0         | 0              
 demo-ksql   | false      | 1          | 1                  | 1         | 1              
-----------------------------------------------------------------------------------------

rmoff avatar Jan 17 '19 23:01 rmoff

KSQL server log

But the response status code is 200 in the server

[2019-01-18 06:16:16,093] INFO 172.31.0.3 - - [18/Jan/2019:06:16:15 +0000] "POST /query HTTP/1.1" 200 73 573 (io.confluent.rest-utils.requests:60)

[2019-01-18 06:16:10,916] INFO Building AST for list topics;. (io.confluent.ksql.KsqlEngine:323)
[2019-01-18 06:16:10,972] INFO 172.31.0.3 - - [18/Jan/2019:06:16:10 +0000] "POST /ksql HTTP/1.1" 200 265  66 (io.confluent.rest-utils.requests:60)
[2019-01-18 06:16:15,522] INFO Building AST for print 'demo-ksql' from beginning;. (io.confluent.ksql.KsqlEngine:323)
[2019-01-18 06:16:15,558] INFO Printing topic 'demo-ksql' (io.confluent.ksql.rest.server.resources.streaming.StreamedQueryResource:132)
[2019-01-18 06:16:15,579] ERROR Exception encountered while writing to output stream (io.confluent.ksql.rest.server.resources.streaming.TopicStreamWriter:112)
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord
	at io.confluent.ksql.rest.server.resources.streaming.TopicStream$Format$2$1.print(TopicStream.java:132)
	at io.confluent.ksql.rest.server.resources.streaming.TopicStream$RecordFormatter.lambda$format$1(TopicStream.java:75)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.Iterator.forEachRemaining(Iterator.java:116)
	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
	at io.confluent.ksql.rest.server.resources.streaming.TopicStream$RecordFormatter.format(TopicStream.java:82)
	at io.confluent.ksql.rest.server.resources.streaming.TopicStreamWriter.write(TopicStreamWriter.java:95)
	at org.glassfish.jersey.message.internal.StreamingOutputProvider.writeTo(StreamingOutputProvider.java:79)
	at org.glassfish.jersey.message.internal.StreamingOutputProvider.writeTo(StreamingOutputProvider.java:61)
	at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$TerminalWriterInterceptor.invokeWriteTo(WriterInterceptorExecutor.java:266)
	at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$TerminalWriterInterceptor.aroundWriteTo(WriterInterceptorExecutor.java:251)
	at org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:163)
	at org.glassfish.jersey.server.internal.JsonWithPaddingInterceptor.aroundWriteTo(JsonWithPaddingInterceptor.java:109)
	at org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:163)
	at org.glassfish.jersey.server.internal.MappableExceptionWrapperInterceptor.aroundWriteTo(MappableExceptionWrapperInterceptor.java:85)
	at org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:163)
	at org.glassfish.jersey.message.internal.MessageBodyFactory.writeTo(MessageBodyFactory.java:1135)
	at org.glassfish.jersey.server.ServerRuntime$Responder.writeResponse(ServerRuntime.java:662)
	at org.glassfish.jersey.server.ServerRuntime$Responder.processResponse(ServerRuntime.java:395)
	at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:385)
	at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:280)
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:272)
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:268)
	at org.glassfish.jersey.internal.Errors.process(Errors.java:316)
	at org.glassfish.jersey.internal.Errors.process(Errors.java:298)
	at org.glassfish.jersey.internal.Errors.process(Errors.java:268)
	at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:289)
	at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:256)
	at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:703)
	at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:416)
	at org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:409)
	at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:584)
	at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:525)
	at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:462)
	at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
	at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:533)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
	at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1595)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
	at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1340)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
	at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473)
	at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1564)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
	at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1242)
	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
	at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:126)
	at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:174)
	at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:220)
	at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:740)
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
	at org.eclipse.jetty.server.Server.handle(Server.java:503)
	at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:364)
	at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:260)
	at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
	at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118)
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
	at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
	at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:765)
	at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:683)
	at java.lang.Thread.run(Thread.java:748)
[2019-01-18 06:16:16,093] INFO 172.31.0.3 - - [18/Jan/2019:06:16:15 +0000] "POST /query HTTP/1.1" 200 73  573 (io.confluent.rest-utils.requests:60)

KSQL cli output

root@692642cfa0cb:/# date
Fri Jan 18 06:07:46 UTC 2019
root@692642cfa0cb:/# ksql http://ksql-server:8088
                  
                  ===========================================
                  =        _  __ _____  ____  _             =
                  =       | |/ // ____|/ __ \| |            =
                  =       | ' /| (___ | |  | | |            =
                  =       |  <  \___ \| |  | | |            =
                  =       | . \ ____) | |__| | |____        =
                  =       |_|\_\_____/ \___\_\______|       =
                  =                                         =
                  =  Streaming SQL Engine for Apache Kafka® =
                  ===========================================

Copyright 2017-2018 Confluent Inc.

CLI v5.1.0, Server v5.1.0 located at http://ksql-server:8088

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> list topics;

 Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups 
-----------------------------------------------------------------------------------------
 _schemas    | false      | 1          | 1                  | 0         | 0              
 demo-ksql   | false      | 1          | 1                  | 1         | 1              
-----------------------------------------------------------------------------------------
ksql> print 'demo-ksql' from beginning;
java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord
ksql> 

Kafka console-consumer

$ docker exec compose_kafka_1  kafka-console-consumer --bootstrap-server localhost:29092 --topic demo-ksql  --from-beginning
8Hello, Created for KSQL demo
8Hello, Created for KSQL demo
Hello World

:::Note:::

I am able to consume via the Java client(via JUnit), perfectly fine, see below(last step has the records, it consumed one records because, it has committed till the previous offset)-

***Step PASSED:Unload - consume a message from kafka->load_kafka
2019-01-18 06:21:01,431 [main] INFO  
--------- TEST-STEP-CORRELATION-ID: 31229a85-ece9-486c-b39e-7f5181c2ecc7 ---------
*requestTimeStamp:2019-01-18T06:21:00.808
step:load_kafka
url:kafka-topic:demo-ksql
method:load
request:
{
  "records" : [ {
    "key" : "1547792460796",
    "value" : "Hello World"
  } ]
} 
--------- TEST-STEP-CORRELATION-ID: 31229a85-ece9-486c-b39e-7f5181c2ecc7 ---------
Response:
{
  "status" : "Ok",
  "recordMetadata" : {
    "offset" : 3,
    "timestamp" : 1547792461364,
    "serializedKeySize" : 13,
    "serializedValueSize" : 11,
    "topicPartition" : {
      "hash" : 749715182,
      "partition" : 0,
      "topic" : "demo-ksql"
    }
  }
}
*responseTimeStamp:2019-01-18T06:21:01.422 
*Response delay:614.0 milli-secs 
---------> Assertion: <----------
{
  "status" : "Ok"
} 
-done-

2019-01-18 06:21:01,433 [main] INFO  
---------------------------------------------------------
kafka.bootstrap.servers - localhost:9092
---------------------------------------------------------

2019-01-18 06:21:01,456 [main] INFO  org.jsmart.zerocode.core.kafka.receive.KafkaReceiver - 
### Kafka Consumer Effective configs:ConsumerLocalConfigs{recordType='null', fileDumpTo='target/temp/demo.txt', commitAsync=false, commitSync=true, showRecordsConsumed=true, maxNoOfRetryPollsOrTimeouts=5, pollingTime=1000, seek=null}

2019-01-18 06:21:01,467 [main] INFO  org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [localhost:9092]
	check.crcs = true
	client.dns.lookup = default
	client.id = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = consumerGroup14
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 2
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

org.jsmart.zerocode.core.kafka.helper.KafkaConsumerHelper - 
Record Key - 1547792460796 , Record value - Hello World, Record partition - 0, Record offset - 3

2019-01-18 06:21:10,609 [main] INFO  org.jsmart.zerocode.core.runner.StepNotificationHandler - 
***Step PASSED:Unload - consume a message from kafka->onload_kafka

--------- TEST-STEP-CORRELATION-ID: 81e13c39-588e-451d-b8e9-f8677c389c1c ---------
*requestTimeStamp:2019-01-18T06:21:01.433
step:onload_kafka
url:kafka-topic:demo-ksql
method:unload
request:
{ } 
--------- TEST-STEP-CORRELATION-ID: 81e13c39-588e-451d-b8e9-f8677c389c1c ---------
Response:
{
  "records" : [ {
    "topic" : "demo-ksql",
    "partition" : 0,
    "offset" : 3,
    "timestamp" : 1547792461364,
    "timestampType" : "CREATE_TIME",
    "serializedKeySize" : 13,
    "serializedValueSize" : 11,
    "headers" : {
      "headers" : [ ],
      "isReadOnly" : false
    },
    "key" : "1547792460796",
    "value" : "Hello World",
    "leaderEpoch" : {
      "value" : 0
    }
  } ],
  "size" : 1
}
*responseTimeStamp:2019-01-18T06:21:10.607 
*Response delay:9174.0 milli-secs 
---------> Assertion: <----------
{
  "size" : "$GT.0"
} 
-done-

2019-01-18 06:21:10,730 [main] INFO  org.jsmart.zerocode.core.runner.ZeroCodeUnitRunner - 
**FINISHED executing all Steps for [Unload - consume a message from kafka] **.

Steps were:[load_kafka, onload_kafka]

The records were here raw text messages. Please let me know if you want me to try with JSON messages. Happy to share the same workflow logs.

:::Another thing to note:::-

  • The kafka-console-consumer doesn't print the key where the Java client prints it.
  • Also there are some other weird behaviour I have spotted with this console-consumer , happy to share, but not part of this issue, as it is not blocking me.

authorjapps avatar Jan 18 '19 06:01 authorjapps

You can put what you want onto a Kafka topic, and using the Consumer API you can read what you want. KSQL is a higher level abstraction, and as such requires the data to be in a format that it expects, namely, CSV, JSON, or Avro.

However, I can't reproduce the error you're getting with just the data you've shown above

ksql> PRINT 'test' FROM BEGINNING;
Format:STRING
1/18/19 12:22:07 PM UTC , NULL , foo,bar
1/18/19 12:22:29 PM UTC , NULL , Hello world
1/18/19 12:22:58 PM UTC , NULL , 8Hello, Created for KSQL demo
1/18/19 12:23:10 PM UTC , NULL , 8Hello, Created for KSQL demo
1/18/19 12:23:17 PM UTC , NULL , Hello World

What I would suggest is that you either just put a simple JSON message on the topic, or if you want to debug the current error further, use kafkacat to dump the full message and raise it as a bug over on the KSQL project https://github.com/confluentinc/ksql/issues/new

e.g.

kafkacat -b kafka:29092 -t test -C -f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\nTimestamp: %T\tPartition: %p\tOffset: %o\n--\n'

Key (-1 bytes):
Value (7 bytes): foo,bar
Timestamp: 1547814127770        Partition: 0    Offset: 0
--

Key (-1 bytes):
Value (11 bytes): Hello world
Timestamp: 1547814149934        Partition: 0    Offset: 1
--

Key (-1 bytes):
Value (29 bytes): 8Hello, Created for KSQL demo
Timestamp: 1547814178652        Partition: 0    Offset: 2
--

Key (-1 bytes):
Value (29 bytes): 8Hello, Created for KSQL demo
Timestamp: 1547814190700        Partition: 0    Offset: 3
--

Key (-1 bytes):
Value (11 bytes): Hello World
Timestamp: 1547814197870        Partition: 0    Offset: 4
--

rmoff avatar Jan 18 '19 12:01 rmoff

Feel free to head over to http://cnfl.io/slack and the #ksql channel for more help on this if you want.

rmoff avatar Jan 18 '19 12:01 rmoff

Thanks mate, it's on the ksql channel now 👍. Let's see if anyone helps us, otherwise, one of our team folks will raise as a bug in the KSQL if it helps in solving the problem.

I was also thinking, in case of an error response from Kakfa-KSQL server over REST call, 200 doesn't sound alright.

Better would 400(+) or 500(+) to differentiate the call was not succeeded.

Response:
{
  "status" : 200,
  "headers" : {
    "Date" : [ "Thu, 17 Jan 2019 22:01:15 GMT" ],
    "Server" : [ "Jetty(9.4.12.v20180830)" ],
    "Content-Type" : [ "application/vnd.ksql.v1+json" ]
  },
  "rawBody" : "java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord\n"
}

I will try with JSON message and come back here on this.

Thanks for your support 🙏

authorjapps avatar Jan 23 '19 06:01 authorjapps

https://github.com/confluentinc/ksql/issues/2386 has been raised now.

authorjapps avatar Jan 23 '19 07:01 authorjapps