Bug: unknown method LifecycleOnCreated
Bug description
I'm trialling Conduit for our use case of CDCing data from postgres -> elasticsearch.
Using:
I'm noticing this error on start:
ERR node stopped error="node postgres-to-elastic:elastic stopped with error: could not open destination connector: error while triggering lifecycle event \"created\": unknown method LifecycleOnCreated for service connector.v1.DestinationPlugin: method not implemented" component=lifecycle.Service node_id=postgres-to-elastic:elastic stack=[{"file":"/app/pkg/lifecycle/service.go","func":"github.com/conduitio/conduit/pkg/lifecycle.(*Service).runPipeline.func2","line":743},{"file":"/app/pkg/lifecycle/stream/destination.go","func":"github.com/conduitio/conduit/pkg/lifecycle/stream.(*DestinationNode).Run","line":81},{"file":"/app/pkg/connector/destination.go","func":"github.com/conduitio/conduit/pkg/connector.(*Destination).triggerLifecycleEvent","line":330}]
Looking at the code, it does look like the elasticsearch connector is old, and doesn't support the lifecycle methods, but I thought that Conduit was meant to allow backwards compatibility in those cases as per https://github.com/ConduitIO/conduit/blob/0345e4ba7a191f4e2be66408b93fc740cb535fa9/pkg/connector/destination.go#L317-L318
Steps to reproduce
Environment
Configuration
docker-compose.yaml
version: '3.8'
services:
conduit:
image: conduit.docker.scarf.sh/conduitio/conduit:latest
volumes:
- ./connectors:/app/connectors:rw
- ./pipelines:/app/pipelines:rw
network_mode: host
postgres:
image: postgres:16
environment:
- POSTGRES_DB=mydata
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=password
volumes:
- postgres_data:/var/lib/postgresql/data
- ./postgresql.conf:/etc/postgresql/postgresql.conf
command: ["postgres", "-c", "config_file=/etc/postgresql/postgresql.conf"]
network_mode: host
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.16.1
environment:
- discovery.type=single-node
volumes:
- elasticsearch_data:/usr/share/elasticsearch/data
network_mode: host
volumes:
postgres_data:
elasticsearch_data:
./pipelines/my-pipeline.yaml
version: "2.2"
pipelines:
- id: postgres-to-elastic
status: running
name: "postgres-to-elastic"
connectors:
- id: postgres
type: source
plugin: "postgres"
settings:
url: 'postgresql://postgres:password@localhost:5432/mydata'
tables: 'users'
cdcMode: 'logrepl'
logrepl.publicationName: 'demo'
logrepl.slotName: 'demo'
- id: elastic
type: destination
plugin: "standalone:elasticsearch"
settings:
version: 8
host: 'http://localhost:9200'
index: 'mydata'
bulkSize: 1000
retries: 100
Version
v0.12.3 x86_64
@mm-zacharydavison Thanks for reporting the issue! You're right in that the lifecycle methods shouldn't be an issue. I'll look into this. Btw, we've updated the SDK in the Elasticsearch connector. It's currently in the main branch. I'll check with the team if we're good to publish a release.
I've managed to get this working by building the latest connector from master.
However, now I have another issue:
2024-12-05T15:47:44+00:00 ERR error writing record error="failed to prepare data with key=\x02: json: error calling MarshalJSON for type json.RawMessage: invalid character 'j' looking for beginning of value" component=plugin.standalone connector_id=postgres-to-elastic:elastic plugin_name=conduit-connector-elasticsearch record_position="{\"type\":1,\"snapshots\":{\"users\":{\"last_read\":1,\"snapshot_end\":3}}}"
2024-12-05T15:47:44+00:00 ERR error writing record error="failed to prepare data with key=\x04: json: error calling MarshalJSON for type json.RawMessage: invalid character 'j' looking for beginning of value" component=plugin.standalone connector_id=postgres-to-elastic:elastic plugin_name=conduit-connector-elasticsearch record_position="{\"type\":1,\"snapshots\":{\"users\":{\"last_read\":2,\"snapshot_end\":3}}}"
2024-12-05T15:47:44+00:00 ERR error writing record error="failed to prepare data with key=\x06: json: error calling MarshalJSON for type json.RawMessage: invalid character '\\x1e' looking for beginning of value" component=plugin.standalone connector_id=postgres-to-elastic:elastic plugin_name=conduit-connector-elasticsearch record_position="{\"type\":1,\"snapshots\":{\"users\":{\"last_read\":3,\"snapshot_end\":3}}}"
2024-12-05T15:47:44+00:00 ERR destination acker node stopped before processing all messages error="nacked 2 messages when stopping destination acker node, 2 nacks failed: another message failed to be acked/nacked\nanother message failed to be acked/nacked" component=DestinationAckerNode node_id=postgres-to-elastic:elastic-acker pipeline_id=postgres-to-elastic stack=[{"file":"/app/pkg/lifecycle/stream/destination_acker.go","func":"github.com/conduitio/conduit/pkg/lifecycle/stream.(*DestinationAckerNode).teardown","line":216}]
It looks like there's an issue with the connector in latest master still.
For debugging purposes, here's the output from a log connector, from postgres, which looks fine to me.
2024-12-05T15:47:44+00:00 INF [DEBUG] component=plugin connector_id=postgres-to-elastic:debug-log plugin_name=log plugin_type=destination record={"key":{"id":1},"metadata":{"conduit.source.connector.id":"postgres-to-elastic:postgres","opencdc.key.schema.subject":"postgres-to-elastic:postgres:users_key","opencdc.key.schema.version":"1","opencdc.payload.schema.subject":"postgres-to-elastic:postgres:users_payload","opencdc.payload.schema.version":"1","opencdc.readAt":"1733413664292754682","postgres.table":"users"},"operation":"snapshot","payload":{"after":{"email":"[email protected]","id":1,"name":"John Doe"},"before":null},"position":"eyJ0eXBlIjoxLCJzbmFwc2hvdHMiOnsidXNlcnMiOnsibGFzdF9yZWFkIjoxLCJzbmFwc2hvdF9lbmQiOjN9fX0="}
2024-12-05T15:47:44+00:00 INF [DEBUG] component=plugin connector_id=postgres-to-elastic:debug-log plugin_name=log plugin_type=destination record={"key":{"id":2},"metadata":{"conduit.source.connector.id":"postgres-to-elastic:postgres","opencdc.key.schema.subject":"postgres-to-elastic:postgres:users_key","opencdc.key.schema.version":"1","opencdc.payload.schema.subject":"postgres-to-elastic:postgres:users_payload","opencdc.payload.schema.version":"1","opencdc.readAt":"1733413664293619964","postgres.table":"users"},"operation":"snapshot","payload":{"after":{"email":"[email protected]","id":2,"name":"Jane Smith"},"before":null},"position":"eyJ0eXBlIjoxLCJzbmFwc2hvdHMiOnsidXNlcnMiOnsibGFzdF9yZWFkIjoyLCJzbmFwc2hvdF9lbmQiOjN9fX0="}
Is there potentially a compatibility issue with the protobufs used? Do I need to also build latest conduit from master?
Is there potentially a compatibility issue with the protobufs used? Do I need to also build latest conduit from master?
I don't think that should be needed, but I'll check this out, it's an interesting issue. I see a postgresql.conf file being used, is there anything interesting in it?
Is there potentially a compatibility issue with the protobufs used? Do I need to also build latest conduit from master?
I don't think that should be needed, but I'll check this out, it's an interesting issue. I see a
postgresql.conffile being used, is there anything interesting in it?
Just the wal_level = logical needed for replication. No other configuration.
Gotcha... So with that, and also with the fact the log destination is working, we can be 99% sure it's the ES connector, and I don't think using latest Conduit will help. My guess is that the automatic schema decoding in the ES connector is somehow wrong, I'll confirm very soon.
@mm-zacharydavison Please try the latest main branch from the ES connector now, the issue should be fixed. If it works for your case too, I'll cut a release.
@mm-zacharydavison Please try the latest
mainbranch from the ES connector now, the issue should be fixed. If it works for your case too, I'll cut a release.
great! ill try this tomorrow. thanks for the super-fast response, saved me a lot of headache figuring it out myself.
No problems at all, you helped us more with filing the issue and the very detailed instructions how to reproduce it. :muscle:
Btw, the issue was the following:
The Postgres connector is encoding all records using Avro. The ES connector got those records in their encoded form (hence that error message about bytes). That was because the ES connector wasn't using the SDK middleware for automatically decoding the records.
Actually, I'll reopen this issue, so we can look into why doesn't backwards compatibility work.
Works perfectly! Thanks.
You're welcome!