python-driver icon indicating copy to clipboard operation
python-driver copied to clipboard

Optimize write path in protocol.py to reduce copies

Open mykaul opened this issue 1 month ago • 3 comments

Refactored _ProtocolHandler.encode_message to reduce memory copies and allocations.

  • Implemented 'Reserve and Seek' strategy for the write path.
  • In uncompressed scenarios (including Protocol V5+), we now write directly to the final buffer instead of an intermediate one, avoiding bytes creation and buffer copying.
  • Reserved space for the frame header, wrote the body, and then back-filled the header with the correct length.
  • Unified buffer initialization and header writing logic for cleaner code.
  • Optimized conditional checks for compression support.

Pre-review checklist

  • [x] I have split my patch into logically separate commits.
  • [x] All commit messages clearly explain what they change and why.
  • [ ] I added relevant tests for new features and bug fixes.
  • [x] All commits compile, pass static checks and pass test.
  • [x] PR description sums up the changes and reasons why they should be introduced.
  • [ ] I have provided docstrings for the public items that I want to introduce.
  • [ ] I have adjusted the documentation in ./docs/source/.
  • [ ] I added appropriate Fixes: annotations to PR description.

mykaul avatar Jan 09 '26 22:01 mykaul

CI failure doesn't look related, but I'll check later this week:

=================================== FAILURES ===================================
  ___________ HostConnectionTests.test_successful_wait_for_connection ____________
  
  self = <tests.unit.test_host_connection_pool.HostConnectionTests testMethod=test_successful_wait_for_connection>
  
      def test_successful_wait_for_connection(self):
          host = Mock(spec=Host, address='ip1')
          session = self.make_session()
          conn = HashableMock(spec=Connection, in_flight=0, is_defunct=False, is_closed=False, max_request_id=100,
                                      lock=Lock())
          session.cluster.connection_factory.return_value = conn
      
          pool = self.PoolImpl(host, HostDistance.LOCAL, session)
          session.cluster.connection_factory.assert_called_once_with(host.endpoint, on_orphaned_stream_released=pool.on_orphaned_stream_released)
      
          pool.borrow_connection(timeout=0.01)
          assert 1 == conn.in_flight
      
          def get_second_conn():
              c, request_id = pool.borrow_connection(1.0)
              assert conn is c
              pool.return_connection(c)
      
          t = Thread(target=get_second_conn)
          t.start()
      
  >       pool.return_connection(conn)
  
  /Users/runner/work/python-driver/python-driver/tests/unit/test_host_connection_pool.py:105: 
  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
  
  self = <cassandra.pool.HostConnection object at 0x000000015958a058>
  connection = <HashableMock name='mock.cluster.connection_factory()' spec='Connection' id='5793776208'>
  stream_was_orphaned = False
  
      def return_connection(self, connection, stream_was_orphaned=False):
          if not stream_was_orphaned:
              with connection.lock:
                  connection.in_flight -= 1
              with self._stream_available_condition:
                  self._stream_available_condition.notify()
      
          if connection.is_defunct or connection.is_closed:
              if connection.signaled_error and not self.shutdown_on_error:
                  return
      
              is_down = False
              if not connection.signaled_error:
                  log.debug("Defunct or closed connection (%s) returned to pool, potentially "
                            "marking host %s as down", id(connection), self.host)
                  is_down = self.host.signal_connection_failure(connection.last_error)
                  connection.signaled_error = True
      
              if self.shutdown_on_error and not is_down:
                  is_down = True
      
              if is_down:
                  self.shutdown()
                  self._session.cluster.on_down(self.host, is_host_addition=False)
              else:
                  connection.close()
                  with self._lock:
                      if self.is_shutdown:
                          return
                      self._connections.pop(connection.features.shard_id, None)
                      if self._is_replacing:
                          return
                      self._is_replacing = True
                      self._session.submit(self._replace, connection)
  >       elif connection in self._trash:
               ^^^^^^^^^^^^^^^^^^^^^^^^^
  E       TypeError: __hash__ method should return an integer not 'MagicMock'
  
  /Users/runner/work/python-driver/python-driver/cassandra/pool.py:582: TypeError
  ------------------------------ Captured log call -------------------------------
  2026-01-09 22:27:07.619 DEBUG [pool:430]: Initializing connection for host <Mock spec='Host' id='5793730976'>
  2026-01-09 22:27:07.620 DEBUG [pool:432]: First connection created to <Mock spec='Host' id='5793730976'> for shard_id=1
  2026-01-09 22:27:07.621 DEBUG [pool:443]: Finished initializing connection for host <Mock spec='Host' id='5793730976'>
****

mykaul avatar Jan 11 '26 07:01 mykaul

Hmm, CI failure looks like https://github.com/scylladb/python-driver/pull/581 ?

mykaul avatar Jan 11 '26 07:01 mykaul

Rebased.

mykaul avatar Jan 14 '26 17:01 mykaul

Rebased.

Thanks, looks great.

dkropachev avatar Jan 29 '26 18:01 dkropachev