openverse
openverse copied to clipboard
Add filter data step to the data refresh DAG
Description
In addition to the steps outlined in #4149 and #4146, we also need to add the Filter Data
step to the data refresh DAG. Initially, this should be a single python task which exactly mirrors the behavior of the clean_image_data
function of cleanup.py
(only applying the tag-specific steps) on the ingestion server[3]. The easiest way to do this would be to directly map the functionality of the ingestion server on this step within a single Airflow task. The steps for this task are as follows:
- Get a batch of records from the database using
CLEANUP_BUFFER_SIZE
- Divide batch up into
multiprocessing.cpu_count()
subbatches - Split the filtering up into separate workers using multiprocessing
- On each process
- Create a new DB connection & cursor per worker
- Iterate through each record
- Remove tags below confidence level
- Remove tags that need to be filtered (denylisted, machine-generated filter list, provider, etc)
- Only surface the record if it needs to be changed
- Update each records one by one with a single
UPDATE
- Commit cursor and close connection
- Repeat steps 1-4 until all batches are consumed
Alternatives
See Alternatives in the IP for possible future directions.
There are a number of ways to accomplish the data filtering, including several ways to improve the approach mentioned.
The Airflow scheduler container has access to 4 cores, which is the same as the ingestion server where this step was originally running. At present, it takes about 8 hours for all cleanup steps, but that includes the URL cleaning which is certainly more time intensive than the tag filtering since it makes outbound requests. Running the tag filtering on Airflow should not impact any of the other running tasks or saturate the instance.
There are a few ways this process could be improved, but none of them are required at this moment. We can follow up after this project is complete to assess what further optimizations might be necessary at this step. Some potential suggestions for that time:
-
Instead of single
UPDATE
queries for each affected records, we could insert records from each subbatch to a temporary table. Then the base table could be updated with anUPDATE ... FROM
in bulk. Since the indices haven't been applied to the base table yet, this should be fairly performant. -
Instead of using multiprocessing, we could pre-define the batches and run the filtering chunks on a set of mapped tasks. The multiprocessing has the benefit of iterating over a cursor on the database rather than having to manage the record ranges explicitly, but this would allow further parallelization and task management.
-
The indexer workers themselves could be expanded to run on larger chunks of the database for this filtering. This would likely require the most work as it would involve expanding the indexer workers' API to handle this additional task.
Additional context
See this section of the IP.