astro-cli icon indicating copy to clipboard operation
astro-cli copied to clipboard

astro dev start - Waiting for Webserver message hangs forever if XComBackend config has problems importing

Open alex-astronomer opened this issue 3 years ago • 0 comments

Describe the bug

When Xcom backend config has some sort of import error, then when the dev start command waits for webserver to start it hangs indefinitely.

What CLI Version did you experience this bug?

1.5.0

What Operating System is the above CLI installed on?

Mac OSX

🪜 Steps To Reproduce

  1. With this backend in include folder:
from uuid import uuid4
from airflow.models.xcom import BaseXCom
import msgpack
from functools import lru_cache
from typing import Any, Optional
import redis
from airflow.hooks.base import BaseHook


MAX_REDIS_STR_LIMIT = 536870912


def chunk_string(string, length):
    return (string[0 + i : length + i] for i in range(0, len(string), length))


class RedisXComBackend(BaseXCom):
    @staticmethod
    @lru_cache(maxsize=1)
    def redis_conn():
        conn = BaseHook.get_connection("redis_default")
        return redis.Redis(host=conn.host, port=conn.port, db=0)

    @staticmethod
    def serialize_value(
        value: Any,
        *,
        key: Optional[str] = None,
        task_id: Optional[str] = None,
        dag_id: Optional[str] = None,
        run_id: Optional[str] = None,
        map_index: Optional[int] = None,
    ):
        redis_key = f"{key}_{task_id}_{dag_id}_{run_id}_{map_index}"
        print(redis_key)
        packed = msgpack.packb(value)
        if len(packed) > MAX_REDIS_STR_LIMIT:
            chunks = chunk_string(packed, MAX_REDIS_STR_LIMIT)
            pipe = RedisXComBackend.redis_conn().pipeline()
            for chunk in chunks:
                pipe.lpush(redis_key, chunk)
            pipe.execute()
        else:
            RedisXComBackend.redis_conn().lpush(redis_key, packed)
        return BaseXCom.serialize_value(redis_key)

    @staticmethod
    def deserialize_value(result: "XCom") -> Any:
        redis_key = BaseXCom.deserialize_value(result)
        decoded = RedisXComBackend.redis_conn().lrange(name=redis_key, start=0, end=-1)
        decoded = b"".join(reversed(decoded))
        decoded = msgpack.unpackb(decoded)
        return decoded

    def orm_deserialize_value(self) -> Any:
        val = RedisXComBackend.deserialize_value(self)
        return val

  1. Set AIRFLOW__CORE__XCOM_BACKEND = include.RedisXComBackend.RedisXComBackend

  2. Make sure that msgpack is not installed (required for this backend)

  3. Run astrocloud dev start and see that the waiting for webserver hangs indefinitely

alex-astronomer avatar Jul 01 '22 15:07 alex-astronomer