beam-nuggets icon indicating copy to clipboard operation
beam-nuggets copied to clipboard

Batch improvement

Open chishankar-work opened this issue 4 years ago • 1 comments

The idea is too emulate the way JDBCIO writes to SQL.

When calling write_record it does the .execute() and then .commit() in sequence, writing and committing each record to disk one at a time. The proposed change allows for more effective batching while better managing connection pooling to CSQL.

  • Removed the session.commit() from the write_record which is called on every element in the _WriteToRelationalDBFn ParDo. Instead, we just call .execute() on each record, and then commit it to disk all at once.

  • Instead of building the engine at the start of each bundle, move self._db = SqlAlchemyDB(self.source_config) to the .setup() method so it's only created once for the object and handles for connection pooling for the sessions that are opened and closed at the start and finish of each bundle.

  • Handled the .commit() logic in the DoFn. In the start_bundle create a record_counter = 0 and records = []. This will allow us to build the commits up to sizes and ensure that they don't get too big.

  • In cases where the bundles are small or divide unevenly leaving a chunk with less than 1000 records, we can directly call commit_records in the finish_bundle() to take care of the remaining elements in the bundle and flush the buffer.

  • Made max_batch_size a configurable value with a default value of 1000. This can be changed easily by the user by doing something such as:

relational_db.Write(
    source_config=source_config,
    table_config=table_config
    table_config=table_config,
    max_batch_size=1500
 )

chishankar-work avatar Mar 10 '21 17:03 chishankar-work

GH-36

chishankar-work avatar Mar 10 '21 17:03 chishankar-work