connect
connect copied to clipboard
CockroachDB Core Changefeed Support
I've been working on my own CRDB Changefeeds as a standalone repo, but I think it might fit really well in here.
Would want support for many of the options such as WITH UPDATE
, as well as tracking the offset timestamp to disk for restart durability (prevent re-emitting as much as possible).
One thing to note, for this to be reliable I would want to bring in the pgx
package, as it has a very elegant way of handling the changefeeds and cancelleable queries. It's not something that is currently a dependency.
Hey @danthegoodman1, that'd be awesome. I've been hoping to get some CDC support eventually (https://github.com/Jeffail/benthos/issues/82) but that's been mostly blocked on getting dedicated help from someone with experience so you're very welcome here.
Bringing in new dependencies is fine, the way that components are defined is such that they're included in the main project (at https://github.com/Jeffail/benthos/tree/master/internal/impl) but they're optional imports (https://github.com/Jeffail/benthos/blob/master/public/components/all/package.go), so it'd still be possible to build leaner versions of Benthos without it. I would imagine pgx
was likely coming in at some point anyway.
For persisting offsets it'd be good to support our concept of caches (https://www.benthos.dev/docs/components/caches/about) which would allow users to choose remote destinations if needed. That would mean adding a string field cache
to the config spec of the input you define and using the *Resources
provided to the constructor you register https://pkg.go.dev/github.com/Jeffail/benthos/[email protected]/public/service#InputConstructor to access it with https://pkg.go.dev/github.com/Jeffail/benthos/[email protected]/public/service#Resources.AccessCache, so you don't need to build a cache client yourself.
Let me know if you need any support.
Great to hear @Jeffail , I'm sure I'll need plenty of help getting going and I'll know more once I get started :)
As for existing examples using the new impl
API for writing new integrations, which one would you suggest as something to follow? It seems maybe pulsar might be a good one to look at? Or is there an interface in which these integrations need to have specific methods for?
@danthegoodman1 the sql_select
input might be worth copying from as there's some overlap: https://github.com/Jeffail/benthos/blob/master/internal/impl/sql/input_sql_select.go, the implementation is the exact same as public plugins so the documentation is all here: https://pkg.go.dev/github.com/Jeffail/benthos/[email protected]/public/service.
There's also a video walkthrough for processor plugins that covers how the config fields are defined and used: https://youtu.be/uH6mKw-Ly0g
@Jeffail Thanks, I'll take a look and try to get started today
Hey @danthegoodman1 I have been playing with cockroach CDC also.
The open and closed versions have a slightly different way of doing CDC: https://www.cockroachlabs.com/docs/stable/stream-data-out-of-cockroachdb-using-changefeeds.html
which are you using ?
@gedw99 Core changfeeds
@Jeffail So I think I've gotten far enough to where I have some questions, see https://github.com/danthegoodman1/benthos/blob/crdb-changefeed-input/internal/impl/crdb/input_changefeed.go
- For the
Read()
function, should I expect that it keeps getting called such that I process one row per read? The way CDC works is that there is basically a single query with a never-ending stream of rows (blocks forever), so I am not sure how to handle returning single messages (rows). I tried to base this off what I saw from the NATS Jetstream input. - For the
Close()
function, all that is needed to be done in terms of graceful shutdown of the CDC is to cancel the query with the cancel func I have in the struct. Just want to make sure I am handling the order of closing correctly (again based on jetstream input)
Note: I still need to figure out caching and overriding of the CURSOR
, but I have some docs in place warning about it's usage.
sounds awesome @danthegoodman1
Alright so I think the docs in the public input service answer my question for (1) and it seems I am doing it correctly.
As per the caches, because the core changefeeds do stream a single row at a time, lots of big changing queries can make rows stream in extremely fast. There are 2 ways we can handle this:
- If we use the caching already in benthos, then we probably want some ticker that updates the cache on some interval so we don't hammer something that shouldn't be hit every message. Also I haven't looked into the
file
but it would have to support super ihgh velocity writes. - We could add a KV store like badger to handle local cursor tracking. This would be able to keep up with millions of rows/sec, but would lose the built in caching features.
I'd like to get the changefeeds working without the cache first, but just laying down some thoughts on the matter.
@danthegoodman1 yeah a ticker based commit would probably be the best option here, and then a final flush on Close
. Benthos is quite strict about graceful termination so you can be pretty confident that Close
will be called and waited on before shutting down (within certain configurable timeouts).
I'm not 100% opposed to using a specific cache implementation as it's what we do with the kinesis input and some other unique cases, but ideally I'd want to be able to point to either a local filestore for testing or a remote destination for production.
@Jeffail Have there been any performance tests regarding the local filestore? I imagine Redis would be quite good as a remote cache for this regardless. Maybe I'm prematurely optimizing.
I think the ticker based commit would be something configurable so it can be tuned based on the cache.
Maybe something like the badger KV store might be a good addition down the road for built-in caching KV style... but that's another story :)
Regarding data / file store
Nats now has a kv store that is clustered and synchronised . Was introduced a month ago and really changes the game with NATS .
https://nats.io/blog/kv-cli/
—-
also local caches using genji with NATS is also a proven approach .
https://github.com/simpleiot/simpleiot, has nats and genji synchronised .
explained here : https://github.com/simpleiot/simpleiot/blob/master/docs/architecture.md
With genji you get a mongodb like golang store with no cgo and a sql api to it .
@Jeffail A bit stuck and not sure where to go about debugging. I added lots of debug fmt.Println()
statements and observe the following output:
dangoodman: ~/clusterSpaceCode/benthos git:(crdb-changefeed-input) ✗ docker run --rm -v /Users/dangoodman/clusterSpaceCode/benthos/config.yml:/config.yml -v /tmp/data:/data -p 4195:4195 jeffail/benthos -c /config.yml
## REGISTERED INPUT
## MAKING CRDB
## FINISHING INIT
{"@timestamp":"2021-11-25T13:29:34Z","@service":"benthos","component":"benthos","level":"INFO","message":"Launching a benthos instance, use CTRL+C to close."}
{"@timestamp":"2021-11-25T13:29:34Z","@service":"benthos","component":"benthos","level":"INFO","message":"Listening for HTTP requests at: http://0.0.0.0:4195"}
## CONNECTING
{"@timestamp":"2021-11-25T13:29:34Z","@service":"benthos","component":"benthos.input","level":"DEBUG","message":"Waiting for pending acks to resolve before shutting down."}
{"@timestamp":"2021-11-25T13:29:34Z","@service":"benthos","component":"benthos.input","level":"DEBUG","message":"Pending acks resolved."}
## CLOSING
## CANCELLED CONTEXT
## SHUTDOWN
(see the code for where these debug statements lie)
3 interesting things:
- Connect never looks like it finishes, but it does not throw any errors
- The shutdown happens immediately and by itself
- The shutdown hangs for some reason (haven't poked at this one yet)
Not sure if you know why this might be the case, I would expect everything to work and I can see the crdb_changefeed
input in the docs, and it is imported and created properly from my config:
input:
crdb_changefeed:
dsn: postgresql://dan:[email protected]:26257/defaultdb?sslmode=require&options=--cluster%3Dportly-impala-2852
tables:
- strm_2
options:
- UPDATED
logger:
prefix: benthos
level: DEBUG
format: json
add_timestamp: true
static_fields:
'@service': benthos
output:
label: ""
stdout:
codec: lines
(That DSN is something I use successfully in my local testing of core changefeeds with pgx)
The full output of the logs:
dangoodman: ~/clusterSpaceCode/benthos git:(crdb-changefeed-input) ✗ docker run --rm -v /Users/dangoodman/clusterSpaceCode/benthos/config.yml:/config.yml -v /tmp/data:/data -p 4195:4195 jeffail/benthos -c /config.yml
## REGISTERED INPUT
## MAKING CRDB
## FINISHING INIT
{"@timestamp":"2021-11-25T13:29:34Z","@service":"benthos","component":"benthos","level":"INFO","message":"Launching a benthos instance, use CTRL+C to close."}
{"@timestamp":"2021-11-25T13:29:34Z","@service":"benthos","component":"benthos","level":"INFO","message":"Listening for HTTP requests at: http://0.0.0.0:4195"}
## CONNECTING
{"@timestamp":"2021-11-25T13:29:34Z","@service":"benthos","component":"benthos.input","level":"DEBUG","message":"Waiting for pending acks to resolve before shutting down."}
{"@timestamp":"2021-11-25T13:29:34Z","@service":"benthos","component":"benthos.input","level":"DEBUG","message":"Pending acks resolved."}
## CLOSING
## CANCELLED CONTEXT
## SHUTDOWN
^C{"@timestamp":"2021-11-25T13:29:39Z","@service":"benthos","component":"benthos","level":"INFO","message":"Received SIGTERM, the service is closing."}
{"@timestamp":"2021-11-25T13:29:54Z","@service":"benthos","component":"benthos","level":"INFO","message":"Unable to fully drain buffered messages within target time."}
{"@timestamp":"2021-11-25T13:29:59Z","@service":"benthos","component":"benthos","level":"ERROR","message":"Failed to stop stream gracefully within target time."}
{"@timestamp":"2021-11-25T13:29:59Z","@service":"benthos","component":"benthos","level":"DEBUG","message":"goroutine profile: total 14\n6 @ 0x4380d6 0x40640c 0x405e78 0xe3a774 0x468c21\n#\t0xe3a773\tgithub.com/klauspost/compress/zstd.(*blockDec).startDecoder+0x93\t/go/src/github.com/Jeffail/benthos/vendor/github.com/klauspost/compress/zstd/blockdec.go:212\n\n1 @ 0x40b8f4 0x4651b8 0x1d9bd39 0x468c21\n#\t0x4651b7\tos/signal.signal_recv+0x97\t/usr/local/go/src/runtime/sigqueue.go:169\n#\t0x1d9bd38\tos/signal.loop+0x18\t\t/usr/local/go/src/os/signal/signal_unix.go:24\n\n1 @ 0x4380d6 0x40640c 0x405e38 0x1170c2d 0x468c21\n#\t0x1170c2c\tgithub.com/ClickHouse/clickhouse-go.init.0.func1+0x2c\t/go/src/github.com/Jeffail/benthos/vendor/github.com/ClickHouse/clickhouse-go/bootstrap.go:48\n\n1 @ 0x4380d6 0x40640c 0x405e38 0x1dbc465 0x468c21\n#\t0x1dbc464\tgithub.com/Jeffail/benthos/v3/lib/service.cmdService.func3.2+0x44\t/go/src/github.com/Jeffail/benthos/lib/service/service.go:437\n\n1 @ 0x4380d6 0x40640c 0x405e78 0x13d74ea 0x468c21\n#\t0x13d74e9\tgithub.com/golang/glog.(*loggingT).flushDaemon+0x69\t/go/src/github.com/Jeffail/benthos/vendor/github.com/golang/glog/glog.go:882\n\n1 @ 0x4380d6 0x447e72 0x154a359 0x468c21\n#\t0x154a358\tgo.opencensus.io/stats/view.(*worker).start+0xb8\t/go/src/github.com/Jeffail/benthos/vendor/go.opencensus.io/stats/view/worker.go:276\n\n1 @ 0x4380d6 0x447e72 0x1dcbcef 0x199a11f 0x435035 0x44ba47 0x44ba17 0x1ec2be9 0x1ec7f51 0x1dcbe6c 0x1dcb82b 0x1999d96 0x1998fe2 0x468c21\n#\t0x1dcbcee\tgithub.com/Jeffail/benthos/v3/public/service.(*airGapReader).WaitForClose+0x6e\t\t/go/src/github.com/Jeffail/benthos/public/service/input.go:147\n#\t0x199a11e\tgithub.com/Jeffail/benthos/v3/lib/input.(*AsyncReader).loop.func1+0x5e\t\t\t/go/src/github.com/Jeffail/benthos/lib/input/async_reader.go:84\n#\t0x435034\truntime.gopanic+0x214\t\t\t\t\t\t\t\t\t/usr/local/go/src/runtime/panic.go:1038\n#\t0x44ba46\truntime.panicmem+0x326\t\t\t\t\t\t\t\t\t/usr/local/go/src/runtime/panic.go:221\n#\t0x44ba16\truntime.sigpanic+0x2f6\t\t\t\t\t\t\t\t\t/usr/local/go/src/runtime/signal_unix.go:735\n#\t0x1ec2be8\tgithub.com/jackc/pgx/v4/pgxpool.ConnectConfig+0x28\t\t\t\t\t/go/src/github.com/Jeffail/benthos/vendor/github.com/jackc/pgx/v4/pgxpool/pool.go:162\n#\t0x1ec7f50\tgithub.com/Jeffail/benthos/v3/internal/impl/crdb.(*crdbChangefeedInput).Connect+0x90\t/go/src/github.com/Jeffail/benthos/internal/impl/crdb/input_changefeed.go:138\n#\t0x1dcbe6b\tgithub.com/Jeffail/benthos/v3/public/service.(*autoRetryInput).Connect+0x2b\t\t/go/src/github.com/Jeffail/benthos/public/service/input_auto_retry.go:52\n#\t0x1dcb82a\tgithub.com/Jeffail/benthos/v3/public/service.(*airGapReader).ConnectWithContext+0x2a\t/go/src/github.com/Jeffail/benthos/public/service/input.go:114\n#\t0x1999d95\tgithub.com/Jeffail/benthos/v3/lib/input.(*AsyncReader).loop.func3+0xb5\t\t\t/go/src/github.com/Jeffail/benthos/lib/input/async_reader.go:105\n#\t0x1998fe1\tgithub.com/Jeffail/benthos/v3/lib/input.(*AsyncReader).loop+0x301\t\t\t/go/src/github.com/Jeffail/benthos/lib/input/async_reader.go:122\n\n1 @ 0x4380d6 0x447e72 0x1ec8925 0x1dcc6ec 0x1dcbc58 0x468c21\n#\t0x1ec8924\tgithub.com/Jeffail/benthos/v3/internal/impl/crdb.(*crdbChangefeedInput).Close+0x1a4\t/go/src/github.com/Jeffail/benthos/internal/impl/crdb/input_changefeed.go:201\n#\t0x1dcc6eb\tgithub.com/Jeffail/benthos/v3/public/service.(*autoRetryInput).Close+0x2b\t\t/go/src/github.com/Jeffail/benthos/public/service/input_auto_retry.go:111\n#\t0x1dcbc57\tgithub.com/Jeffail/benthos/v3/public/service.(*airGapReader).CloseAsync.func1+0x37\t/go/src/github.com/Jeffail/benthos/public/service/input.go:140\n\n1 @ 0x462e25 0xad1d75 0xad1b8d 0xaced0b 0x1d4f25b 0x1db9df3 0x1dbc2e3 0x1dbbcb3 0x1dbe93c 0x1d6cc68 0x1db884e 0x1db8827 0x2146697 0x437d07 0x468c21\n#\t0x462e24\truntime/pprof.runtime_goroutineProfileWithLabels+0x24\t\t\t/usr/local/go/src/runtime/mprof.go:746\n#\t0xad1d74\truntime/pprof.writeRuntimeProfile+0xb4\t\t\t\t\t/usr/local/go/src/runtime/pprof/pprof.go:724\n#\t0xad1b8c\truntime/pprof.writeGoroutine+0x4c\t\t\t\t\t/usr/local/go/src/runtime/pprof/pprof.go:684\n#\t0xaced0a\truntime/pprof.(*Profile).WriteTo+0x14a\t\t\t\t\t/usr/local/go/src/runtime/pprof/pprof.go:331\n#\t0x1d4f25a\tgithub.com/Jeffail/benthos/v3/lib/stream.(*Type).Stop+0x23a\t\t/go/src/github.com/Jeffail/benthos/lib/stream/type.go:369\n#\t0x1db9df2\tgithub.com/Jeffail/benthos/v3/lib/service.(*swappableStopper).Stop+0xd2\t/go/src/github.com/Jeffail/benthos/lib/service/service.go:207\n#\t0x1dbc2e2\tgithub.com/Jeffail/benthos/v3/lib/service.cmdService.func3+0x262\t/go/src/github.com/Jeffail/benthos/lib/service/service.go:452\n#\t0x1dbbcb2\tgithub.com/Jeffail/benthos/v3/lib/service.cmdService+0x1012\t\t/go/src/github.com/Jeffail/benthos/lib/service/service.go:480\n#\t0x1dbe93b\tgithub.com/Jeffail/benthos/v3/lib/service.Run.func2+0x2db\t\t/go/src/github.com/Jeffail/benthos/lib/service/run.go:218\n#\t0x1d6cc67\tgithub.com/urfave/cli/v2.(*App).RunContext+0x7a7\t\t\t/go/src/github.com/Jeffail/benthos/vendor/github.com/urfave/cli/v2/app.go:322\n#\t0x1db884d\tgithub.com/urfave/cli/v2.(*App).Run+0xd6d\t\t\t\t/go/src/github.com/Jeffail/benthos/vendor/github.com/urfave/cli/v2/app.go:224\n#\t0x1db8826\tgithub.com/Jeffail/benthos/v3/lib/service.Run+0xd46\t\t\t/go/src/github.com/Jeffail/benthos/lib/service/run.go:359\n#\t0x2146696\tmain.main+0x16\t\t\t\t\t\t\t\t/go/src/github.com/Jeffail/benthos/cmd/benthos/main.go:12\n#\t0x437d06\truntime.main+0x226\t\t\t\t\t\t\t/usr/local/go/src/runtime/proc.go:255\n"}
Sneaking suspicion it has something to do with those ACKs, but I have no idea why because there are messages in the changefeed it should read immediately.
The stack trace looks as though Connect
is being blocked on pgxpool.ConnectConfig
. Not sure why the service would shut down immediately, maybe try without your plugin and something like http_server
to make sure it's not unrelated., and maybe also try removing service.AutoRetryNacks
to see if there's some odd interaction going on.
Didn't think about the nacks, will try that.
@Jeffail http seems to be fine as an input, and removing the nack did not seem to help
My guess would be this for the immediate shutdown, however I never get that READING log:
fmt.Println("## READING")
if c.pgPool == nil && c.rows == nil {
return nil, nil, service.ErrNotConnected
}
if c.rows == nil {
return nil, nil, service.ErrEndOfInput
}
Hmm, when I change the line to fmt.Println("## CONNECTING to", c.pgConfig.ConnString())
I don't get the log... that tells me a lot..
Lmao I figured it out, I had everything under a if err != nil {}
:)
There it is:
## READING
{"primary_key":"[\"414b8f21-9b1c-4a97-b2c8-eb27295431d9\", 1]","row":"{\"after\": {\"k\": \"414b8f21-9b1c-4a97-b2c8-eb27295431d9\", \"v\": 1}, \"updated\": \"1637874650209908152.0000000000\"}","table":"strm_2"}
I'll get this cleaned up and try to write some tests either today or tomorrow :)
Still having issues with closing, looking for similar bugs but it does not seem like the Close()
function is being called on ^C
:
## SENDING NEW MESSAGE
## READING
{"primary_key":"[\"0ab7abb5-94ca-4cbe-9a59-0065f39ecc23\", 2]","row":"{\"after\": {\"k\": \"0ab7abb5-94ca-4cbe-9a59-0065f39ecc23\", \"v\": 2}, \"updated\": \"1637874693016437872.0000000000\"}","table":"strm_2"}
^C{"@timestamp":"2021-11-25T21:12:02Z","@service":"benthos","component":"benthos","level":"INFO","message":"Received SIGTERM, the service is closing."}
{"@timestamp":"2021-11-25T21:12:17Z","@service":"benthos","component":"benthos","level":"INFO","message":"Unable to fully drain buffered messages within target time."}
{"@timestamp":"2021-11-25T21:12:22Z","@service":"benthos","component":"benthos","level":"ERROR","message":"Failed to stop stream gracefully within target time."}
I think the issue here could be that the if c.rows.Next() {
line will block, since it will keep waiting for a row. That will prevent closure is my guess since it is still waiting for Read()
to return
@Jeffail Is there any way that I can tell benthos it is allowed to abort an in-progress read without waiting for the timeout?
I wonder if what you're looking for is shutdown_timeout
. See the "Full" section here. The default value is 20s
.
@mihaitodor Sorry if I have not been clear enough. The issue is that the Close()
function never gets called. Shutdown happens fine after the timeout, but I'd rather get closure working properly instead of reducing the timeout.
My guess as to why it is never called is because the Read()
function (in particular the c.rows.Next()
function inside) blocks forever until there is a message. I haven't found in the code but my guess is that this blocks the Close()
function from being called since there is still a read in progress - benthos will let the read gracefully finish.
I looked at some of the Pulsar code and saw some ReadWithContext()
and CloseAsync()
functions and wonder if I might need to implement that way. Immediatelly changing the Close()
to CloseAsync()
did not seem to invoke it either.