analog icon indicating copy to clipboard operation
analog copied to clipboard

更新版本

Open 1u0Hun opened this issue 4 years ago • 3 comments

目前没有更新,作者大佬是否考虑继续更新实时的呢,可以一起做做

1u0Hun avatar Nov 23 '20 12:11 1u0Hun

Hi,很抱歉这么久才回复。 主要原因还是因为觉得该项目缺一个文档支持,这段时间把analog的文档补齐了,希望你能够从中获得更多信息。 此外,在 code review的过程中,我重写了一些功能(其实还挺多的),并加入了一个简易的文件更新功能,我先抛砖引玉,说一下我的实现思路:

  1. 另起两个线程,一个线程作为生产者,监视文件变化,而另一个作为消费者,将日志文件改动持续地插入数据库中;
  2. 生产者线程supervisor,每过一个时间间隔(暂时设置为5s)检查当前日志文件夹的快照和之前的文件夹快照是否一致,不一致则将新建文件和改动文件的文件路径加入到队列中,快照的功能由watchdog实现,核心代码如下:
# 重载Thread,一定时间间隔触发执行函数
class ConsecutiveThread(Thread):
    def __init__(self, stop_event, func, args=None, kwargs=None, interval=5, daemon=True):
        Thread.__init__(self, daemon=daemon)
        self.stopped = stop_event
        self.func = func
        self.args = args if args is not None else tuple()
        self.kwargs = kwargs if kwargs is not None else dict()
        self.interval = interval

    def run(self):
        while not self.stopped.wait(self.interval):
            self.func(*self.args, **self.kwargs)


class FileSupervisor:

    def __init__(self, _path, _queue: Queue, interval=5, recursive=True, daemon=True, log_path=None, log_name=None):
        """
        文件监视类
        :param _path: 要监视的目录
        :param interval: 时间间隔,即每interval检查一次变更
        :param recursive: 是否递归监视
        :param daemon: 守护线程
        """
        super().__init__()
        self.path = _path
        self.interval = interval
        self.snap = DirectorySnapshot(self.path, recursive=recursive)
        self.daemon = daemon
        self.stop_flag = Event()
        self.logger = None
        self.queue = _queue
        self.thread = ConsecutiveThread(self.stop_flag, self.get_diff, daemon=True)

    def start(self):
        self.thread.start()

    def get_diff(self):
        current_snap = DirectorySnapshot(self.path)
        diff = DirectorySnapshotDiff(self.snap, current_snap)
        self.snap = current_snap
        changes = diff.files_modified + diff.files_created
        if len(changes) != 0:
            self.queue.put(changes)
  1. 消费者线程update线程,也是一定时间间隔读取队列中的改动列表,获取到改动列表后逐个将文件倒序读取日志,将日期新于数据库最新时间的日志插入数据库中。这一做法是适应服务器动态更新日志的行为。
def update_thread_func(self):
	while True:
		changes = None
		if self.update_thread_stop_flag:
			return
		try:
			if not self.file_queue.empty():
				changes = self.file_queue.get(timeout=100)
		except Empty:
			sleep(1)
			continue
		if changes is not None:
			self.update_logs(changes)
		else:
			sleep(1)
self.update_thread = Thread(target=self.update_thread_func, daemon=True)
self.update_thread.start()

Testzero-wz avatar Dec 08 '20 12:12 Testzero-wz

感谢您的真诚回信,我会认真拜读下您的思维

------------------ 原始邮件 ------------------ 发件人: "Testzero-wz/analog" <[email protected]>; 发送时间: 2020年12月8日(星期二) 晚上8:12 收件人: "Testzero-wz/analog"<[email protected]>; 抄送: "1105364442"<[email protected]>;"Author"<[email protected]>; 主题: Re: [Testzero-wz/analog] 更新版本 (#3)

Hi,很抱歉这么久才回复。 主要原因还是因为觉得该项目缺一个文档支持,这段时间把analog的文档补齐了,希望你能够从中获得更多信息。 此外,在 code review的过程中,我重写了一些功能(其实还挺多的),并加入了一个简易的文件更新功能,我先抛砖引玉,说一下我的实现思路:

另起两个线程,一个线程作为生产者,监视文件变化,而另一个作为消费者,将日志文件改动持续地插入数据库中;

生产者线程supervisor,每过一个时间间隔(暂时设置为5s)检查当前日志文件夹的快照和之前的文件夹快照是否一致,不一致则将新建文件和改动文件的文件路径加入到队列中,快照的功能由watchdog实现,核心代码如下:

重载Thread,一定时间间隔触发执行函数 class ConsecutiveThread(Thread): def init(self, stop_event, func, args=None, kwargs=None, interval=5, daemon=True): Thread.init(self, daemon=daemon) self.stopped = stop_event self.func = func self.args = args if args is not None else tuple() self.kwargs = kwargs if kwargs is not None else dict() self.interval = interval def run(self): while not self.stopped.wait(self.interval): self.func(*self.args, **self.kwargs) class FileSupervisor: def init(self, _path, _queue: Queue, interval=5, recursive=True, daemon=True, log_path=None, log_name=None): """ 文件监视类 :param _path: 要监视的目录 :param interval: 时间间隔,即每interval检查一次变更 :param recursive: 是否递归监视 :param daemon: 守护线程 """ super().init() self.path = _path self.interval = interval self.snap = DirectorySnapshot(self.path, recursive=recursive) self.daemon = daemon self.stop_flag = Event() self.logger = None self.queue = _queue self.thread = ConsecutiveThread(self.stop_flag, self.get_diff, daemon=True) def start(self): self.thread.start() def get_diff(self): current_snap = DirectorySnapshot(self.path) diff = DirectorySnapshotDiff(self.snap, current_snap) self.snap = current_snap changes = diff.files_modified + diff.files_created if len(changes) != 0: self.queue.put(changes)

消费者线程update线程,也是一定时间间隔读取队列中的改动列表,获取到改动列表后逐个将文件倒序读取日志,将日期新于数据库最新时间的日志插入数据库中。这一做法是适应服务器动态更新日志的行为。 def update_thread_func(self): while True: changes = None if self.update_thread_stop_flag: return try: if not self.file_queue.empty(): changes = self.file_queue.get(timeout=100) except Empty: sleep(1) continue if changes is not None: self.update_logs(changes) else: sleep(1) self.update_thread = Thread(target=self.update_thread_func, daemon=True) self.update_thread.start()

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub, or unsubscribe.

1u0Hun avatar Dec 08 '20 12:12 1u0Hun

大佬能不能加一下联系方式

xcszh avatar Feb 27 '23 09:02 xcszh