connect
connect copied to clipboard
genererating schemas from arbitrary map[string]interface{} (parquet, avro)
When reading from a source (ie. avro-ocf, parquet) where the reader outputs a map[string]interface{} it would be great if it wasn't necessary to redefine the exit schema but instead convert the input schema to the equivalent schema for the output writer.
Could this be done by iterating over the map and using type assertions to assemble the schema for different writers? And then perhaps configurable filters/regex for mapping field names+primitive type combinations to logical types (ie. field-name: (event[[:graph:]]*) type:INT64 logital-type:TIMESTAMP unit:MILLIS).
Use case brainstorm: data stream sinks to object storage transferring OLTP DB data to OLAP DB using federated tables (ie. BigQuery external tables) converting from row-based to column-based format
Hey @loicalleyne, there's potential here but we'd need to be very specific about how we handle a range of edge cases.
On a high level I'm thinking the first step is that output writer would need a new configuration option for schema where the writer will accept a schema passed by the input or processor preceding it. For the schema itself, are the Go native types conserved in the message?
On a high level I'm thinking the first step is that output writer would need a new configuration option for schema where the writer will accept a schema passed by the input or processor preceding it. Are the Go native types conserved in the message? In other words, do need to parse the schema for each type of input or can we rely on the output of the reader libraries to do most of the work?
I wrote a proof of concept that can take a map and output what I think is compliant YAML schema for the parquet encoder and also the beginning of a Avro schema to Parquet schema converter; looks ok for primitive types, logical type support would need some more work but as far as I can make out I don't think they're supported in the encoder yet. Would appreciate your feedback if you have time to look at it. https://github.com/loicalleyne/map-to-parquet-schema
Without looking at the code, just curious, how are you planning to handle cases when you need to infer the schema for something like this:
{
"foo": [
1,
{"bar": {"x": "y"}},
2,
"z"
]
}
are the Go native types conserved in the message?
Yeah, the supported types are listed here: https://www.benthos.dev/docs/guides/bloblang/methods#type. Message objects have the AsStructured()
method which returns an object of type interface{}
containing the structured data.
As far as I know Parquet only supports a union of ["primitive_type", "null"] or ["null", "primitive_type"]
https://github.com/apache/parquet-format/pull/44
In that scenario you provided my code would infer the following
- { "name":"foo","type":"BYTE_ARRAY", repeated: true }
using BYTE_ARRAY as a catch-all - I'm not sure if this works in actual practice, would have to test.
As an example of what gets inferred from an arbitrary map, here are the input and output examples: Input:
var exampleMap = map[string]interface{}{
"request": map[string]interface{}{
"datetime": "2021-07-27 02:59:59",
"ip": "172.222.233.111",
"host": "www.domain.com",
"uri": "/api/v1/",
"request_uri": "/api/v1/",
"referer": "",
"useragent": "",
},
"metadata": map[string]interface{}{
"compression_ratio": 1,
"coroutine_uuid": "46c45675-5c10-4094-a124-f8615f2b10db",
"hostname": "server.domain.com",
},
"incoming_request": map[string]interface{}{
"id": "1d25bb6a-3e09-4a8e-8b0f-93a0e9383904",
"external_id": "afwef",
"direct_connection": true,
"remote_ip": "67.49.160.53",
},
"entity": map[string]interface{}{
"id": 39,
"api_key": "1d6492bb-490e-4cb9-1d24-8wr4db8d07e7",
"tracking_protocol": "emulator",
"event_proxy": nil,
"feature_support_enabled": true,
"primary_currency": "EUR",
"rate_percent": 10,
},
"screen": map[string]interface{}{
"id": 86233,
"ids": []interface{}{
132, 453535, 13412341,
},
"external_id": "com.domain:215426709",
"width": 1080,
"height": 1920,
},
"geo": map[string]interface{}{
"ip": "0.0.0.0",
"country": "NL",
"region": "06",
"city": "Noordhoek",
"dma": "1234",
"zip": "1345-A",
"latitude": 51.958441162109375,
"longitude": 5.8780440068664551,
},
"schedule": map[string]interface{}{
"id": 18604307,
},
"audience": map[string]interface{}{
"origin": "file",
"SOMEFLOAT": 5.255000114440918,
},
"provider": map[string]interface{}{
"id": 42,
"protocol": "openrtb",
"feature1_enabled": false,
"feature2_enabled": true,
"primary_currency": "USD",
},
"outgoing_request": map[string]interface{}{
"string_array": []interface{}{
"P0OW42XDHA",
"KLTFJYG9FX",
},
"id": "9abbb401-d29a-4025-bd45-416b2ebf13e3",
},
}
Output:
- name: screen
optional: true
fields:
- { "name":"ids","type":"BYTE_ARRAY", repeated: true }
- { "name":"external_id","type":"BYTE_ARRAY" }
- { "name":"width","type":"INT32" }
- { "name":"height","type":"INT32" }
- { "name":"id","type":"INT32" }
- name: geo
optional: true
fields:
- { "name":"zip","type":"BYTE_ARRAY" }
- { "name":"latitude","type":"DOUBLE" }
- { "name":"longitude","type":"DOUBLE" }
- { "name":"ip","type":"BYTE_ARRAY" }
- { "name":"country","type":"BYTE_ARRAY" }
- { "name":"region","type":"BYTE_ARRAY" }
- { "name":"city","type":"BYTE_ARRAY" }
- { "name":"dma","type":"BYTE_ARRAY" }
- name: outgoing_request
optional: true
fields:
- { "name":"id","type":"BYTE_ARRAY" }
- { "name":"string_array","type":"BYTE_ARRAY", repeated: true }
- name: request
optional: true
fields:
- { "name":"useragent","type":"BYTE_ARRAY" }
- { "name":"datetime","type":"BYTE_ARRAY" }
- { "name":"ip","type":"BYTE_ARRAY" }
- { "name":"host","type":"BYTE_ARRAY" }
- { "name":"uri","type":"BYTE_ARRAY" }
- { "name":"request_uri","type":"BYTE_ARRAY" }
- { "name":"referer","type":"BYTE_ARRAY" }
- name: metadata
optional: true
fields:
- { "name":"compression_ratio","type":"INT32" }
- { "name":"coroutine_uuid","type":"BYTE_ARRAY" }
- { "name":"hostname","type":"BYTE_ARRAY" }
- name: incoming_request
optional: true
fields:
- { "name":"direct_connection","type":"BOOLEAN" }
- { "name":"remote_ip","type":"BYTE_ARRAY" }
- { "name":"id","type":"BYTE_ARRAY" }
- { "name":"external_id","type":"BYTE_ARRAY" }
- name: entity
optional: true
fields:
- { "name":"event_proxy","type":"BYTE_ARRAY" }
- { "name":"id","type":"INT32" }
- { "name":"api_key","type":"BYTE_ARRAY" }
- { "name":"tracking_protocol","type":"BYTE_ARRAY" }
- { "name":"feature_support_enabled","type":"BOOLEAN" }
- { "name":"primary_currency","type":"BYTE_ARRAY" }
- { "name":"rate_percent","type":"INT32" }
- name: schedule
optional: true
fields:
- { "name":"id","type":"INT32" }
- name: audience
optional: true
fields:
- { "name":"origin","type":"BYTE_ARRAY" }
- { "name":"SOMEFLOAT","type":"DOUBLE" }
- name: provider
optional: true
fields:
- { "name":"feature1_enabled","type":"BOOLEAN" }
- { "name":"feature2_enabled","type":"BOOLEAN" }
- { "name":"primary_currency","type":"BYTE_ARRAY" }
- { "name":"id","type":"INT32" }
- { "name":"protocol","type":"BYTE_ARRAY" }
Just saw https://github.com/benthosdev/benthos/commit/07ed81b150778a362e25e52428c59a05ca21369b, I'd like to work on enabling parquet output without having to specify a schema in the config file - I was wondering:
If input messages are structured, is the structure guaranteed to stay the same across all messages from that input? Does it depend on the input source?
Is it preferable to get an AVRO or other schema from the input and pass that as metadata to be dynamically converted to parquet schema? If so, in the case where there's a Bloblang processor, would the metadata of the input be mutated to add new mappings or to change field type?
My standalone parquet YAML schema generator proof of concept tries to cover both the arbitrary map and the defined schema scenarios, which one dovetails better with the way Benthos works?