get_uris_and_datetimes_in_cdxj() in replay can be more efficient
The function getURIsAndDatetimesInCDXJ() in replay.py iterates through every line in a list of lines to extract the datetime, mime, and HTTP status from a CDXJ file to be used by the JavaScript in the ipwb user interface.
The current implementation (c18c13e0ad44e759fb9d956b6c53ef63eba83053) iterates through each line. Per comments in #604, this could be way more efficient using some sort of parallelism like Python 3's asyncio.
For CDXJ files that are very large (e.g., 500,000 URI-Ms per the sample in #604), this has the potential for being way more efficient at obtaining the relevant information from the CDXJ into the user interface.
On further inspection of getURIsAndDatetimesInCDXJ(), the function initially calls getIndexFileContents() and puts the CDXJ lines into memory, namely a Python list, and only then begins to iteratively process the lines.
I'm still getting a handle on implementing async/await in Py3, particularly on how to do batch queueing and returning a value once the line is consumed. The current logic expects a globally (to the loop) available results (uris) list that can be checked and manipulated.
I am searching for an example to accomplish this. While I can process the lines, I cannot yet return the results. Most examples using asyncio Queues simply result in the consumer printing out the result. Need a way to pass it by reference...if possible, though this sounds like it might lead to a race condition.
Something like the following might work, adapted from the current code (& thx to @N0taN3rd):
import asyncio
import aiofiles
import json
def unsurt(surt):
try:
index = surt.index(')/')
parts = surt[0:index].split(',')
parts.reverse()
host = '.'.join(parts)
host += surt[index+1:]
return host
except ValueError:
# May not be a valid surt
return surt
def parseCDXJLine(l):
cdxjFields = l.strip().split(' ', 2)
uri = unsurt(cdxjFields[0])
datetime = cdxjFields[1]
try:
jsonFields = json.loads(cdxjFields[2])
except Exception as e: # Skip lines w/o JSON block
return None, None
mementoAsJSON = {
'datetime': datetime,
'mime': jsonFields['mime_type'] or '',
'status': jsonFields['status_code']
}
if 'title' in jsonFields:
mementoAsJSON['title'] = jsonFields['title']
return uri, mementoAsJSON
async def getURIsAndDatetimesInCDXJ(cdxjFilePath):
uris = {}
async with aiofiles.open(cdxjFilePath, mode='r') as f:
async for line in f:
(uri, mementoAsJSON) = parseCDXJLine(line)
if uri not in uris:
uris[uri] = []
uris[uri].append(mementoAsJSON)
loop = asyncio.get_event_loop()
task = loop.create_task(getURIsAndDatetimesInCDXJ('/tmp/sample_sorted.cdxj'))
loop.run_until_complete(task)
I do not think making CDX lookup async is the right thing to invest energy in. We would like async fetches of headers and payload parts from IPFS. As far as index is concerned, we just need to focus on implementing in-file binary search.
@ibnesayeed The function in question requires parsing every line in-series. This GitHub issue revolves around making this function more efficient. It does not relate to making fetches to IPFS more efficient. It is not even a search function, simply one of parsing and extraction, which can be optimized.
Feel free to invest your energy where you see it most useful. 😊
I understand what you are doing here, but I do not think it will improve the experience much. Line reads from the index file are still sequential and bound by the I/O read speed. If parsing each line were a complex and time-taking process or had any other I/Os (such as network) involved then concurrency would have benefited us here. Asynchronous code comes with its own cost and should only be used if the gain is reasonable. That said, if you insist on working on this approach then I would suggest you to profile time taken in parsing both with and without async in place.
Above revised somewhat:
import asyncio
import time
import json
async def consumer(queue):
while True:
line = await queue.get()
doWork(line)
queue.task_done()
async def producer():
N_TASKS = 10
loop = asyncio.get_event_loop()
queue = asyncio.Queue(N_TASKS)
tasks = [loop.create_task(consumer(queue)) for _ in range(N_TASKS)]
try:
with open('sample.cdxj') as f:
for line in f:
await queue.put(line)
await queue.join()
finally:
for t in tasks:
t.cancel()
def sync():
with open('sample.cdxj') as f:
for line in f:
doWork(line)
def doWork(line):
cdxjFields = line.split(' ', 2)
uri = unsurt(cdxjFields[0])
datetime = cdxjFields[1]
try:
jsonFields = json.loads(cdxjFields[2])
except Exception as e: # Skip lines w/o JSON block
return
mementoAsJSON = {
'datetime': datetime,
'mime': jsonFields['mime_type'] or '',
'status': jsonFields['status_code']
}
if 'title' in jsonFields:
mementoAsJSON['title'] = jsonFields['title']
return
def unsurt(surt):
try:
index = surt.index(')/')
parts = surt[0:index].split(',')
parts.reverse()
host = '.'.join(parts)
host += surt[index+1:]
return host
except ValueError:
return surt
start1 = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(producer())
print('Async run time: ', time.time() - start1)
start2 = time.time()
sync()
print('Sync run time: ', time.time() - start2)
Async run time: 9.692363977432251 Sync run time: 3.3537070751190186
Not too convincing but might be my understanding of async.
A simpler example using the built-in multiprocessing looks to be a little faster than the linear synchronous approach:
from multiprocessing import Pool
def multiprocess():
pool = Pool(4)
with open('sample.cdxj', 'r') as f:
pool.map(doWork, f)
multiprocess()
On a 500,000 line cdxj file:
Sync run time: 2.866218090057373 seconds
Multiprocess run time: 2.458813190460205 seconds
With about 4,000,000 lines:
Sync run time: 25.190104961395264 seconds
Multiprocess run time: 27.536958932876587 seconds