flytekit icon indicating copy to clipboard operation
flytekit copied to clipboard

Fix databricks job request serialization

Open pbrogan12 opened this issue 1 year ago • 6 comments

Why are the changes needed?

Ran into an error in create method of Databricks agent when testing v1.11.0.

Last Error: USER:: rpc error: code = Internal desc = failed to create spark task with error: Object of type RepeatedScalarContainer is not JSON serializable

The parameter spark_python_task contains reference to container.args which is of type RepeatedScalarContainer from protobuf library

https://github.com/flyteorg/flytekit/blob/4d42399079535c3582c9adfad5791253dcbd045e/plugins/flytekit-spark/flytekitplugins/spark/agent.py#L51

This is not JSON serializable, resulting in error when converting the job request to json.

https://github.com/flyteorg/flytekit/blob/4d42399079535c3582c9adfad5791253dcbd045e/plugins/flytekit-spark/flytekitplugins/spark/agent.py#L62

What changes were proposed in this pull request?

Modified Container.from_flyte_idl method to convert args to a list of strings.

How was this patch tested?

Running locally

Setup process

Screenshots

Check all the applicable boxes

  • [ ] I updated the documentation accordingly.
  • [ ] All new and existing tests passed.
  • [ ] All commits are signed-off.

Related PRs

Docs link

pbrogan12 avatar Mar 19 '24 16:03 pbrogan12

Codecov Report

All modified and coverable lines are covered by tests :white_check_mark:

Project coverage is 83.34%. Comparing base (4d42399) to head (1c402bb). Report is 40 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2286      +/-   ##
==========================================
- Coverage   83.73%   83.34%   -0.40%     
==========================================
  Files         327      311      -16     
  Lines       24870    24191     -679     
  Branches     3706     3496     -210     
==========================================
- Hits        20826    20162     -664     
+ Misses       3418     3401      -17     
- Partials      626      628       +2     

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

codecov[bot] avatar Mar 19 '24 18:03 codecov[bot]

Will it be better to hardcode them in spark/agent.py? I'm not sure will other arguments be affected.

I think it's preferable to fix in the class method, from looking at the unit tests. It looks like the Container class args attribute is initialized with list of string. https://github.com/flyteorg/flytekit/blob/3642ec6771985f504c2b6a9dfd2598a84219ac58/tests/flytekit/unit/models/test_tasks.py#L252-L259

pbrogan12 avatar Mar 20 '24 15:03 pbrogan12

Will it be better to hardcode them in spark/agent.py? I'm not sure will other arguments be affected.

I think it's preferable to fix in the class method, from looking at the unit tests. It looks like the Container class args attribute is initialized with list of string.

https://github.com/flyteorg/flytekit/blob/3642ec6771985f504c2b6a9dfd2598a84219ac58/tests/flytekit/unit/models/test_tasks.py#L252-L259

Make sense, but need more time to trace code, thank you so much

Future-Outlier avatar Mar 20 '24 15:03 Future-Outlier

@pbrogan12 Can you help signoff this PR?

Future-Outlier avatar Mar 20 '24 15:03 Future-Outlier

Yep @Future-Outlier

Launched agent service from docker

docker run -it -p 8000:8000  --entrypoint pyflyte ghcr.io/flyteorg/flyteagent:1.11.0 --verbose serve agent

Made a simple task

@task(
    container_image="databricksruntime/standard:14.3-LTS",
    task_config=Databricks(
        spark_conf={"spark.executor.cores": "1"},
        databricks_instance="example.cloud.databricks.com",
        databricks_conf={
            "run_name": "flyte-databricks",
            "new_cluster": {
                "spark_version": "14.3.x-scala2.12",
                "node_type_id": "r5.xlarge",
                "num_workers": 1,
                "aws_attributes": {
                    "instance_profile_arn": "arn:aws:iam:123456789:instance-profile/flyte-databricks",
                    "ebs_volume_size": 100,
                    "ebs_volume_type": "GENERAL_PURPOSE_SSD",
                    "ebs_volume_count": 1,
                },
            },
        },
    ),
)
def databricks_hello_world() -> str:
    return "Hello world"

Compiled the task using pyflyte package and unpacked the tar archive.

pyflyte -k tasks package  

Finally, formed the CreateTask request and sent to local agent service.

import grpc

from flyteidl.core.compiler_pb2 import CompiledTask
from flyteidl.service.agent_pb2_grpc import AsyncAgentServiceStub
from flyteidl.admin.agent_pb2 import CreateTaskRequest

task = CompiledTask()

with open("./0_tasks.databricks.databricks_hello_world_1.pb", "rb") as f:
    task.ParseFromString(f.read())

request = CreateTaskRequest()
request.template.CopyFrom(task.template)

channel = grpc.insecure_channel("localhost:8000")
stub = AsyncAgentServiceStub(channel)
response = stub.CreateTask(request)

Here is the error message I get

grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.INTERNAL
	details = "failed to create spark task with error: Object of type RepeatedScalarContainer is not JSON serializable."
	debug_error_string = "UNKNOWN:Error received from peer ipv4:127.0.0.1:8000 {grpc_message:"failed to create spark task with error: Object of type RepeatedScalarContainer is not JSON serializable.", grpc_status:13, created_time:"2024-03-21T08:51:16.611258-04:00"}"
>

Attached the compiled task protobuf that I used.

flyte-package.tgz

pbrogan12 avatar Mar 21 '24 12:03 pbrogan12

Yep @Future-Outlier

Launched agent service from docker


docker run -it -p 8000:8000  --entrypoint pyflyte ghcr.io/flyteorg/flyteagent:1.11.0 --verbose serve agent

Made a simple task


@task(

    container_image="databricksruntime/standard:14.3-LTS",

    task_config=Databricks(

        spark_conf={"spark.executor.cores": "1"},

        databricks_instance="example.cloud.databricks.com",

        databricks_conf={

            "run_name": "flyte-databricks",

            "new_cluster": {

                "spark_version": "14.3.x-scala2.12",

                "node_type_id": "r5.xlarge",

                "num_workers": 1,

                "aws_attributes": {

                    "instance_profile_arn": "arn:aws:iam:123456789:instance-profile/flyte-databricks",

                    "ebs_volume_size": 100,

                    "ebs_volume_type": "GENERAL_PURPOSE_SSD",

                    "ebs_volume_count": 1,

                },

            },

        },

    ),

)

def databricks_hello_world() -> str:

    return "Hello world"

Compiled the task using pyflyte package and unpacked the tar archive.


pyflyte -k tasks package  

Finally, formed the CreateTask request and sent to local agent service.


import grpc



from flyteidl.core.compiler_pb2 import CompiledTask

from flyteidl.service.agent_pb2_grpc import AsyncAgentServiceStub

from flyteidl.admin.agent_pb2 import CreateTaskRequest



task = CompiledTask()



with open("./0_tasks.databricks.databricks_hello_world_1.pb", "rb") as f:

    task.ParseFromString(f.read())



request = CreateTaskRequest()

request.template.CopyFrom(task.template)



channel = grpc.insecure_channel("localhost:8000")

stub = AsyncAgentServiceStub(channel)

response = stub.CreateTask(request)

Here is the error message I get


grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:

	status = StatusCode.INTERNAL

	details = "failed to create spark task with error: Object of type RepeatedScalarContainer is not JSON serializable."

	debug_error_string = "UNKNOWN:Error received from peer ipv4:127.0.0.1:8000 {grpc_message:"failed to create spark task with error: Object of type RepeatedScalarContainer is not JSON serializable.", grpc_status:13, created_time:"2024-03-21T08:51:16.611258-04:00"}"

>

Attached the compiled task protobuf that I used.

flyte-package.tgz

Thank you, valuable content, will help you tomorrow and discuss with maintainers <3

Future-Outlier avatar Mar 21 '24 14:03 Future-Outlier

@pbrogan12 thank you. It also fixes my issue.

pingsutw avatar Apr 04 '24 00:04 pingsutw