spring-cloud-contract icon indicating copy to clipboard operation
spring-cloud-contract copied to clipboard

Spring Cloud Contract - Spring Kafka

Open Jenani-Srinivasan opened this issue 6 years ago • 14 comments

As per the discussion with Marcin on Twitter, Spring Cloud Contract does not have implementation to support pure Spring Kafka Template. As, there is no feature for spring cloud contract to support spring kafka template.

Can I please request to have the implementation?

Jenani-Srinivasan avatar Feb 07 '19 09:02 Jenani-Srinivasan

Is there any plans when it will be done?

IrishkA13 avatar Apr 17 '19 13:04 IrishkA13

Hello, it's in the backlog @IrishkA13 and it will be done; will try to add it for Hoxton release.

OlgaMaciaszek avatar Apr 17 '19 16:04 OlgaMaciaszek

I think I have a prototype working. Does anyone have a sample I could play with?

marcingrzejszczak avatar Sep 11 '19 14:09 marcingrzejszczak

@jakubnabrdalik

pszymczyk avatar Sep 11 '19 17:09 pszymczyk

Here is the version with the Kafka protytpe https://github.com/spring-cloud/spring-cloud-contract/tree/issues_%23877_kafka

$ git clone https://github.com/spring-cloud/spring-cloud-contract.git
$ cd spring-cloud-contract
$ git checkout issues_#877_kafka
# install the snapshots
$ ./mvnw clean install -Pfast -T 4 -DskipTests

This is a sample that works with kafka https://github.com/spring-cloud-samples/spring-cloud-contract-samples/pull/111/files

$ git clone https://github.com/spring-cloud-samples/spring-cloud-contract-samples.git
$ cd spring-cloud-contract-samples
$ git checkout kafka
# now you can play around with the producer_kafka / consumer_kafka samples

Since I'm not an expert on Kafka, I would definitely need some feedback on this. The prerequisites for the feature are

  • you need to use spring-kafka
  • you need to use the EmbeddedKafkaBroker (via spring-kafka-test)
  • you need to provide a groupId for your consumer

marcingrzejszczak avatar Sep 12 '19 11:09 marcingrzejszczak

Hi! In which release & when does it plan to be completed?

IrishkA13 avatar Oct 17 '19 15:10 IrishkA13

@IrishkA13 2.2.0.M3, so I assume it goes to future release 2.2.0.RELEASE (so it's not released yet, just M3)

mfolnovic avatar Oct 17 '19 16:10 mfolnovic

@mfolnovic thanks for reply Are there any planned dates for 2.2.0.RELEASE?

IrishkA13 avatar Oct 17 '19 16:10 IrishkA13

Please check https://github.com/spring-cloud/spring-cloud-release/milestones for release dates

marcingrzejszczak avatar Oct 28 '19 17:10 marcingrzejszczak

Hi @marcingrzejszczak

I added one field to your sample kafka contract.

	label("trigger")
	input {
		triggeredBy("trigger()")
	}
	outputMessage {
		sentTo("topic1")
		body([
		        foo: 'example',
			bar: 'example2'
		])
	}
}

and change application-test.yaml (consumer side)

  kafka:
    bootstrap-servers: ${spring.embedded.kafka.brokers}
    producer:
      properties:
        "value.serializer": "org.springframework.kafka.support.serializer.JsonSerializer"
        "spring.json.trusted.packages": "*"
    consumer:
      properties:
        "value.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer"
        "value.serializer": "org.springframework.kafka.support.serializer.JsonSerializer"
        "spring.json.trusted.packages": "*"
      group-id: groupId

I also add "bar" field to Foo2 class on the consumer side. Now if I run the ApplicationTests on the consumer side I have exception like this: Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.common.Foo2` (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('{"foo":"example","bar":"example2"}') at [Source: (String)""{\"foo\":\"example\",\"bar\":\"example2\"}""; line: 1, column: 1]

This exception is caused by:

  • extra " sign at the start and end of the message
  • backslashes

When I add this hack everythinks works. Is there any other way to solve this?

	@Bean
	@Primary
	public RecordMessageConverter converter() {
		return new StringJsonMessageConverter() {
			@Override
			protected Object convertPayload(Message<?> message) {
//				return super.convertPayload(message);
				String value = (String) super.convertPayload(message);
				if (value.length() > 2 && value.startsWith("\"") && value.endsWith("\"")) {
					value = value.substring(1, value.length() - 1);

				}
				return StringEscapeUtils.unescapeJson(value); //apache commons
			}
		};
	}

DawidSwinoga avatar Nov 19 '19 10:11 DawidSwinoga

I think I had a similar issue so to speak. In the tests (https://github.com/spring-cloud/spring-cloud-contract/tree/master/tests/spring-cloud-contract-stub-runner-kafka), what I did was:

application.yml

spring:
  kafka:
    bootstrap-servers: ${spring.embedded.kafka.brokers}
    producer:
      properties:
        "value.serializer": "org.springframework.kafka.support.serializer.JsonSerializer"
        "spring.json.trusted.packages": "*"
    consumer:
      properties:
        "value.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer"
        "value.serializer": "org.springframework.kafka.support.serializer.JsonSerializer"
        "spring.json.trusted.packages": "*"
      group-id: ${random.value}

that's how I received the message

private boolean assertThatBodyContainsBookNameFoo(Object payload) {
		log.info("Got payload [" + payload + "]")
		String objectAsString = payload instanceof String ? payload :
				JsonOutput.toJson(payload)
		def json = new JsonSlurper().parseText(objectAsString)
		return json.bookName == 'foo'
	}

I'm no Kafka expert, so maybe what I did is just a hack. I guess we would need to ask @olegz , @sobychacko or @garyrussell for help if what I did is actually a hack .

marcingrzejszczak avatar Nov 20 '19 10:11 marcingrzejszczak

BTW this is how we construct the message you then receive (https://github.com/spring-cloud/spring-cloud-contract/blob/master/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/kafka/KafkaStubMessages.java#L97-L173)


class Record {

	private final ConsumerRecord record;

	Record(ConsumerRecord record) {
		this.record = record;
	}

	private Map<String, Object> toMap(Headers headers) {
		Map<String, Object> map = new HashMap<>();
		for (Header header : headers) {
			map.put(header.key(), header.value());
		}
		return map;
	}

	Message toMessage() {
		Object textPayload = record.value();
		// sometimes it's a message sometimes just payload
		MessageHeaders headers = new MessageHeaders(toMap(record.headers()));
		if (textPayload instanceof String && ((String) textPayload).contains("payload")
				&& ((String) textPayload).contains("headers")) {
			try {
				Object object = new JSONParser(JSONParser.DEFAULT_PERMISSIVE_MODE)
						.parse((String) textPayload);
				JSONObject jo = (JSONObject) object;
				String payload = (String) jo.get("payload");
				JSONObject headersInJson = (JSONObject) jo.get("headers");
				Map newHeaders = new HashMap(headers);
				newHeaders.putAll(headersInJson);
				return MessageBuilder.createMessage(unquoted(payload),
						new MessageHeaders(newHeaders));
			}
			catch (ParseException ex) {
				throw new IllegalStateException(ex);
			}
		}
		return MessageBuilder.createMessage(unquoted(textPayload), headers);
	}

	private Object unquoted(Object value) {
		String textPayload = value instanceof byte[] ? new String((byte[]) value)
				: value.toString();
		if (textPayload.startsWith("\"") && textPayload.endsWith("\"")) {
			return textPayload.substring(1, textPayload.length() - 1).replace("\\\"",
					"\"");
		}
		return textPayload;
	}

}

marcingrzejszczak avatar Nov 20 '19 10:11 marcingrzejszczak

I found the root cause of the error.

StubRunnerExecutor.java

	private void sendMessage(Contract groovyDsl) {
		OutputMessage outputMessage = groovyDsl.getOutputMessage();
		DslProperty<?> body = outputMessage.getBody();
		Headers headers = outputMessage.getHeaders();
		// TODO: Json is harcoded here
		this.contractVerifierMessaging.send(
				JsonOutput.toJson(BodyExtractor.extractClientValueFromBody(
						body == null ? null : body.getClientValue())),
				headers == null ? null : headers.asStubSideMap(),
				outputMessage.getSentTo().getClientValue());
	}

The message is serialized to json before send to Kafka topic. Then KafkaTemplate use RecordMessageConverter to serialize already serialized json.

Finally, we have a twice serialized json message. If we receive message like this the deserializer cannot deserialize it to target object because after deserialization we have a String containing json (created by StubRunnerExecutor).

The message passed to KafkaTemplage cannot be serialized to json. This will allow Record MessageConverter to serialize the message to the target form.

DawidSwinoga avatar Nov 23 '19 16:11 DawidSwinoga

Hey @DawidSwinoga are you willing to create a sample that replicates this? That way it would be easier to fix this problem

marcingrzejszczak avatar Aug 21 '20 16:08 marcingrzejszczak

We've decided to go with the middleware based approach for messaging as presented in https://github.com/spring-cloud-samples/spring-cloud-contract-samples/tree/main/producer_kafka_middleware and https://github.com/spring-cloud-samples/spring-cloud-contract-samples/tree/main/consumer_kafka_middleware .

That means we want to encourage the user to use e.g. testcontainers to start an actual broker and then use the MessageVerifierSender / MessageVerifierReceiver implementations to tell us how to send and receive a message

marcingrzejszczak avatar Nov 16 '22 17:11 marcingrzejszczak