flink-agents icon indicating copy to clipboard operation
flink-agents copied to clipboard

[Bug] Flink Agents Example Fails with 'ModuleNotFoundError: No module named 'encodings'' in YARN Cluster Mode

Open bydeath opened this issue 2 months ago • 10 comments

Search before asking

  • [x] I searched in the issues and found nothing similar.

Description

Summary

The official Flink Agents example, workflow_single_agent_example.py, fails in YARN Application Cluster mode with a ModuleNotFoundError: No module named 'encodings', indicating an issue with the embedded Python interpreter's ability to find its standard library.

Crucially:

  1. The job runs successfully in Standalone Flink Cluster mode using the exact same Python virtual environment and code.
  2. A standard PyFlink DataStream example (word_count.py) also runs successfully in YARN mode.
  3. The YARN NodeManager machines DO NOT have a system-wide Python installation. The entire Python interpreter, necessary libraries, and dependencies are all contained within the archived virtual environment (venv.tar.gz).

This confirms that the environment setup is entirely dependent on the archived file, and the failure occurs when the Pemja-embedded interpreter attempts to load this self-contained environment.

Error Log Snippet (taskmanager.log)

2025-11-02 07:46:21,055 INFO  org.apache.beam.runners.fnexecution.data.GrpcDataService     [] - Beam Fn Data client connected.
2025-11-02 07:46:21,053 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - action-execute-operator -> Map, Map -> Sink: Print to Std. Out (1/1)#0 (0b196afe82a3bfbc08fa3a8d12240d81_90bea66de1c231edf33913ecd54406c1_0_0) switched from INITIALIZING to FAILED with failure cause:
java.lang.RuntimeException: Failed to find libpython
	at pemja.utils.CommonUtils.getPythonLibrary(CommonUtils.java:161) ~[flink-python-1.20.3.jar:1.20.3]
	at pemja.utils.CommonUtils.loadPython(CommonUtils.java:44) ~[flink-python-1.20.3.jar:1.20.3]
	at pemja.core.PythonInterpreter$MainInterpreter.initialize(PythonInterpreter.java:365) ~[flink-python-1.20.3.jar:1.20.3]
	at pemja.core.PythonInterpreter.initialize(PythonInterpreter.java:144) ~[flink-python-1.20.3.jar:1.20.3]
	at pemja.core.PythonInterpreter.<init>(PythonInterpreter.java:45) ~[flink-python-1.20.3.jar:1.20.3]
	at org.apache.flink.agents.runtime.env.EmbeddedPythonEnvironment.getInterpreter(EmbeddedPythonEnvironment.java:45) ~[flink-agents-dist-0.1.0.jar:0.1.0]
	at org.apache.flink.agents.runtime.python.utils.PythonActionExecutor.open(PythonActionExecutor.java:80) ~[flink-agents-dist-0.1.0.jar:0.1.0]
	at org.apache.flink.agents.runtime.operator.ActionExecutionOperator.initPythonActionExecutor(ActionExecutionOperator.java:504) ~[flink-agents-dist-0.1.0.jar:0.1.0]
	at org.apache.flink.agents.runtime.operator.ActionExecutionOperator.open(ActionExecutionOperator.java:247) ~[flink-agents-dist-0.1.0.jar:0.1.0]
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) ~[flink-dist-1.20.3.jar:1.20.3]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858) ~[flink-dist-1.20.3.jar:1.20.3]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812) ~[flink-dist-1.20.3.jar:1.20.3]
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist-1.20.3.jar:1.20.3]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812) ~[flink-dist-1.20.3.jar:1.20.3]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) ~[flink-dist-1.20.3.jar:1.20.3]
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-dist-1.20.3.jar:1.20.3]
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) [flink-dist-1.20.3.jar:1.20.3]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) [flink-dist-1.20.3.jar:1.20.3]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist-1.20.3.jar:1.20.3]
	at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: java.io.IOException: Failed to execute the command: /tmp/hadoop-hadoop/nm-local-dir/usercache/hadoop/appcache/application_1762069162598_0003/python-dist-6062dfd1-ba6b-4ea0-a82a-a45d53469610/python-archives/venv.tar.gz/bin/python -c from find_libpython import find_libpython;print(find_libpython())
output: Python path configuration:
  PYTHONHOME = 'venv.tar.gz'
  PYTHONPATH = (not set)
  program name = '/tmp/hadoop-hadoop/nm-local-dir/usercache/hadoop/appcache/application_1762069162598_0003/python-dist-6062dfd1-ba6b-4ea0-a82a-a45d53469610/python-archives/venv.tar.gz/bin/python'
  isolated = 0
  environment = 1
  user site = 1
  import site = 1
  sys._base_executable = '/tmp/hadoop-hadoop/nm-local-dir/usercache/hadoop/appcache/application_1762069162598_0003/python-dist-6062dfd1-ba6b-4ea0-a82a-a45d53469610/python-archives/venv.tar.gz/bin/python'
  sys.base_prefix = 'venv.tar.gz'
  sys.base_exec_prefix = 'venv.tar.gz'
  sys.platlibdir = 'lib'
  sys.executable = '/tmp/hadoop-hadoop/nm-local-dir/usercache/hadoop/appcache/application_1762069162598_0003/python-dist-6062dfd1-ba6b-4ea0-a82a-a45d53469610/python-archives/venv.tar.gz/bin/python'
  sys.prefix = 'venv.tar.gz'
  sys.exec_prefix = 'venv.tar.gz'
  sys.path = [
    'venv.tar.gz/lib/python310.zip',
    'venv.tar.gz/lib/python3.10',
    'venv.tar.gz/lib/python3.10/lib-dynload',
  ]
Fatal Python error: init_fs_encoding: failed to get the Python codec of the filesystem encoding
Python runtime state: core initialized
ModuleNotFoundError: No module named 'encodings'

Current thread 0x00007d1077b7f740 (most recent call first):
  <no Python frame>

	at pemja.utils.CommonUtils.execute(CommonUtils.java:180) ~[flink-python-1.20.3.jar:1.20.3]
	at pemja.utils.CommonUtils.getPythonLibrary(CommonUtils.java:157) ~[flink-python-1.20.3.jar:1.20.3]
	... 19 more
  • Behavior When Not Setting PYTHONHOME If the parameter -Dcontainerized.taskmanager.env.PYTHONHOME=venv.tar.gz is removed, the Task Manager fails with the exact same Python core error (printed to taskmanager.err), only without the detailed Java stack trace This confirms the core issue lies in the initialization of the embedded Python interpreter from the archived environment, regardless of whether PYTHONHOME is manually specified.

  • Conclusion The Pemja-embedded Python interpreter (used by Flink Agents) is failing to correctly locate the standard library (encodings module) of the self-contained, Conda-created virtual environment when deployed via YARN, even though the standard PyFlink Python Worker process runs successfully. This suggests a path resolution failure specific to how Pemja initializes the embedded environment on YARN.

How to reproduce

./flink-1.20.3/bin/flink run-application -t yarn-application \
      -Dcontainerized.master.env.JAVA_HOME=/usr/lib/jvm/jre-11 \
      -Dcontainerized.taskmanager.env.JAVA_HOME=/usr/lib/jvm/jre-11 \
      -Djobmanager.memory.process.size=1024m \
      -Dcontainerized.taskmanager.env.PYTHONHOME=venv.tar.gz \
      -Dtaskmanager.memory.process.size=1024m \
      -Dyarn.application.name=flink-agents-workflow \
      -Dyarn.ship-files=./shipfiles \
      -pyarch shipfiles/venv.tar.gz \
      -pyclientexec venv.tar.gz/bin/python \
      -pyexec venv.tar.gz/bin/python \
      -pyfs shipfiles \
      -pym workflow_single_agent_example

Version and environment

  • Flink Version: 1.20.3
  • Flink Agents Version: 0.1.0
  • Deployment Mode: YARN Application Cluster (-t yarn-application)
  • Python Version (in venv): Python 3.10
  • Python Virtual Environment: Created and archived using Conda (venv.tar.gz)
  • YARN Setup: NodeManagers lack a system Python; environment is self-contained in the archive.

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

bydeath avatar Nov 05 '25 03:11 bydeath

@bydeath , thanks for the detailed report on this.

@GreatEugenius , could you please take a look into this?

xintongsong avatar Nov 05 '25 03:11 xintongsong

I think this a known bug of pemja: https://issues.apache.org/jira/browse/FLINK-38585, and has been fixed in pemja recently https://github.com/alibaba/pemja/pull/87.

But because Flink-Agents is indirectly depended on pemja through pyflink, Flink-Agents must wait until Flink releases a version containing the pemja fix before this issue can be resolved.

wenjin272 avatar Nov 06 '25 02:11 wenjin272

We've been discussing to prepare a Flink Agents 0.1.1 bugfix release. The time plan is not decided yet. We want to wait a bit more for bugs like this to emerge. I think we can try to push for a Flink 1.20.4 bugfix release before Flink Agents 0.1.1, so that this bugfix can be included.

xintongsong avatar Nov 06 '25 02:11 xintongsong

I think this a known bug of pemja: https://issues.apache.org/jira/browse/FLINK-38585, and has been fixed in pemja recently alibaba/pemja#87.

But because Flink-Agents is indirectly depended on pemja through pyflink, Flink-Agents must wait until Flink releases a version containing the pemja fix before this issue can be resolved.

Hi @wenjin272,

Thank you for your response and for pointing out the related JIRA issue, FLINK-38585.

Based on my analysis and the stack trace, it appears that my issue with flink-agents is distinct from FLINK-38585, which specifically addresses problems in PyFlink's thread mode execution based on Pemja. Furthermore, I have not observed flink-agents explicitly configuring Pemja to use thread mode (e.g., by setting python.execution-mode to thread), suggesting the nature of the Pemja usage is fundamentally different from the one targeted by FLINK-38585.

The failure I am encountering occurs when the flink-agents framework attempts to initialize its own embedded Python environment. The stack trace clearly indicates that the failure happens directly within the flink-agents operator loading the Python interpreter via Pemja:

// ... (omitted)
	at pemja.core.PythonInterpreter.<init>(PythonInterpreter.java:45) ~[flink-python-1.20.3.jar:1.20.3]
	at org.apache.flink.agents.runtime.env.EmbeddedPythonEnvironment.getInterpreter(EmbeddedPythonEnvironment.java:45) ~[flink-agents-dist-0.1.0.jar:0.1.0]
	at org.apache.flink.agents.runtime.python.utils.PythonActionExecutor.open(PythonActionExecutor.java:80) ~[flink-agents-dist-0.1.0.jar:0.1.0]
	at org.apache.flink.agents.runtime.operator.ActionExecutionOperator.initPythonActionExecutor(ActionExecutionOperator.java:504) ~[flink-agents-dist-0.1.0.jar:0.1.0]
// ... (omitted)
Caused by: java.io.IOException: Failed to execute the command: ... /venv.tar.gz/bin/python -c from find_libpython import find_libpython;print(find_libpython())
Fatal Python error: init_fs_encoding: failed to get the Python codec of the filesystem encoding
ModuleNotFoundError: No module named 'encodings'

As you can see, the call sequence confirms that flink-agents is directly using Pemja's PythonInterpreter for environment initialization:

  1. The EmbeddedPythonEnvironment.getInterpreter() method (source: EmbeddedPythonEnvironment.java#L45) returns the PythonInterpreter instance.

  2. This interpreter instance is created via the logic in PythonEnvironmentManager.createEnvironment() (source: PythonEnvironmentManager.java#L45-L83).

  3. The failure (ModuleNotFoundError: No module named 'encodings') happens inside Pemja's constructor (PythonInterpreter.) during this direct initialization call.

The core issue remains that Pemja fails to initialize the self-contained Conda environment when launched by flink-agents in YARN mode. I suspect that even after a Pemja bug fix is incorporated into a new Flink release, this specific issue with flink-agents may not be resolved, because the problem appears tied to path resolution logic within flink-agents' direct usage of Pemja, and not just the execution model addressed in FLINK-38585.

Thank you!

bydeath avatar Nov 07 '25 10:11 bydeath

@bydeath, thanks for your detailed analysis.

Actually, the JIRA issue FLINK-38585 is discovered when @Sxnan try to submit a flink-agents job to a k8s cluster. And after offline discussion and debugging, @bgeng777 fixed this issue in PEMJA and created this Jira issue.

If I understand correctly, the thread mode of pyflink is just based on pemja, meaning that using PEMJA implies the use of thread mode, so the behavior flink-agents directly use pemja is just like pyflink thread mode.

I indeed cann't determine whether this is the same issue described in the Jira issue. @GreatEugenius has already reproduced this bug on the Yarn cluster and is currently verifying whether it is the same issue. We can wait for his analysis.

Thank you again for your detailed description and analysis.

wenjin272 avatar Nov 08 '25 16:11 wenjin272

Hi @bydeath,
Thank you for your detailed analysis and for highlighting the distinction from FLINK-38585. You are absolutely correct—this issue arises from a different root cause.

We have reproduced the issue in a YARN cluster and confirmed that upgrading the pemja dependency to resolve initialization-order problems does not fix the current error.

We are actively debugging to identify the cause and will implement a solution as soon as possible. We appreciate your patience and will provide updates as soon as we have actionable insights. cc: @wenjin272

GreatEugenius avatar Nov 10 '25 01:11 GreatEugenius

hi folks, Just some information after debugging this issue, I see that the reproduce command sets PYTHONHOME via -Dcontainerized.taskmanager.env.PYTHONHOME, this is not recommended as pyflink would try to use the shipped python env(i.e. venv.tar.gz) as its execution environment. The shipped tar.gz/zip would be decompressed and when using YARN, the unzipped dir usually looks like this /xxx/nm-local-dir/usercache/xxx/appcache/application_xx3921_0020/python-dist-8d697226-383a-486c-ae01-df4b096e8a70/python-archives/venv.tar.gz/ (note python-archives/venv.tar.gz/ is a dir which contains the specified python interpreter like python-dist-8d697226-383a-486c-ae01-df4b096e8a70/python-archives/venv.tar.gz/bin/python).

So, when users set containerized.taskmanager.env.PYTHONHOME, it make pemja use wrong PYTHONHOME to find packages and leads to error like ModuleNotFoundError: No module named 'encodings'. I think we should either improve pemja's logic to handle such case and output some warn, or at least tell users that such behaivor would cause some unexpected issues. cc @dianfu

As a result, in YARN env, to run the example for now, users should use pemja wheel package from https://github.com/alibaba/pemja/pull/8. and NOT set containerized.taskmanager.env.PYTHONHOME. Maybe @GreatEugenius can offer more detailed instructions.

thanks

bgeng777 avatar Nov 10 '25 08:11 bgeng777

Hi @bydeath,

We have identified the root cause of this bug and summarize it as follows:

Root Cause

  1. Pemja initialization order issue (GitHub Issue #86) — This has been fixed in the 0.5.dev version.
  2. Incorrect configuration of -Dcontainerized.taskmanager.env.PYTHONHOME — This caused Pemja to fail in locating the correct Python environment path, leading to the ModuleNotFoundError: No module named 'encodings' error.

Solutions

Temporary Solution (Immediate Fix):

  • Upgrade Pemja to the fixed 0.5.dev version using the following command:
pip uninstall pemja
pip install /path/to/pemja-0.5.dev0-*.whl
  • Package your virtual environment into venv.tar.gz:
conda pack -o venv.tar.gz
  • When submitting the job, remove the -Dcontainerized.taskmanager.env.PYTHONHOME configuration, and use the following command:
./flink-1.20.3/bin/flink run-application -t yarn-application \
  -Dcontainerized.master.env.JAVA_HOME=/usr/lib/jvm/jre-11 \
  -Dcontainerized.taskmanager.env.JAVA_HOME=/usr/lib/jvm/jre-11 \
  -Djobmanager.memory.process.size=1024m \
  -Dtaskmanager.memory.process.size=1024m \
  -Dyarn.application.name=flink-agents-workflow \
  -Dyarn.ship-files=./shipfiles \
  -pyarch shipfiles/venv.tar.gz \
  -pyclientexec venv.tar.gz/bin/python \
  -pyexec venv.tar.gz/bin/python \
  -pyfs shipfiles \
  -pym workflow_single_agent_example

Final Solution (Long-term Resolution):

Since Flink-Agents depends on Pemja indirectly through PyFlink, we should close the issue after updating the Flink dependency version to a new release that includes the Pemja fix. We will also update the Flink Agents documentation to include deployment notes for YARN mode, to help users avoid misconfigurations like the incorrect use of -Dcontainerized.taskmanager.env.PYTHONHOME.

Thank you for your patience and support! If you have any further questions or feedback, feel free to let us know.

GreatEugenius avatar Nov 10 '25 10:11 GreatEugenius

Hi @GreatEugenius,

Thank you very much for your quick response and for providing the suggested workaround!

I will install the development version of pemja and repackage my environment, then submit the job without setting the PYTHONHOME configuration, as you advised.

Regarding the initial PYTHONHOME setting in my submission command:

I included PYTHONHOME=venv.tar.gz because preliminary testing showed the environment discovery was the core issue: manually unzipping the archive on every NodeManager and setting PYTHONHOME to that known absolute path allowed the job to run successfully. Since the absolute path is unknown in YARN containers, I explicitly set PYTHONHOME=venv.tar.gz (the relative path) as a direct attempt to fix the environment dynamically. For reference, when running without any PYTHONHOME setting, the job failed with the same ModuleNotFoundError but lacked any stack trace in the logs.

Thanks again for your guidance! I will report back with the test results.

bydeath avatar Nov 11 '25 07:11 bydeath

Hi @GreatEugenius,

I'm happy to confirm the suggested workaround is effective!

I installed the development version of pemja and the flink-agents example now runs successfully in YARN mode without the PYTHONHOME setting.

Thank you very much for your quick assistance and the fix. Big thanks to everyone involved.

bydeath avatar Nov 12 '25 08:11 bydeath