h2
h2 copied to clipboard
Add incremental updating of open streams count and closed_streams state
This fixes https://github.com/python-hyper/hyper-h2/issues/1184
On the old code, here is how test_concurrent_stream_open_performance
runs:
============================================================= FAILURES =============================================================
___________________________ TestConcurrentStreamOpenPerformance.test_concurrent_stream_open_performance ____________________________
self = <test_concurrent_stream_open.TestConcurrentStreamOpenPerformance object at 0x107d86a50>
frame_factory = <helpers.FrameFactory object at 0x107d86cd0>
def test_concurrent_stream_open_performance(self, frame_factory):
"""
Opening many concurrent streams is constant time operation
"""
num_concurrent_streams = 10000
c = h2.connection.H2Connection()
c.initiate_connection()
start = time.time()
for i in xrange(num_concurrent_streams):
c.send_headers(1 + (2 * i), self.example_request_headers, end_stream=False)
c.clear_outbound_data_buffer()
end = time.time()
run_time = end - start
> assert run_time < 3
E assert 36.598387002944946 < 3
test/test_concurrent_stream_open.py:51: AssertionError
==================================================== 1 failed in 36.66 seconds =====================================================
New code:
============================================================================================================ test session starts =============================================================================================================
platform darwin -- Python 2.7.15, pytest-3.4.2, py-1.7.0, pluggy-0.6.0
hypothesis profile 'default' -> database=DirectoryBasedExampleDatabase('/Users/andrew/workspace/hyper-h2/.hypothesis/examples')
rootdir: /Users/andrew/workspace/hyper-h2, inifile:
plugins: xdist-1.22.2, profiling-1.6.0, forked-1.0.2, cov-2.5.1, hypothesis-4.5.11
collected 1 item
test/test_concurrent_stream_open.py . [100%]
========================================================================================================== 1 passed in 1.06 seconds ==========================================================================================================
Note that for this example, I commented out the second "test" in the test_concurrent_stream_open_performance
function since the old code fails the first "test".
As you can see, this is a 36X perf improvement
So the lints I can clean up (I assume I can just run autopep8 or something similar?), but the code coverage tests are failing for some versions of python for these lines:
h2/stream.py 479 0 106 1 99% 809->814
Pasting the code here for reference:
def sync_state_change(func):
def wrapper(self, *args, **kwargs):
# Collect state at the beginning.
start_state = self.state_machine.state
started_open = self.open
started_closed = not started_open
# Do the state change (if any).
result = func(self, *args, **kwargs)
# Collect state at the end.
end_state = self.state_machine.state
ended_open = self.open
ended_closed = not ended_open
# If at any point we've tranwsitioned to the CLOSED state
# from any other state, close our stream.
if end_state == StreamState.CLOSED and start_state != end_state:
if self._close_stream_callback:
self._close_stream_callback(self.stream_id)
# Clear callback so we only call this once per stream
self._close_stream_callback = None
# If we were open, but are now closed, decrement
# the open stream count, and call the close callback.
if started_open and ended_closed:
if self._decrement_open_stream_count_callback:
self._decrement_open_stream_count_callback(self.stream_id,
-1,)
# Clear callback so we only call this once per stream
self._decrement_open_stream_count_callback = None
# If we were closed, but are now open, increment
# the open stream count.
elif started_closed and ended_open:
> if self._increment_open_stream_count_callback:
> self._increment_open_stream_count_callback(self.stream_id,
> 1,)
> # Clear callback so we only call this once per stream
> self._increment_open_stream_count_callback = None
return result
return wrapper
Which is odd, since I can insert a print statement there and verify that the code is getting called, not to mention the counts of open outbound/inbound streams would be completely wrong if that code wasn't getting called.
Is this a quirk with the code coverage tool?
Oh I see, it's because the if() never evaluates to False. If I remove the conditional the coverage tests pass
The reason that conditional is there is for defensive reasons. A function wrapped by sync_state_change
can call another function that is also wrapped by sync_state_change
, and we don't want to update state twice.
I can write a specific test to exercise this behavior for that conditional
@Lukasa @pgjones This is ready for review (picking these names based on recent merge commits)
Thanks, I've managed to find time to understand the problem - but I'm not sure about the solution. It would be helpful if @Lukasa could comment on the general solution and how it fits in with the codebase. I'd then be happy to comment on the details.
Any updates on this @pgjones @Lukasa ?
I'm very glad someone already thought that this open stream count may be a problem 😍
However the @sync_state_change
annotation on every method in H2Stream
here seems invasive and possibly error-prone. Its flip-side, callbacks, don't seem very Pythonic 🤔
The open stream count is essentially a cache; could it be implemented in some other way?
For example, could it be something akin to a weakset/dict?
Or, perhaps the count could be recalculated on demand (i.e. on new stream or stream end mark cache dirty; next time recalculate)?
Or, perhaps stream state machine could have an output STREAM_CLOSED
that the connection receives after pumping stream's events to the state machine, at which point stream can be removed from .streams
as opposed to removal during counting?
Or, maybe, .streams
could be simply split into .inbound_streams/.outbound_streams/.promises
so that only specific set is ever evaluated (against own or peer's MAX_CONCURRENT_STREAMS
)?