nodestream
nodestream copied to clipboard
[REQUEST] Record Schema Inference and Enforcement
Is your feature request related to a problem? Please describe. When you aggregate data from many data sources contributed by many teams, its possible to have a schema that is changed underneath you. When this happens and you run with a TTL system, its possible to only notice this when things expire.
Describe the solution you'd like
Basic API
Introduce aFilter
type that is capable of inferring and enforcing the record schema.
filters:
- implementation: nodestream.filters:SchemaEnforcement
arguments:
mode: "ENFORCE" # One of ENFORCE, WARN, INFER
storage:
location: s3
bucket: my-awesome-s3-schema-bucket
key: schemas/pipelines/my-schema-for-this-cool-pipeline.json
inference: # only used when mode is INFER
sample_size: 10000
Implementation Details
- We can validate schemas with the
jsonschema
library. - Enforcement mode with filter out the offending record and produce two log messages. Oe that there was a filter at the
ERROR
log level and one containing the filtered record at theDEBUG
level. - Warn mode will not filter anything, but will produce two log messages when something does not match the schema. One that there was a violation at the
WARN
log level and one containing the would be filtered record at theDEBUG
level. -
INFER
mode will accumulatesample_size
records and. then generate a schema using thegenson
library. - We can decouple storage modes from the core filter. Provide a File System and S3 Variant.
Describe alternatives you've considered N/A
Additional context N/A