data-prepper
data-prepper copied to clipboard
Parse CSV or TSV content in Events
Some Events will have a single line from a comma-separated value (CSV) or tab-separated value (TSV) file. Data Prepper should be able to parse an individual CSV line and add fields to the Event for that CSV line.
Pipeline authors should have the option to have the each CSV/TSV line output either:
- Specific key names from column indices (e.g. "column 2 maps to status code")
- Output the columns into an array based on the index
- [x] #1613
- [x] #1614
- [x] #1615
- [x] #1616
- [x] #1617
- [ ] #1618
- [x] #1619
Request to add support to automatically detect the format (fields) based on heading columns for CSV or TSV.
For example, I have below CSV (delimited by space) content as the source:
key value
a b
c d
e f
The expected result is output each lines to destination.
[
{"key": "a", "value": "b"}
{"key": "c", "value": "d"}
{"key": "e", "value": "f"}
]
I should just need to specify 'header' (true or false) and 'delimiter'. If header is not included in the data, I should be able to specify heading columns somewhere else.
An example of such use case is to process VPC flow logs, customer can customize what columns are included, and the log file will have a heading row included.
@daixba, I do believe that could be a useful extension.
Regarding VPC Flow Logs, we expect that these will come from the S3 Source which is discussed in #251 . As currently proposed, the S3 Source would parse each line from the S3 object and create an individual record which it then sends to downstream processors. Do you believe it would be better to have the whole object sent and then break apart the object downstream? (This might require a change to the current Event model).
@dlvenable I think both ways work for me. It makes more sense to me to parse each line in the source as in #251 . At least there is no need to deal with S3 in the processor, and not to mention the whole s3 object could be huge (~GB after decompresed).
But not sure if it's possible to have a special codec just for csv or tsv with options such as 'header', 'delimiter' or 'heading' columns, then use a parse_json
to parse the event message (each line) maybe.
source:
s3:
notification_type: sqs
sqs:
queue_url: "https://sqs.us-east-2.amazonaws.com/123456789012/MyS3EventQueue"
codec:
csv:
header: true
delimiter: ' '
processor:
...
If the s3 source only needs to pass the raw message (each line) to downstream processors, then the downstream processors (csv/tsv) should have similar options.
An extra question is that whether there is a need to make the csv/tsv support more generic and works in other sources like file
too?
@daixba , Regarding the S3 Events, it may be reasonable to include the CSV header in each Event created. This way, the header is consider context for future processing.
The CSV processor could then be configured to read the headers from a given field in the Event.
Example config:
source:
s3:
notification_type: sqs
sqs:
queue_url: "https://sqs.us-east-2.amazonaws.com/123456789012/MyS3EventQueue"
codec:
newline:
header_field: myHeader # Tells S3 Source to include the header in the myHeader field for all events
processor:
- csv:
source: message
header_source: myHeader # Tells CSV processor to get the header from the field myHeader
Let's say we have a CSV file that looks like the following:
id name email
123 Bob [email protected]
456 Max [email protected]
The S3 Source would output two Events:
{
"message": "123 Bob [email protected]"
"myHeader" : "id name email"
}
{
"message": "456 Max [email protected]"
"myHeader" : "id name email"
}
I'll work on this.
Hello, here are the proposed configuration options for the CSV processor:
source
— The field in the Event that will be parsed.
- Default:
message
quote_character
— The character used to escape quotes within a field.
- Default: Double quote (
"
)
delimiter
— The character separating each column.
- Default:
,
delete_header
(Optional, boolean) — If specified, the header on the Event (column_names_source_key) will be deleted after the Event is parsed. If there’s no header on the Event then nothing will happen.
- Default:
true
column_names_source_key
— (Optional) The field in the Event that specifies the CSV column names, which will be autodetected. If there must be extra column names, they will be autogenerated according to their index.
- There is no default.
- Note: If
column_names
is also defined, the header incolumn_names_source_key
will be used to generate the Event fields as long as the header source key is present in the Event. - Note: If too few columns are specified in this field, the remaining column names will be autogenerated (not taken from
column_names
). If too many column names are specified in this field, then the extra column names will be omitted.
column_names
— (Optional) User-specified names for the CSV columns.
- Default:
[column1, column2, ..., column{N}]
if there are N columns of data in the CSV record andcolumn_names_source_key
is not defined. - Note: If
column_names_source_key
is defined and the header source key is present on the Event, the header incolumn_names_source_key
will be used to generate the Event fields. - Note: If too few columns are specified in this field, the remaining column names will be autogenerated. If too many column names are specified in this field, then the extra column names will be omitted.
Here are two configurations that illustrate the behavior of the CSV processor:
Below: Example configuration reading CSV from S3 Source with modifications to newline config.
csv-pipeline:
source:
s3:
...
codec:
newline:
header_destination: "column_names"
processor:
- csv:
column_names_source_key: "column_names"
...
Pseudocode of an Event passing through a CSV Processor with this configuration.
{Event: {"message": "sample_name, sample_date,50",
"column_names": ["name", "date", "age"]}}
----(CSV-processor)----->
{Event: {"message": "sample_name, sample_date,50",
"column_names": ["name", "date", "age"],
"name": "sample_name",
"date": " sample_date",
"age": "50" }
}
Below: Config with a file source and manually specified columns (with column autogeneration for the last column).
csv-pipeline:
source:
file:
...
processor:
- csv:
column_names: ["name","date"]
...
{Event: {"message": "sample_name, sample_date,50"}}
----(CSV-processor)----->
{Event: {"message": "sample_name, sample_date,50",
"name": "sample_name",
"date": " sample_date",
"column2": "50" }
}
We also plan to add a CSV codec to the S3 Source down the line.
Anyone should feel free to comment on this issue with any input, questions, or concerns about the CSV processor.
Note: If column_names_source_key is defined, the header in column_names_source_key will be used to generate the Event fields.
I think this is good, but I might expand upon it some: If the column_names_source_key is defined and the source key is present in the Event, the header in column_names_source_key will be used to generate the Event fields. The italics are the changes I recommend.
In this approach, the column_names
could act as a fallback for any given Event which lacks the source key.
Thoughts on this approach?
I agree with this approach and I'll add your expansion to the comment.
Another question about using column_names
as a fallback is the case where a column_names_source_key
is defined and the source key is present in the Event, but the header on the Event has too few columns. Right now, we autogenerate the extra columns, but this could be changed. We could get some extra columns from the end of column_names
as long as column_names
contains more columns than the source key (and then autogenerate any remaining columns).
Thoughts on adding this implementation detail?
I split out #1618 from this task. We can complete the CSV processor without it for Data Prepper 2.0 and add conversion later.