flink
flink copied to clipboard
[hotfix][python] Fix PyFlink JdbcSink wrong Java Class Name
What is the purpose of the change
While using PyFlink 1.19, the JDBC sink raises this error message
Traceback (most recent call last):
File "d:\pyflink_s/sink_02_jdbc.py", line 70, in <module>
jdbc_sink = JdbcSink.sink(
File "D:\pyflink_s/.venv/lib/site-packages/pyflink/datastream/connectors/jdbc.py", line 66, in sink
j_builder_method = output_format_clz.getDeclaredMethod('createRowJdbcStatementBuilder',
File "D:/pyflink_s/.venv/lib/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "D:/pyflink_s/.venv/lib/site-packages/pyflink/util/exceptions.py", line 146, in deco return f(*a, **kw)
File "D:/pyflink_s/.venv/lib/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o96.getDeclaredMethod.
: java.lang.NoSuchMethodException: org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.createRowJdbcStatementBuilder([I)
at java.lang.Class.getDeclaredMethod(Class.java:2130)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
I have checked the code associated with the JdbcOutputFormat class here and there is no defined method with the name createRowJdbcStatementBuilder, this method is defined in the updated class RowJdbcOutputFormat here
Verifying this change
No tests were added, however, the introduced fix fixes the raised exception.
Below is a code used to test the insertion to the database table
CREATE TABLE customers (
id INTEGER NOT NULL,
name TEXT NOT NULL,
city TEXT NOT NULL
);
from pyflink.common import Types
from pyflink.datastream import DataStream, StreamExecutionEnvironment
from pyflink.datastream.connectors.jdbc import (
JdbcConnectionOptions,
JdbcExecutionOptions,
JdbcSink,
)
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
type_info = Types.ROW([Types.INT(), Types.STRING(), Types.STRING()])
data = [ (1, "Sobhy", "Cairo"), (2, "Samar", "Dokki"), (3, "Gad", "Cairo"), (4, "Sabry", "Giza") ]
stream: DataStream = env.from_collection(collection=data, type_info=type_info)
database_name = "postgres"
pg_host = "localhost"
pg_port = 5432
pg_jdbc_url = f"jdbc:postgresql://{pg_host}:{pg_port}/{database_name}"
pg_driver_name = "org.postgresql.Driver"
pg_username = "postgres"
pg_password = "myC0mpl!x"
jdbc_connection_options = (
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.with_url(pg_jdbc_url)
.with_driver_name(pg_driver_name)
.with_user_name(pg_username)
.with_password(pg_password)
.build()
)
jdbc_execution_options = (
JdbcExecutionOptions.builder()
.with_batch_interval_ms(1000)
.with_batch_size(200)
.with_max_retries(5)
.build()
)
jdbc_sink = JdbcSink.sink(
sql="INSERT INTO customers (id,name,city) VALUES (?,?,?)",
type_info=type_info,
jdbc_connection_options=jdbc_connection_options,
jdbc_execution_options=jdbc_execution_options,
)
stream.add_sink(jdbc_sink)
env.execute()
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / no)
- The public API, i.e., is any changed class annotated with
@Public(Evolving): (yes / no) - The serializers: (yes / no / don't know)
- The runtime per-record code paths (performance sensitive): (yes / no / don't know)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
- The S3 file system connector: (yes / no / don't know)
Documentation
- Does this pull request introduce a new feature? (yes / no)
- If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
CI report:
- 0efa9a276fe17970772afc229d5611fe7fe2fee2 Azure: FAILURE
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build