Support multipart blob download
Tracking issue
https://github.com/flyteorg/flyte/issues/3632
Why are the changes needed?
Supporting multipart blob downloads allows us to completely copy the specified directory into the input path.
What changes were proposed in this pull request?
- Using new storage
Listapi to collect items under container before download - Implement
Listapi for memory storage - Parallel download
How was this patch tested?
unit tests, specifically in download_test.go
Setup process
Screenshots
Check all the applicable boxes
- [x] I updated the documentation accordingly.
- [x] All new and existing tests passed.
- [x] All commits are signed-off.
Related PRs
https://github.com/flyteorg/flytekit/pull/2258
Docs link
NA
Codecov Report
Attention: Patch coverage is 59.11950% with 65 lines in your changes missing coverage. Please review.
Project coverage is 36.90%. Comparing base (
fef67b8) to head (dbbd8c3). Report is 134 commits behind head on master.
Additional details and impacted files
@@ Coverage Diff @@
## master #5715 +/- ##
==========================================
+ Coverage 36.85% 36.90% +0.04%
==========================================
Files 1310 1310
Lines 131246 131372 +126
==========================================
+ Hits 48377 48477 +100
- Misses 78670 78682 +12
- Partials 4199 4213 +14
| Flag | Coverage Δ | |
|---|---|---|
| unittests-datacatalog | 51.58% <ø> (ø) |
|
| unittests-flyteadmin | 54.05% <ø> (ø) |
|
| unittests-flytecopilot | 22.23% <64.38%> (+10.50%) |
:arrow_up: |
| unittests-flytectl | 62.39% <ø> (ø) |
|
| unittests-flyteidl | 6.92% <ø> (ø) |
|
| unittests-flyteplugins | 53.84% <ø> (ø) |
|
| unittests-flytepropeller | 42.90% <ø> (ø) |
|
| unittests-flytestdlib | 55.31% <0.00%> (-0.07%) |
:arrow_down: |
Flags with carried forward coverage won't be shown. Click here to find out more.
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
There is also this PR, https://github.com/flyteorg/flyte/pull/5674/files which I think we should merge first. The change to core api should probably be done separately.
@wayner0628 https://github.com/flyteorg/flyte/pull/5741 this was just merged, adding a list api to the storage client. mind using the new interface to do this?
@wild-endeavor No problem, I'll update this PR to align with the new interface.
Hi @Future-Outlier and @wild-endeavor,
I’ve been encountering an issue while running a Flytekit test case. The error I'm seeing is as follows:
[1/1] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes.
[flyte-copilot-downloader] terminated with exit code (1). Reason [Error]. Message:
Type to use [iam, accesskey]. (default "iam")
--storage.connection.disable-ssl Disables SSL connection. Should only be used for development.
--storage.connection.endpoint string URL for storage client to connect to.
--storage.connection.region string Region to connect to. (default "us-east-1")
--storage.connection.secret-key string Secret to use when accesskey is set.
--storage.container string Initial container (in s3 a bucket) to create -if it doesn't exist-.'
--storage.defaultHttpClient.timeout string Sets time out on the http client. (default "0s")
--storage.enable-multicontainer If this is true, then the container argument is overlooked and redundant. This config will automatically open new connections to new containers/buckets as they are encountered
--storage.limits.maxDownloadMBs int Maximum allowed download size (in MBs) per call. (default 2)
--storage.stow.config stringToString Configuration for stow backend. Refer to github/graymeta/stow (default [])
--storage.stow.kind string Kind of Stow backend to use. Refer to github/graymeta/stow
--storage.type string Sets the type of storage to configure [s3/minio/local/mem/stow]. (default "s3")
--tls-server-name string If provided, this name will be used to validate server certificate. If this is not provided, hostname used to contact the server is used.
--token string Bearer token for authentication to the API server
--user string The name of the kubeconfig user to use
--username string Username for basic authentication to the API server
-v, --v Level number for the log level verbosity
--vmodule moduleSpec comma-separated list of pattern=N settings for file-filtered logging
.
The error with flyte-copilot-downloader persists even when I run the raw_container.py example from the Flytesnacks using master branch:
flytectl demo start --dev
POD_NAMESPACE=flyte ./flyte start --config flyte-single-binary-local.yaml
pyflyte run --remote raw_container.py calculate_ellipse_area_shell --a 1.1 --b 1.2
Environment Details:
- Flytekit: 1.13.7
- Flyte: up-to-date Master branch
- Config: No modifications from the master branch
- Python 3.9.12
- go version go1.22.4 darwin/arm64
I build, tag and push the modified docker image when testing this PR, but I did not use modified image for the Flytesnacks, it still failed.
This has been blocking me for a couple of weeks now. I’ll continue investigating, but any help or guidance you could provide would be greatly appreciated!
Thank you in advance.
Hi @Future-Outlier and @wild-endeavor,
I’ve been encountering an issue while running a Flytekit test case. The error I'm seeing is as follows:
[1/1] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes. [flyte-copilot-downloader] terminated with exit code (1). Reason [Error]. Message: Type to use [iam, accesskey]. (default "iam") --storage.connection.disable-ssl Disables SSL connection. Should only be used for development. --storage.connection.endpoint string URL for storage client to connect to. --storage.connection.region string Region to connect to. (default "us-east-1") --storage.connection.secret-key string Secret to use when accesskey is set. --storage.container string Initial container (in s3 a bucket) to create -if it doesn't exist-.' --storage.defaultHttpClient.timeout string Sets time out on the http client. (default "0s") --storage.enable-multicontainer If this is true, then the container argument is overlooked and redundant. This config will automatically open new connections to new containers/buckets as they are encountered --storage.limits.maxDownloadMBs int Maximum allowed download size (in MBs) per call. (default 2) --storage.stow.config stringToString Configuration for stow backend. Refer to github/graymeta/stow (default []) --storage.stow.kind string Kind of Stow backend to use. Refer to github/graymeta/stow --storage.type string Sets the type of storage to configure [s3/minio/local/mem/stow]. (default "s3") --tls-server-name string If provided, this name will be used to validate server certificate. If this is not provided, hostname used to contact the server is used. --token string Bearer token for authentication to the API server --user string The name of the kubeconfig user to use --username string Username for basic authentication to the API server -v, --v Level number for the log level verbosity --vmodule moduleSpec comma-separated list of pattern=N settings for file-filtered logging .The error with
flyte-copilot-downloaderpersists even when I run theraw_container.pyexample from the Flytesnacks using master branch:flytectl demo start --dev POD_NAMESPACE=flyte ./flyte start --config flyte-single-binary-local.yaml pyflyte run --remote raw_container.py calculate_ellipse_area_shell --a 1.1 --b 1.2Environment Details:
- Flytekit: 1.13.7
- Flyte: up-to-date Master branch
- Config: No modifications from the master branch
- Python 3.9.12
- go version go1.22.4 darwin/arm64
I build, tag and push the modified docker image when testing this PR, but I did not use modified image for the Flytesnacks, it still failed.
This has been blocking me for a couple of weeks now. I’ll continue investigating, but any help or guidance you could provide would be greatly appreciated!
Thank you in advance.
Can you show me your config file?
It's the original one, I used to run Flytesnacks
plugins:
logs:
kubernetes-enabled: true
kubernetes-template-uri: http://localhost:30080/kubernetes-dashboard/#/log/{{.namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}
cloudwatch-enabled: false
stackdriver-enabled: false
k8s:
default-env-vars:
- FLYTE_AWS_ENDPOINT: http://flyte-sandbox-minio.flyte:9000
- FLYTE_AWS_ACCESS_KEY_ID: minio
- FLYTE_AWS_SECRET_ACCESS_KEY: miniostorage
image-pull-policy: Always # Helps in better iteration of flytekit changes
k8s-array:
logs:
config:
kubernetes-enabled: true
kubernetes-template-uri: http://localhost:30080/kubernetes-dashboard/#/log/{{.namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}
cloudwatch-enabled: false
stackdriver-enabled: false
It's the original one, I used to run Flytesnacks
plugins: logs: kubernetes-enabled: true kubernetes-template-uri: http://localhost:30080/kubernetes-dashboard/#/log/{{.namespace }}/{{ .podName }}/pod?namespace={{ .namespace }} cloudwatch-enabled: false stackdriver-enabled: false k8s: default-env-vars: - FLYTE_AWS_ENDPOINT: http://flyte-sandbox-minio.flyte:9000 - FLYTE_AWS_ACCESS_KEY_ID: minio - FLYTE_AWS_SECRET_ACCESS_KEY: miniostorage image-pull-policy: Always # Helps in better iteration of flytekit changes k8s-array: logs: config: kubernetes-enabled: true kubernetes-template-uri: http://localhost:30080/kubernetes-dashboard/#/log/{{.namespace }}/{{ .podName }}/pod?namespace={{ .namespace }} cloudwatch-enabled: false stackdriver-enabled: false
you have to add co-pilot image.
k8s:
default-env-vars:
- FLYTE_AWS_ENDPOINT: "http://flyte-sandbox-minio.flyte:9000"
- FLYTE_AWS_ACCESS_KEY_ID: minio
- FLYTE_AWS_SECRET_ACCESS_KEY: miniostorage
- MLFLOW_TRACKING_URI: postgresql+psycopg2://postgres:@postgres.flyte.svc.cluster.local:5432/flyteadmin
co-pilot:
image: "cr.flyte.org/flyteorg/flytecopilot:v1.13.1"
@Future-Outlier , I'll try it later, thank you
@Future-Outlier , I add copilot image image: "cr.flyte.org/flyteorg/flytecopilot:v1.13.1" to the config, but the task still failed, do I need to push the image, or do anything else?
@Future-Outlier , I add copilot image
image: "cr.flyte.org/flyteorg/flytecopilot:v1.13.1"to the config, but the task still failed, do I need to push the image, or do anything else?
@wayner0628 show me your python code and show your whole k8s config.
@Future-Outlier , I add copilot image
image: "cr.flyte.org/flyteorg/flytecopilot:v1.13.1"to the config, but the task still failed, do I need to push the image, or do anything else?@wayner0628 show me your python code and show your whole k8s config.
import logging
from flytekit import ContainerTask, kwtypes, task, workflow
logger = logging.getLogger(__file__)
# A `flytekit.ContainerTask` denotes an arbitrary container. In the following example, the name of the task
# is `calculate_ellipse_area_shell`. This name has to be unique in the entire project. Users can specify:
#
# - `input_data_dir` -> where inputs will be written to.
# - `output_data_dir` -> where Flyte will expect the outputs to exist.
# `inputs` and `outputs` specify the interface for the task; thus it should be an ordered dictionary of typed input and
# output variables.
calculate_ellipse_area_shell = ContainerTask(
name="ellipse-area-metadata-shell",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs=kwtypes(a=float, b=float),
outputs=kwtypes(area=float, metadata=str),
image="ghcr.io/flyteorg/rawcontainers-shell:v2",
command=[
"./calculate-ellipse-area.sh",
"{{.inputs.a}}",
"{{.inputs.b}}",
"/var/outputs",
],
)
calculate_ellipse_area_python = ContainerTask(
name="ellipse-area-metadata-python",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs=kwtypes(a=float, b=float),
outputs=kwtypes(area=float, metadata=str),
image="ghcr.io/flyteorg/rawcontainers-python:v2",
command=[
"python",
"calculate-ellipse-area.py",
"{{.inputs.a}}",
"{{.inputs.b}}",
"/var/outputs",
],
)
calculate_ellipse_area_r = ContainerTask(
name="ellipse-area-metadata-r",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs=kwtypes(a=float, b=float),
outputs=kwtypes(area=float, metadata=str),
image="ghcr.io/flyteorg/rawcontainers-r:v2",
command=[
"Rscript",
"--vanilla",
"calculate-ellipse-area.R",
"{{.inputs.a}}",
"{{.inputs.b}}",
"/var/outputs",
],
)
calculate_ellipse_area_haskell = ContainerTask(
name="ellipse-area-metadata-haskell",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs=kwtypes(a=float, b=float),
outputs=kwtypes(area=float, metadata=str),
image="ghcr.io/flyteorg/rawcontainers-haskell:v2",
command=[
"./calculate-ellipse-area",
"{{.inputs.a}}",
"{{.inputs.b}}",
"/var/outputs",
],
)
calculate_ellipse_area_julia = ContainerTask(
name="ellipse-area-metadata-julia",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs=kwtypes(a=float, b=float),
outputs=kwtypes(area=float, metadata=str),
image="ghcr.io/flyteorg/rawcontainers-julia:v2",
command=[
"julia",
"calculate-ellipse-area.jl",
"{{.inputs.a}}",
"{{.inputs.b}}",
"/var/outputs",
],
)
@task
def report_all_calculated_areas(
area_shell: float,
metadata_shell: str,
area_python: float,
metadata_python: str,
area_r: float,
metadata_r: str,
area_haskell: float,
metadata_haskell: str,
area_julia: float,
metadata_julia: str,
):
logger.info(f"shell: area={area_shell}, metadata={metadata_shell}")
logger.info(f"python: area={area_python}, metadata={metadata_python}")
logger.info(f"r: area={area_r}, metadata={metadata_r}")
logger.info(f"haskell: area={area_haskell}, metadata={metadata_haskell}")
logger.info(f"julia: area={area_julia}, metadata={metadata_julia}")
# If you’re using Flytekit version >= v1.11.1, you can execute it locally.
# For example, `pyflyte run raw_container.py calculate_ellipse_area_shell --a 1.1 --b 1.2`
#
# As can be seen in this example, `ContainerTask`s can be interacted with like normal Python functions, whose inputs
# correspond to the declared input variables. All data returned by the tasks are consumed and logged by a Flyte task.
@workflow
def wf(a: float, b: float):
# Calculate area in all languages
area_shell, metadata_shell = calculate_ellipse_area_shell(a=a, b=b)
area_python, metadata_python = calculate_ellipse_area_python(a=a, b=b)
area_r, metadata_r = calculate_ellipse_area_r(a=a, b=b)
area_haskell, metadata_haskell = calculate_ellipse_area_haskell(a=a, b=b)
area_julia, metadata_julia = calculate_ellipse_area_julia(a=a, b=b)
# Report on all results in a single task to simplify comparison
report_all_calculated_areas(
area_shell=area_shell,
metadata_shell=metadata_shell,
area_python=area_python,
metadata_python=metadata_python,
area_r=area_r,
metadata_r=metadata_r,
area_haskell=area_haskell,
metadata_haskell=metadata_haskell,
area_julia=area_julia,
metadata_julia=metadata_julia,
)
@Future-Outlier , k8s config, you mean flyte-single-binary-local.yaml ?
# This is a sample configuration file for running single-binary Flyte locally against
# a sandbox.
admin:
# This endpoint is used by flytepropeller to talk to admin
# and artifacts to talk to admin,
# and _also_, admin to talk to artifacts
endpoint: localhost:30080
insecure: true
catalog-cache:
endpoint: localhost:8081
insecure: true
type: datacatalog
cluster_resources:
standaloneDeployment: false
templatePath: $HOME/.flyte/sandbox/cluster-resource-templates
logger:
show-source: true
level: 5
propeller:
create-flyteworkflow-crd: true
kube-config: $HOME/.flyte/sandbox/kubeconfig
rawoutput-prefix: s3://my-s3-bucket/data
server:
kube-config: $HOME/.flyte/sandbox/kubeconfig
webhook:
certDir: $HOME/.flyte/webhook-certs
localCert: true
secretName: flyte-sandbox-webhook-secret
serviceName: flyte-sandbox-local
servicePort: 9443
tasks:
task-plugins:
enabled-plugins:
- container
- sidecar
- K8S-ARRAY
- agent-service
- echo
default-for-task-types:
- container: container
- container_array: K8S-ARRAY
plugins:
logs:
kubernetes-enabled: true
kubernetes-template-uri: http://localhost:30080/kubernetes-dashboard/#/log/{{.namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}
cloudwatch-enabled: false
stackdriver-enabled: false
k8s:
default-env-vars:
- FLYTE_AWS_ENDPOINT: "http://flyte-sandbox-minio.flyte:9000"
- FLYTE_AWS_ACCESS_KEY_ID: minio
- FLYTE_AWS_SECRET_ACCESS_KEY: miniostorage
- MLFLOW_TRACKING_URI: postgresql+psycopg2://postgres:@postgres.flyte.svc.cluster.local:5432/flyteadmin
co-pilot:
image: "cr.flyte.org/flyteorg/flytecopilot:v1.13.1"
image-pull-policy: Always # Helps in better iteration of flytekit changes
k8s-array:
logs:
config:
kubernetes-enabled: true
kubernetes-template-uri: http://localhost:30080/kubernetes-dashboard/#/log/{{.namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}
cloudwatch-enabled: false
stackdriver-enabled: false
database:
postgres:
username: postgres
password: postgres
host: 127.0.0.1
port: 30001
dbname: flyte
options: "sslmode=disable"
storage:
type: stow
stow:
kind: s3
config:
region: us-east-1
disable_ssl: true
v2_signing: true
endpoint: http://localhost:30002
auth_type: accesskey
access_key_id: minio
secret_key: miniostorage
container: my-s3-bucket
task_resources:
defaults:
cpu: 500m
memory: 500Mi
limits:
cpu: 4
memory: 4Gi
@Future-Outlier , I add copilot image
image: "cr.flyte.org/flyteorg/flytecopilot:v1.13.1"to the config, but the task still failed, do I need to push the image, or do anything else?@wayner0628 show me your python code and show your whole k8s config.
import logging from flytekit import ContainerTask, kwtypes, task, workflow logger = logging.getLogger(__file__) # A `flytekit.ContainerTask` denotes an arbitrary container. In the following example, the name of the task # is `calculate_ellipse_area_shell`. This name has to be unique in the entire project. Users can specify: # # - `input_data_dir` -> where inputs will be written to. # - `output_data_dir` -> where Flyte will expect the outputs to exist. # `inputs` and `outputs` specify the interface for the task; thus it should be an ordered dictionary of typed input and # output variables. calculate_ellipse_area_shell = ContainerTask( name="ellipse-area-metadata-shell", input_data_dir="/var/inputs", output_data_dir="/var/outputs", inputs=kwtypes(a=float, b=float), outputs=kwtypes(area=float, metadata=str), image="ghcr.io/flyteorg/rawcontainers-shell:v2", command=[ "./calculate-ellipse-area.sh", "{{.inputs.a}}", "{{.inputs.b}}", "/var/outputs", ], ) calculate_ellipse_area_python = ContainerTask( name="ellipse-area-metadata-python", input_data_dir="/var/inputs", output_data_dir="/var/outputs", inputs=kwtypes(a=float, b=float), outputs=kwtypes(area=float, metadata=str), image="ghcr.io/flyteorg/rawcontainers-python:v2", command=[ "python", "calculate-ellipse-area.py", "{{.inputs.a}}", "{{.inputs.b}}", "/var/outputs", ], ) calculate_ellipse_area_r = ContainerTask( name="ellipse-area-metadata-r", input_data_dir="/var/inputs", output_data_dir="/var/outputs", inputs=kwtypes(a=float, b=float), outputs=kwtypes(area=float, metadata=str), image="ghcr.io/flyteorg/rawcontainers-r:v2", command=[ "Rscript", "--vanilla", "calculate-ellipse-area.R", "{{.inputs.a}}", "{{.inputs.b}}", "/var/outputs", ], ) calculate_ellipse_area_haskell = ContainerTask( name="ellipse-area-metadata-haskell", input_data_dir="/var/inputs", output_data_dir="/var/outputs", inputs=kwtypes(a=float, b=float), outputs=kwtypes(area=float, metadata=str), image="ghcr.io/flyteorg/rawcontainers-haskell:v2", command=[ "./calculate-ellipse-area", "{{.inputs.a}}", "{{.inputs.b}}", "/var/outputs", ], ) calculate_ellipse_area_julia = ContainerTask( name="ellipse-area-metadata-julia", input_data_dir="/var/inputs", output_data_dir="/var/outputs", inputs=kwtypes(a=float, b=float), outputs=kwtypes(area=float, metadata=str), image="ghcr.io/flyteorg/rawcontainers-julia:v2", command=[ "julia", "calculate-ellipse-area.jl", "{{.inputs.a}}", "{{.inputs.b}}", "/var/outputs", ], ) @task def report_all_calculated_areas( area_shell: float, metadata_shell: str, area_python: float, metadata_python: str, area_r: float, metadata_r: str, area_haskell: float, metadata_haskell: str, area_julia: float, metadata_julia: str, ): logger.info(f"shell: area={area_shell}, metadata={metadata_shell}") logger.info(f"python: area={area_python}, metadata={metadata_python}") logger.info(f"r: area={area_r}, metadata={metadata_r}") logger.info(f"haskell: area={area_haskell}, metadata={metadata_haskell}") logger.info(f"julia: area={area_julia}, metadata={metadata_julia}") # If you’re using Flytekit version >= v1.11.1, you can execute it locally. # For example, `pyflyte run raw_container.py calculate_ellipse_area_shell --a 1.1 --b 1.2` # # As can be seen in this example, `ContainerTask`s can be interacted with like normal Python functions, whose inputs # correspond to the declared input variables. All data returned by the tasks are consumed and logged by a Flyte task. @workflow def wf(a: float, b: float): # Calculate area in all languages area_shell, metadata_shell = calculate_ellipse_area_shell(a=a, b=b) area_python, metadata_python = calculate_ellipse_area_python(a=a, b=b) area_r, metadata_r = calculate_ellipse_area_r(a=a, b=b) area_haskell, metadata_haskell = calculate_ellipse_area_haskell(a=a, b=b) area_julia, metadata_julia = calculate_ellipse_area_julia(a=a, b=b) # Report on all results in a single task to simplify comparison report_all_calculated_areas( area_shell=area_shell, metadata_shell=metadata_shell, area_python=area_python, metadata_python=metadata_python, area_r=area_r, metadata_r=metadata_r, area_haskell=area_haskell, metadata_haskell=metadata_haskell, area_julia=area_julia, metadata_julia=metadata_julia, )@Future-Outlier , k8s config, you mean
flyte-single-binary-local.yaml?# This is a sample configuration file for running single-binary Flyte locally against # a sandbox. admin: # This endpoint is used by flytepropeller to talk to admin # and artifacts to talk to admin, # and _also_, admin to talk to artifacts endpoint: localhost:30080 insecure: true catalog-cache: endpoint: localhost:8081 insecure: true type: datacatalog cluster_resources: standaloneDeployment: false templatePath: $HOME/.flyte/sandbox/cluster-resource-templates logger: show-source: true level: 5 propeller: create-flyteworkflow-crd: true kube-config: $HOME/.flyte/sandbox/kubeconfig rawoutput-prefix: s3://my-s3-bucket/data server: kube-config: $HOME/.flyte/sandbox/kubeconfig webhook: certDir: $HOME/.flyte/webhook-certs localCert: true secretName: flyte-sandbox-webhook-secret serviceName: flyte-sandbox-local servicePort: 9443 tasks: task-plugins: enabled-plugins: - container - sidecar - K8S-ARRAY - agent-service - echo default-for-task-types: - container: container - container_array: K8S-ARRAY plugins: logs: kubernetes-enabled: true kubernetes-template-uri: http://localhost:30080/kubernetes-dashboard/#/log/{{.namespace }}/{{ .podName }}/pod?namespace={{ .namespace }} cloudwatch-enabled: false stackdriver-enabled: false k8s: default-env-vars: - FLYTE_AWS_ENDPOINT: "http://flyte-sandbox-minio.flyte:9000" - FLYTE_AWS_ACCESS_KEY_ID: minio - FLYTE_AWS_SECRET_ACCESS_KEY: miniostorage - MLFLOW_TRACKING_URI: postgresql+psycopg2://postgres:@postgres.flyte.svc.cluster.local:5432/flyteadmin co-pilot: image: "cr.flyte.org/flyteorg/flytecopilot:v1.13.1" image-pull-policy: Always # Helps in better iteration of flytekit changes k8s-array: logs: config: kubernetes-enabled: true kubernetes-template-uri: http://localhost:30080/kubernetes-dashboard/#/log/{{.namespace }}/{{ .podName }}/pod?namespace={{ .namespace }} cloudwatch-enabled: false stackdriver-enabled: false database: postgres: username: postgres password: postgres host: 127.0.0.1 port: 30001 dbname: flyte options: "sslmode=disable" storage: type: stow stow: kind: s3 config: region: us-east-1 disable_ssl: true v2_signing: true endpoint: http://localhost:30002 auth_type: accesskey access_key_id: minio secret_key: miniostorage container: my-s3-bucket task_resources: defaults: cpu: 500m memory: 500Mi limits: cpu: 4 memory: 4Gi
Thank you, I just tested it, and found that this is break by others....
Thanks for the insights, @Future-Outlier. I was able to test my copilot image and noticed an error in write_flytedir.py in your example :
Error: s3:/my-s3-bucket/data/9x/al2ts9fhllvlghff6dx6-n0-0/a0a112e1a40ca0f61ee2b4e6f4d3911c does not exist or is not a directory.
It seems like there’s a missing / after s3:/, but for corresponding FlyteDirectory and multi-part blob there is no such missing. I'm currently working on resolving this. Since testing with the Copilot setup takes some time, I appreciate your patience as I continue to investigate. Thanks!
Thanks for the insights, @Future-Outlier. I was able to test my copilot image and noticed an error in
write_flytedir.pyin your example :Error: s3:/my-s3-bucket/data/9x/al2ts9fhllvlghff6dx6-n0-0/a0a112e1a40ca0f61ee2b4e6f4d3911c does not exist or is not a directory.It seems like there’s a missing
/afters3:/, but for correspondingFlyteDirectoryandmulti-part blobthere is no such missing. I'm currently working on resolving this. Since testing with the Copilot setup takes some time, I appreciate your patience as I continue to investigate. Thanks!
No problem, reply here anytime, I'll be there
Hi @Future-Outlier, I have addressed the technical aspects of the issues, but I need some clarification on a couple of conceptual points before I can finalize the PR:
-
Memory Storage
ListAPI: In the unit test functionTestHandleBlobMultipart, the memory storageListAPI did not align with the implementation in the stow storage, which should use an "absolute path." I've made updates to both the API and the downloader in this behavior. ✅ -
Error Explanation:
Error: s3:/my-s3-bucket/data/9x/al2ts9fhllvlghff6dx6-n0-0/a0a112e1a40ca0f61ee2b4e6f4d3911c does not exist or is not a directory.This error seems to be related to
input_path = Path(sys.argv[1]), wherepathlibremoves the second slash. However, I'm confused about one part: "why does the Container Task receive an S3 path directly instead of a relative path that it could use to copy the input directory?"
Once I have clarity on these points, I believe I can deliver the final PR very quickly. Thank you!
Hi all,
I've tested the downloader with a Python task that writes to a FlyteDirectory, and the raw container is able to read it successfully, as shown below:
Additionally, I tested the scenario where a raw container writes a directory, and a downstream Python task reads it. It appears that the sidecar (uploader for the container task) does not currently support multi-blob tasks. Addressing this limitation will require more time, but since we already have the downloader with directory support, I suggest we proceed with merging this PR and address the sidecar support in a follow-up PR. (https://github.com/flyteorg/flyte/issues/5924)
I'll update the testing .py code and this screenshot in the description, thank you!
Hi all,
I've tested the downloader with a Python task that writes to a FlyteDirectory, and the raw container is able to read it successfully, as shown below:
Additionally, I tested the scenario where a raw container writes a directory, and a downstream Python task reads it. It appears that the sidecar (uploader for the container task) does not currently support multi-blob tasks. Addressing this limitation will require more time, but since we already have the downloader with directory support, I suggest we proceed with merging this PR and address the sidecar support in a follow-up PR. (#5924)
I'll update the testing .py code and this screenshot in the description, thank you!
It's looking pretty good, will come here at Thursday or Friday, thank you Wayner.
Hi @Future-Outlier, I’ve pushed some modifications based on your feedback—please take a look!
can the stow API install more than one file at a time? (We think not)
I’m not sure what this means. Could you clarify what changes you’d like here?
Hi @Future-Outlier, I’ve pushed some modifications based on your feedback—please take a look!
can the stow API install more than one file at a time? (We think not)
I’m not sure what this means. Could you clarify what changes you’d like here?
Yes no problem, will do it today. for the stow API, you can ignore it first. We just want to make sure there's no way to batch download files.
