influxdb icon indicating copy to clipboard operation
influxdb copied to clipboard

Flux Query - Parsing Annotated CSV is to convoluted

Open arialpew opened this issue 3 years ago • 0 comments

When InfluxDB process a Flux Query, the response is returned in annotated CSV.

The format is well described in InfluxDB documentation. You can expect to have many tables, with many rows. Each table consist of annotations (headers) and datas. Table are often named "serie" in the context of user. Each table have a layout.

Sometime InfluxDB "merge" / "coerce" two tables that have the same layout in the annotated CSV response.

Due to that side effect of "merge" / "coerce" tables, parsing Annotated CSV is a nightmare since there's now 2 way that you can have the table ID increase. Not only you need to lookup at the "table" counter to know at which table you are, but you need to be very carefull if InfluxDB "merge" / "coerce" tables because your iterator need to re-use the same annotations as before.

Why is this necessary ? It save some space for sure when transmitting data over the network, but it make parsing of annotated CSV way more complex and error prone.

Even if both table have the same layout, they don't belongs to the same serie because the table ID is different. At least, you could provide an option to disable this behavior of "merge" / "coerce" table, at the cost of increasing network bandwidth - it make the format more consistent for the end user and faster to parse.

impl TryFrom<String> for QueryResponse {
    type Error = InfluxClientError;

    fn try_from(response: String) -> Result<Self, Self::Error> {
        let mut series = Vec::new();

        // Each table carry a header with all columns and their datatypes.
        // To know at which table we are, there's a "table" column.
        // Each table map to a serie.
        //
        // With Flux, you can use groupping and pivot to group/ungroup series.
        // That mean we can't really make any assumption about the order of columns, even the InfluxDB system columns that start with underscore.
        //
        // The only assumption we can use is that each table have a separate unique ID. We should traverse each tables and create a serie for each.
        // As long as the ID don't change, we can keep dataframe into the same serie.
        // Once the table ID change, even if we are still in the same table in point of view of iteration, we should create a new serie.
        // This is because InfluxDB can "automerge" series that have the same layout into a single table.
        // But we never want that, because it create a behavior that is not consistent.
        // We want as many series as there's different tables, because each table map to a serie.
        let tables = response.split(TABLE_SEPARATOR);
        // It's unlikely we have more than 2^16 tables/series.
        let mut table_counter: u16 = 0;
        let mut serie = SerieBuilder::new();

        for table in tables {
            let mut rows = table.split(LINE_SEPARATOR);

            let datatypes = rows
                .next()
                .ok_or(InfluxClientError::ResponseParseError(
                    ResponseParseError::ParseMissingDataTypeAnnotation,
                ))?
                .split(VALUE_SEPARATOR)
                .collect::<Vec<_>>();

            let header = rows
                .next()
                .ok_or(InfluxClientError::ResponseParseError(
                    ResponseParseError::ParseMissingHeader,
                ))?
                .split(VALUE_SEPARATOR)
                .collect::<Vec<_>>();

            for row in rows {
                let row: Vec<&str> = row.split(VALUE_SEPARATOR).collect();
                let mut it = header.iter().zip(row.iter()).zip(datatypes.iter());
                let mut dataframe = DataframeBuilder::new();

                for ((column, value), datatype) in &mut it {
                    match *column {
                        "" | "result" | "_start" | "_stop" => {
                            // Ignore.
                        }
                        "table" => {
                            let new_table_counter = value.parse::<u16>().unwrap();

                            // We should create a new serie if the table ID change.
                            // This can happen if InfluxDB merge / coerce tables that have the same layout.
                            if new_table_counter != table_counter {
                                table_counter = new_table_counter;

                                let current_serie = std::mem::take(&mut serie);

                                series.push(current_serie.build());
                            }
                        }
                        "_measurement" => {
                            dataframe = dataframe.measurement(
                                RessourceName::try_from((*value).to_owned()).map_err(|_| {
                                    InfluxClientError::ResponseParseError(
                                        ResponseParseError::UnableToParseColumn(
                                            "_measurement".to_owned(),
                                        ),
                                    )
                                })?,
                            );
                        }
                        "_time" => {
                            dataframe = dataframe.time(
                                Rfc3339::try_from((*value).to_owned()).map_err(|_| {
                                    InfluxClientError::ResponseParseError(
                                        ResponseParseError::UnableToParseColumn("_time".to_owned()),
                                    )
                                })?,
                            );
                        }
                        "_field" => {
                            dataframe = dataframe.field(
                                RessourceName::try_from((*value).to_owned()).map_err(|_| {
                                    InfluxClientError::ResponseParseError(
                                        ResponseParseError::UnableToParseColumn(
                                            "_field".to_owned(),
                                        ),
                                    )
                                })?,
                            );
                        }
                        "_value" => {
                            dataframe = dataframe.value(
                                Value::try_from((*value, *datatype)).map_err(|_| {
                                    InfluxClientError::ResponseParseError(
                                        ResponseParseError::UnableToParseColumn(
                                            "_value".to_owned(),
                                        ),
                                    )
                                })?,
                            );
                        }
                        _ => {
                            dataframe = dataframe.insert(
                                RessourceName::try_from((*column).to_owned()).map_err(|_| {
                                    InfluxClientError::ResponseParseError(
                                        ResponseParseError::UnableToParseColumn(
                                            (*column).to_owned(),
                                        ),
                                    )
                                })?,
                                Value::try_from((*value, *datatype)).map_err(|_| {
                                    InfluxClientError::ResponseParseError(
                                        ResponseParseError::UnableToParseColumn(
                                            (*column).to_owned(),
                                        ),
                                    )
                                })?,
                            );
                        }
                    };
                }

                serie = serie.insert(dataframe.build()?);
            }

            // The table counter increase since we consumed all row of the current table.
            // The next table is a new serie.
            table_counter += 1;

            let current_serie = std::mem::take(&mut serie);

            series.push(current_serie.build());
        }

        Ok(Self(series))
    }
}

In this example, you can clearly see there's 2 paths that change "table_counter" due to this issue, making the parsing of annotated CSV way more convoluted than necessary.

Outside of that, I think the "table" and "result" column should be InfluxDB system columns like "_start" and "_stop". User should be forbidden to create any field key or tag key that start with an underscore so that InfluxDB system columns are always reserved and start with an underscore. This is necessary, because with Flux pivot and group with no arguments, you can ungroup all your tables to a single table and that can lead to collision and confusion.

InfluxDB system buckets already start with an underscore and you can't create a bucket that start with an underscore, for good reason.

This will solve : https://github.com/influxdata/influxdb/issues/20821 and https://github.com/influxdata/influxdb/issues/20820 - but it's likely a breaking change since user that already had stored keys that start with an underscore or keys that are equals to "table" or "result", would need to rename their keys.

arialpew avatar Jul 19 '22 12:07 arialpew