set_types memory usage with extremely wide data
Hey,
I have a strange dataset here that has 2500 columns and only 60 rows. The set_types processor slowly gobbles up all of the memory when called with all 2500 columns.
Here's the data: Coral%20ESVs_Galaxaura.xlsx
And the pipeline-spec.yaml: pipeline-spec.yaml.txt
Note the recursion limit parameter which does not exist in the standard load processor - You will probably have to set your python recursion limit somewhere:
import sys
sys.setrecursionlimit(_recursion_limit)
I think the main problem is that it is creating ~2500 set_type flows, which for some reason uses up a ton of memory. I'm guessing if there was a single set_types flow in dataflows then calling it wouldn't run out of memory.
@roll
My new running theory is that the problem is that for every field passed into the set_types processor, the entire resource (including every field) is validated by the set_type flow. So you end up with (w * h) * w operations.
edit: Maybe that's wrong, as I see now that only the single field that is updated is being validated...
I created this custom processor that seems to fix the issue:
from dataflows import Flow, PackageWrapper, schema_validator
from dataflows.helpers.resource_matcher import ResourceMatcher
from datapackage_pipelines.wrapper import ingest
from datapackage_pipelines.utilities.flow_utils import spew_flow
import re
def set_types(parameters, resources=None, regex=None, types={}):
def func(package: PackageWrapper):
matcher = ResourceMatcher(resources, package.pkg)
for resource in package.pkg.descriptor["resources"]:
if matcher.match(resource["name"]):
fields = resource["schema"]["fields"]
for name, options in types.items():
if not regex:
name = re.escape(name)
name = re.compile(f"^{name}$")
for field in fields:
if name.match(field["name"]):
field.update(options)
yield package.pkg
for rows in package:
if matcher.match(rows.res.name):
yield schema_validator(rows.res, rows)
else:
yield rows
yield from package
return func
def flow(parameters):
resources = parameters.get("resources", None)
regex = parameters.get("regex", True)
types = parameters.get("types", {})
return Flow(set_types(parameters, resources=resources, regex=regex, types=types))
if __name__ == "__main__":
with ingest() as ctx:
spew_flow(flow(ctx.parameters), ctx)
Hi @cschloer, would you like to PR?
Sure - though it would require a PR in both dataflows and datapackage_pipelines. @akariv is there any reason you only implemented the singular set_type in data flows instead of a set_types?
I would add an 'update_field' method (to complement the 'update_package', 'update_resource' and 'update_schema'). Then your flow would contain multiple 'update_field' calls and a singular 'validate' at the end. Nevertheless, I think that for 2500 consecutive columns (definitely an unusual case :) ) a processor like your might be a better solution.
On Mon, Jul 13, 2020 at 4:02 PM Conrad Schloer [email protected] wrote:
Sure - though it would require a PR in both dataflows and datapackage_pipelines. @akariv https://github.com/akariv is there any reason you only implemented the singular set_type in data flows instead of a set_types?
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/frictionlessdata/datapackage-pipelines/issues/193#issuecomment-657546620, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACAY5NK76GKQV7SMRGZXM3R3MAUPANCNFSM4OYMINNQ .
OK - should I hold off on creating a PR then, since this is specific to the unusual case of having 2500 columns?