beam icon indicating copy to clipboard operation
beam copied to clipboard

Getting error while using JdbcIO.ReadWithPartitions

Open umeshPathak opened this issue 6 months ago • 1 comments

I want to read Customer table from MySQL and store the records in CSV file. Very simple and straightforward. Want to read 4 million records in partition.

I am using <java.version>21</java.version> <beam.version>2.65.0</beam.version> This is the code -

PipelineOptions options = PipelineOptionsFactory.create();
	        Pipeline pipeline = Pipeline.create(options);

	        JdbcIO.ReadWithPartitions<Customer1,Long> readTransform = JdbcIO.<Customer1>readWithPartitions()
	                .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
	                        "com.mysql.cj.jdbc.Driver",
	                        "jdbc:mysql://localhost:3306/customerdb")
	                        .withUsername("")
	                        .withPassword(""))
	                .withTable("Customer1")
	                .withPartitionColumn("customerId")
	                .withLowerBound(0L)
	                .withUpperBound(1000L)
	                .withNumPartitions(5)
	                .withRowMapper(
	              		new JdbcIO.RowMapper<Customer1>() 
	              		{
						private static final long serialVersionUID = 1L;
						public Customer1 mapRow(ResultSet rs) throws SQLException 
      	                	{
      	                	  Customer1 cust = new Customer1();
      	                	  cust.setCustomerId( rs.getInt("customerId"));
      	                	  cust.setName(rs.getString("name"));
      	                	  cust.setEmail(rs.getString("email"));
      	                	  cust.setAddress(rs.getString("address"));
      	                	    return cust ;
      	                	}
	              			})
	                .withCoder(SerializableCoder.of(Customer1.class));
	        pipeline.apply("ReadFromMySQL", readTransform)
	                .apply("ToCSV", MapElements.via(new CustomerToCSV()))
	                .apply("WriteCSV", TextIO.write()
	                        .to("output/customers")
	                        .withHeader("customerId,name,email,address")
	                        .withSuffix(".csv")
	                        .withNumShards(1));
	        pipeline.run().waitUntilFinish();

Getting below exception ```

org.apache.beam.sdk.transforms.display.DisplayData$InternalBuilder$PopulateDisplayDataException: Error while populating display data for component 'org.apache.beam.sdk.io.jdbc.JdbcIO$ReadWithPartitions': Input display value cannot be null at org.apache.beam.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:789) at org.apache.beam.sdk.transforms.display.DisplayData$InternalBuilder.access$100(DisplayData.java:703) at org.apache.beam.sdk.transforms.display.DisplayData.from(DisplayData.java:81) at org.apache.beam.runners.direct.DisplayDataValidator.evaluateDisplayData(DisplayDataValidator.java:50) at org.apache.beam.runners.direct.DisplayDataValidator.access$100(DisplayDataValidator.java:33) at org.apache.beam.runners.direct.DisplayDataValidator$Visitor.enterCompositeTransform(DisplayDataValidator.java:59) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:581) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585) at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240) at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214) at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:477) at org.apache.beam.runners.direct.DisplayDataValidator.validateTransforms(DisplayDataValidator.java:46) at org.apache.beam.runners.direct.DisplayDataValidator.validatePipeline(DisplayDataValidator.java:38) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:185) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310) at com.example.apachebeamwiththjoine.ApacheBeamWithJoinApplication.main(ApacheBeamWithJoinApplication.java:79) Caused by: java.lang.NullPointerException: Input display value cannot be null at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:921) at org.apache.beam.sdk.transforms.display.DisplayData$InternalBuilder.addItemIf(DisplayData.java:830) at org.apache.beam.sdk.transforms.display.DisplayData$InternalBuilder.add(DisplayData.java:808) at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadWithPartitions.populateDisplayData(JdbcIO.java:1617) at org.apache.beam.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:780) ... 17 more

umeshPathak avatar Jun 14 '25 13:06 umeshPathak

I think the NullPointerException during display data population is a classic symptom of using an anonymous inner class for a transform's logic, in this case, the JdbcIO.RowMapper. To fix this, the RowMapper should be refactored into a static nested class.

try this diff:

--- Original User Code (Problematic)
+++ Corrected Production-Quality Code

- // Anonymous inner class for RowMapper
- .withRowMapper(
-     new JdbcIO.RowMapper() {
-         private static final long serialVersionUID = 1L;
-         public Customer1 mapRow(ResultSet rs) throws SQLException {
-             Customer1 cust = new Customer1();
-             cust.setCustomerId(rs.getInt("customerId"));
-             cust.setName(rs.getString("name"));
-             cust.setEmail(rs.getString("email"));
-             cust.setAddress(rs.getString("address"));
-             return cust;
-         }
-     })
- .withCoder(SerializableCoder.of(Customer1.class));
+ // Define the RowMapper as a static nested class elsewhere in the file
+ private static class CustomerRowMapper implements JdbcIO.RowMapper<Customer1> {
+     @Override
+     public Customer1 mapRow(ResultSet rs) throws SQLException {
+         Customer1 cust = new Customer1();
+         cust.setCustomerId(rs.getInt("customerId"));
+         cust.setName(rs.getString("name"));
+         cust.setEmail(rs.getString("email"));
+         cust.setAddress(rs.getString("address"));
+         return cust;
+     }
+ }
+ 
+ // In the main pipeline construction:
+
- JdbcIO.ReadWithPartitions readTransform = JdbcIO.readWithPartitions()
+ // Parameterize the transform with the output type <Customer1>
+ JdbcIO.ReadWithPartitions<Customer1> readTransform = JdbcIO.<Customer1>readWithPartitions()
+     // ... (DataSourceConfiguration, etc.)
+     // Use the new static nested class for the RowMapper
+     .withRowMapper(new CustomerRowMapper());
+     // The .withCoder() call is now removed as it's inferred automatically.

- pipeline.apply("ReadFromMySQL", readTransform)
-     .apply("ToCSV", MapElements.via(new CustomerToCSV()))
+ pipeline.apply("ReadFromMySQL", readTransform)
+     // Add a type hint for robustness
+     .apply("ToCSV", MapElements.into(TypeDescriptors.strings()).via(new CustomerToCSV()))


liferoad avatar Jun 14 '25 19:06 liferoad

This issue has been marked as stale due to 150 days of inactivity. It will be closed in 30 days if no further activity occurs. If you think that’s incorrect or this issue still needs to be addressed, please simply write any comment. If closed, you can reopen the issue at any time. Thank you for your contributions.

github-actions[bot] avatar Nov 12 '25 12:11 github-actions[bot]