sdk-java icon indicating copy to clipboard operation
sdk-java copied to clipboard

Time skipping server drops headers in workflow signal

Open tconley1428 opened this issue 6 months ago • 0 comments

Expected Behavior

Headers set in client interceptor appear in inbound workflow interceptor

Actual Behavior

Headers are blank

Steps to Reproduce the Problem

Using python sdk:

class HeaderWorkerInterceptor(temporalio.worker.Interceptor):
    def workflow_interceptor_class(
        self, input: temporalio.worker.WorkflowInterceptorClassInput
    ) -> Optional[Type[temporalio.worker.WorkflowInboundInterceptor]]:
        return HeaderWorkflowInboundInterceptor


class HeaderWorkflowInboundInterceptor(temporalio.worker.WorkflowInboundInterceptor):
    def init(self, outbound: temporalio.worker.WorkflowOutboundInterceptor) -> None:
        super().init(HeaderWorkflowOutboundInterceptor(outbound))

    async def handle_signal(self, input: HandleSignalInput) -> None:
        assert input.headers["foo"].data == b"bar"
        await super().handle_signal(input)


class HeaderWorkflowOutboundInterceptor(temporalio.worker.WorkflowOutboundInterceptor):
    def start_activity(
        self, input: temporalio.worker.StartActivityInput
    ) -> workflow.ActivityHandle:
        # Add a header to the outbound activity call
        input.headers = {"foo": Payload(data=b"bar")}
        return super().start_activity(input)


class HeaderClientInterceptor(temporalio.client.Interceptor):
    def __init__(self, header: Payload):
        self.header = header
        super().__init__()

    def intercept_client(
        self, next: temporalio.client.OutboundInterceptor
    ) -> temporalio.client.OutboundInterceptor:
        return HeaderClientOutboundInterceptor(
            super().intercept_client(next), self.header
        )


class HeaderClientOutboundInterceptor(temporalio.client.OutboundInterceptor):
    def __init__(
        self, next: temporalio.client.OutboundInterceptor, header: Payload
    ) -> None:
        self.header = header
        super().__init__(next)

    async def signal_workflow(self, input: SignalWorkflowInput) -> None:
        input.headers = {"foo": self.header.__deepcopy__()}
        return await super().signal_workflow(input)



@pytest.mark.parametrize(
    "header_codec_behavior",
    [
        HeaderCodecBehavior.NO_CODEC,
        HeaderCodecBehavior.CODEC,
        HeaderCodecBehavior.WORKFLOW_ONLY_CODEC,
    ],
)
async def test_workflow_headers_with_codec(
    client: Client, env: WorkflowEnvironment, header_codec_behavior: HeaderCodecBehavior
):
    header_payload = Payload(data=b"bar")
    if header_codec_behavior == HeaderCodecBehavior.WORKFLOW_ONLY_CODEC:
        header_payload = (await SimpleCodec().encode([header_payload]))[0]

    # Make client with this codec and run a couple of existing tests
    config = client.config()
    config["data_converter"] = DataConverter(payload_codec=SimpleCodec())
    config["interceptors"] = [HeaderClientInterceptor(header_payload)]
    config["header_codec_behavior"] = header_codec_behavior
    client = Client(**config)

    global global_header_codec_behavior
    global_header_codec_behavior = header_codec_behavior

    async with new_worker(
        client,
        SimpleActivityWorkflow,
        SignalAndQueryWorkflow,
        activities=[say_hello],
        interceptors=[HeaderWorkerInterceptor()],
    ) as worker:
        handle = await client.start_workflow(
            SignalAndQueryWorkflow.run,
            id=f"workflow-{uuid.uuid4()}",
            task_queue=worker.task_queue,
        )

        # Simple signals and queries
        await handle.signal(SignalAndQueryWorkflow.signal1, "some arg")
        assert "signal1: some arg" == await handle.query(
            SignalAndQueryWorkflow.last_event
        )

        async for e in handle.fetch_history_events():
            if e.HasField("workflow_execution_signaled_event_attributes"):
                header = e.workflow_execution_signaled_event_attributes.header.fields[
                    "foo"
                ]
                if header_codec_behavior == HeaderCodecBehavior.CODEC:
                    assert "simple-codec" in header.metadata

poe test -k "test_workflow_headers_with_codec" --workflow-environment time-skipping

Specifications

  • Version:
  • Platform:

tconley1428 avatar Jul 08 '25 17:07 tconley1428