`PGClient.listen`, `Stream.asyncPush`, or `Stream.runForEach` bug: Stream or PG Channel silently stops working
What version of Effect is running?
^3.8.4
What steps can reproduce the bug?
It's on my todo list today to attempt to reproduce in a minimal repo, but here is the code
const postgres = yield* Postgres; // wrapped around PGClient
yield* Effect.forkScoped(
postgres.listen("job_queue_channel").pipe(
Stream.runForEach((jobId) =>
postgres.getJobById(jobId).pipe(
Effect.flatten,
Effect.andThen((job) => Message.decode(job.payload)),
Effect.andThen((message) => [jobId, message] as const),
Effect.andThen(handleMessage),
Effect.onError(handleError),
),
),
),
);
What is the expected behavior?
The stream should process items for the pg channel indefinitely.
What do you see instead?
After some period of time, the stream stops processing items without any indication of why and without the fiber dying.
Additional information
const make = Effect.gen(function* () {
yield* Effect.logInfo("Starting PostgresClient");
yield* Effect.acquireRelease(Effect.logInfo("PostgresClient started"), () =>
Effect.logInfo("PostgresClient stopped"),
);
const sql = yield* PgClient.PgClient;
const getJobByIdAndLock = (jobId: string) =>
Effect.gen(function* () {
yield* Effect.logInfo(`Getting job '${jobId}'`);
return yield* sql<Job>`
update job_queue
set status='locked', locked_at = ${new Date()}
where id = (
select id from job_queue
where id = ${jobId}
for update skip locked
limit 1
)
returning *;
`.pipe(Effect.map(Array.get(0)));
});
const setJobStatus = (jobId: string, status: Job["status"]) =>
Effect.gen(function* () {
yield* Effect.logInfo(`Setting status to '${status}' for job '${jobId}'`);
if (status === "completed") {
yield* sql<Job>`
update job_queue set status = ${status}, completed_at = ${new Date()} where id = ${jobId};
`;
return;
}
yield* sql<Job>`
update job_queue set status = ${status} where id = ${jobId};
`;
});
return {
getJobById: getJobByIdAndLock,
setJobStatus,
listen: sql.listen,
} as const satisfies IPostgres;
});
export const PostgresClient = PgClient.layer({
url: Config.redacted("DATABASE_URL"),
debug: Config.succeed((...args) => {
const [connection, query, parameters, paramTypes] = args;
console.log(
`PgClient debug: ${JSON.stringify({
fn: "debug",
connection,
query,
parameters,
paramTypes,
})}`,
);
}),
onnotice: Config.succeed((notice) => {
console.log(`PgClient onnotice: ${JSON.stringify(notice)}`);
}),
}).pipe(Layer.annotateLogs({ module: "PgClient" }));
function handleMessage([jobId, message]: readonly [string, Message]) {
return Effect.gen(function* () {
yield* Effect.logInfo(`Handling job '${jobId}' of type '${message._tag}'`);
const retryPolicy = yield* RetryPolicy;
const postgres = yield* Postgres;
const withRetry = Effect.retry(retryPolicy);
const run = <A, E, R>(effect: Effect.Effect<A, E, R>) => {
return Effect.matchEffect(withRetry(effect), {
onFailure(cause) {
return Effect.gen(function* () {
yield* Effect.logError(
`An error occurred while handling job '${jobId}' of type '${message._tag}': ${cause}`
);
yield* withRetry(postgres.setJobStatus(jobId, "error"));
return yield* Effect.fail(cause);
});
},
onSuccess(a) {
return Effect.gen(function* () {
yield* Effect.logInfo(
`Successfully handled job '${jobId}' of type '${message._tag}'`
);
yield* withRetry(postgres.setJobStatus(jobId, "completed"));
return yield* Effect.succeed(a);
});
}
});
};
switch (message._tag) {
case "send_verification_email": {
return yield* Effect.forkScoped(
run(handleSendVerificationEmail(message))
);
}
case "process_metrics_upload": {
return yield* Effect.forkScoped(
run(handleProccesMetricsUpload(message))
);
}
case "ontraport_create_contact": {
return yield* Effect.forkScoped(
run(handleOntraportCreateContact(message))
);
}
}
});
}
I'll wait for the minimal repro before going any further.
Will need to see at which point the messages are getting stuck, just to make sure it isn't the underlying postgres.js library.
I've spent a ton of time digging on this and have learned more than I ever wanted to about postgres and tcp 😂 I'm inclined to believe that this is not an Effect bug at this point.
If I had to point to a potential point of failure that I wanna spend more time it would be this in @effect/sql-pg
listen: (channel: string) =>
Stream.asyncPush<string, SqlError>((emit) =>
Effect.acquireRelease(
Effect.tryPromise({
try: () => client.listen(channel, (payload) => emit.single(payload)),
catch: (cause) => new SqlError({ cause, message: "Failed to listen" })
}),
({ unlisten }) => Effect.promise(() => unlisten())
)
),
client.listen takes a third argument, onlisten that might be useful for tapping into. Under the covers this called in the following circumstances:
- Initial connection
- Reconnects
this is what the postgres npm libraries impl of listen looks like:
async function listen(name, fn, onlisten) {
const listener = { fn, onlisten }
const sql = listen.sql || (listen.sql = Postgres({
...options,
max: 1,
idle_timeout: null,
max_lifetime: null,
fetch_types: false,
onclose() {
Object.entries(listen.channels).forEach(([name, { listeners }]) => {
delete listen.channels[name]
Promise.all(listeners.map(l => listen(name, l.fn, l.onlisten).catch(() => { /* noop */ })))
})
},
onnotify(c, x) {
c in listen.channels && listen.channels[c].listeners.forEach(l => l.fn(x))
}
}))
const channels = listen.channels || (listen.channels = {})
, exists = name in channels
if (exists) {
channels[name].listeners.push(listener)
const result = await channels[name].result
listener.onlisten && listener.onlisten()
return { state: result.state, unlisten }
}
channels[name] = { result: sql`listen ${
sql.unsafe('"' + name.replace(/"/g, '""') + '"')
}`, listeners: [listener] }
const result = await channels[name].result
listener.onlisten && listener.onlisten()
return { state: result.state, unlisten }
Specifically in this block is where reconnects happen:
onclose() {
Object.entries(listen.channels).forEach(([name, { listeners }]) => {
delete listen.channels[name]
Promise.all(listeners.map(l => listen(name, l.fn, l.onlisten).catch(() => { /* noop */ })))
})
},
The recursive calls for retries/reconnect seems like it should be fine w/ how y'all are using emit from Stream.asyncPush but haven't had the time to patch deps and add logging yet.
How frequent are the messages? And how is postgres hosted in this case?
I think I've resolved the issue - the frequency of messages is very low atm due to it just being my testing. That being said, switching from using Supabases pooler/Pooled Connection to a direct connection, seems to have resolved this issue.
Still concerning that the underlying postgres library gave no indication of disconnects or errors though
I think I've resolved the issue - the frequency of messages is very low atm due to it just being my testing. That being said, switching from using Supabases pooler/Pooled Connection to a direct connection, seems to have resolved this issue.
Still concerning that the underlying postgres library gave no indication of disconnects or errors though
It seems they are disabling timeouts for the connection, which could be an issue too.
Might be a supabase issue? https://github.com/supabase/supavisor/issues/85