nodestream icon indicating copy to clipboard operation
nodestream copied to clipboard

[REQUEST] Record Schema Inference and Enforcement

Open zprobst opened this issue 1 year ago • 1 comments

Is your feature request related to a problem? Please describe. When you aggregate data from many data sources contributed by many teams, its possible to have a schema that is changed underneath you. When this happens and you run with a TTL system, its possible to only notice this when things expire.

Describe the solution you'd like

Basic API

Introduce aFilter type that is capable of inferring and enforcing the record schema.

filters:
 - implementation: nodestream.filters:SchemaEnforcement
   arguments:
      mode: "ENFORCE" # One of ENFORCE, WARN, INFER
      storage:
         location: s3
         bucket: my-awesome-s3-schema-bucket
         key: schemas/pipelines/my-schema-for-this-cool-pipeline.json
      inference: # only used when mode is INFER
         sample_size: 10000
Implementation Details
  • We can validate schemas with the jsonschema library.
  • Enforcement mode with filter out the offending record and produce two log messages. Oe that there was a filter at the ERROR log level and one containing the filtered record at the DEBUG level.
  • Warn mode will not filter anything, but will produce two log messages when something does not match the schema. One that there was a violation at the WARN log level and one containing the would be filtered record at the DEBUG level.
  • INFER mode will accumulate sample_size records and. then generate a schema using the genson library.
  • We can decouple storage modes from the core filter. Provide a File System and S3 Variant.

Describe alternatives you've considered N/A

Additional context N/A

zprobst avatar Jun 23 '23 16:06 zprobst