connect icon indicating copy to clipboard operation
connect copied to clipboard

genererating schemas from arbitrary map[string]interface{} (parquet, avro)

Open loicalleyne opened this issue 2 years ago • 8 comments

When reading from a source (ie. avro-ocf, parquet) where the reader outputs a map[string]interface{} it would be great if it wasn't necessary to redefine the exit schema but instead convert the input schema to the equivalent schema for the output writer.

Could this be done by iterating over the map and using type assertions to assemble the schema for different writers? And then perhaps configurable filters/regex for mapping field names+primitive type combinations to logical types (ie. field-name: (event[[:graph:]]*) type:INT64 logital-type:TIMESTAMP unit:MILLIS).

Use case brainstorm: data stream sinks to object storage transferring OLTP DB data to OLAP DB using federated tables (ie. BigQuery external tables) converting from row-based to column-based format

loicalleyne avatar Jul 30 '22 02:07 loicalleyne

Hey @loicalleyne, there's potential here but we'd need to be very specific about how we handle a range of edge cases.

Jeffail avatar Jul 30 '22 15:07 Jeffail

On a high level I'm thinking the first step is that output writer would need a new configuration option for schema where the writer will accept a schema passed by the input or processor preceding it. For the schema itself, are the Go native types conserved in the message?

loicalleyne avatar Jul 30 '22 22:07 loicalleyne

On a high level I'm thinking the first step is that output writer would need a new configuration option for schema where the writer will accept a schema passed by the input or processor preceding it. Are the Go native types conserved in the message? In other words, do need to parse the schema for each type of input or can we rely on the output of the reader libraries to do most of the work?

loicalleyne avatar Jul 31 '22 14:07 loicalleyne

I wrote a proof of concept that can take a map and output what I think is compliant YAML schema for the parquet encoder and also the beginning of a Avro schema to Parquet schema converter; looks ok for primitive types, logical type support would need some more work but as far as I can make out I don't think they're supported in the encoder yet. Would appreciate your feedback if you have time to look at it. https://github.com/loicalleyne/map-to-parquet-schema

loicalleyne avatar Aug 17 '22 02:08 loicalleyne

Without looking at the code, just curious, how are you planning to handle cases when you need to infer the schema for something like this:

{
  "foo": [
    1,
    {"bar": {"x": "y"}},
    2,
    "z"
  ]
}

are the Go native types conserved in the message?

Yeah, the supported types are listed here: https://www.benthos.dev/docs/guides/bloblang/methods#type. Message objects have the AsStructured() method which returns an object of type interface{} containing the structured data.

mihaitodor avatar Aug 17 '22 11:08 mihaitodor

As far as I know Parquet only supports a union of ["primitive_type", "null"] or ["null", "primitive_type"] https://github.com/apache/parquet-format/pull/44 In that scenario you provided my code would infer the following - { "name":"foo","type":"BYTE_ARRAY", repeated: true } using BYTE_ARRAY as a catch-all - I'm not sure if this works in actual practice, would have to test.

loicalleyne avatar Aug 17 '22 13:08 loicalleyne

As an example of what gets inferred from an arbitrary map, here are the input and output examples: Input:

var exampleMap = map[string]interface{}{
	"request": map[string]interface{}{
		"datetime":    "2021-07-27 02:59:59",
		"ip":          "172.222.233.111",
		"host":        "www.domain.com",
		"uri":         "/api/v1/",
		"request_uri": "/api/v1/",
		"referer":     "",
		"useragent":   "",
	},
	"metadata": map[string]interface{}{
		"compression_ratio": 1,
		"coroutine_uuid":    "46c45675-5c10-4094-a124-f8615f2b10db",
		"hostname":          "server.domain.com",
	},
	"incoming_request": map[string]interface{}{
		"id":                "1d25bb6a-3e09-4a8e-8b0f-93a0e9383904",
		"external_id":       "afwef",
		"direct_connection": true,
		"remote_ip":         "67.49.160.53",
	},
	"entity": map[string]interface{}{
		"id":                      39,
		"api_key":                 "1d6492bb-490e-4cb9-1d24-8wr4db8d07e7",
		"tracking_protocol":       "emulator",
		"event_proxy":             nil,
		"feature_support_enabled": true,
		"primary_currency":        "EUR",
		"rate_percent":            10,
	},
	"screen": map[string]interface{}{
		"id": 86233,
		"ids": []interface{}{
			132, 453535, 13412341,
		},
		"external_id": "com.domain:215426709",
		"width":       1080,
		"height":      1920,
	},
	"geo": map[string]interface{}{
		"ip":        "0.0.0.0",
		"country":   "NL",
		"region":    "06",
		"city":      "Noordhoek",
		"dma":       "1234",
		"zip":       "1345-A",
		"latitude":  51.958441162109375,
		"longitude": 5.8780440068664551,
	},
	"schedule": map[string]interface{}{
		"id": 18604307,
	},
	"audience": map[string]interface{}{
		"origin":    "file",
		"SOMEFLOAT": 5.255000114440918,
	},
	"provider": map[string]interface{}{
		"id":               42,
		"protocol":         "openrtb",
		"feature1_enabled": false,
		"feature2_enabled": true,
		"primary_currency": "USD",
	},
	"outgoing_request": map[string]interface{}{
		"string_array": []interface{}{
			"P0OW42XDHA",
			"KLTFJYG9FX",
		},
		"id": "9abbb401-d29a-4025-bd45-416b2ebf13e3",
	},
}

Output:

- name: screen
  optional: true
  fields:
  - { "name":"ids","type":"BYTE_ARRAY", repeated: true }
  - { "name":"external_id","type":"BYTE_ARRAY" }
  - { "name":"width","type":"INT32" }
  - { "name":"height","type":"INT32" }
  - { "name":"id","type":"INT32" }
- name: geo
  optional: true
  fields:
  - { "name":"zip","type":"BYTE_ARRAY" }
  - { "name":"latitude","type":"DOUBLE" }
  - { "name":"longitude","type":"DOUBLE" }
  - { "name":"ip","type":"BYTE_ARRAY" }
  - { "name":"country","type":"BYTE_ARRAY" }
  - { "name":"region","type":"BYTE_ARRAY" }
  - { "name":"city","type":"BYTE_ARRAY" }
  - { "name":"dma","type":"BYTE_ARRAY" }
- name: outgoing_request
  optional: true
  fields:
  - { "name":"id","type":"BYTE_ARRAY" }
  - { "name":"string_array","type":"BYTE_ARRAY", repeated: true }
- name: request
  optional: true
  fields:
  - { "name":"useragent","type":"BYTE_ARRAY" }
  - { "name":"datetime","type":"BYTE_ARRAY" }
  - { "name":"ip","type":"BYTE_ARRAY" }
  - { "name":"host","type":"BYTE_ARRAY" }
  - { "name":"uri","type":"BYTE_ARRAY" }
  - { "name":"request_uri","type":"BYTE_ARRAY" }
  - { "name":"referer","type":"BYTE_ARRAY" }
- name: metadata
  optional: true
  fields:
  - { "name":"compression_ratio","type":"INT32" }
  - { "name":"coroutine_uuid","type":"BYTE_ARRAY" }
  - { "name":"hostname","type":"BYTE_ARRAY" }
- name: incoming_request
  optional: true
  fields:
  - { "name":"direct_connection","type":"BOOLEAN" }
  - { "name":"remote_ip","type":"BYTE_ARRAY" }
  - { "name":"id","type":"BYTE_ARRAY" }
  - { "name":"external_id","type":"BYTE_ARRAY" }
- name: entity
  optional: true
  fields:
  - { "name":"event_proxy","type":"BYTE_ARRAY" }
  - { "name":"id","type":"INT32" }
  - { "name":"api_key","type":"BYTE_ARRAY" }
  - { "name":"tracking_protocol","type":"BYTE_ARRAY" }
  - { "name":"feature_support_enabled","type":"BOOLEAN" }
  - { "name":"primary_currency","type":"BYTE_ARRAY" }
  - { "name":"rate_percent","type":"INT32" }
- name: schedule
  optional: true
  fields:
  - { "name":"id","type":"INT32" }
- name: audience
  optional: true
  fields:
  - { "name":"origin","type":"BYTE_ARRAY" }
  - { "name":"SOMEFLOAT","type":"DOUBLE" }
- name: provider
  optional: true
  fields:
  - { "name":"feature1_enabled","type":"BOOLEAN" }
  - { "name":"feature2_enabled","type":"BOOLEAN" }
  - { "name":"primary_currency","type":"BYTE_ARRAY" }
  - { "name":"id","type":"INT32" }
  - { "name":"protocol","type":"BYTE_ARRAY" }

loicalleyne avatar Aug 17 '22 13:08 loicalleyne

Just saw https://github.com/benthosdev/benthos/commit/07ed81b150778a362e25e52428c59a05ca21369b, I'd like to work on enabling parquet output without having to specify a schema in the config file - I was wondering:

If input messages are structured, is the structure guaranteed to stay the same across all messages from that input? Does it depend on the input source?

Is it preferable to get an AVRO or other schema from the input and pass that as metadata to be dynamically converted to parquet schema? If so, in the case where there's a Bloblang processor, would the metadata of the input be mutated to add new mappings or to change field type?

My standalone parquet YAML schema generator proof of concept tries to cover both the arbitrary map and the defined schema scenarios, which one dovetails better with the way Benthos works?

loicalleyne avatar Aug 31 '22 17:08 loicalleyne