Add ability to Write to GCP PubSub with an orderingKey
There was a prior email dev thread about PubSub ordering key[1] but that was in reference to reads. It should be possible to support pubsub Write with ordering key. We might be able to have users supply a serializable function to extra the ordering key from each message, for example?
Imported from Jira BEAM-13148. Original Jira may contain additional context. Reported by: egalpin.
Any updates on this? Python api has this but Java api lacks it. Can't it be prioritised?
@egalpin may have context
Yes publishing messages with an ordering key is now supported in the Java SDK as well, added by https://github.com/apache/beam/pull/22216. I believe this should be included in SDK v2.43.0 and higher.
Using ordering key is supported by employing PubsubIO.writeMessages[1], where messages will need to be instantiated using the appropriate constructor[2] that accepts ordering keys. It’s also worth noting that pubsub write can make use of regional endpoints[3], which might be required for your use case to fully ensure ordering.
[1] https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.html#writeMessages-- [2] https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.html [3] https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.Write.html#withPubsubRootUrl-java.lang.String-
Interesting, I used 2.43 with the appropriate constructor that takes messageId and orderingKey but no messageId and orderingKey was in Pubsub. I am using dataflow runner.
Could it be related to #23525 ?
Yes good call out, this is exactly the issue of https://github.com/apache/beam/issues/23525. You can safely and properly circumvent the issue by using setCoder on the PCollection of pubsub messages which have ordering keys, specifying PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder as the coder
Note that, as far as I understand, any messageId set on a pubsub message to be published will be dropped before being published. This is because message IDs are assigned on the server side of pubsub (again, this is my own understanding and may not be factual)
I used PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder on the PCollection of Pubsub messages but still messageId(I don't care it) and orderingKey is null.
I used it as follows:
results.setCoder(new PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder())
.apply("Write Result updates to PubSub", PubsubIO.writeMessages().to(options.getLitResultTopic()));
Am I not using it correctly?
This is indeed correct usage, and I was under the impression that this should work. Based on what you're saying/seeing, I'm suspicious that the coder might not be propagated through the PubsubIO/write code path. I'll try to trace the code to confirm and/or validate behaviour as well.
@codertimu you could try to set the default coder for the pubsub message class[1] to see if that will unblock you:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
CoderRegistry cr = p.getCoderRegistry();
cr.registerCoder(PubsubMessage.class, PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.class);
[1] https://beam.apache.org/documentation/programming-guide/#setting-default-coder
@egalpin registerCoder does not exist. Do you mean like this:
cr.registerCoderForClass(PubsubMessage.class, new PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder());
Tried above and this:
cr.registerCoderForType(TypeDescriptor.of(PubsubMessage.class), new PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder());
but nothing changed.
Thanks for confirming. I haven't yet had time to trace/validate. One other option you could try depending on your level of comfort would be to modify the beam source and vendor in the pubsubIO module. I noted my steps for doing so, specifically with dataflow, previously: https://lists.apache.org/thread/c6929ms0bjxtcw9ho4tdb5y3t8wnwnfy. This is not a long-term solution, but rather would be a stopgap solution until the same change can be made in the official SDK as per https://github.com/apache/beam/issues/23525. Given that setting the coder on the PCollection appears to be insufficient, the priority of https://github.com/apache/beam/issues/23525 is increased in my opinion.
You'd want to modify the PubsubCoderProviderRegistrar[1] such that the PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder appears first in the list (or as the sole list element), then follow the vendoring steps mentioned.
[1] https://github.com/apache/beam/blob/9a22bf68767b8fa27aa381d6ff84e381a21af5a4/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderProviderRegistrar.java
@codertimu can you confirm whether your pipeline is bounded or unbounded? It may help me trace
It is unbounded but I was using the test pipeline for the last few examples I tried here. I didn't attempt to run in dataflow.
@egalpin just to clarify, in my test pipeline, PubsubIO is not involved at all. Even in a simple ptransform, the orderingkey and messageid are not preserved in the pcollection.
thanks @codertimu , I can reproduce the issue of lost orderingKey locally as well using something like this:
pipeline.apply(
Create.of(
new PubsubMessage(
"{\"baz\": \"jazz\"}".getBytes(StandardCharsets.UTF_8),
Collections.singletonMap("foo", "bar"),
null,
"1")))
.setCoder(PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.of());
...
Attaching a debugger and stepping through, I can see that the Coder in Create is PubsubMessageWithAttributesCoder (the first element in the PubsubCoderProviderRegistrar).
This obviously needs to be fixed, and I believe will be fixed by altering the registrar to use PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder as default (i.e. fixing https://github.com/apache/beam/issues/23525), but if you're able to could you verify if this behaviour persists with any runner which is not the test runner?
I confirm that the issue persists with the dataflow runner as well.
@codertimu @egalpin is this issue still of interest to you? I believe #31608 fixes it. Feel free to review!
Hi @ahmedabu98,
Thank you for your PR! Are you planning to complete it?
Best regards