prefect icon indicating copy to clipboard operation
prefect copied to clipboard

flow.from_source doesn't handle local paths

Open EmilRex opened this issue 1 year ago • 3 comments

First check

  • [X] I added a descriptive title to this issue.
  • [X] I used the GitHub search to find a similar issue and didn't find it.
  • [X] I searched the Prefect documentation for this issue.
  • [X] I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

I would expect to be able to pass a local file path to flow.from_source, but doing so raises an error. The flow code is in /Users/emilchristensen/github/hello.py. I have tried the following combinations:

  • /Users/emilchristensen/github
  • file:///Users/emilchristensen/github
  • file:////Users/emilchristensen/github

At least it's not clear to me how to specify that code is stored locally. The intent here is to chain from_source with .deploy and then execute the flow as a deployment with a worker.

Reproduction

from prefect import flow

@flow
def hello(name: str = "Marvin"):
    print(f"Hello {name}!")

if __name__ == "__main__":
    flow.from_source(
        source="/Users/emilchristensen/github",
        entrypoint="hello.py:hello"
    )

Error

(default) ➜  github python /Users/emilchristensen/github/hello.py 
Traceback (most recent call last):
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/runner/storage.py", line 431, in pull_code
    await from_async.wait_for_call_in_new_thread(
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/api.py", line 196, in wait_for_call_in_new_thread
    return call.result()
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 284, in result
    return self.future.result(timeout=timeout)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
    return self.__get_result()
  File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 318, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/fsspec/spec.py", line 798, in get
    rpaths = self.expand_path(rpath, recursive=recursive)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/fsspec/spec.py", line 893, in expand_path
    out = self.expand_path([path], recursive, maxdepth)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/fsspec/spec.py", line 917, in expand_path
    raise FileNotFoundError(path)
FileNotFoundError: ['/Users/emilchristensen/github/Users/emilchristensen/github']

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/emilchristensen/github/hello.py", line 8, in <module>
    flow.from_source(
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 255, in coroutine_wrapper
    return call()
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 398, in __call__
    return self.result()
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 284, in result
    return self.future.result(timeout=timeout)
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
    return self.__get_result()
  File "/opt/homebrew/Cellar/[email protected]/3.9.18_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
    result = await coro
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/flows.py", line 846, in from_source
    await storage.pull_code()
  File "/Users/emilchristensen/.virtualenvs/default/lib/python3.9/site-packages/prefect/runner/storage.py", line 440, in pull_code
    raise RuntimeError(
RuntimeError: Failed to pull contents from remote storage '/Users/emilchristensen/github' to PosixPath('/var/folders/yc/p317ld6d0gn55d0qqwh2pwrw0000gn/T/tmpofkq4bdg/Users/emilchristensen/github')

Versions

Version:             2.14.12
API version:         0.8.4
Python version:      3.9.18
Git commit:          5a2482d0
Built:               Wed, Dec 20, 2023 4:58 PM
OS/Arch:             darwin/arm64
Profile:             personal
Server type:         cloud

Additional context

No response

EmilRex avatar Dec 29 '23 21:12 EmilRex

Hi @EmilRex, provided that the file is called hello.py, I don't think you need .from_source here at all.

from prefect import flow

@flow
def hello(name: str = "Marvin"):
    print(f"Hello {name}!")

if __name__ == "__main__":
    hello.deploy(
        name="my-hello-deployment",
        ...
    )

Can you explain more about why this wouldn't work for you?

serinamarie avatar Jan 02 '24 16:01 serinamarie

@serinamarie I had assumed that you needed to specify a source to deploy something, but this is definitely better. What about if I need to specify a specific path... for example, suppose I call .deploy in a GitHub action (/code/hello.py), but on the server where the flow eventually runs it's under a different path (/Users/emilchristensen/github/hello.py).

EmilRex avatar Jan 03 '24 18:01 EmilRex

@EmilRex In the case that you mentioned, we handle the resolution of the flow source code either via a pull step that is generated from the source used in .from_source, or we build a Docker image and store the path to the flow entry point relative to the working directory of the image.

If the flow code is located in a different directory, then I'm assuming it is moved to the execution environment outside of Prefect's usual operations. We generally do not recommend this pattern. I can give a better recommendation with more information about the use case, but if users want to deploy a local flow, we recommend using flow.deploy to build and push a Docker image for the given flow.

desertaxle avatar Feb 13 '24 02:02 desertaxle

@desertaxle I have a use case where I want to store flow code on the worker container itself so technically I want to deploy from local storage when the worker starts. Each time the worker restarts, the deployment will be triggered and the path + entrypoint will get updated. I'm trying to deploy without using from_source and I'm running into an error

ValueError: Either an image or remote storage location must be provided when deploying a deployment.

Code sample:

test_parallel_sleeper.deploy(
        name=f"run_{environment}_sleeper_test",
        work_pool_name=os.environ["PREFECT_DEFAULT_WORK_POOL_NAME"],
        build=False,
        push=False,
        work_queue_name="default",
        is_schedule_active=False,
        tags=["testing"],
        version="0.1",
        description="Run a test flow.",
        enforce_parameter_schema=False,
        print_next_steps=True,
    )

If I try to provide a path similar to the issue I am running into the same FileNotFoundError and I'm fairly certain something unexpected is happening on in this line https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flows.py#L903

avinash-santhanagopalan avatar Mar 04 '24 15:03 avinash-santhanagopalan

Hey @avinash-santhanagopalan, this type of question is great for our Slack community. If you ask it there it will get more visibility. I will say that if you want to store your code in this way, we recommend baking your flow code directly into a Docker image so that the path stays stable between restarts. That's why you get an error when you try to deploy in the code sample you posted. If you want to collocate your flow code and your worker, it's also possible that using flow.serve might be better suited for your usecase.

desertaxle avatar Mar 04 '24 15:03 desertaxle

Thanks @desertaxle. I am baking the code into a docker image and then trying to .deploy from there but I'm still running into this issue. I will reach out in the slack community to get more clarity. I did try flow.serve and it did work but the serve command never exits (at least how I wrote it) and my worker never gets started. The scripts look like this.

python flows/test.py
exec prefect worker start --pool "$PREFECT_DEFAULT_WORK_POOL_NAME" --type "$PREFECT_DEFAULT_WORK_POOL_TYPE"

Example: test.py

if __name__ == "__main__":
    test_parallel_sleeper.serve(
        name=f"run_{environment}_sleeper_test",
        is_schedule_active=False,
        tags=["testing"],
        version="0.1",
        description="Run a test flow.",
        enforce_parameter_schema=False,
        limit=1,
    )

I will have dig around a little more but I really wish .from_source allows creating deployments from local paths.

avinash-santhanagopalan avatar Mar 05 '24 19:03 avinash-santhanagopalan

I'm trying to run a flow via process type work pool locally I created the pool, and here is my hello world

from prefect import flow, task

@task
def print_hello(name):
    print(f"Hello {name}!")

@flow(name="Hello Flow")
def hello_world(name="world"):
    print_hello(name)

if __name__ == "__main__":
    hello_world.deploy(
        name="my-hello-deployment",
        work_pool_name="process-pool"
    )

running it gives the error

in deploy raise ValueError(ValueError: Either an image or remote storage location must be provided when deploying a deployment.

It's confusing that I can't easily simulate work pools and workers setup locally without Docker when I have a specific work pool type process


UPD. here is the Docker local setup that works, no issues with the network or rest

from prefect import flow, task
from prefect.deployments import DeploymentImage

@task
def print_hello(name):
    print(f"Hello {name}!")

@flow(name="Hello Flow")
def hello_world(name="world"):
    print_hello(name)

if __name__ == "__main__":
    hello_world.deploy(
        name="hello-world-image-deployment", 
        work_pool_name="docker-pool",
        image="local/hello-world:1.0",
    push=False 
    )

povisenko avatar Mar 28 '24 19:03 povisenko

Same here, wanted to use prefect worker process and deploy a flow locally.

Actually, by using a prefect.yaml you can achieve this.

However I would like to do it through the python api with flow.deploy() ...


  • Initialize your prefect.yaml
$ prefect deploy $PWD/hello.py:flow -n my-deployment
...
  • Check your file
$ cat prefect.yaml
name: hello
build: null
push: null
pull:
- prefect.deployments.steps.set_working_directory:
    directory: /home/xxx/hello/

deployments:
- name: my-deployment
  version: null
  tags: []
  description: null
  entrypoint: hello.py:flow
  parameters:
  work_pool:
    name: my-work-pool
    work_queue_name: null
  schedules: []
  • Deploy it
$ prefect deploy  -n my-deployment
  • Launch a worker process
$ prefect worker start --pool "my-work-pool"

steph-ben avatar May 22 '24 10:05 steph-ben

Should be solved by #13981

EmilRex avatar Jun 12 '24 20:06 EmilRex