Fix databricks job request serialization
Why are the changes needed?
Ran into an error in create method of Databricks agent when testing v1.11.0.
Last Error: USER:: rpc error: code = Internal desc = failed to create spark task with error: Object of type RepeatedScalarContainer is not JSON serializable
The parameter spark_python_task contains reference to container.args which is of type RepeatedScalarContainer from protobuf library
https://github.com/flyteorg/flytekit/blob/4d42399079535c3582c9adfad5791253dcbd045e/plugins/flytekit-spark/flytekitplugins/spark/agent.py#L51
This is not JSON serializable, resulting in error when converting the job request to json.
https://github.com/flyteorg/flytekit/blob/4d42399079535c3582c9adfad5791253dcbd045e/plugins/flytekit-spark/flytekitplugins/spark/agent.py#L62
What changes were proposed in this pull request?
Modified Container.from_flyte_idl method to convert args to a list of strings.
How was this patch tested?
Running locally
Setup process
Screenshots
Check all the applicable boxes
- [ ] I updated the documentation accordingly.
- [ ] All new and existing tests passed.
- [ ] All commits are signed-off.
Related PRs
Docs link
Codecov Report
All modified and coverable lines are covered by tests :white_check_mark:
Project coverage is 83.34%. Comparing base (
4d42399) to head (1c402bb). Report is 40 commits behind head on master.
Additional details and impacted files
@@ Coverage Diff @@
## master #2286 +/- ##
==========================================
- Coverage 83.73% 83.34% -0.40%
==========================================
Files 327 311 -16
Lines 24870 24191 -679
Branches 3706 3496 -210
==========================================
- Hits 20826 20162 -664
+ Misses 3418 3401 -17
- Partials 626 628 +2
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
Will it be better to hardcode them in
spark/agent.py? I'm not sure will other arguments be affected.
I think it's preferable to fix in the class method, from looking at the unit tests. It looks like the Container class args attribute is initialized with list of string. https://github.com/flyteorg/flytekit/blob/3642ec6771985f504c2b6a9dfd2598a84219ac58/tests/flytekit/unit/models/test_tasks.py#L252-L259
Will it be better to hardcode them in
spark/agent.py? I'm not sure will other arguments be affected.I think it's preferable to fix in the class method, from looking at the unit tests. It looks like the Container class args attribute is initialized with list of string.
https://github.com/flyteorg/flytekit/blob/3642ec6771985f504c2b6a9dfd2598a84219ac58/tests/flytekit/unit/models/test_tasks.py#L252-L259
Make sense, but need more time to trace code, thank you so much
@pbrogan12 Can you help signoff this PR?
Yep @Future-Outlier
Launched agent service from docker
docker run -it -p 8000:8000 --entrypoint pyflyte ghcr.io/flyteorg/flyteagent:1.11.0 --verbose serve agent
Made a simple task
@task(
container_image="databricksruntime/standard:14.3-LTS",
task_config=Databricks(
spark_conf={"spark.executor.cores": "1"},
databricks_instance="example.cloud.databricks.com",
databricks_conf={
"run_name": "flyte-databricks",
"new_cluster": {
"spark_version": "14.3.x-scala2.12",
"node_type_id": "r5.xlarge",
"num_workers": 1,
"aws_attributes": {
"instance_profile_arn": "arn:aws:iam:123456789:instance-profile/flyte-databricks",
"ebs_volume_size": 100,
"ebs_volume_type": "GENERAL_PURPOSE_SSD",
"ebs_volume_count": 1,
},
},
},
),
)
def databricks_hello_world() -> str:
return "Hello world"
Compiled the task using pyflyte package and unpacked the tar archive.
pyflyte -k tasks package
Finally, formed the CreateTask request and sent to local agent service.
import grpc
from flyteidl.core.compiler_pb2 import CompiledTask
from flyteidl.service.agent_pb2_grpc import AsyncAgentServiceStub
from flyteidl.admin.agent_pb2 import CreateTaskRequest
task = CompiledTask()
with open("./0_tasks.databricks.databricks_hello_world_1.pb", "rb") as f:
task.ParseFromString(f.read())
request = CreateTaskRequest()
request.template.CopyFrom(task.template)
channel = grpc.insecure_channel("localhost:8000")
stub = AsyncAgentServiceStub(channel)
response = stub.CreateTask(request)
Here is the error message I get
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.INTERNAL
details = "failed to create spark task with error: Object of type RepeatedScalarContainer is not JSON serializable."
debug_error_string = "UNKNOWN:Error received from peer ipv4:127.0.0.1:8000 {grpc_message:"failed to create spark task with error: Object of type RepeatedScalarContainer is not JSON serializable.", grpc_status:13, created_time:"2024-03-21T08:51:16.611258-04:00"}"
>
Attached the compiled task protobuf that I used.
Yep @Future-Outlier
Launched agent service from docker
docker run -it -p 8000:8000 --entrypoint pyflyte ghcr.io/flyteorg/flyteagent:1.11.0 --verbose serve agentMade a simple task
@task( container_image="databricksruntime/standard:14.3-LTS", task_config=Databricks( spark_conf={"spark.executor.cores": "1"}, databricks_instance="example.cloud.databricks.com", databricks_conf={ "run_name": "flyte-databricks", "new_cluster": { "spark_version": "14.3.x-scala2.12", "node_type_id": "r5.xlarge", "num_workers": 1, "aws_attributes": { "instance_profile_arn": "arn:aws:iam:123456789:instance-profile/flyte-databricks", "ebs_volume_size": 100, "ebs_volume_type": "GENERAL_PURPOSE_SSD", "ebs_volume_count": 1, }, }, }, ), ) def databricks_hello_world() -> str: return "Hello world"Compiled the task using pyflyte package and unpacked the tar archive.
pyflyte -k tasks packageFinally, formed the CreateTask request and sent to local agent service.
import grpc from flyteidl.core.compiler_pb2 import CompiledTask from flyteidl.service.agent_pb2_grpc import AsyncAgentServiceStub from flyteidl.admin.agent_pb2 import CreateTaskRequest task = CompiledTask() with open("./0_tasks.databricks.databricks_hello_world_1.pb", "rb") as f: task.ParseFromString(f.read()) request = CreateTaskRequest() request.template.CopyFrom(task.template) channel = grpc.insecure_channel("localhost:8000") stub = AsyncAgentServiceStub(channel) response = stub.CreateTask(request)Here is the error message I get
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.INTERNAL details = "failed to create spark task with error: Object of type RepeatedScalarContainer is not JSON serializable." debug_error_string = "UNKNOWN:Error received from peer ipv4:127.0.0.1:8000 {grpc_message:"failed to create spark task with error: Object of type RepeatedScalarContainer is not JSON serializable.", grpc_status:13, created_time:"2024-03-21T08:51:16.611258-04:00"}" >Attached the compiled task protobuf that I used.
Thank you, valuable content, will help you tomorrow and discuss with maintainers <3
@pbrogan12 thank you. It also fixes my issue.