[Bug]: Pickling error in save_main_session with STORAGE_WRITE_API in Dataflow Python pipeline
What happened?
WriteToBigQuery with STORAGE_WRITE_API method can cause a pickling error [1] if
- The
WriteToBigQuerystep is applied after a multi-output step - And, the pipeline variable is in "main" scope.
- And, the
--save_main_sessionisTrue.
See this example of code and run script to reproduce this error.
There are 3 mitigation ways:
- Apply
WriteToBigQueryafter a single-output step (example). - Or, define the pipeline graph inside a function (example).
- Or, use
--pickle_library=cloudpickle.
[1]
File "/Users/baeminbo/.pyenv/versions/3.11.4/lib/python3.11/pickle.py", line 972, in save_dict
self._batch_setitems(obj.items())
File "/Users/baeminbo/.pyenv/versions/3.11.4/lib/python3.11/pickle.py", line 998, in _batch_setitems
save(v)
File "/Users/baeminbo/.pyenv/versions/3.11.4/lib/python3.11/pickle.py", line 578, in save
rv = reduce(self.proto)
^^^^^^^^^^^^^^^^^^
File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__
See the full output at https://gist.github.com/baeminbo/bd23df65e5604cf24213c2e1d6a46a25
Issue Priority
Priority: 3 (minor)
Issue Components
- [X] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [X] Component: Google Cloud Dataflow Runner
Thanks @baeminbo for a detailed repro.
Eventually, we plan to swtich to cloudpickle pickler, which doesn't require saving the main session.
Structuring a pipeline as a package is the best way to avoid having to pass --save_main_session and can also help provide better structure for complex pipelines. A few examples:
https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/#multiple-file-dependencies https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/dataflow/flex-templates/pipeline_with_dependencies/main.py