influxdb
influxdb copied to clipboard
feat: influx inspect export parquet
Closes https://github.com/influxdata/edge/issues/672
This PR extends influx_inspect command export with Parquet output. The code in cmd/influx_inspect/export/parquet folder is Parquet exporter code and related code ported from idpe (command snapshot store tsm export), just the minimal required subset for the export.
New influx_inspect options:
-measurement- selects measurement to be exported required-parquet- selects Parquet output instead of line protocol-chunk-size- size, in bytes, to partition Parquet files (default 100000000)
Output file(s) are created in a folder specified via existing -out option. The limitations are:
-database,-retentionand-measurementmust be specified- only TSM files are exported (not WAL files, unlike when exporting to line protocol) - if requested, can be easily implemented
- export to Parquet file(s) is done per each TSM file. The files are apparently not sorted by time of the contained data by the reading code. Therefore, neither are output files. So for example
table-00001.parquetmay contain older data thantable-00000.parquet. Seems irrelevant for import.
- [X] I've read the contributing section of the project README.
Export example cmd:
influx_inspect export -datadir /var/lib/influxdb/data/ -waldir /var/lib/influxdb/wal/ -out /bigdata/export/ -database benchmark_db -retention autogen -measurement cpu -parquet
Import via telegraf:
[[inputs.file]]
files = ["/bigdata/export/table-*.parquet"]
name_override = "cpu"
data_format = "parquet"
tag_columns = ["datacenter","hostname","os","rack","region","service","team"]
timestamp_column = "time"
timestamp_format = "unix_ns"
telegraf --once
@davidby-influx could we get Stuart's review on this PR? While not urgent, it would be nice to keep up the momentum on this.
@alespour I have two comments:
- In the README I'd rather see some examples of running with this new option + the required params
- Is there a reason you are using arrow v14 and not v16? I assume that is copied over as well?
@powersj Yes, arrow v14 is used used in v2 exporter and it was just copied. I'll update the dep to v16. And add some examples of running the tool with Parquet output.
Thank you very much for your input, @stuartcarnie . Given the apparent need for better insight into TSM and exporter code itself, I begin to wonder if it wouldn't be better a more feasible approach to use the tool's existing capability to export the data into line protocol, then scan the output to extract tables schemas (1st pass), and then parse it again and save it to Parquet format (2nd pass).
I begin to wonder if it wouldn't be better a more feasible approach to use the tool's existing capability to export the data into line protocol, then scan the output to extract tables schemas (1st pass), and then parse it again and save it to Parquet format (2nd pass).
That would be very inefficient for exporting large databases. If you have access to the code I wrote for Cloud 2, that could be made to work with OSS, and it should be very efficient.
@alespour I'd like to recommend you consider an alternate approach using some higher-level APIs, rather than a TSMReader.
I would study the influx_tools export command:
https://github.com/influxdata/influxdb/blob/cc26b7653c7d9c383b855c3765b5deb3ec803c51/cmd/influx_tools/export
and consider adding a new command to influx_tools, called export-parquet, unless the team has strong feelings otherwise.
At a high level, I suggest processing the export per shard. That will mean using the Shard type:
https://github.com/influxdata/influxdb/blob/e484c4d87193a475466c0285c018d16f168139e6/tsdb/shard.go#L161-L162
You'll use a combination of the CreateSeriesCursor API:
https://github.com/influxdata/influxdb/blob/e484c4d87193a475466c0285c018d16f168139e6/tsdb/shard.go#L913
which is responsible for iterating over all the series keys of a shard. The series keys are produced in order.
This, in combination with the CreateCursorIterator API:
https://github.com/influxdata/influxdb/blob/e484c4d87193a475466c0285c018d16f168139e6/tsdb/shard.go#L921
Is used to produce data for each field.
You can see these APIs being used in the cmd/influx_tools/internal/storage package, starting with a call to the Read API:
https://github.com/influxdata/influxdb/blob/0887b38a655d1c742e228f0e149420b5bf64956e/cmd/influx_tools/internal/storage/store.go#L28-L29
which returns a ResultSet:
https://github.com/influxdata/influxdb/blob/fe6c64b21ed7e0757375e57b8eca21e9c05f3c89/cmd/influx_tools/internal/storage/resultset.go#L11-L15
The existing code wraps the list of shards in a ShardGroup and so it obtains the list of fields as follows:
https://github.com/influxdata/influxdb/blob/0887b38a655d1c742e228f0e149420b5bf64956e/cmd/influx_tools/internal/storage/series_cursor.go#L51-L68
As you will be working with a Shard (not a ShardGroup), you can obtain all the fields for the measurement you are processing via the MeasurementFields API directly, which takes the name of a measurement:
https://github.com/influxdata/influxdb/blob/e484c4d87193a475466c0285c018d16f168139e6/tsdb/shard.go#L853-L854
You then use the FieldKeys API to retrieve all the fields for the measurement, which importantly, is sorted, and you must maintain that sort order.
You can then get the type information for each field returned by the FieldKeys API using the Field API:
https://github.com/influxdata/influxdb/blob/e484c4d87193a475466c0285c018d16f168139e6/tsdb/shard.go#L1639-L1640
You then treat the ResultSet as an iterator, calling the Next API to iterate over each series key and field:
https://github.com/influxdata/influxdb/blob/fe6c64b21ed7e0757375e57b8eca21e9c05f3c89/cmd/influx_tools/internal/storage/resultset.go#L36-L37
[!NOTE]
As stated in previous comments, you will need to iterate over all the series keys first, to determine all the tag keys, to ensure the Parquet table schema is complete.
Ultimately, your goal is to replace:
https://github.com/influxdata/influxdb/blob/5b5d6ee724e5255426d59820c07f90e26647064a/cmd/influx_inspect/export/export_parquet.go#L231-L238
with a version that consumes a Shard directly.
Just a quick update from us: We are on track to use influx_tools export as a base tool to convert TSM data into Parquet format. @alespour has successfully customized the tool to iterate over shards for data access during the export phase, and we are now exploring how to integrate influx_tools export with the existing code for the Parquet exporter.
Status update from Bonitoo: We've prepared a refactored version of the exporter based on influx_tools, detailed in PR https://github.com/influxdata/influxdb/pull/25253. We are currently waiting to feedback how to correctly create the series key in the exported file. For more information, please check out this Slack conversation: https://influxdata.slack.com/archives/C5BSZ026L/p1724142258571929?thread_ts=1721781280.449769&cid=C5BSZ026L