Batch improvement
The idea is too emulate the way JDBCIO writes to SQL.
When calling write_record it does the .execute() and then .commit() in sequence, writing and committing each record to disk one at a time. The proposed change allows for more effective batching while better managing connection pooling to CSQL.
-
Removed the
session.commit()from thewrite_recordwhich is called on every element in the_WriteToRelationalDBFnParDo. Instead, we just call.execute()on each record, and then commit it to disk all at once. -
Instead of building the engine at the start of each bundle, move
self._db = SqlAlchemyDB(self.source_config)to the.setup()method so it's only created once for the object and handles for connection pooling for the sessions that are opened and closed at the start and finish of each bundle. -
Handled the
.commit()logic in the DoFn. In thestart_bundlecreate arecord_counter = 0andrecords = []. This will allow us to build the commits up to sizes and ensure that they don't get too big. -
In cases where the bundles are small or divide unevenly leaving a chunk with less than 1000 records, we can directly call
commit_recordsin thefinish_bundle()to take care of the remaining elements in the bundle and flush the buffer. -
Made
max_batch_sizea configurable value with a default value of 1000. This can be changed easily by the user by doing something such as:
relational_db.Write(
source_config=source_config,
table_config=table_config
table_config=table_config,
max_batch_size=1500
)
GH-36