data-prepper icon indicating copy to clipboard operation
data-prepper copied to clipboard

Parse CSV or TSV content in Events

Open dlvenable opened this issue 3 years ago • 8 comments

Some Events will have a single line from a comma-separated value (CSV) or tab-separated value (TSV) file. Data Prepper should be able to parse an individual CSV line and add fields to the Event for that CSV line.

Pipeline authors should have the option to have the each CSV/TSV line output either:

  • Specific key names from column indices (e.g. "column 2 maps to status code")
  • Output the columns into an array based on the index
  • [x] #1613
  • [x] #1614
  • [x] #1615
  • [x] #1616
  • [x] #1617
  • [ ] #1618
  • [x] #1619

dlvenable avatar Feb 22 '22 17:02 dlvenable

Request to add support to automatically detect the format (fields) based on heading columns for CSV or TSV.

For example, I have below CSV (delimited by space) content as the source:

key value
a b
c d
e f

The expected result is output each lines to destination.

[
{"key": "a", "value": "b"}
{"key": "c", "value": "d"}
{"key": "e", "value": "f"}
]

I should just need to specify 'header' (true or false) and 'delimiter'. If header is not included in the data, I should be able to specify heading columns somewhere else.

An example of such use case is to process VPC flow logs, customer can customize what columns are included, and the log file will have a heading row included.

daixba avatar May 11 '22 09:05 daixba

@daixba, I do believe that could be a useful extension.

Regarding VPC Flow Logs, we expect that these will come from the S3 Source which is discussed in #251 . As currently proposed, the S3 Source would parse each line from the S3 object and create an individual record which it then sends to downstream processors. Do you believe it would be better to have the whole object sent and then break apart the object downstream? (This might require a change to the current Event model).

dlvenable avatar May 11 '22 21:05 dlvenable

@dlvenable I think both ways work for me. It makes more sense to me to parse each line in the source as in #251 . At least there is no need to deal with S3 in the processor, and not to mention the whole s3 object could be huge (~GB after decompresed).

But not sure if it's possible to have a special codec just for csv or tsv with options such as 'header', 'delimiter' or 'heading' columns, then use a parse_json to parse the event message (each line) maybe.

source:
  s3:
    notification_type: sqs
    sqs:
      queue_url: "https://sqs.us-east-2.amazonaws.com/123456789012/MyS3EventQueue"
  codec:
    csv:
      header: true
      delimiter: ' '
processor:
...

If the s3 source only needs to pass the raw message (each line) to downstream processors, then the downstream processors (csv/tsv) should have similar options.

An extra question is that whether there is a need to make the csv/tsv support more generic and works in other sources like file too?

daixba avatar May 12 '22 01:05 daixba

@daixba , Regarding the S3 Events, it may be reasonable to include the CSV header in each Event created. This way, the header is consider context for future processing.

The CSV processor could then be configured to read the headers from a given field in the Event.

Example config:

source:
  s3:
    notification_type: sqs
    sqs:
      queue_url: "https://sqs.us-east-2.amazonaws.com/123456789012/MyS3EventQueue"
  codec:
    newline:
      header_field: myHeader      # Tells S3 Source to include the header in the myHeader field for all events
processor:
  - csv:
       source: message
       header_source: myHeader       # Tells CSV processor to get the header from the field myHeader

Let's say we have a CSV file that looks like the following:

id name email
123 Bob [email protected]
456 Max [email protected]

The S3 Source would output two Events:

{
  "message": "123 Bob [email protected]"
  "myHeader" : "id name email"
}
{
  "message": "456 Max [email protected]"
  "myHeader" : "id name email"
}

dlvenable avatar Jun 29 '22 20:06 dlvenable

I'll work on this.

finnroblin avatar Jul 19 '22 15:07 finnroblin

Hello, here are the proposed configuration options for the CSV processor:

source — The field in the Event that will be parsed.

  • Default: message

quote_character — The character used to escape quotes within a field.

  • Default: Double quote (")

delimiter — The character separating each column.

  • Default: ,

delete_header (Optional, boolean) — If specified, the header on the Event (column_names_source_key) will be deleted after the Event is parsed. If there’s no header on the Event then nothing will happen.

  • Default: true

column_names_source_key — (Optional) The field in the Event that specifies the CSV column names, which will be autodetected. If there must be extra column names, they will be autogenerated according to their index.

  • There is no default.
  • Note: If column_names is also defined, the header in column_names_source_key will be used to generate the Event fields as long as the header source key is present in the Event.
  • Note: If too few columns are specified in this field, the remaining column names will be autogenerated (not taken from column_names). If too many column names are specified in this field, then the extra column names will be omitted.

column_names — (Optional) User-specified names for the CSV columns.

  • Default: [column1, column2, ..., column{N}] if there are N columns of data in the CSV record and column_names_source_key is not defined.
  • Note: If column_names_source_key is defined and the header source key is present on the Event, the header in column_names_source_key will be used to generate the Event fields.
  • Note: If too few columns are specified in this field, the remaining column names will be autogenerated. If too many column names are specified in this field, then the extra column names will be omitted.

Here are two configurations that illustrate the behavior of the CSV processor:

Below: Example configuration reading CSV from S3 Source with modifications to newline config.

csv-pipeline:
  source:
    s3:
      ...
      codec:
        newline:
          header_destination: "column_names"
  processor:
    - csv:
        column_names_source_key: "column_names"
  ...   

Pseudocode of an Event passing through a CSV Processor with this configuration.

{Event: {"message": "sample_name, sample_date,50", 
        "column_names": ["name", "date", "age"]}}
        ----(CSV-processor)----->
{Event: {"message": "sample_name, sample_date,50", 
        "column_names": ["name", "date", "age"], 
        "name": "sample_name",
        "date": " sample_date",
        "age": "50" }
}

Below: Config with a file source and manually specified columns (with column autogeneration for the last column).

csv-pipeline:
  source:
    file:
      ...
  processor:
    - csv:
        column_names: ["name","date"]
  ...   
{Event: {"message": "sample_name, sample_date,50"}}
        ----(CSV-processor)----->
{Event: {"message": "sample_name, sample_date,50",
        "name": "sample_name",
        "date": " sample_date",
        "column2": "50" }
}        

We also plan to add a CSV codec to the S3 Source down the line.

Anyone should feel free to comment on this issue with any input, questions, or concerns about the CSV processor.

finnroblin avatar Jul 28 '22 18:07 finnroblin

Note: If column_names_source_key is defined, the header in column_names_source_key will be used to generate the Event fields.

I think this is good, but I might expand upon it some: If the column_names_source_key is defined and the source key is present in the Event, the header in column_names_source_key will be used to generate the Event fields. The italics are the changes I recommend.

In this approach, the column_names could act as a fallback for any given Event which lacks the source key.

Thoughts on this approach?

dlvenable avatar Jul 29 '22 14:07 dlvenable

I agree with this approach and I'll add your expansion to the comment.

Another question about using column_names as a fallback is the case where a column_names_source_key is defined and the source key is present in the Event, but the header on the Event has too few columns. Right now, we autogenerate the extra columns, but this could be changed. We could get some extra columns from the end of column_names as long as column_names contains more columns than the source key (and then autogenerate any remaining columns).

Thoughts on adding this implementation detail?

finnroblin avatar Jul 29 '22 15:07 finnroblin

I split out #1618 from this task. We can complete the CSV processor without it for Data Prepper 2.0 and add conversion later.

dlvenable avatar Sep 13 '22 16:09 dlvenable