HelloGitHub
HelloGitHub copied to clipboard
【开源自荐】broadcast-service 一个轻量级的python发布订阅广播库
推荐项目
- 类别:Python
- 项目标题:broadcast-service 一个轻量级的python发布发布订阅广播库
- 项目描述:broadcast-service是一个轻量级的python发布发布订阅广播库,你可以通过这个库轻松地构造发布订阅者模式,并且支持同步、异步、多主题订阅等不同场景下的模式建立。
- 亮点:
- 可以用十分简单的语法构建起一个发布订阅者模式
- 支持异步、同步等不同的应用场景
- 提供lambda、回调函数、装饰器等不同的语法编写模式
- 支持同时订阅多个主题回调,同时向多个主题发布信息
- 文档完善 document
- 示例代码:
from broadcast_service import broadcast_service
# callback of common method
def handle_msg(params):
print(f"handle_msg receive params: {params}")
# callback of decorator
@broadcast_service.on_listen(['my_topic'])
def handle_decorator_msg(params):
print(f"handle_decorator_msg receive params: {params}")
if __name__ == '__main__':
info = 'This is very important msg'
# subscribe topic
broadcast_service.subscribe('my_topic', handle_msg)
# publish broadcast
broadcast_service.publish('my_topic', info)
"""
This example show how to use in a class. The ueage is very
similar to the common method.
"""
from broadcast_service import broadcast_service
class Person:
def __init__(self, name: str) -> None:
self.name = name
def subscribe(self, topic):
broadcast_service.listen(topic, self.take_milk)
print(f"{self.name} subscribe the milk service")
def take_milk(self):
print(f"{self.name} takes the milk")
class Market:
def send_milk(self):
print("Market send milk")
broadcast_service.broadcast("milk")
def main():
p1 = Person("Jack")
p1.subscribe("milk")
p2 = Person("Tom")
p2.subscribe("milk")
m = Market()
m.send_milk()
if __name__ == '__main__':
main()
- 后续更新计划:
- 优化装饰器等场景下的语法表达能力
- 探索更多复杂的应用场景,为更多复杂场景提供解决方案
- 增加模糊订阅功能
- 增加单次订阅回调功能