wayback icon indicating copy to clipboard operation
wayback copied to clipboard

Multiple threads

Open adildg opened this issue 2 years ago • 5 comments

Hello,

I would like to be able to check multiple domains at the same time, is it okay to use multithreading ?

adildg avatar Nov 07 '22 15:11 adildg

This package is currently based on the really broadly used Requests package, which is unfortunately not thread-safe. That means that, if you want to make requests from multiple threads, you should create a separate WaybackClient instance in each thread you want to make requests from.

For example:

mementos_to_get = [list, of, cdx, records, or, urls]

# Get a unique WaybackClient for whatever thread you're on.
def get_wayback_client():
    if 'wayback' not in threading.local():
        threading.local.wayback = wayback.WaybackClient()
    return threading.local().wayback

def get_memento_safely(*args, **kwargs)
    return get_wayback_client().get_memento(*args, **kwargs)

with ThreadPoolExecutor(max_workers=4) as executor:
    for memento in executor.map(get_memento_safely, mementos_to_get):
        # Do something with each memento result

Or using classic thread classes:

mementos_to_get = [list, of, cdx, records, or, urls]

class Worker(threading.Thread):
    def __init__(self, input_queue, output_queue):
        super().__init__()
        self.input_queue = input_queue
        self.output_queue = output_queue

    def run(self):
        # Make a client for this thread and use it:
        with wayback.WaybackClient() as client:
            while True:
                try:
                    # This expects the queue to already be full, and no be added to in real time.
                    # Otherwise you should get() instead of get_nowait().
                    item = self.input_queue.get_nowait()
                    memento = client.get_memento(your, args, here)
                    self.output_queue.put(memento)
                except queue.Empty:
                    # This thread is done, so let the run() method end.
                    break
                except Exception as error:
                    self.output_queue.put(error)
                finally:
                    self.input_queue.task_done()

processing_queue = queue.Queue()
results_queue = queue.Queue()
for item of mementos_to_get:
    processing_queue.put_nowait(item)
threads = [Worker(processing_queue, results_queue) for i in range(4)]

# Wait for them all to finish:
processing_queue.join()
# Start reading the results:
while not results_queue.empty():
    memento_or_error = results.queue.get()
    # Do something with the result

You can do some really complicated things with WaybackSession to share a pool of connections across threads, but it’s really complicated and I don’t recommend it. Here’s an example: https://github.com/edgi-govdata-archiving/web-monitoring-processing/blob/fcfb36341090bf1a2b560a9008c711386ef8da17/web_monitoring/cli/cli.py

That said, thread safety is one of my 2 next priorities (the other is the Wayback Machine’s new, beta CDX search API). v0.4.0 will be out in the next couple days, and then thread safety should be in v0.5.0. When that’s done, you can just use one client wherever you want, without worrying about whether you are on different threads. But that will take a lot of work, since it means moving off the Requests package. I don’t have a clear timeframe for it. (See #58).

Mr0grog avatar Nov 07 '22 18:11 Mr0grog

Relatedly, if your use case is basically:

  1. Use search() to find a list of mementos, then
  2. Get those mementos efficiently on a bunch of threads

I’d appreciate any feedback on how we could or should make a nice wrapper for that in #17. (It will probably be a while before that gets implemented, though!)

Mr0grog avatar Nov 07 '22 18:11 Mr0grog

This package is currently based on the really broadly used Requests package, which is unfortunately not thread-safe. That means that, if you want to make requests from multiple threads, you should create a separate WaybackClient instance in each thread you want to make requests from.

For example:

mementos_to_get = [list, of, cdx, records, or, urls]

# Get a unique WaybackClient for whatever thread you're on.
def get_wayback_client():
    if 'wayback' not in threading.local():
        threading.local.wayback = wayback.WaybackClient()
    return threading.local().wayback

def get_memento_safely(*args, **kwargs)
    return get_wayback_client().get_memento(*args, **kwargs)

with ThreadPoolExecutor(max_workers=4) as executor:
    for memento in executor.map(get_memento_safely, mementos_to_get):
        # Do something with each memento result

Or using classic thread classes:

mementos_to_get = [list, of, cdx, records, or, urls]

class Worker(threading.Thread):
    def __init__(self, input_queue, output_queue):
        super().__init__()
        self.input_queue = input_queue
        self.output_queue = output_queue

    def run(self):
        # Make a client for this thread and use it:
        with wayback.WaybackClient() as client:
            while True:
                try:
                    # This expects the queue to already be full, and no be added to in real time.
                    # Otherwise you should get() instead of get_nowait().
                    item = self.input_queue.get_nowait()
                    memento = client.get_memento(your, args, here)
                    self.output_queue.put(memento)
                except queue.Empty:
                    # This thread is done, so let the run() method end.
                    break
                except Exception as error:
                    self.output_queue.put(error)
                finally:
                    self.input_queue.task_done()

processing_queue = queue.Queue()
results_queue = queue.Queue()
for item of mementos_to_get:
    processing_queue.put_nowait(item)
threads = [Worker(processing_queue, results_queue) for i in range(4)]

# Wait for them all to finish:
processing_queue.join()
# Start reading the results:
while not results_queue.empty():
    memento_or_error = results.queue.get()
    # Do something with the result

You can do some really complicated things with WaybackSession to share a pool of connections across threads, but it’s really complicated and I don’t recommend it. Here’s an example: https://github.com/edgi-govdata-archiving/web-monitoring-processing/blob/fcfb36341090bf1a2b560a9008c711386ef8da17/web_monitoring/cli/cli.py

That said, thread safety is one of my 2 next priorities (the other is the Wayback Machine’s new, beta CDX search API). v0.4.0 will be out in the next couple days, and then thread safety should be in v0.5.0. When that’s done, you can just use one client wherever you want, without worrying about whether you are on different threads. But that will take a lot of work, since it means moving off the Requests package. I don’t have a clear timeframe for it. (See #58).

Amazing! thank you so so much for your explanation <3

adildg avatar Nov 08 '22 18:11 adildg

I've tried multithreading and got blocked by the website. If you are trying it, I recommend giving time.sleep() in between.

kyungsub1108 avatar Oct 09 '23 20:10 kyungsub1108

Quick update: I’m considering this a duplicate of #58, which I am pretty committed to actually solving this month.

@kyungsub1108 we made a bunch of rate limiting improvements recently in v0.4.4, and have some even bigger ones coming in v0.5.0 later this month (along with actual thread safety, so you can use a single client across multiple threads). Hopefully those help with situations like yours.

Mr0grog avatar Dec 13 '23 01:12 Mr0grog