amazon_photos
amazon_photos copied to clipboard
Add async upload functions
Would you consider adding some native async methods like this?
async def aupload(self, files: list[Path | str] | Generator, chunk_size=64 * 1024) -> list[dict]:
"""
Upload files to Amazon Photos
@param files: list of files to upload
@param chunk_size: optional size to chunk files into during stream upload
@return: the upload response
"""
async def stream_bytes(file: Path) -> bytes:
async with aiofiles.open(file, 'rb') as f:
while chunk := await f.read(chunk_size):
yield chunk
async def upload_file(client: AsyncClient, file: Path):
logger.debug(f'Upload start: {file.name}')
file = Path(file) if isinstance(file, str) else file
async with aiofiles.open(file, 'rb') as f:
file_content = await f.read()
r = await client.post(
'https://content-na.drive.amazonaws.com/v2/upload',
data=stream_bytes(file),
params={
'conflictResolution': 'RENAME',
'fileSize': str(len(file_content)),
'name': file.name,
'parentNodeId': self.root,
},
headers={
'content-length': str(len(file_content)),
'x-amzn-file-md5': hashlib.md5(file_content).hexdigest(),
}
)
data = r.json()
logger.debug(f'Upload complete: {file.name}')
logger.debug(f'{data = }')
return data
async with AsyncClient(
http2=True,
timeout=60,
follow_redirects=True,
headers=self.client.headers,
cookies=self.client.cookies
) as client:
fns = [upload_file(client, file) for file in files]
return await asyncio.gather(*fns)