telegraf
telegraf copied to clipboard
feat: postgresql output
This PR provides a PostgreSQL output plugin.
This is a continuation of #3428 but massively reworked. There are a lot of breaking changes to address issues/limitations, as well as performance optimizations.
The performance optimizations are not minor either. The original code from #3428 benchmarked at around 250 points per second (using real production data). Its been a bit since I last ran a benchmark, but last time I did I was able to get around 60,000 points per second.
closes #3408 closes #3428
Major changes from #3428:
- Deterministic tags when using
tags_as_foreign_keys
.
Previous code utilized an auto-generated serial to obtain the tag ID for a set of tags. This resulted in a severe bottleneck as the tag IDs have to be looked up from the database for every single series. The new code hashes the tags & their values (within telegraf) to generate the tag ID. - Connection pooling & concurrency.
The plugin allows multiple simultaneous inserts into the database. Each batch is split into a separate insert (COPY) per measurement/table, which can run in parallel. If the plugin receives batches faster than a single connection can write, each batch will also be inserted in parallel. - Use of
numeric
data type foruint64
.
Previous code usedbigint
foruint64
. This would result in overflow errors when inserting values larger than anint64
asbigint
is a signed 64-bit integer.numeric
is an arbitrary precision exact value numeric data type. It is less performant thanbigint
, but it's the only datatype that can hold the fulluint64
range. - More powerful schema modification template statements.
For table creation & column addition, a more powerful mechanism was implemented. The new template statements use golang's text/template library with lots of provided variables, methods, & functions to allow for a virtually any use case. Some example configs are provided below to demonstrate. Comprehensive documentation on this functionality will be added. - Better error handling. The old code didn't use transactions, and would infinitely retry the entire batch on error. This resulted in things like duplicate inserts. As mentioned earlier, each measurement is a separate sub-batch so this mitigates some of the scope of errors. Each sub-batch is inserted in a transaction, so there's no risk of duplicates. In addition, the plugin is able to discern between temporary and permanent errors. A permanent error is something like bad SQL that will always fail now matter how many times you retry it. A temporary error is something like a failed connection, where a retry may succeed. Temporary errors will infinitely retry (with incremental backoff), while permanent errors will discard the sub-batch.
Note that there are breaking changes in the db schema:
- If using
tags_as_foreign_keys
, thetag_id
column type is nowbigint
. However if the column type is changed tobigint
preserving the records, then while the plugin will use newtag_id
values, and insert new records, joins should be functional. - If field type is
uint64
, the column type is nownumeric
instead ofbigint
. Leaving the column type asbigint
should still work unless values exceed the maximum ofbigint
. - Tag columns, when not using
tags_as_foreign_keys
, must be commented with the stringtag
at the beginning. Failure to do so will result in some of the add column template functionality not working properly.
Right now this PR is a draft as I want to run it in an environment with a TimescaleDB similar to how we plan on deploying it to production. However there are several blocking issues with TimescaleDB preventing me from being able to do this. All the tests are still for the old code, and have not been updated (and thus won't even compile). There's also still a few more minor changes to make, as well as general cleanup. So I'm taking this opportunity to put it out for preview so that feedback can be gathered and any architectural changes can be made.
I have not done exhaustive testing, so there may be bugs. I do not know of any right now, so if any are found, please raise them.
Example templating
Below are some example templates I've been using in my testing. The scenario is for use with TimescaleDB. The templates basically allow creating the tables in the telegraf
schema, and then a view in the public
schema which joins the tag table and the data table making it easier to work with. In addition since you cannot add columns to a TimescaleDB hypertable with compression, it creates a new table when columns are added, and creates another view which UNION
s the old and new tables.
This is probably one of the most complex use cases possible, but it demonstrates the power and flexibility of the templating.
tags_as_foreignkeys = true
schema = 'telegraf'
create_templates = [
'''CREATE TABLE {{ .table }} ({{ .allColumns }})''',
'''SELECT create_hypertable({{ .table|quoteLiteral }}, 'time', chunk_time_interval => INTERVAL '1h')''',
'''ALTER TABLE {{ .table }} SET (timescaledb.compress, timescaledb.compress_segmentby = 'tag_id')''',
'''SELECT add_compression_policy({{ .table|quoteLiteral }}, INTERVAL '2h')''',
'''CREATE VIEW {{ .table.WithSuffix "_data" }} AS SELECT {{ .allColumns.Selectors | join "," }} FROM {{ .table }}''',
'''CREATE VIEW {{ .table.WithSchema "public" }} AS SELECT time, {{ (.tagTable.Columns.Tags.Concat .allColumns.Fields).Identifiers | join "," }} FROM {{ .table.WithSuffix "_data" }} t, {{ .tagTable }} tt WHERE t.tag_id = tt.tag_id''',
]
add_column_templates = [
'''ALTER TABLE {{ .table }} RENAME TO {{ (.table.WithSuffix "_" .table.Columns.Hash).WithSchema "" }}''',
'''ALTER VIEW {{ .table.WithSuffix "_data" }} RENAME TO {{ (.table.WithSuffix "_" .table.Columns.Hash "_data").WithSchema "" }}''',
'''DROP VIEW {{ .table.WithSchema "public" }}''',
'''CREATE TABLE {{ .table }} ({{ .allColumns }})''',
'''SELECT create_hypertable({{ .table|quoteLiteral }}, 'time', chunk_time_interval => INTERVAL '1h')''',
'''ALTER TABLE {{ .table }} SET (timescaledb.compress, timescaledb.compress_segmentby = 'tag_id')''',
'''SELECT add_compression_policy({{ .table|quoteLiteral }}, INTERVAL '2h')''',
'''CREATE VIEW {{ .table.WithSuffix "_data" }} AS SELECT {{ .allColumns.Selectors | join "," }} FROM {{ .table }} UNION ALL SELECT {{ (.allColumns.Union .table.Columns).Selectors | join "," }} FROM {{ .table.WithSuffix "_" .table.Columns.Hash "_data" }}''',
'''CREATE VIEW {{ .table.WithSchema "public" }} AS SELECT time, {{ (.tagTable.Columns.Tags.Concat .allColumns.Fields).Identifiers | join "," }} FROM {{ .table.WithSuffix "_data" }} t, {{ .tagTable }} tt WHERE t.tag_id = tt.tag_id''',
]
Required for all PRs:
- [x] Signed CLA.
- [x] Associated README.md updated.
- [x] Has appropriate unit tests.
@phemmer Does this PR cover the copy method outlined inhttps://github.com/influxdata/telegraf/pull/3912#issuecomment-731124375?
We'd like to have this be the single Postgresql output PR moving forward.
Yes, COPY is utilized for all data inserts.
Yes, COPY is utilized for all data inserts.
Perfect. Let's move forward with this PR then. Is this in a state yet to have someone on the Influx team to review or still in draft mode?
Yes it's still a draft. None of the tests have been written. Was waiting to see if there were any major changes requested. Since it seems quiet on that front, I can assume there are none and proceed.
Hey @phemmer! I really want to see this tremendous work merged especially as there seems to be huge interest from the community. However, ~2,500 LoC are impossible to review (even with a lot of beer ;-)), so we need a strategy to get this in using a staged approach. I've seen the series_grouper (beautiful btw) split out from this work. Do you see a way to split out a very basic version of this plugin for me to review? Maybe leaving out all the fancy stuff like auto-table-creation etc.? I'd be happy to review it!
If you are like me and wanted a docker build embedding the latest available postgresql/timescaledb output with telegraf 1.17.2:
docker pull fgribreau/telegraf-timescaledb-docker:1.17.2
- source code (will accept MR :+1:)
- docker hub
Are there any risks involved with two telegraf agents on different hosts attempting to create tables (for new metrics) or add columns at the same time?
Also, you provide a very complex templating example with compression; is there any way in the PR's current form for different metrics to have different Postgres/Timescale configurations re: compression or time partition window?
@phemmer any news here?
Are there any risks involved with two telegraf agents on different hosts attempting to create tables (for new metrics) or add columns at the same time?
Off the top of my head, yes. If both operations occur at the exact same moment (which will be pretty difficult, as the time window beetween check & create is very small), one of the clients will get an error and drop the metrics it can't insert (in the case of a table or tag column) or drop the fields (in the case of a field column).
However this should be very easy to compensate for by configuring the error code for "table already exists" and "column already exists" into temporary errors, which will cause the plugin to retry.
I'll add this to the tests.
Also, you provide a very complex templating example with compression; is there any way in the PR's current form for different metrics to have different Postgres/Timescale configurations re: compression or time partition window?
Yes. You can add a conditional clause to the template that would use different SQL based on your chosen condition.
But you can also have 2 postgres outputs to the same database.
@phemmer any news here?
I've been working on some other higher priority stuff, so haven't had much time to get back to this. I expect to be able to get back on it next week.
@phemmer just let me know if you can split out PRs for me to review... :-)
I don't think a split is likely. I'll won't dismiss the idea outright, and I'll give it honest consideration, but I suspect ripping out chunks of functionality and substituting it with simpler versions is likely too much work. Plus at that point is it really a code review if the final thing isn't what's being reviewed?
I'll make a final pass at organizing the code before taking PR out of draft, but right now the distribution of code seems fairly sane:
26 ./columns.go
323 ./postgresql.go
311 ./table_manager.go
380 ./table_source.go
279 ./template/template.go
66 ./utils/types.go
147 ./utils/utils.go
I've been trying this PR out in my local setup; I've had success sending it metrics from inputs.cpu and a custom socket_listener via the telegraf-ruby
gem, but when I hooked up an inputs.statsd
to it, I started getting the following panic:
panic: Tried to perform an invalid column drop. This should not have happened. measurement=Rack_Server_All_GC_minor_gc_count name=time role=1
goroutine 99 [running]:
github.com/influxdata/telegraf/plugins/outputs/postgresql.(*TableSource).DropColumn(0xc00090f290, 0x69fb8ae, 0x4, 0x6a3f8c3, 0x18, 0x1)
/Users/machty/code/exc/research/telegraf/plugins/outputs/postgresql/table_source.go:148 +0x1e5
Full telegraf log:
https://gist.github.com/machty/fc3242a4a743917698b2c81d22c33e8e
Is this an issue with my setup or with the PR code?
Solved
The issue was that the metrics were coming in uppercase, and uppercase table names in postgres tables requires special escaping/quoting. The easiest solution for me was to put in a telegraf string processor to downcase all measurement names before the postgres output was reached:
[[processors.strings]]
[[processors.strings.lowercase]]
measurement = "*"
I've been trying this PR out in my local setup; I've had success sending it metrics from inputs.cpu and a custom socket_listener via the
telegraf-ruby
gem, but when I hooked up aninputs.statsd
to it, I started getting the following panic:panic: Tried to perform an invalid column drop. This should not have happened. measurement=Rack_Server_All_GC_minor_gc_count name=time role=1 goroutine 99 [running]: github.com/influxdata/telegraf/plugins/outputs/postgresql.(*TableSource).DropColumn(0xc00090f290, 0x69fb8ae, 0x4, 0x6a3f8c3, 0x18, 0x1) /Users/machty/code/exc/research/telegraf/plugins/outputs/postgresql/table_source.go:148 +0x1e5
Full telegraf log:
https://gist.github.com/machty/fc3242a4a743917698b2c81d22c33e8e
Is this an issue with my setup or with the PR code?
The PR code.
Solved
The issue was that the metrics were coming in uppercase, and uppercase table names in postgres tables requires special escaping/quoting.
Spot on.
I thought I had all uses of table names properly quoted. But it turns out there was an obscure one I didn't expect: the (table_schema||'.'||table_name)::regclass::oid
from https://github.com/influxdata/telegraf/blob/9a01472c1ff9b7d516ef18a10838108c20984f1e/plugins/outputs/postgresql/table_manager.go#L15
I've fixed this locally. But there are a few other pending changes I don't want to push just yet.
Thanks for reporting. This was an interesting one to track down.
I wonder if downcasing metric names should be an (opt-out-able) default for this PR (better to introduce it now as a default than change it later). So many downstream use cases are made more annoying/difficult/footgunny by letting in case-sensitive table names.
FYI I didn't keep track of the exact error, but I was also running into a situation where very long names exceeded the 63 char Postgres table name max, and it was causing some issues into I introduced a few string processors to shorten known long strings in the metric name (e.g. "Rack_Server_All_GC_minor_gc_count" to "rs_all_gc_minor_gc_count"). That said, this could be something in pg_partman
, which I'm using for setting up / maintaining time-partitioned table (evaluating this approach alongside switching to Timescale).
Here's my create_templates:
create_templates = [
'''CREATE TABLE {{ .table }} ({{ .allColumns }}) PARTITION BY RANGE (time);''',
'''CREATE INDEX ON {{ .table }} (time);''',
'''SELECT create_parent(replace('{{ .table }}', '"', ''), 'time', 'native', '1 week');''',
'''UPDATE part_config SET infinite_time_partitions = true WHERE parent_table = replace('{{ .table }}', '"', ''); ''',
]
I wonder if downcasing metric names should be an (opt-out-able) default for this PR (better to introduce it now as a default than change it later). So many downstream use cases are made more annoying/difficult/footgunny by letting in case-sensitive table names.
I'm of the opinion that the plugin shouldn't mess with the casing. It shouldn't be something that surprises people down the road, as it's immediately noticable. It's easy enough to change in the config like you did.
There are other things which I think are likely things everyone wants, such as always merging fields which have been split over multiple metrics (aggregators/merge). But I don't want to start putting in tons of defaults which then requires more option toggles to disable.
Probably worth mentioning in the README for the plugin though as "things which you probably want to handle in your config".
But I'm open to feedback on the issue.
FYI I didn't keep track of the exact error, but I was also running into a situation where very long names exceeded the 63 char Postgres table name max, and it was causing some issues into I introduced a few string processors to shorten known long strings in the metric name (e.g. "Rack_Server_All_GC_minor_gc_count" to "rs_all_gc_minor_gc_count"). That said, this could be something in
pg_partman
, which I'm using for setting up / maintaining time-partitioned table (evaluating this approach alongside switching to Timescale).
Yes, that's another one. Right now the plugin should just spit back the error it gets from postgres. But again, I don't think I want to add some automatic behavior to truncate strings, as truncation might end up merging different measurements into the same table that shouldn't be merged. I think it better to present the error to the user so they can ensure their table names have meaningful values.
Here's my create_templates:
create_templates = [ '''CREATE TABLE {{ .table }} ({{ .allColumns }}) PARTITION BY RANGE (time);''', '''CREATE INDEX ON {{ .table }} (time);''', '''SELECT create_parent(replace('{{ .table }}', '"', ''), 'time', 'native', '1 week');''', '''UPDATE part_config SET infinite_time_partitions = true WHERE parent_table = replace('{{ .table }}', '"', ''); ''', ]
The latter 2 statements raise an interesting use case. I'm not familiar with pg_partman, but how would you safely pass table names with special characters if you can't use quoting? Looking at pg_partman's changelog, they clearly support it as it was mentioned in version 2.1.0 release.
I'm not sure about pg_partman. From my limited experience they seem to track managed tables in the part_config
in the parent_table
column. Looking at that table now there's lots of tables with namespaced names like telegraf.rs_gc_heap_marked_slots
, but prior to me downcasing everything, I think some quoted names started to show up in that table, which seemed weird and seemed to cause issues for me.
Have taken the PR out of draft status.
At this point I'm just doing cleanup operations. Just now noticed I need to update the readme. There's also that commented line in go.mod
. Failing tests. Want to simplify some of the code. Few other minor tidbits. But it's late, so I'll tackle them another day. Happy to accept feedback though. All functionality should be in place, no known bugs, all tests written.
@phemmer first of all a big cheers :raised_hands: for your great effort on this plugin. We are using it for about a month or two and it was running perfectly so far. We have now enabled SMART data collection on a couple of machines and encountered an issue with the tag processing. Telegraf panics with the following message:
panic: runtime error: index out of range [5] with length 5
goroutine 355 [running]:
github.com/influxdata/telegraf/plugins/outputs/postgresql.(*TagTableSource).values(0xc0000e9770, 0x989d237878e9ef04, 0x0, 0x0)
/INSTALL/telegraf_custom/telegraf_2021_05_21/plugins/outputs/postgresql/table_source.go:394 +0x4b7
github.com/influxdata/telegraf/plugins/outputs/postgresql.(*TagTableSource).Next(0xc0000e9770, 0xc000158800)
/INSTALL/telegraf_custom/telegraf_2021_05_21/plugins/outputs/postgresql/table_source.go:372 +0x8d
github.com/jackc/pgx/v4.(*copyFrom).buildCopyBuf(0xc0048a8000, 0xc000158800, 0x13, 0x400, 0xc0048a8050, 0xc001e42f68, 0xc001e42f60, 0x2, 0x1, 0xc00597c201, ...)
/root/go/pkg/mod/github.com/jackc/pgx/[email protected]/copy_from.go:170 +0x7e
github.com/jackc/pgx/v4.(*copyFrom).run.func1(0xc000116840, 0xc0048a8000, 0xc0048a8050, 0xc00018fe10)
/root/go/pkg/mod/github.com/jackc/pgx/[email protected]/copy_from.go:123 +0x18d
created by github.com/jackc/pgx/v4.(*copyFrom).run
/root/go/pkg/mod/github.com/jackc/pgx/[email protected]/copy_from.go:110 +0x4f7
I've no experience with go, but tried to narrow it down a bit by printing some variables to stdout:
if !ttsrc.postgresql.TagsAsJsonb {
values = make([]interface{}, len(tagSet)+1)
for _, tag := range tagSet {
fmt.Println(tag.Key)
fmt.Println(tag.Value)
fmt.Println(ttsrc.TableSource.tagColumns.indices[tag.Key])
values[ttsrc.TableSource.tagColumns.indices[tag.Key]+1] = tag.Value // +1 to account for tag_id column
}
} else {
The following output is generated now:
0
device
sdd
1
enabled
Enabled
2
host
machine1.example.intern
3
model
ST16000NM001G-2KK103
4
serial_no
WL200QWE
5
wwn
5000c500d56ac404
6
capacity
16000900661248
0
device
sdb
1
enabled
Enabled
2
host
machine1.example.intern
3
model
ST16000NM001G-2KK103
4
serial_no
WL200ASD
5
wwn
5000c500d5687e47
6
device
nvme0
1
host
machine1.example.intern
3
model
Samsung SSD 980 PRO 1TB
4
panic: runtime error: index out of range [5] with length 5
goroutine 355 [running]:
github.com/influxdata/telegraf/plugins/outputs/postgresql.(*TagTableSource).values(0xc0000e9770, 0x989d237878e9ef04, 0x0, 0x0)
/INSTALL/telegraf_custom/telegraf_2021_05_21/plugins/outputs/postgresql/table_source.go:394 +0x4b7
github.com/influxdata/telegraf/plugins/outputs/postgresql.(*TagTableSource).Next(0xc0000e9770, 0xc000158800)
/INSTALL/telegraf_custom/telegraf_2021_05_21/plugins/outputs/postgresql/table_source.go:372 +0x8d
github.com/jackc/pgx/v4.(*copyFrom).buildCopyBuf(0xc0048a8000, 0xc000158800, 0x13, 0x400, 0xc0048a8050, 0xc001e42f68, 0xc001e42f60, 0x2, 0x1, 0xc00597c201, ...)
/root/go/pkg/mod/github.com/jackc/pgx/[email protected]/copy_from.go:170 +0x7e
github.com/jackc/pgx/v4.(*copyFrom).run.func1(0xc000116840, 0xc0048a8000, 0xc0048a8050, 0xc00018fe10)
/root/go/pkg/mod/github.com/jackc/pgx/[email protected]/copy_from.go:123 +0x18d
created by github.com/jackc/pgx/v4.(*copyFrom).run
/root/go/pkg/mod/github.com/jackc/pgx/[email protected]/copy_from.go:110 +0x4f7
Unfortunately my go experience is way too little to get a better picture on what could be the problem and was hoping that you could help me to narrow it down.
Not sure if this is the right place to report this error, so please let me know if I should put this somewhere else.
first of all a big cheers raised_hands for your great effort on this plugin. We are using it for about a month or two and it was running perfectly so far.
Thanks, that's great to hear!
We have now enabled SMART data collection on a couple of machines and encountered an issue with the tag processing. Telegraf panics with the following message:
Not good :-(
I've no experience with go, but tried to narrow it down a bit by printing some variables to stdout:
That information looks useful. Thanks
From what I can see, it's receiving a metric which is missing some of the tags. and it would appear this is messing up the indexing. This is a little surprising, as the code should be handling that. But obviously there's something different about this case.
Not sure if this is the right place to report this error, so please let me know if I should put this somewhere else.
This is the correct place. I'll look into the issue, and let you know what I find.
This is the correct place. I'll look into the issue, and let you know what I find.
:metal:
From what I can see, it's receiving a metric which is missing some of the tags. and it would appear this is messing up the indexing. This is a little surprising, as the code should be handling that. But obviously there's something different about this case.
I've run outputs.file
in parallel to gather some sample data. It doesn't look like that the data ends at the same position, but the following records look a like:
smart_device,capacity=16000900661248,device=sdb,enabled=Enabled,host=machine1.example.intern,model=ST16000NM001G-2KK103,serial_no=WL200ASD,wwn=5000c500d5687e47 udma_crc_errors=0i,exit_status=0i,health_ok=true,read_error_rate=21762728i,seek_error_rate=5503461i,temp_c=31i 1621598100000000000
smart_device,device=nvme0,host=machine1.example.intern,model=Samsung\ SSD\ 980\ PRO\ 1TB,serial_no=S5GXNF0R218401A health_ok=true,temp_c=71i,exit_status=0i 1621598100000000000
A convenient reproduction for the above issue seems to be the docker input plugin, if that's helpful
Thanks for the ping. This completely fell off my radar. The tagging panic has been found, fixed, and has test cases. I've pushed it so that anyone interested can do a build.
I should have lots of time this week, so I'll look at addressing the rest of the cleanup comments.
@srebhan I think all comments have been either addressed, or have replies. Some of them I disagree with, but you are the project maintainer, so if you still prefer the changes be made, I can make them (or you can, whichever).
Looks like there's a minor bug around table creations & errors. I've run into it a few times where 2 clients try to create the same table at the same time and get an error (which is expected), but then also get a idle-in-transaction error soon after (not expected). It recovers on the next write, but error handling shouldn't be leaving transactions open. Will need to track down.
I also want to add support for pguint, so we can avoid the nasty "numeric" data type for storing unsigned 64-bit integers (will be optional for people who have the extension).
So the above commits (and builds) contain the changes/fixes I was talking about in the previous message.
The issue with transactions being left open is fixed. Though I was wrong on initial assessment. It had nothing to do with error handling, but was a deadlock between multiple telegraf processes creating the same table. In the process of fixing, I also removed some unnecessary transactioning, which should also give a slight boost to performance.
I also added support for pguint and the uint8
data type. However I could use some feedback on this as I'm not fond of the configuration.
Currently this functionality is enabled by setting unsigned_integers = true
config param. There are a couple alternatives I'm considering.
- An explicit setting for
uinteger_type = "uint8"
- A type mapping such as:
[[inputs.postgresql]]
[inputs.postgresql.typemap]
uint64 = "uint8"
# 1 would be for that one data type only.
# 2 would allow overriding any data type mapping.
The latter is more flexible, but I'm not sure it warrants the extra complexity.
Could be other ideas too. Open to suggestions.
@srebhan I think all comments have been either addressed, or have replies. Some of them I disagree with, but you are the project maintainer, so if you still prefer the changes be made, I can make them (or you can, whichever).
Hey @phemmer, I'm by no means the project maintainer, only a poor (community) soul reviewing PRs... :-) As I'm currently very short on time (yes I do have a day-job :-P) I try to point some of the Influx guys to this PR. Hope to get you another review round soon.
If all that fails, please trigger me again and I will see what I can do, but no promises...
Looks like new artifacts were built from this PR. Get them here!
Artifact URLs
I just wanted to comment here that this PR has been very useful to me so far; it's extremely nice to be able to stick with the proven, known, "boring" technology that is postgres
for more pieces of my tech stack.
Looking forward to seeing this merged, with many thanks to @phemmer.
I just wanted to comment here that this PR has been very useful to me so far; it's extremely nice to be able to stick with the proven, known, "boring" technology that is
postgres
for more pieces of my tech stack. Looking forward to seeing this merged, with many thanks to @phemmer.
Agree, I really appreciate this work.