hoverfly icon indicating copy to clipboard operation
hoverfly copied to clipboard

Have a way to launch a long-lived postaction process into middleware

Open arseniycloud opened this issue 6 months ago • 6 comments

Is your feature request related to a problem? Please describe.

I decided to try using postaction for load testing to simulate sending callbacks after a call while under load > ~150 rps. For this purpose, we allocated resources in the Kubernetes infrastructure

requests: cpu: 2 memory: 4Gi limits: cpu: 4 memory: 8Gi

My Use Case: I’ve implemented a custom Python script as a postAction hook in Hoverfly to handle a variety of webhook-related logic. The functions include:

•	detect_service_from_path
•	extract_id_from_mock_request
•	get_webhook_url
•	create_webhook_data
•	log_call_async
•	send_webhook_async
•	process_payload_async
•	async_main
•	main(entrypoint)

To optimize performance, I’ve utilized the following high-performance Python libraries:

•	aiohttp
•	asyncio
•	orjson

Here’s an example of main.py implementation: optimized for 50 concurrent requests

async def main_async() -> None:
    """Reads payload from stdin, creates aiohttp session, processes payload, ensures session close."""
    global RESULT_HISTORY
    RESULT_HISTORY.clear()
    payload_str = ""
    original_stdout_data = "{}"
    try:
        # Define Aiohttp settings inside main_async
        aiohttp_tcp_limit = int(os.environ.get("AIOHTTP_TCP_LIMIT", "1000"))
        aiohttp_tcp_limit_per_host = int(os.environ.get("AIOHTTP_TCP_LIMIT_PER_HOST", "500"))
        aiohttp_tcp_keepalive_timeout = float(os.environ.get("AIOHTTP_TCP_KEEPALIVE_TIMEOUT", "60.0"))
        request_timeout_total = float(os.environ.get("REQUEST_TIMEOUT", "30.0"))

        async with aiohttp.ClientSession(
                timeout=aiohttp.ClientTimeout(total=request_timeout_total),
                headers=CUSTOM_HEADERS,
                json_serialize=lambda obj: orjson.dumps(obj).decode("utf-8"),
                raise_for_status=False,
                connector=aiohttp.TCPConnector(
                    limit=aiohttp_tcp_limit, limit_per_host=aiohttp_tcp_limit_per_host,
                    keepalive_timeout=aiohttp_tcp_keepalive_timeout, ssl=False, enable_cleanup_closed=True
                )
        ) as session:
            try:
                payload_str = sys.stdin.read()
                original_stdout_data = payload_str
                try:
                    payload = orjson.loads(payload_str.encode("utf-8"))
                except orjson.JSONDecodeError as e:
                    timestamp = get_timestamp()
                    print(f"[ERROR] {timestamp} - parser - Failed to parse input as JSON: {str(e)}")
                    print(original_stdout_data)
                    return
                temp_data = payload_str;
                payload_str = None;
                del temp_data
                success, webhook_count = await process_payload_async(session, payload)
                del payload
                print(original_stdout_data)
                timestamp = get_timestamp()
                if webhook_count > 1:
                    status_msg = f"[STATUS] {timestamp} - batch - Sent {success}/{webhook_count} webhooks successfully"
                else:
                    status = "success" if success == 1 else "failure"
                    status_msg = f"[STATUS] {timestamp} - webhook - {status}"
                print(status_msg)
                if webhook_count > 0 and success < webhook_count:
                    timestamp_err = get_timestamp()
                    if webhook_count == 1:
                        print(f"[FAILURE] {timestamp_err} - webhook - failure")
                    else:
                        print(
                            f"[FAILURE] {timestamp_err} - batch - Sent only {success}/{webhook_count} webhooks successfully")
            except Exception as e:
                timestamp = get_timestamp()
                print(f"[ERROR] {timestamp} - system - Error processing data: {str(e)}")
                print(original_stdout_data)
    except Exception as e:
        timestamp = get_timestamp()
        print(f"[ERROR] {timestamp} - system - Error setting up/running main_async: {str(e)}")
        print(original_stdout_data)


def main() -> None:
    """Entry point: Runs async main."""
    try:
        asyncio.run(main_async())
    except Exception as e:
        timestamp = get_timestamp()
        print(f"[ERROR] {timestamp} - critical - Failed during main execution: {e}")
        sys.exit(1)

Load Testing: I used Pandora (aka Yandex Tank) as a load testing tool. Here is a screenshot of the load profile: Image

Issue: Unfortunately, under moderate load (~40 requests per second), we began to experience anomalies, increased latency, and eventually crashes. Here is the hoverfly story in a Kubernetes pod: OMKilled

Image

After investigation, it appears that Hoverfly spawns a new process for each request, which severely limits scalability and is likely the root cause of the instability. This behavior makes it almost impossible to efficiently use async Python code in high-throughput scenarios.

It would be incredibly beneficial to have a way to launch a long-lived external process (e.g., an async Python service) alongside the main Hoverfly Go process, rather than forking a fresh subprocess for each request.

This would drastically improve performance and resource usage for advanced use cases that rely on efficient, asynchronous outbound HTTP requests.

arseniycloud avatar May 06 '25 10:05 arseniycloud