rstream
rstream copied to clipboard
Maybe no buffered_messages_lock?
It’s unnecessary to acquire/release buffered_messages_lock
frequently when a large number of messages need to be sent out.
To remove the lock, function _timer
can be written like one of the following two ways:
1.use the copy of the _buffered_messages
and clear
it before _send_batch
.(with data copy)
2.trace the pending_write_length
about _buffered_messages[stream]
and del
after _send_batch
.(with a new arg pending_write_length
on the _send_batch
)
async def _timer(self):
send_batch = self._send_batch
delay = self._default_batch_publishing_delay
while True:
await asyncio.sleep(delay)
for stream, messages in self._buffered_messages.items():
if messages:
messages_copy = messages[:]
messages.clear()
# pending_write_length = len(messages)
await send_batch(stream, messages_copy)
# del messages[:pending_write_length]
And another thing about concurrently write:
One Producer
, One _timer
task, so the published messages is strictly in order from the perspective of the underlaying StreamWriter
’s view.
I don’t know how many connections the Producer
will open, it may depend on the RabbitMQ’s topology?
But if there are multiple connections in one Producer
, it should send messages concurrently. In this way, the _timer
task should be at the client connection level like the _listener
task, not in the Producer
.
And it’s possible to move the _lock
at line 233 in producer.py
to function _get_or_create_publisher
? Maybe _lock
is only required when _send_batch
needs to create a new publisher.
In addition, if a stream(partition) corresponds to only one publisher, line 233, 244 can be moved out of the for loop.
Here is a simple wrapper for Producer, with the following modifications:
- remove
_buffered_messages_lock
- move
_lock
into_get_or_create_publisher
- add
_buffered_sub_entry_messages
- remove the arg
sync
from_send_batch
- redesign
publisher_name
(add publisher_name check insend
) - optimize some Python statements (attributes query etc)
About point 5:
Inside Producer.send
, publisher_name
and message
are used together to construct _MessageNotification
, but publisher_name
is redundant actually. Because each partition corresponds to a publisher, and publisher_name
can be found by partition_name. To support this query, Producer
needs a dict _publishers_name
[stream : publisher_name].
There may be a performance improvement of close to 20%, but I'm not sure if there are any bugs in the code.
class MyProducer(Producer):
def __init__(
self,
host: str,
port: int = 5552,
*,
ssl_context: Optional[ssl.SSLContext] = None,
vhost: str = "/",
username: str,
password: str,
frame_max: int = 1 * 1024 * 1024,
heartbeat: int = 60,
load_balancer_mode: bool = False,
max_retries: int = 20,
default_batch_publishing_delay: float = 0.2,
connection_closed_handler: Optional[CB[Exception]] = None,
):
super().__init__(
host,
port,
ssl_context=ssl_context,
vhost=vhost,
username=username,
password=password,
frame_max=frame_max,
heartbeat=heartbeat,
load_balancer_mode=load_balancer_mode,
max_retries=max_retries,
default_batch_publishing_delay=default_batch_publishing_delay,
connection_closed_handler=connection_closed_handler,
)
self._buffered_sub_entry_messages: dict[str, list] = defaultdict(list)
self._publishers_name: dict[str, str] = {} # stream : publisher_name
self._publishing_ids_callback: dict[
str, defaultdict[CB[ConfirmationStatus], set[int]]
] = {} # use the same dict in `_send_batch` & `_send_sub_entry_batch`
async def close(self) -> None:
if self.task is not None:
for stream, messages in self._buffered_messages.items():
if messages:
with suppress(BaseException):
await self._send_batch(stream, messages)
messages.clear()
for stream, messages in self._buffered_sub_entry_messages.items():
if messages:
with suppress(BaseException):
await self._send_batch(stream, messages)
messages.clear()
self.task.cancel()
for publisher in self._publishers.values():
client = publisher.client
with suppress(BaseException):
await client.delete_publisher(publisher.id)
publisher_reference = publisher.reference
client.remove_handler(schema.PublishConfirm, publisher_reference)
client.remove_handler(schema.PublishError, publisher_reference)
self._publishers.clear()
await self._pool.close()
self._clients.clear()
self._waiting_for_confirm.clear()
self._default_client = None
async def _get_or_create_client(self, stream: str) -> Client:
clients = self._clients
if (client := clients.get(stream)) is not None:
return client
leader, _ = await self.default_client.query_leader_and_replicas(stream)
clients[stream] = client = await self._pool.get(
Addr(leader.host, leader.port), self._connection_closed_handler
)
return client
async def _get_or_create_publisher(
self,
stream: str,
publisher_name: Optional[str] = None,
) -> _Publisher:
publishers = self._publishers
if (publisher := publishers.get(stream)) is not None:
if publisher_name is not None:
assert publisher.reference == publisher_name
return publisher
async with self._lock:
if (publisher := publishers.get(stream)) is not None:
if publisher_name is not None:
assert publisher.reference == publisher_name
return publisher
client = await self._get_or_create_client(stream)
# We can have multiple publishers sharing same connection, so their ids must be distinct
publisher_id = (
len([p for p in publishers.values() if p.client is client]) + 1
)
if publisher_name is None:
publisher_name = f"{stream}_publisher_{publisher_id}"
self._publishers_name[stream] = publisher_name
reference = publisher_name
publisher = publishers[stream] = _Publisher(
id=publisher_id,
stream=stream,
reference=reference,
sequence=utils.MonotonicSeq(),
client=client,
)
await client.declare_publisher(
stream=stream,
reference=reference,
publisher_id=publisher_id,
)
sequence = await client.query_publisher_sequence(
stream=stream,
reference=reference,
)
publisher.sequence.set(sequence + 1)
client.add_handler(
schema.PublishConfirm,
partial(self._on_publish_confirm, publisher=publisher),
name=reference,
)
client.add_handler(
schema.PublishError,
partial(self._on_publish_error, publisher=publisher),
name=reference,
)
return publisher
async def send_batch(
self,
stream: str,
batch: list[MessageT],
publisher_name: Optional[str] = None,
on_publish_confirm: Optional[CB[ConfirmationStatus]] = None,
) -> list[int]:
if (
publisher_name is not None
and publisher_name
!= self._publishers_name.setdefault(stream, publisher_name)
):
raise Exception
wrapped_batch = [
_MessageNotification(entry=entry, callback=on_publish_confirm)
for entry in batch
]
return await self._send_batch(stream, wrapped_batch, publisher_name)
async def _send_batch(
self,
stream: str,
batch: list[_MessageNotification],
publisher_name: Optional[str] = None,
pending_length: Optional[int] = None,
return_ids: bool = True,
) -> list[int]:
if len(batch) == 0:
raise ValueError("Empty batch")
messages = []
messages_append = messages.append
# messages = [None] * pending_length
publishing_ids_callback = self._publishing_ids_callback.setdefault(
stream, defaultdict(set)
)
publisher = await self._get_or_create_publisher(stream, publisher_name)
sequence_next = publisher.sequence.next
for item in islice(batch, pending_length):
entry = item.entry
msg = RawMessage(entry) if isinstance(entry, bytes) else entry
if (publishing_id := msg.publishing_id) is None:
msg.publishing_id = publishing_id = sequence_next()
if (callback := item.callback) is not None:
publishing_ids_callback[callback].add(publishing_id)
messages_append(
schema.Message(
publishing_id=publishing_id,
data=bytes(msg),
)
)
await publisher.client.send_frame(
schema.Publish(
publisher_id=publisher.id,
messages=messages,
),
)
if publishing_ids_callback:
waiters = self._waiting_for_confirm[publisher.reference]
for callback, ids in publishing_ids_callback.items():
waiters.setdefault(callback, set()).update(ids)
publishing_ids_callback.clear()
if return_ids:
return list(set(m.publishing_id for m in messages))
async def _send_sub_entry_batch(
self,
stream: str,
batch: list[_MessageNotification],
publisher_name: Optional[str] = None,
pending_length: Optional[int] = None,
):
publishing_ids = []
publishing_ids_append = publishing_ids.append
publishing_ids_callback = self._publishing_ids_callback.setdefault(
stream, defaultdict(set)
)
publisher = await self._get_or_create_publisher(stream, publisher_name)
publisher_id, sequence_next = publisher.id, publisher.sequence.next
send_frame = publisher.client.send_frame
for item in islice(batch, pending_length):
entry = item.entry
messages_count = entry.messages_count()
for _ in repeat(None, messages_count):
publishing_id = sequence_next()
publishing_ids_append(publishing_id)
if (callback := item.callback) is not None:
publishing_ids_callback[callback].add(publishing_id)
await send_frame(
schema.PublishSubBatching(
publisher_id=publisher_id,
number_of_root_messages=1,
publishing_id=publishing_id,
compress_type=0x80 | entry.compression_type() << 4,
subbatching_message_count=messages_count,
uncompressed_data_size=entry.uncompressed_size(),
compressed_data_size=entry.compressed_size(),
messages=entry.data(),
),
)
if publishing_ids_callback:
waiters = self._waiting_for_confirm[publisher.reference]
for callback, ids in publishing_ids_callback.items():
waiters.setdefault(callback, set()).update(ids)
publishing_ids_callback.clear()
return publishing_ids
async def send_wait(
self,
stream: str,
message: MessageT,
publisher_name: Optional[str] = None,
) -> int:
if (
publisher_name is not None
and publisher_name
!= self._publishers_name.setdefault(stream, publisher_name)
):
raise Exception
publisher = await self._get_or_create_publisher(stream, publisher_name)
msg = RawMessage(message) if isinstance(message, bytes) else message
if (publishing_id := msg.publishing_id) is None:
msg.publishing_id = publishing_id = publisher.sequence.next()
await publisher.client.send_frame(
schema.Publish(
publisher_id=publisher.id,
messages=[
schema.Message(
publishing_id=publishing_id,
data=bytes(msg),
)
],
),
)
future: asyncio.Future[None] = asyncio.Future()
self._waiting_for_confirm[publisher.reference][future] = {publishing_id}
await future
return publishing_id
async def send(
self,
stream: str,
message: MessageT,
publisher_name: Optional[str] = None, # # may be removed
on_publish_confirm: Optional[CB[ConfirmationStatus]] = None,
):
if (
publisher_name is not None
and publisher_name
!= self._publishers_name.setdefault(stream, publisher_name)
):
raise Exception
# start the background thread to send buffered messages
if self.task is None:
self.task = asyncio.create_task(self._timer())
self.task.add_done_callback(self._timer_completed)
wrapped_message = _MessageNotification(
entry=message, callback=on_publish_confirm
)
self._buffered_messages[stream].append(wrapped_message)
self._default_context_switch_counter += 1
if self._default_context_switch_counter > self._default_context_switch_value:
await asyncio.sleep(0)
self._default_context_switch_counter = 0
async def send_sub_entry(
self,
stream: str,
sub_entry_messages: list[MessageT],
compression_type: CompressionType = CompressionType.No,
publisher_name: Optional[str] = None, # may be removed
on_publish_confirm: Optional[CB[ConfirmationStatus]] = None,
):
if (
publisher_name is not None
and publisher_name
!= self._publishers_name.setdefault(stream, publisher_name)
):
raise Exception
if len(sub_entry_messages) == 0:
raise ValueError("Empty batch")
# start the background thread to send buffered messages
if self.task is None:
self.task = asyncio.create_task(self._timer())
self.task.add_done_callback(self._timer_completed)
compression_codec = CompressionHelper.compress(
sub_entry_messages, compression_type
)
wrapped_message = _MessageNotification(
entry=compression_codec, callback=on_publish_confirm
)
self._buffered_sub_entry_messages[stream].append(wrapped_message)
await asyncio.sleep(0)
async def _timer(self):
delay = self._default_batch_publishing_delay
buffered_messages = self._buffered_messages
buffered_sub_entry_messages = self._buffered_sub_entry_messages
publishers_name_getter = self._publishers_name.get
send_batch, send_sub_entry_batch = self._send_batch, self._send_sub_entry_batch
while True:
await asyncio.sleep(delay)
for stream, messages in buffered_messages.items():
if messages:
pending_length = len(messages)
await send_batch(
stream,
messages,
publishers_name_getter(stream),
pending_length,
return_ids=False,
)
del messages[:pending_length]
if buffered_sub_entry_messages:
for stream, messages in buffered_sub_entry_messages.items():
if messages:
pending_length = len(messages)
await send_sub_entry_batch(
stream,
messages,
publishers_name_getter(stream),
pending_length,
)
del messages[:pending_length]
async def _on_publish_confirm(
self, frame: schema.PublishConfirm, publisher: _Publisher
) -> None:
if frame.publisher_id != publisher.id:
return
waiting = self._waiting_for_confirm[publisher.reference]
for confirmation in list(waiting):
ids = waiting[confirmation]
frame_publishing_ids = frame.publishing_ids
ids_to_call = ids.intersection(frame_publishing_ids)
ids.difference_update(frame_publishing_ids)
if not ids:
del waiting[confirmation]
if not isinstance(confirmation, asyncio.Future):
for id in ids_to_call:
confirmation_status = ConfirmationStatus(id, True)
result = confirmation(confirmation_status)
if result is not None and hasattr(result, "__await__"):
await result
else:
confirmation.set_result(None)
Thank you for your feedback but we don't think the lock is the performance bottleneck. I would invest time on https://github.com/qweeze/rstream/issues/133
Here is a scenario: _timer
blocks at writer.drain()
, and at this point there are a lot of other tasks calling send
concurrently. These tasks must wait _timer
because of this lock.
Once _timer
completes and the lock is released, a large number of waiters wait for wake-ups in sequence, and only one waiter will be waked up in each event loop. I think this will affect performance in a high-concurrency web server.
I'll close this issue as at the moment we are not planning this implementation. We are anyway noting this issue in the Performance one so in case in the future we may want to look at it again