Spring Cloud Contract - Spring Kafka
As per the discussion with Marcin on Twitter, Spring Cloud Contract does not have implementation to support pure Spring Kafka Template. As, there is no feature for spring cloud contract to support spring kafka template.
Can I please request to have the implementation?
Is there any plans when it will be done?
Hello, it's in the backlog @IrishkA13 and it will be done; will try to add it for Hoxton release.
I think I have a prototype working. Does anyone have a sample I could play with?
@jakubnabrdalik
Here is the version with the Kafka protytpe https://github.com/spring-cloud/spring-cloud-contract/tree/issues_%23877_kafka
$ git clone https://github.com/spring-cloud/spring-cloud-contract.git
$ cd spring-cloud-contract
$ git checkout issues_#877_kafka
# install the snapshots
$ ./mvnw clean install -Pfast -T 4 -DskipTests
This is a sample that works with kafka https://github.com/spring-cloud-samples/spring-cloud-contract-samples/pull/111/files
$ git clone https://github.com/spring-cloud-samples/spring-cloud-contract-samples.git
$ cd spring-cloud-contract-samples
$ git checkout kafka
# now you can play around with the producer_kafka / consumer_kafka samples
Since I'm not an expert on Kafka, I would definitely need some feedback on this. The prerequisites for the feature are
- you need to use
spring-kafka - you need to use the EmbeddedKafkaBroker (via
spring-kafka-test) - you need to provide a
groupIdfor your consumer
Hi! In which release & when does it plan to be completed?
@IrishkA13 2.2.0.M3, so I assume it goes to future release 2.2.0.RELEASE (so it's not released yet, just M3)
@mfolnovic thanks for reply Are there any planned dates for 2.2.0.RELEASE?
Please check https://github.com/spring-cloud/spring-cloud-release/milestones for release dates
Hi @marcingrzejszczak
I added one field to your sample kafka contract.
label("trigger")
input {
triggeredBy("trigger()")
}
outputMessage {
sentTo("topic1")
body([
foo: 'example',
bar: 'example2'
])
}
}
and change application-test.yaml (consumer side)
kafka:
bootstrap-servers: ${spring.embedded.kafka.brokers}
producer:
properties:
"value.serializer": "org.springframework.kafka.support.serializer.JsonSerializer"
"spring.json.trusted.packages": "*"
consumer:
properties:
"value.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer"
"value.serializer": "org.springframework.kafka.support.serializer.JsonSerializer"
"spring.json.trusted.packages": "*"
group-id: groupId
I also add "bar" field to Foo2 class on the consumer side. Now if I run the ApplicationTests on the consumer side I have exception like this:
Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.common.Foo2` (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('{"foo":"example","bar":"example2"}') at [Source: (String)""{\"foo\":\"example\",\"bar\":\"example2\"}""; line: 1, column: 1]
This exception is caused by:
- extra " sign at the start and end of the message
- backslashes
When I add this hack everythinks works. Is there any other way to solve this?
@Bean
@Primary
public RecordMessageConverter converter() {
return new StringJsonMessageConverter() {
@Override
protected Object convertPayload(Message<?> message) {
// return super.convertPayload(message);
String value = (String) super.convertPayload(message);
if (value.length() > 2 && value.startsWith("\"") && value.endsWith("\"")) {
value = value.substring(1, value.length() - 1);
}
return StringEscapeUtils.unescapeJson(value); //apache commons
}
};
}
I think I had a similar issue so to speak. In the tests (https://github.com/spring-cloud/spring-cloud-contract/tree/master/tests/spring-cloud-contract-stub-runner-kafka), what I did was:
application.yml
spring:
kafka:
bootstrap-servers: ${spring.embedded.kafka.brokers}
producer:
properties:
"value.serializer": "org.springframework.kafka.support.serializer.JsonSerializer"
"spring.json.trusted.packages": "*"
consumer:
properties:
"value.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer"
"value.serializer": "org.springframework.kafka.support.serializer.JsonSerializer"
"spring.json.trusted.packages": "*"
group-id: ${random.value}
that's how I received the message
private boolean assertThatBodyContainsBookNameFoo(Object payload) {
log.info("Got payload [" + payload + "]")
String objectAsString = payload instanceof String ? payload :
JsonOutput.toJson(payload)
def json = new JsonSlurper().parseText(objectAsString)
return json.bookName == 'foo'
}
I'm no Kafka expert, so maybe what I did is just a hack. I guess we would need to ask @olegz , @sobychacko or @garyrussell for help if what I did is actually a hack .
BTW this is how we construct the message you then receive (https://github.com/spring-cloud/spring-cloud-contract/blob/master/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/kafka/KafkaStubMessages.java#L97-L173)
class Record {
private final ConsumerRecord record;
Record(ConsumerRecord record) {
this.record = record;
}
private Map<String, Object> toMap(Headers headers) {
Map<String, Object> map = new HashMap<>();
for (Header header : headers) {
map.put(header.key(), header.value());
}
return map;
}
Message toMessage() {
Object textPayload = record.value();
// sometimes it's a message sometimes just payload
MessageHeaders headers = new MessageHeaders(toMap(record.headers()));
if (textPayload instanceof String && ((String) textPayload).contains("payload")
&& ((String) textPayload).contains("headers")) {
try {
Object object = new JSONParser(JSONParser.DEFAULT_PERMISSIVE_MODE)
.parse((String) textPayload);
JSONObject jo = (JSONObject) object;
String payload = (String) jo.get("payload");
JSONObject headersInJson = (JSONObject) jo.get("headers");
Map newHeaders = new HashMap(headers);
newHeaders.putAll(headersInJson);
return MessageBuilder.createMessage(unquoted(payload),
new MessageHeaders(newHeaders));
}
catch (ParseException ex) {
throw new IllegalStateException(ex);
}
}
return MessageBuilder.createMessage(unquoted(textPayload), headers);
}
private Object unquoted(Object value) {
String textPayload = value instanceof byte[] ? new String((byte[]) value)
: value.toString();
if (textPayload.startsWith("\"") && textPayload.endsWith("\"")) {
return textPayload.substring(1, textPayload.length() - 1).replace("\\\"",
"\"");
}
return textPayload;
}
}
I found the root cause of the error.
StubRunnerExecutor.java
private void sendMessage(Contract groovyDsl) {
OutputMessage outputMessage = groovyDsl.getOutputMessage();
DslProperty<?> body = outputMessage.getBody();
Headers headers = outputMessage.getHeaders();
// TODO: Json is harcoded here
this.contractVerifierMessaging.send(
JsonOutput.toJson(BodyExtractor.extractClientValueFromBody(
body == null ? null : body.getClientValue())),
headers == null ? null : headers.asStubSideMap(),
outputMessage.getSentTo().getClientValue());
}
The message is serialized to json before send to Kafka topic. Then KafkaTemplate use RecordMessageConverter to serialize already serialized json.
Finally, we have a twice serialized json message. If we receive message like this the deserializer cannot deserialize it to target object because after deserialization we have a String containing json (created by StubRunnerExecutor).
The message passed to KafkaTemplage cannot be serialized to json. This will allow Record MessageConverter to serialize the message to the target form.
Hey @DawidSwinoga are you willing to create a sample that replicates this? That way it would be easier to fix this problem
We've decided to go with the middleware based approach for messaging as presented in https://github.com/spring-cloud-samples/spring-cloud-contract-samples/tree/main/producer_kafka_middleware and https://github.com/spring-cloud-samples/spring-cloud-contract-samples/tree/main/consumer_kafka_middleware .
That means we want to encourage the user to use e.g. testcontainers to start an actual broker and then use the MessageVerifierSender / MessageVerifierReceiver implementations to tell us how to send and receive a message