HelloGitHub icon indicating copy to clipboard operation
HelloGitHub copied to clipboard

【开源自荐】broadcast-service 一个轻量级的python发布订阅广播库

Open Undertone0809 opened this issue 1 year ago • 0 comments

推荐项目

  • 类别:Python
  • 项目标题:broadcast-service 一个轻量级的python发布发布订阅广播库
  • 项目描述:broadcast-service是一个轻量级的python发布发布订阅广播库,你可以通过这个库轻松地构造发布订阅者模式,并且支持同步、异步、多主题订阅等不同场景下的模式建立。
  • 亮点:
  1. 可以用十分简单的语法构建起一个发布订阅者模式
  2. 支持异步、同步等不同的应用场景
  3. 提供lambda、回调函数、装饰器等不同的语法编写模式
  4. 支持同时订阅多个主题回调,同时向多个主题发布信息
  5. 文档完善 document
  • 示例代码:
from broadcast_service import broadcast_service


# callback of common method
def handle_msg(params):
    print(f"handle_msg receive params: {params}")


# callback of decorator
@broadcast_service.on_listen(['my_topic'])
def handle_decorator_msg(params):
    print(f"handle_decorator_msg receive params: {params}")

if __name__ == '__main__':
    info = 'This is very important msg'

    # subscribe topic
    broadcast_service.subscribe('my_topic', handle_msg)

    # publish broadcast
    broadcast_service.publish('my_topic', info)
"""
This example show how to use in a class. The ueage is very
similar to the common method.
"""


from broadcast_service import broadcast_service


class Person:

    def __init__(self, name: str) -> None:
        self.name = name

    def subscribe(self, topic):
        broadcast_service.listen(topic, self.take_milk)
        print(f"{self.name} subscribe the milk service")

    def take_milk(self):
        print(f"{self.name} takes the milk")


class Market:

    def send_milk(self):
        print("Market send milk")
        broadcast_service.broadcast("milk")


def main():
    p1 = Person("Jack")
    p1.subscribe("milk")
    p2 = Person("Tom")
    p2.subscribe("milk")
    m = Market()
    m.send_milk()


if __name__ == '__main__':
    main()
  • 后续更新计划:
  1. 优化装饰器等场景下的语法表达能力
  2. 探索更多复杂的应用场景,为更多复杂场景提供解决方案
  3. 增加模糊订阅功能
  4. 增加单次订阅回调功能

Undertone0809 avatar Jan 03 '23 10:01 Undertone0809