Add a way to notify the webserver that Pushpin is ready to receive a held response
I'm interested in using Pushpin to transparently handle certain HTTP requests by background jobs. The workflow is basically like this:
- When the HTTP request comes, I start a background job and return a GRIP response with
Grip-Hold: responseandGrip-Channel: <random-id>. - When the job completes, it publishes the real response to the
<random-id>channel.
This works reasonably well, but there's a race condition here. If the background job completes before Pushpin finishes handling the GRIP response, then the real response will just be discarded.
To fix this, I'd like to have a way to ensure that Pushpin is ready to receive the response before starting the job. For example, it would be great if I could add another header to the GRIP response with a URL, which Pushpin would then call in a POST request as soon as it's ready to receive the response.
I have tried using Pushpin's reliability retries for this, and it works, but:
-
The retries seem to be delayed by about a second, and the specifics of my use case are such that I'd want to start the background job as soon as possible.
-
I'm not sure if I can really rely on retries being present.
Of course, if there's any other way to fix the race condition, I'd be glad to know.
My recommendation is to keep a record somewhere of the completed job and have a way to reproduce the completion result upon request. That way you can publish the result immediately and respond to any retry with the same result.
Otherwise, a workaround could be to publish a hint action after sending the GRIP response, forcing an immediate retry if the subscription has been established, which you could tune the timing of to ensure it happens most of the time. You could then treat the incoming retry as a signal to proceed (kind of like the extra POST idea you described). The hint action would be subject to the race condition, but if in the rare case it's received too early then the 1-second recovery retry will come soon after.
You can rely on the recovery retry always happening if you're using new random channel for every job.
My recommendation is to keep a record somewhere of the completed job and have a way to reproduce the completion result upon request. That way you can publish the result immediately and respond to any retry with the same result.
Thanks, I hadn't considered that. I've implemented this, and it seems to mostly work. However, I seem to be hitting another race condition. I think this one might actually be a bug in Pushpin.
I've created a small application to demonstrate this. It emulates a background job by sleeping a configurable amount after sending a GRIP response:
app.py
import asyncio
import json
import logging
import random
import string
from urllib.parse import parse_qsl
from urllib.request import urlopen, Request
random.seed()
ready = set()
async def send_grip_response(send, channel_name):
await send({
"type": "http.response.start",
"status": 200,
"headers": [
(b"Grip-Hold", b"response"),
(b"Grip-Channel", channel_name + b"; prev-id=started"),
],
})
await send({
"type": "http.response.body",
"body": b"timeout\n",
})
async def application(scope, receive, send):
if scope["type"] != "http":
raise RuntimeException(f"Unknown scope type {scope['type']}")
request_headers = {k.lower(): v for k, v in scope["headers"]}
if b"grip-last" in request_headers:
grip_last = request_headers[b"grip-last"]
channel_name = grip_last[:grip_last.index(b";")]
print(f"RETRY REQUEST {channel_name=}")
if channel_name in ready:
print(" result is ready")
await send({
"type": "http.response.start",
"status": 200,
})
await send({
"type": "http.response.body",
"body": b"retry\n",
})
else:
print(" result is not ready")
await send_grip_response(send, channel_name)
return
channel_name = ''.join(random.choices(string.ascii_lowercase, k=8)).encode()
print(f"INITIAL REQUEST {channel_name=}")
qs = dict(parse_qsl(scope["query_string"].decode()))
delay = float(qs.get("delay", "0"))
await send_grip_response(send, channel_name)
await asyncio.sleep(delay)
ready.add(channel_name)
print(f"FAKE JOB COMPLETED {channel_name=}")
command = {
"items": [
{
"channel": channel_name.decode(),
"prev-id": "started",
"formats": {"http-response": {"body": "publish\n"}},
},
]
}
publish_request = Request(
"http://localhost:5561/publish/",
data=json.dumps(command).encode(),
headers={"Content-Type": "application/json"},
)
with urlopen(publish_request) as response:
pass
print(f"PUBLISHED {channel_name=}")
To see the problem:
- install uvicorn;
- run
uvicorn app:application; - run
pushpin --route '* 127.0.0.1:8000' --merge-output --port 127.0.0.1:7999; - run
curl http://localhost:7999/?delay=1.1repeatedly (1.1 is a number that works for me, but it may need to be adjusted for your machine).
Most of the time curl will print "publish" or "retry", which is good. But sometimes it will print "timeout", which means that the "job" wasn't completed at the time of the retry request, and yet the publish command didn't work.
The application output in this case is:
INITIAL REQUEST channel_name=b'iycgzdat'
INFO: 127.0.0.1:34098 - "GET /?delay=1.1 HTTP/1.1" 200 OK
RETRY REQUEST channel_name=b'iycgzdat'
result is not ready
INFO: 127.0.0.1:34098 - "GET /?delay=1.1 HTTP/1.1" 200 OK
FAKE JOB COMPLETED channel_name=b'iycgzdat'
PUBLISHED channel_name=b'iycgzdat'
And pushpin output is:
[INFO] 2025-05-09 12:17:32.244 [handler] subscribe http://localhost:7999/?delay=1.1 channel=iycgzdat
[INFO] 2025-05-09 12:17:32.255 [proxy] GET http://localhost:7999/?delay=1.1 -> 127.0.0.1:8000 accept
[INFO] 2025-05-09 12:17:33.341 [handler] publish channel=iycgzdat receivers=0
[INFO] 2025-05-09 12:17:33.344 [handler] subscribe http://localhost:7999/?delay=1.1 channel=iycgzdat retry
[INFO] 2025-05-09 12:17:33.354 [proxy] GET http://localhost:7999/?delay=1.1 -> 127.0.0.1:8000 accept retry
In particular, that receivers=0 seems really suspicious - this happens nearly a full second after the initial request, so how could there be no receivers?
A connection undergoing a retry is not considered a subscriber, which can explain the receivers=0. This is in part because a retry might not result in a hold. However, if it does result in a hold and a publish had been attempted in the interim, the connection should retry again in order to resolve everything. Maybe this is not happening. I'll try your sample code and report back.
Ah, reproduced using your sample code and figured out the issue. Here's a fix:
--- a/app.py
+++ b/app.py
@@ -70,6 +70,7 @@ async def application(scope, receive, send):
"items": [
{
"channel": channel_name.decode(),
+ "id": "finished",
"prev-id": "started",
"formats": {"http-response": {"body": "publish\n"}},
},
Basically, in order for a re-held request to be retried again due to a publish happening during the previous retry, the published item needs an ID. Then you get a log message like:
[DEBUG] 2025-10-27 13:33:04.416 [handler] last ID inconsistency (got=started, expected=finished), retrying
Okay, thanks! This does seem to work.
This should hopefully resolve my last problem, so I'll close the issue.