influxdb icon indicating copy to clipboard operation
influxdb copied to clipboard

feat: influx inspect export parquet

Open alespour opened this issue 1 year ago • 8 comments

Closes https://github.com/influxdata/edge/issues/672

This PR extends influx_inspect command export with Parquet output. The code in cmd/influx_inspect/export/parquet folder is Parquet exporter code and related code ported from idpe (command snapshot store tsm export), just the minimal required subset for the export.

New influx_inspect options:

  • -measurement - selects measurement to be exported required
  • -parquet - selects Parquet output instead of line protocol
  • -chunk-size - size, in bytes, to partition Parquet files (default 100000000)

Output file(s) are created in a folder specified via existing -out option. The limitations are:

  • -database, -retention and -measurement must be specified
  • only TSM files are exported (not WAL files, unlike when exporting to line protocol) - if requested, can be easily implemented
  • export to Parquet file(s) is done per each TSM file. The files are apparently not sorted by time of the contained data by the reading code. Therefore, neither are output files. So for example table-00001.parquet may contain older data than table-00000.parquet. Seems irrelevant for import.

  • [X] I've read the contributing section of the project README.

alespour avatar Jun 07 '24 16:06 alespour

Export example cmd:

influx_inspect export -datadir /var/lib/influxdb/data/ -waldir /var/lib/influxdb/wal/ -out /bigdata/export/ -database benchmark_db -retention autogen -measurement cpu -parquet

Import via telegraf:

[[inputs.file]]
   files = ["/bigdata/export/table-*.parquet"]
   name_override = "cpu"
   data_format = "parquet"
   tag_columns = ["datacenter","hostname","os","rack","region","service","team"]
   timestamp_column = "time"
   timestamp_format = "unix_ns"

telegraf --once

alespour avatar Jun 12 '24 09:06 alespour

@davidby-influx could we get Stuart's review on this PR? While not urgent, it would be nice to keep up the momentum on this.

@alespour I have two comments:

  • In the README I'd rather see some examples of running with this new option + the required params
  • Is there a reason you are using arrow v14 and not v16? I assume that is copied over as well?

powersj avatar Jun 12 '24 14:06 powersj

@powersj Yes, arrow v14 is used used in v2 exporter and it was just copied. I'll update the dep to v16. And add some examples of running the tool with Parquet output.

alespour avatar Jun 12 '24 14:06 alespour

Thank you very much for your input, @stuartcarnie . Given the apparent need for better insight into TSM and exporter code itself, I begin to wonder if it wouldn't be better a more feasible approach to use the tool's existing capability to export the data into line protocol, then scan the output to extract tables schemas (1st pass), and then parse it again and save it to Parquet format (2nd pass).

alespour avatar Jul 25 '24 09:07 alespour

I begin to wonder if it wouldn't be better a more feasible approach to use the tool's existing capability to export the data into line protocol, then scan the output to extract tables schemas (1st pass), and then parse it again and save it to Parquet format (2nd pass).

That would be very inefficient for exporting large databases. If you have access to the code I wrote for Cloud 2, that could be made to work with OSS, and it should be very efficient.

stuartcarnie avatar Aug 02 '24 01:08 stuartcarnie

@alespour I'd like to recommend you consider an alternate approach using some higher-level APIs, rather than a TSMReader.

I would study the influx_tools export command:

https://github.com/influxdata/influxdb/blob/cc26b7653c7d9c383b855c3765b5deb3ec803c51/cmd/influx_tools/export

and consider adding a new command to influx_tools, called export-parquet, unless the team has strong feelings otherwise.


At a high level, I suggest processing the export per shard. That will mean using the Shard type:

https://github.com/influxdata/influxdb/blob/e484c4d87193a475466c0285c018d16f168139e6/tsdb/shard.go#L161-L162

You'll use a combination of the CreateSeriesCursor API:

https://github.com/influxdata/influxdb/blob/e484c4d87193a475466c0285c018d16f168139e6/tsdb/shard.go#L913

which is responsible for iterating over all the series keys of a shard. The series keys are produced in order.

This, in combination with the CreateCursorIterator API:

https://github.com/influxdata/influxdb/blob/e484c4d87193a475466c0285c018d16f168139e6/tsdb/shard.go#L921

Is used to produce data for each field.

You can see these APIs being used in the cmd/influx_tools/internal/storage package, starting with a call to the Read API:

https://github.com/influxdata/influxdb/blob/0887b38a655d1c742e228f0e149420b5bf64956e/cmd/influx_tools/internal/storage/store.go#L28-L29

which returns a ResultSet:

https://github.com/influxdata/influxdb/blob/fe6c64b21ed7e0757375e57b8eca21e9c05f3c89/cmd/influx_tools/internal/storage/resultset.go#L11-L15

The existing code wraps the list of shards in a ShardGroup and so it obtains the list of fields as follows:

https://github.com/influxdata/influxdb/blob/0887b38a655d1c742e228f0e149420b5bf64956e/cmd/influx_tools/internal/storage/series_cursor.go#L51-L68

As you will be working with a Shard (not a ShardGroup), you can obtain all the fields for the measurement you are processing via the MeasurementFields API directly, which takes the name of a measurement:

https://github.com/influxdata/influxdb/blob/e484c4d87193a475466c0285c018d16f168139e6/tsdb/shard.go#L853-L854

You then use the FieldKeys API to retrieve all the fields for the measurement, which importantly, is sorted, and you must maintain that sort order.

You can then get the type information for each field returned by the FieldKeys API using the Field API:

https://github.com/influxdata/influxdb/blob/e484c4d87193a475466c0285c018d16f168139e6/tsdb/shard.go#L1639-L1640

You then treat the ResultSet as an iterator, calling the Next API to iterate over each series key and field:

https://github.com/influxdata/influxdb/blob/fe6c64b21ed7e0757375e57b8eca21e9c05f3c89/cmd/influx_tools/internal/storage/resultset.go#L36-L37

[!NOTE]

As stated in previous comments, you will need to iterate over all the series keys first, to determine all the tag keys, to ensure the Parquet table schema is complete.

Ultimately, your goal is to replace:

https://github.com/influxdata/influxdb/blob/5b5d6ee724e5255426d59820c07f90e26647064a/cmd/influx_inspect/export/export_parquet.go#L231-L238

with a version that consumes a Shard directly.

stuartcarnie avatar Aug 02 '24 04:08 stuartcarnie

Just a quick update from us: We are on track to use influx_tools export as a base tool to convert TSM data into Parquet format. @alespour has successfully customized the tool to iterate over shards for data access during the export phase, and we are now exploring how to integrate influx_tools export with the existing code for the Parquet exporter.

bednar avatar Aug 14 '24 12:08 bednar

Status update from Bonitoo: We've prepared a refactored version of the exporter based on influx_tools, detailed in PR https://github.com/influxdata/influxdb/pull/25253. We are currently waiting to feedback how to correctly create the series key in the exported file. For more information, please check out this Slack conversation: https://influxdata.slack.com/archives/C5BSZ026L/p1724142258571929?thread_ts=1721781280.449769&cid=C5BSZ026L

bednar avatar Aug 21 '24 11:08 bednar