analog
analog copied to clipboard
更新版本
目前没有更新,作者大佬是否考虑继续更新实时的呢,可以一起做做
Hi,很抱歉这么久才回复。 主要原因还是因为觉得该项目缺一个文档支持,这段时间把analog的文档补齐了,希望你能够从中获得更多信息。 此外,在 code review的过程中,我重写了一些功能(其实还挺多的),并加入了一个简易的文件更新功能,我先抛砖引玉,说一下我的实现思路:
- 另起两个线程,一个线程作为生产者,监视文件变化,而另一个作为消费者,将日志文件改动持续地插入数据库中;
- 生产者线程supervisor,每过一个时间间隔(暂时设置为5s)检查当前日志文件夹的快照和之前的文件夹快照是否一致,不一致则将新建文件和改动文件的文件路径加入到队列中,快照的功能由watchdog实现,核心代码如下:
# 重载Thread,一定时间间隔触发执行函数
class ConsecutiveThread(Thread):
def __init__(self, stop_event, func, args=None, kwargs=None, interval=5, daemon=True):
Thread.__init__(self, daemon=daemon)
self.stopped = stop_event
self.func = func
self.args = args if args is not None else tuple()
self.kwargs = kwargs if kwargs is not None else dict()
self.interval = interval
def run(self):
while not self.stopped.wait(self.interval):
self.func(*self.args, **self.kwargs)
class FileSupervisor:
def __init__(self, _path, _queue: Queue, interval=5, recursive=True, daemon=True, log_path=None, log_name=None):
"""
文件监视类
:param _path: 要监视的目录
:param interval: 时间间隔,即每interval检查一次变更
:param recursive: 是否递归监视
:param daemon: 守护线程
"""
super().__init__()
self.path = _path
self.interval = interval
self.snap = DirectorySnapshot(self.path, recursive=recursive)
self.daemon = daemon
self.stop_flag = Event()
self.logger = None
self.queue = _queue
self.thread = ConsecutiveThread(self.stop_flag, self.get_diff, daemon=True)
def start(self):
self.thread.start()
def get_diff(self):
current_snap = DirectorySnapshot(self.path)
diff = DirectorySnapshotDiff(self.snap, current_snap)
self.snap = current_snap
changes = diff.files_modified + diff.files_created
if len(changes) != 0:
self.queue.put(changes)
- 消费者线程update线程,也是一定时间间隔读取队列中的改动列表,获取到改动列表后逐个将文件倒序读取日志,将日期新于数据库最新时间的日志插入数据库中。这一做法是适应服务器动态更新日志的行为。
def update_thread_func(self):
while True:
changes = None
if self.update_thread_stop_flag:
return
try:
if not self.file_queue.empty():
changes = self.file_queue.get(timeout=100)
except Empty:
sleep(1)
continue
if changes is not None:
self.update_logs(changes)
else:
sleep(1)
self.update_thread = Thread(target=self.update_thread_func, daemon=True)
self.update_thread.start()
感谢您的真诚回信,我会认真拜读下您的思维
------------------ 原始邮件 ------------------ 发件人: "Testzero-wz/analog" <[email protected]>; 发送时间: 2020年12月8日(星期二) 晚上8:12 收件人: "Testzero-wz/analog"<[email protected]>; 抄送: "1105364442"<[email protected]>;"Author"<[email protected]>; 主题: Re: [Testzero-wz/analog] 更新版本 (#3)
Hi,很抱歉这么久才回复。 主要原因还是因为觉得该项目缺一个文档支持,这段时间把analog的文档补齐了,希望你能够从中获得更多信息。 此外,在 code review的过程中,我重写了一些功能(其实还挺多的),并加入了一个简易的文件更新功能,我先抛砖引玉,说一下我的实现思路:
另起两个线程,一个线程作为生产者,监视文件变化,而另一个作为消费者,将日志文件改动持续地插入数据库中;
生产者线程supervisor,每过一个时间间隔(暂时设置为5s)检查当前日志文件夹的快照和之前的文件夹快照是否一致,不一致则将新建文件和改动文件的文件路径加入到队列中,快照的功能由watchdog实现,核心代码如下:
重载Thread,一定时间间隔触发执行函数 class ConsecutiveThread(Thread): def init(self, stop_event, func, args=None, kwargs=None, interval=5, daemon=True): Thread.init(self, daemon=daemon) self.stopped = stop_event self.func = func self.args = args if args is not None else tuple() self.kwargs = kwargs if kwargs is not None else dict() self.interval = interval def run(self): while not self.stopped.wait(self.interval): self.func(*self.args, **self.kwargs) class FileSupervisor: def init(self, _path, _queue: Queue, interval=5, recursive=True, daemon=True, log_path=None, log_name=None): """ 文件监视类 :param _path: 要监视的目录 :param interval: 时间间隔,即每interval检查一次变更 :param recursive: 是否递归监视 :param daemon: 守护线程 """ super().init() self.path = _path self.interval = interval self.snap = DirectorySnapshot(self.path, recursive=recursive) self.daemon = daemon self.stop_flag = Event() self.logger = None self.queue = _queue self.thread = ConsecutiveThread(self.stop_flag, self.get_diff, daemon=True) def start(self): self.thread.start() def get_diff(self): current_snap = DirectorySnapshot(self.path) diff = DirectorySnapshotDiff(self.snap, current_snap) self.snap = current_snap changes = diff.files_modified + diff.files_created if len(changes) != 0: self.queue.put(changes)
消费者线程update线程,也是一定时间间隔读取队列中的改动列表,获取到改动列表后逐个将文件倒序读取日志,将日期新于数据库最新时间的日志插入数据库中。这一做法是适应服务器动态更新日志的行为。 def update_thread_func(self): while True: changes = None if self.update_thread_stop_flag: return try: if not self.file_queue.empty(): changes = self.file_queue.get(timeout=100) except Empty: sleep(1) continue if changes is not None: self.update_logs(changes) else: sleep(1) self.update_thread = Thread(target=self.update_thread_func, daemon=True) self.update_thread.start()
— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub, or unsubscribe.
大佬能不能加一下联系方式