refactor!: Introduce new storage client system
Description
- I consolidated all commits from https://github.com/apify/crawlee-python/pull/1107 into this new PR.
- The previous storage-clients implementation was completely replaced with a redesigned clients, including:
- new storage-client interface,
- in-memory storage client,
- file-system storage client,
- Apify storage client (implemented in the SDK; see https://github.com/apify/apify-sdk-python/pull/470),
- and various adjustments elsewhere in the codebase.
- The old "memory plus persist" client has been split into separate memory and file-system implementations.
- The
Configuration.persist_storageandConfiguration.persist_metadataoptions were removed.
- The
- All old collection clients have been removed, they're no longer needed.
- Each storage client now prints warnings if you pass method arguments it does not support.
- The creation management modules in the storage clients and storages were removed.
- Storage client parameters (e.g.
purge_on_start, ortokenandbase_api_urlfor the Apify client) are configured via theConfiguration. - Every storage, and its corresponding client, now provides both a
purgemethod (which clears all items but preserves the storage and metadata) and adropmethod (which removes the entire storage, metadata included). - All unused types, models, and helper utilities have been removed.
- The detailed, per-storage/client changes are listed below...
Dataset
- Properties:
idnamemetadata
- Methods:
openpurge(new method)droppush_dataget_dataiterate_itemslist_items(new method)export_to
- Breaking changes:
from_storage_objectmethod has been removed - Use theopenmethod withnameoridinstead.get_info->metadatapropertystorage_object->metadatapropertyset_metadatamethod has been removed (it wasn't propage to clients)- Do we want to support it (e.g. for renaming)?
write_to_json-> method has been removed, useexport_toinsteadwrite_to_csv-> method has been removed, useexport_toinstead
import asyncio
from crawlee.storage_clients import FileSystemStorageClient
from crawlee.storages import Dataset
async def main() -> None:
dataset = await Dataset.open(storage_client=FileSystemStorageClient())
print(f'default dataset - ID: {dataset.id}, name: {dataset.name}')
await dataset.push_data({'name': 'John'})
await dataset.push_data({'name': 'John', 'age': 20})
await dataset.push_data({})
dataset_with_name = await Dataset.open(
name='my_dataset',
storage_client=FileSystemStorageClient(),
)
print(f'named dataset - ID: {dataset_with_name.id}, name: {dataset_with_name.name}')
await dataset_with_name.push_data([{'age': 30}, {'age': 25}])
print('Default dataset items:')
async for item in dataset.iterate_items(skip_empty=True):
print(item)
print('Named dataset items:')
async for item in dataset_with_name.iterate_items():
print(item)
items = await dataset.get_data()
print(items)
dataset_by_id = await Dataset.open(id=dataset_with_name.id)
print(f'dataset by ID - ID: {dataset_by_id.id}, name: {dataset_by_id.name}')
if __name__ == '__main__':
asyncio.run(main())
Key-value store
- Properties:
idnamemetadata
- Methods:
openpurge(new method)dropget_valueset_valuedelete_value(new method, Apify platform's set_value support setting an empty value to a key, so having a separate method for deleting is useful)iterate_keyslist_keys(new method)get_public_urlget_auto_saved_valuepersist_autosaved_values
- Breaking changes:
from_storage_objectmethod has been removed - Use theopenmethod withnameoridinstead.get_info->metadatapropertystorage_object->metadatapropertyset_metadatamethod has been removed (it wasn't propage to clients)- Do we want to support it (e.g. for renaming)?
import asyncio
import requests
from crawlee.storage_clients import FileSystemStorageClient
from crawlee.storages import KeyValueStore
async def main() -> None:
print('Opening key-value store "my_kvs"...')
storage_client = FileSystemStorageClient()
kvs = await KeyValueStore.open(name='my_kvs', storage_client=storage_client)
print('Setting value to "file.json"...')
await kvs.set_value('file.json', {'key': 'value'})
print('Setting value to "file.jpg"...')
response = requests.get('https://avatars.githubusercontent.com/u/25082181')
await kvs.set_value('file.jpg', response.content)
print('Iterating over keys:')
async for key in kvs.iterate_keys():
print(f'Key: {key}')
print('Listing keys:')
keys = [key.key for key in await kvs.list_keys()]
print(f'Keys: {keys}')
for key in keys:
print(f'Getting value of {key}...')
value = await kvs.get_value(key)
print(f'Value: {str(value)[:100]}')
print('Deleting value of "file.json"...')
await kvs.delete_value('file.json')
kvs_default = await KeyValueStore.open(storage_client=storage_client)
special_key = 'key with spaces/and/slashes!@#$%^&*()'
test_value = 'Special key value'
await kvs_default.set_value(key=special_key, value=test_value)
record = await kvs_default.get_value(key=special_key)
assert record is not None
assert record == test_value
result = await kvs_default.list_keys()
print(f'kvs_default list keys = {result}')
kvs_2 = await KeyValueStore.open()
result = await kvs_2.list_keys()
print(f'kvs_2 list keys = {result}')
if __name__ == '__main__':
asyncio.run(main())
Request queue
- Properties:
idnamemetadata
- Methods:
openpurge(new method)dropadd_requestadd_requests_batched->add_requestsfetch_next_requestget_requestmark_request_as_handledreclaim_requestis_emptyis_finished
- Breaking changes:
from_storage_objectmethod has been removed - Use theopenmethod withnameoridinstead.get_info->metadatapropertystorage_object->metadatapropertyset_metadatamethod has been removed (it wasn't propage to clients)- Do we want to support it (e.g. for renaming)?
get_handled_countmethod had been removed - Usemetadata.handled_request_countinstead.get_total_countmethod has been removed - Usemetadata.total_request_countinstead.resource_directoryfrom theRequestQueueMetadatawas removed, usepath_to...property instead.RequestQueueHeadmodel has been removed - UseRequestQueueHeadWithLocksinstead.
- Notes:
- New RQ
add_requestscontainforefrontarg (Apify API supports it)
- New RQ
import asyncio
from crawlee import Request
from crawlee.configuration import Configuration
from crawlee.storage_clients import FileSystemStorageClient
from crawlee.storages import RequestQueue
async def main() -> None:
rq = await RequestQueue.open(
name='my-queue',
storage_client=FileSystemStorageClient(),
configuration=Configuration(purge_on_start=True),
)
print(f'RequestQueue: {rq}')
print(f'RequestQueue client: {rq._client}')
await rq.add_requests(
requests=[
Request.from_url('https://example.com', use_extended_unique_key=True),
Request.from_url('https://crawlee.dev', use_extended_unique_key=True),
Request.from_url('https://apify.com', use_extended_unique_key=True),
],
)
print('Requests were added to the queue')
is_empty = await rq.is_empty()
is_finished = await rq.is_finished()
print(f'Is empty: {is_empty}')
print(f'Is finished: {is_finished}')
request = await rq.fetch_next_request()
print(f'Fetched request: {request}')
await rq.add_request('https://facebook.com', forefront=True)
request = await rq.fetch_next_request()
print(f'Fetched request: {request}')
rq_default = await RequestQueue.open(
storage_client=FileSystemStorageClient(),
configuration=Configuration(purge_on_start=True),
)
await rq_default.add_request('https://example.com/1')
await rq_default.add_requests(
[
'https://example.com/priority-1',
'https://example.com/priority-2',
'https://example.com/priority-3',
]
)
await rq_default.add_request('https://example.com/2')
if __name__ == '__main__':
asyncio.run(main())
BaseDatasetClient
- Properties:
metadata
- Methods:
openpurgedroppush_dataget_dataiterate_items
BaseKeyValueStoreClient
- Properties:
metadata
- Methods:
openpurgedropget_valueset_valuedelete_valueiterate_keysget_public_url
BaseRequestQueueClient
- Properties:
metadata
- Methods:
openpurgedropadd_requests_batch->add_batch_of_requests(one backend method for 2 frontend methods)get_requestfetch_next_requestmark_request_as_handledreclaim_requestis_empty
- Models
RequestQueueHeadWithLocks->RequestQueueHeadBatchRequestsOperationResponse->AddRequestsResponse
- Notes:
- Old file system (memory) version didn't persist the in-progress requests
- Old file system (memory) version didn't persist the forefront values (now there is a FS-specific
_sequencefield in the FS Request) - The methods manipulating locks and listing heads are now only internal methods of Apify RQ client.
Issues
- Closes: #92
- Closes: #147
- Closes: #783
- Relates: #1175
- Relates: #1191
Testing
- The original tests were mostly removed and replaced with a new ones.
- Each storage-client implementation now has its own dedicated tests at the client level (more targeted/edge-case coverage).
- On top of that, there are storage-level tests that use a parametrized fixture for each storage client (
file-systemandmemory), ensuring every storage test runs against every client implementation.
Checklist
- [x] CI passed
That's excellent work!
All the feedback was addressed, including the upgrading guide. Could you guys please take a second look?
(FYI; this https://github.com/apify/crawlee-python/pull/1194/commits/9f10b955c6d8c0d82940ad1c6bec0be6d5274565 broke the docs build, @barjin will take a look at it later.)
Awesome work @vdusek! Did you get a chance to test the SDK integration tests (with these changes https://github.com/apify/apify-sdk-python/pull/470) with updated Crawlee?
I'm asking because I'd love to avoid the situation where we need to make hotfix releases after we discover that we can't make SDK work with these changes.
Benchmark
- 1000 requests to a local HTTP server.
Crawlee Py - Old memory client
- Old memory client = memory with persitence false
All runtimes:
- Run 1: 4.280095s
- Run 2: 4.388267s
- Run 3: 4.322075s
- Run 4: 4.520383s
- Run 5: 4.222991s
- Run 6: 4.359523s
- Run 7: 4.182811s
- Run 8: 4.322229s
- Run 9: 4.072315s
- Run 10: 4.032425s
Average crawler runtime: 4.270311s
Crawlee Py - Old file-system client
- Old file-system client = memory with persistence true
All runtimes:
- Run 1: 5.351646s
- Run 2: 5.934761s
- Run 3: 5.218038s
- Run 4: 4.867312s
- Run 5: 4.890084s
- Run 6: 4.935311s
- Run 7: 4.923271s
- Run 8: 4.752518s
- Run 9: 4.724725s
- Run 10: 4.865203s
Average crawler runtime: 5.046287s
Crawlee Py - New memory client
All runtimes:
- Run 1: 1.582967s
- Run 2: 1.723083s
- Run 3: 1.539048s
- Run 4: 1.622284s
- Run 5: 1.802081s
- Run 6: 1.556861s
- Run 7: 1.436224s
- Run 8: 1.635982s
- Run 9: 1.633467s
- Run 10: 1.727041s
Average crawler runtime: 1.625904s
Crawlee Py - New file-system client
All runtimes:
- Run 1: 4.299179s
- Run 2: 4.576746s
- Run 3: 4.359626s
- Run 4: 4.305971s
- Run 5: 4.480797s
- Run 6: 4.511054s
- Run 7: 4.316566s
- Run 8: 4.503595s
- Run 9: 4.378998s
- Run 10: 4.427795s
Average crawler runtime: 4.416033s
Crawlee TS - Memory client
- Memory client = memory with persistence false
All runtimes:
- Run 1: 2.268000s
- Run 2: 2.220000s
- Run 3: 2.254000s
- Run 4: 2.297000s
- Run 5: 2.283000s
- Run 6: 2.279000s
- Run 7: 2.212000s
- Run 8: 2.307000s
- Run 9: 2.127000s
- Run 10: 2.256000s
Average crawler runtime: 2.2503s
Crawlee TS - File-system client
- File-system client = memory with persistence true
All runtimes:
- Run 1: 3.446000s
- Run 2: 3.186000s
- Run 3: 3.390000s
- Run 4: 3.145000s
- Run 5: 3.102000s
- Run 6: 3.260000s
- Run 7: 3.178000s
- Run 8: 3.160000s
- Run 9: 3.369000s
- Run 10: 3.247000s
Average crawler runtime: 3.2483s
Scrapy - memory*
- Scrapy provides only in-memory storage for requests.
All runtimes:
- Run 1: 1.476767s
- Run 2: 1.451156s
- Run 3: 1.463033s
- Run 4: 1.521124s
- Run 5: 1.498111s
- Run 6: 1.494774s
- Run 7: 1.487637s
- Run 8: 1.479461s
- Run 9: 1.459569s
- Run 10: 1.447364s
Average crawler runtime: 1.477900s
Summary
| Configuration | Average Runtime (s) |
|---|---|
| Crawlee Py - Old memory client | 4.270311 |
| Crawlee Py - Old file-system client | 5.046287 |
| Crawlee Py - New memory client | 1.625904 |
| Crawlee Py - New file-system client | 4.416033 |
| Crawlee TS - Memory client | 2.2503 |
| Crawlee TS - File-system client | 3.2483 |
| Scrapy - memory* | 1.477900 |
end of an era 🎉