ipwb icon indicating copy to clipboard operation
ipwb copied to clipboard

get_uris_and_datetimes_in_cdxj() in replay can be more efficient

Open machawk1 opened this issue 6 years ago • 8 comments

The function getURIsAndDatetimesInCDXJ() in replay.py iterates through every line in a list of lines to extract the datetime, mime, and HTTP status from a CDXJ file to be used by the JavaScript in the ipwb user interface.

The current implementation (c18c13e0ad44e759fb9d956b6c53ef63eba83053) iterates through each line. Per comments in #604, this could be way more efficient using some sort of parallelism like Python 3's asyncio.

For CDXJ files that are very large (e.g., 500,000 URI-Ms per the sample in #604), this has the potential for being way more efficient at obtaining the relevant information from the CDXJ into the user interface.

machawk1 avatar Jun 22 '19 22:06 machawk1

On further inspection of getURIsAndDatetimesInCDXJ(), the function initially calls getIndexFileContents() and puts the CDXJ lines into memory, namely a Python list, and only then begins to iteratively process the lines.

machawk1 avatar Aug 05 '19 20:08 machawk1

I'm still getting a handle on implementing async/await in Py3, particularly on how to do batch queueing and returning a value once the line is consumed. The current logic expects a globally (to the loop) available results (uris) list that can be checked and manipulated.

I am searching for an example to accomplish this. While I can process the lines, I cannot yet return the results. Most examples using asyncio Queues simply result in the consumer printing out the result. Need a way to pass it by reference...if possible, though this sounds like it might lead to a race condition.

machawk1 avatar Aug 05 '19 20:08 machawk1

Something like the following might work, adapted from the current code (& thx to @N0taN3rd):

import asyncio
import aiofiles
import json

def unsurt(surt):
    try:
        index = surt.index(')/')
        parts = surt[0:index].split(',')
        parts.reverse()
        host = '.'.join(parts)
        host += surt[index+1:]
        return host

    except ValueError:
        # May not be a valid surt
        return surt

def parseCDXJLine(l):
	cdxjFields = l.strip().split(' ', 2)
	uri = unsurt(cdxjFields[0])
	datetime = cdxjFields[1]

	try:
		jsonFields = json.loads(cdxjFields[2])
	except Exception as e:  # Skip lines w/o JSON block
		return None, None

	mementoAsJSON = {
		'datetime': datetime,
		'mime': jsonFields['mime_type'] or '',
		'status': jsonFields['status_code']
	}
	if 'title' in jsonFields:
		mementoAsJSON['title'] = jsonFields['title']

	return uri, mementoAsJSON
        
async def getURIsAndDatetimesInCDXJ(cdxjFilePath):
    uris = {}

    async with aiofiles.open(cdxjFilePath, mode='r') as f:
        async for line in f:
          (uri, mementoAsJSON) = parseCDXJLine(line)
          if uri not in uris:
              uris[uri] = []
          uris[uri].append(mementoAsJSON)


loop = asyncio.get_event_loop()
task = loop.create_task(getURIsAndDatetimesInCDXJ('/tmp/sample_sorted.cdxj'))
loop.run_until_complete(task)

machawk1 avatar Aug 05 '19 22:08 machawk1

I do not think making CDX lookup async is the right thing to invest energy in. We would like async fetches of headers and payload parts from IPFS. As far as index is concerned, we just need to focus on implementing in-file binary search.

ibnesayeed avatar Aug 05 '19 22:08 ibnesayeed

@ibnesayeed The function in question requires parsing every line in-series. This GitHub issue revolves around making this function more efficient. It does not relate to making fetches to IPFS more efficient. It is not even a search function, simply one of parsing and extraction, which can be optimized.

Feel free to invest your energy where you see it most useful. 😊

machawk1 avatar Aug 05 '19 22:08 machawk1

I understand what you are doing here, but I do not think it will improve the experience much. Line reads from the index file are still sequential and bound by the I/O read speed. If parsing each line were a complex and time-taking process or had any other I/Os (such as network) involved then concurrency would have benefited us here. Asynchronous code comes with its own cost and should only be used if the gain is reasonable. That said, if you insist on working on this approach then I would suggest you to profile time taken in parsing both with and without async in place.

ibnesayeed avatar Aug 05 '19 23:08 ibnesayeed

Above revised somewhat:

import asyncio
import time
import json

async def consumer(queue):
    while True:
        line = await queue.get()
        doWork(line)
        queue.task_done()

async def producer():
    N_TASKS = 10
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue(N_TASKS)
    tasks = [loop.create_task(consumer(queue)) for _ in range(N_TASKS)]
    try:
        with open('sample.cdxj') as f:
            for line in f:
                await queue.put(line)
        await queue.join()
    finally:
        for t in tasks:
            t.cancel()

def sync():
    with open('sample.cdxj') as f:
        for line in f:
          doWork(line)


def doWork(line):
    cdxjFields = line.split(' ', 2)
    uri = unsurt(cdxjFields[0])
    datetime = cdxjFields[1]

    try:
        jsonFields = json.loads(cdxjFields[2])
    except Exception as e:  # Skip lines w/o JSON block
        return

    mementoAsJSON = {
        'datetime': datetime,
        'mime': jsonFields['mime_type'] or '',
        'status': jsonFields['status_code']
    }

    if 'title' in jsonFields:
        mementoAsJSON['title'] = jsonFields['title']

    return

def unsurt(surt):
    try:
        index = surt.index(')/')
        parts = surt[0:index].split(',')
        parts.reverse()
        host = '.'.join(parts)
        host += surt[index+1:]
        return host

    except ValueError:
        return surt

start1 = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(producer())
print('Async run time: ', time.time() - start1)

start2 = time.time()
sync()
print('Sync run time: ', time.time() - start2)

Async run time: 9.692363977432251 Sync run time: 3.3537070751190186

Not too convincing but might be my understanding of async.

machawk1 avatar Nov 23 '19 23:11 machawk1

A simpler example using the built-in multiprocessing looks to be a little faster than the linear synchronous approach:

from multiprocessing import Pool

def multiprocess():
    pool = Pool(4)
    with open('sample.cdxj', 'r') as f:
        pool.map(doWork, f)

multiprocess()

On a 500,000 line cdxj file:

Sync run time:  2.866218090057373 seconds
Multiprocess run time:  2.458813190460205 seconds

With about 4,000,000 lines:

Sync run time:  25.190104961395264 seconds
Multiprocess run time:  27.536958932876587 seconds

machawk1 avatar Dec 16 '19 19:12 machawk1