prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Logs missing when using result_serializer with a GCS Block

Open essamik opened this issue 8 months ago • 4 comments

Bug summary

When providing a GCS Block to my flow decorator result_storage, logs don't appear neither on my local IDE when running a deployment locally from the python main, neither on my self hosted prefect server when running a deployment on k8s directly.

Removing that result_storage parameter line fix the problems and logs are again visible.


@flow(
    name="flow-name",
    result_serializer="json",
    result_storage="gcs-bucket/gcs-prefect-block", # Removing this line and logs show up again
    persist_result=True
)
def my_flow(day_delta: int = 30) -> None:

It's weird that setting a result_storage creates a side effects on the logging, seems like a bug to me.

See image, there should be a lot more logs at task level. Image

Version info

Version:             3.4.0
API version:         0.8.4
Python version:      3.12.3
Git commit:          c80e4442
Built:               Fri, May 02, 2025 08:02 PM
OS/Arch:             darwin/arm64
Profile:             dev
Server type:         server
Pydantic version:    2.10.6
Integrations:
  prefect-gcp:       0.6.2
  prefect-docker:    0.6.3
  prefect-kubernetes: 0.5.9

Additional context

I checked the log table in the prefect DB and they also do not appear there. I'm using the get_run_logger helper to log in my tasks. Log level is correctly set to default value.

essamik avatar May 08 '25 10:05 essamik

hi @essamik - thanks for the issue. can you provide a complete MRE here? this does not appear to reproduce

from prefect import flow, get_run_logger

@flow(
    name="flow-name",
    result_serializer="json",
    result_storage="gcs-bucket/gcs-prefect-block",  # logs show regardless of this kwarg
    persist_result=True,
)
def my_flow() -> None:
    logger = get_run_logger()
    logger.info("Hello, world!")


if __name__ == "__main__":
    my_flow()

ie it seems there must be something more specific going on in your case where you don't see logs

zzstoatzz avatar May 09 '25 14:05 zzstoatzz

Thanks for the reply, after some debugging I could narrow the issue to the usage of concurrency/parallelism (adding submit/result/wait to my tasks in my flow). I took the github star example from the Prefect tutorial (https://docs.prefect.io/v3/tutorials/pipelines#run-your-improved-flow) and added the GCS result storage.

from typing import Any

import httpx
from prefect import flow, get_run_logger, task


@task
def fetch_stats(github_repo: str) -> dict[str, Any]:
    get_run_logger().info("fetch stats") # Added a log here
    return httpx.get(f"https://api.github.com/repos/{github_repo}").json()


@task
def get_stars(repo_stats: dict[str, Any]) -> int:
    get_run_logger().info("get_stars") # Added a log here
    return repo_stats["stargazers_count"]

@flow(
    log_prints=True,
    result_storage="gcs-bucket/gcs-prefect-block" # the problematic line
)
def show_stars(github_repos: list[str]) -> None:
    stats_futures = fetch_stats.map(github_repos)
    stars = get_stars.map(stats_futures).result()

    for repo, star_count in zip(github_repos, stars):
        print(f"{repo}: {star_count} stars")


# Run the flow
if __name__ == "__main__":
    show_stars([
        "PrefectHQ/prefect",
        "pydantic/pydantic",
        "huggingface/transformers"
    ])

Here are the logs I get in my console output without the result_storage config:

12:59:43.784 | INFO    | Flow run 'tireless-ladybug' - Beginning flow run 'tireless-ladybug' for flow 'show-stars'
12:59:43.788 | INFO    | Flow run 'tireless-ladybug' - View at http://prefect.dev.int.com/runs/flow-run/345b0e31-c2ff-42e8-8aea-4ce7cf321064
12:59:44.897 | INFO    | Task run 'fetch_stats-7ac' - fetch stats
12:59:44.897 | INFO    | Task run 'fetch_stats-ac4' - fetch stats
12:59:44.899 | INFO    | Task run 'fetch_stats-a8f' - fetch stats
12:59:45.130 | INFO    | Task run 'fetch_stats-7ac' - Finished in state Completed()
12:59:45.137 | INFO    | Task run 'get_stars-b90' - get_stars
12:59:45.139 | INFO    | Task run 'fetch_stats-ac4' - Finished in state Completed()
12:59:45.140 | INFO    | Task run 'get_stars-b90' - Finished in state Completed()
12:59:45.144 | INFO    | Task run 'get_stars-878' - get_stars
12:59:45.145 | INFO    | Task run 'get_stars-878' - Finished in state Completed()
12:59:45.307 | INFO    | Task run 'fetch_stats-a8f' - Finished in state Completed()
12:59:45.314 | INFO    | Task run 'get_stars-db0' - get_stars
12:59:45.315 | INFO    | Task run 'get_stars-db0' - Finished in state Completed()
12:59:45.317 | INFO    | Flow run 'tireless-ladybug' - PrefectHQ/prefect: 19237 stars
12:59:45.318 | INFO    | Flow run 'tireless-ladybug' - pydantic/pydantic: 23782 stars
12:59:45.318 | INFO    | Flow run 'tireless-ladybug' - huggingface/transformers: 144157 stars
12:59:45.777 | INFO    | Flow run 'tireless-ladybug' - Finished in state Completed()

Process finished with exit code 0

And here the same run with the result_storage set to GCS block:

12:58:36.071 | INFO    | Flow run 'soft-badger' - Beginning flow run 'soft-badger' for flow 'show-stars'
12:58:36.099 | INFO    | Flow run 'soft-badger' - View at http://prefect.dev.int.com/runs/flow-run/8540ad76-c805-4266-9b75-ee43d1f5231f
12:58:38.661 | INFO    | Task run 'fetch_stats-3fc' - fetch stats

Process finished with exit code 0

No other side effects outside of logging, the flow gets executed correctly and create the desired output.

essamik avatar May 11 '25 11:05 essamik

Experiencing similar problems with my setup

mdturp avatar May 19 '25 13:05 mdturp

@zzstoatzz Do you need more details regarding this case ?

essamik avatar Jun 10 '25 06:06 essamik

I am also experiencing a similar thing. Since introducing GCS storage into my flows, the logging has become very unreliable.

slothtopus avatar Nov 14 '25 00:11 slothtopus

Turns out this is due to the prefect_gcp.cloud_storage module using the prefect.logging.loggers.disable_run_logger context manager in a kind of reckless way.

The cloud storage module uses disable_run_logger to hide some logging during its i/o calls (for example https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-gcp/prefect_gcp/cloud_storage.py#L807). This shuts down logging globally, which already isn't great, and because disable_run_logger is not thread safe, the logger very often gets stuck as disabled.

I've fixed this in my project by monkey patching disable_logger (which is used by disable_run_logger) to be a noop when called from prefect_gcp.cloud_storage.

import inspect
from contextlib import contextmanager
import prefect.logging.loggers as pf_loggers

_original_disable_logger = pf_loggers.disable_logger

@contextmanager
def selective_disable_logger(name: str):
    """
    Wrapper around Prefect's disable_logger.

    - If called from 'prefect_gcp.cloud_storage' -> no-op.
    - Otherwise -> delegate to original disable_logger.
    """
    stack = inspect.stack()
    caller_modules = {f.frame.f_globals.get("__name__", "") for f in stack[1:5]}

    if 'prefect_gcp.cloud_storage' in caller_modules:
        yield
    else:
        with _original_disable_logger(name):
            yield

pf_loggers.disable_logger = selective_disable_logger

slothtopus avatar Nov 18 '25 22:11 slothtopus

oof ty for the sleuthwork @slothtopus - we'll get this fixed or happy to review a PR in the meantime

zzstoatzz avatar Nov 18 '25 22:11 zzstoatzz

PR here

zzstoatzz avatar Nov 19 '25 17:11 zzstoatzz