Timeout on callback function when using ListenStream
Issue Summary
When using ListenStream to monitor stream activity, the connector process is used as a callback to check if the stream is alive. However, if the callback involves a long-running operation, the connector may stop unexpectedly. This happens because a security mechanism terminates the connector if there is no activity within 45 seconds. As a result, any long call that exceeds this timeout causes the connector to crash.
Steps to Reproduce
- Use ListenStream with a callback that performs a lengthy operation.
- Ensure the operation takes longer than 45 seconds.
- Observe that the connector stops due to inactivity.
Expected Behavior
The connector should handle long-running callbacks gracefully and not terminate as long as the operation is still in progress.
Actual Behavior
The connector stops if there is no activity within 45 seconds, even if the callback is still running.
Additional Context
This behavior is problematic for use cases where processing messages or data may occasionally take longer than the configured inactivity timeout. It would be helpful to either allow configurable timeouts or ensure the connector remains alive while a callback is executing
Solution Proposal
To prevent the connector from stopping during long-running callback operations, wrap the callback function in a future using asyncio. By doing so, you can obtain a promise and continuously check for a response in a loop. This approach ensures that the connector remains responsive and does not hit the inactivity timeout while the callback is still executing.
This will improve reliability for scenarios where callbacks might take longer than the inactivity timeout
Implementation Outline:
Wrap the callback in an asyncio future:
- Use asyncio.create_task() to schedule the callback as a coroutine.
Monitor the future in a loop:
- Implement a while True loop that periodically checks if the future is done, allowing the event loop to process other tasks and preventing the connector from being marked as inactive.
Handle the result:
- Once the future completes, retrieve and process the result as needed.