sdk-java
sdk-java copied to clipboard
Time skipping server drops headers in workflow signal
Expected Behavior
Headers set in client interceptor appear in inbound workflow interceptor
Actual Behavior
Headers are blank
Steps to Reproduce the Problem
Using python sdk:
class HeaderWorkerInterceptor(temporalio.worker.Interceptor):
def workflow_interceptor_class(
self, input: temporalio.worker.WorkflowInterceptorClassInput
) -> Optional[Type[temporalio.worker.WorkflowInboundInterceptor]]:
return HeaderWorkflowInboundInterceptor
class HeaderWorkflowInboundInterceptor(temporalio.worker.WorkflowInboundInterceptor):
def init(self, outbound: temporalio.worker.WorkflowOutboundInterceptor) -> None:
super().init(HeaderWorkflowOutboundInterceptor(outbound))
async def handle_signal(self, input: HandleSignalInput) -> None:
assert input.headers["foo"].data == b"bar"
await super().handle_signal(input)
class HeaderWorkflowOutboundInterceptor(temporalio.worker.WorkflowOutboundInterceptor):
def start_activity(
self, input: temporalio.worker.StartActivityInput
) -> workflow.ActivityHandle:
# Add a header to the outbound activity call
input.headers = {"foo": Payload(data=b"bar")}
return super().start_activity(input)
class HeaderClientInterceptor(temporalio.client.Interceptor):
def __init__(self, header: Payload):
self.header = header
super().__init__()
def intercept_client(
self, next: temporalio.client.OutboundInterceptor
) -> temporalio.client.OutboundInterceptor:
return HeaderClientOutboundInterceptor(
super().intercept_client(next), self.header
)
class HeaderClientOutboundInterceptor(temporalio.client.OutboundInterceptor):
def __init__(
self, next: temporalio.client.OutboundInterceptor, header: Payload
) -> None:
self.header = header
super().__init__(next)
async def signal_workflow(self, input: SignalWorkflowInput) -> None:
input.headers = {"foo": self.header.__deepcopy__()}
return await super().signal_workflow(input)
@pytest.mark.parametrize(
"header_codec_behavior",
[
HeaderCodecBehavior.NO_CODEC,
HeaderCodecBehavior.CODEC,
HeaderCodecBehavior.WORKFLOW_ONLY_CODEC,
],
)
async def test_workflow_headers_with_codec(
client: Client, env: WorkflowEnvironment, header_codec_behavior: HeaderCodecBehavior
):
header_payload = Payload(data=b"bar")
if header_codec_behavior == HeaderCodecBehavior.WORKFLOW_ONLY_CODEC:
header_payload = (await SimpleCodec().encode([header_payload]))[0]
# Make client with this codec and run a couple of existing tests
config = client.config()
config["data_converter"] = DataConverter(payload_codec=SimpleCodec())
config["interceptors"] = [HeaderClientInterceptor(header_payload)]
config["header_codec_behavior"] = header_codec_behavior
client = Client(**config)
global global_header_codec_behavior
global_header_codec_behavior = header_codec_behavior
async with new_worker(
client,
SimpleActivityWorkflow,
SignalAndQueryWorkflow,
activities=[say_hello],
interceptors=[HeaderWorkerInterceptor()],
) as worker:
handle = await client.start_workflow(
SignalAndQueryWorkflow.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
# Simple signals and queries
await handle.signal(SignalAndQueryWorkflow.signal1, "some arg")
assert "signal1: some arg" == await handle.query(
SignalAndQueryWorkflow.last_event
)
async for e in handle.fetch_history_events():
if e.HasField("workflow_execution_signaled_event_attributes"):
header = e.workflow_execution_signaled_event_attributes.header.fields[
"foo"
]
if header_codec_behavior == HeaderCodecBehavior.CODEC:
assert "simple-codec" in header.metadata
poe test -k "test_workflow_headers_with_codec" --workflow-environment time-skipping
Specifications
- Version:
- Platform: