Logs missing when using result_serializer with a GCS Block
Bug summary
When providing a GCS Block to my flow decorator result_storage, logs don't appear neither on my local IDE when running a deployment locally from the python main, neither on my self hosted prefect server when running a deployment on k8s directly.
Removing that result_storage parameter line fix the problems and logs are again visible.
@flow(
name="flow-name",
result_serializer="json",
result_storage="gcs-bucket/gcs-prefect-block", # Removing this line and logs show up again
persist_result=True
)
def my_flow(day_delta: int = 30) -> None:
It's weird that setting a result_storage creates a side effects on the logging, seems like a bug to me.
See image, there should be a lot more logs at task level.
Version info
Version: 3.4.0
API version: 0.8.4
Python version: 3.12.3
Git commit: c80e4442
Built: Fri, May 02, 2025 08:02 PM
OS/Arch: darwin/arm64
Profile: dev
Server type: server
Pydantic version: 2.10.6
Integrations:
prefect-gcp: 0.6.2
prefect-docker: 0.6.3
prefect-kubernetes: 0.5.9
Additional context
I checked the log table in the prefect DB and they also do not appear there.
I'm using the get_run_logger helper to log in my tasks.
Log level is correctly set to default value.
hi @essamik - thanks for the issue. can you provide a complete MRE here? this does not appear to reproduce
from prefect import flow, get_run_logger
@flow(
name="flow-name",
result_serializer="json",
result_storage="gcs-bucket/gcs-prefect-block", # logs show regardless of this kwarg
persist_result=True,
)
def my_flow() -> None:
logger = get_run_logger()
logger.info("Hello, world!")
if __name__ == "__main__":
my_flow()
ie it seems there must be something more specific going on in your case where you don't see logs
Thanks for the reply, after some debugging I could narrow the issue to the usage of concurrency/parallelism (adding submit/result/wait to my tasks in my flow). I took the github star example from the Prefect tutorial (https://docs.prefect.io/v3/tutorials/pipelines#run-your-improved-flow) and added the GCS result storage.
from typing import Any
import httpx
from prefect import flow, get_run_logger, task
@task
def fetch_stats(github_repo: str) -> dict[str, Any]:
get_run_logger().info("fetch stats") # Added a log here
return httpx.get(f"https://api.github.com/repos/{github_repo}").json()
@task
def get_stars(repo_stats: dict[str, Any]) -> int:
get_run_logger().info("get_stars") # Added a log here
return repo_stats["stargazers_count"]
@flow(
log_prints=True,
result_storage="gcs-bucket/gcs-prefect-block" # the problematic line
)
def show_stars(github_repos: list[str]) -> None:
stats_futures = fetch_stats.map(github_repos)
stars = get_stars.map(stats_futures).result()
for repo, star_count in zip(github_repos, stars):
print(f"{repo}: {star_count} stars")
# Run the flow
if __name__ == "__main__":
show_stars([
"PrefectHQ/prefect",
"pydantic/pydantic",
"huggingface/transformers"
])
Here are the logs I get in my console output without the result_storage config:
12:59:43.784 | INFO | Flow run 'tireless-ladybug' - Beginning flow run 'tireless-ladybug' for flow 'show-stars'
12:59:43.788 | INFO | Flow run 'tireless-ladybug' - View at http://prefect.dev.int.com/runs/flow-run/345b0e31-c2ff-42e8-8aea-4ce7cf321064
12:59:44.897 | INFO | Task run 'fetch_stats-7ac' - fetch stats
12:59:44.897 | INFO | Task run 'fetch_stats-ac4' - fetch stats
12:59:44.899 | INFO | Task run 'fetch_stats-a8f' - fetch stats
12:59:45.130 | INFO | Task run 'fetch_stats-7ac' - Finished in state Completed()
12:59:45.137 | INFO | Task run 'get_stars-b90' - get_stars
12:59:45.139 | INFO | Task run 'fetch_stats-ac4' - Finished in state Completed()
12:59:45.140 | INFO | Task run 'get_stars-b90' - Finished in state Completed()
12:59:45.144 | INFO | Task run 'get_stars-878' - get_stars
12:59:45.145 | INFO | Task run 'get_stars-878' - Finished in state Completed()
12:59:45.307 | INFO | Task run 'fetch_stats-a8f' - Finished in state Completed()
12:59:45.314 | INFO | Task run 'get_stars-db0' - get_stars
12:59:45.315 | INFO | Task run 'get_stars-db0' - Finished in state Completed()
12:59:45.317 | INFO | Flow run 'tireless-ladybug' - PrefectHQ/prefect: 19237 stars
12:59:45.318 | INFO | Flow run 'tireless-ladybug' - pydantic/pydantic: 23782 stars
12:59:45.318 | INFO | Flow run 'tireless-ladybug' - huggingface/transformers: 144157 stars
12:59:45.777 | INFO | Flow run 'tireless-ladybug' - Finished in state Completed()
Process finished with exit code 0
And here the same run with the result_storage set to GCS block:
12:58:36.071 | INFO | Flow run 'soft-badger' - Beginning flow run 'soft-badger' for flow 'show-stars'
12:58:36.099 | INFO | Flow run 'soft-badger' - View at http://prefect.dev.int.com/runs/flow-run/8540ad76-c805-4266-9b75-ee43d1f5231f
12:58:38.661 | INFO | Task run 'fetch_stats-3fc' - fetch stats
Process finished with exit code 0
No other side effects outside of logging, the flow gets executed correctly and create the desired output.
Experiencing similar problems with my setup
@zzstoatzz Do you need more details regarding this case ?
I am also experiencing a similar thing. Since introducing GCS storage into my flows, the logging has become very unreliable.
Turns out this is due to the prefect_gcp.cloud_storage module using the prefect.logging.loggers.disable_run_logger context manager in a kind of reckless way.
The cloud storage module uses disable_run_logger to hide some logging during its i/o calls (for example https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-gcp/prefect_gcp/cloud_storage.py#L807). This shuts down logging globally, which already isn't great, and because disable_run_logger is not thread safe, the logger very often gets stuck as disabled.
I've fixed this in my project by monkey patching disable_logger (which is used by disable_run_logger) to be a noop when called from prefect_gcp.cloud_storage.
import inspect
from contextlib import contextmanager
import prefect.logging.loggers as pf_loggers
_original_disable_logger = pf_loggers.disable_logger
@contextmanager
def selective_disable_logger(name: str):
"""
Wrapper around Prefect's disable_logger.
- If called from 'prefect_gcp.cloud_storage' -> no-op.
- Otherwise -> delegate to original disable_logger.
"""
stack = inspect.stack()
caller_modules = {f.frame.f_globals.get("__name__", "") for f in stack[1:5]}
if 'prefect_gcp.cloud_storage' in caller_modules:
yield
else:
with _original_disable_logger(name):
yield
pf_loggers.disable_logger = selective_disable_logger
oof ty for the sleuthwork @slothtopus - we'll get this fixed or happy to review a PR in the meantime