seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Bug] Can not find catalog table with factoryId [Postgres]

Open Asura7969 opened this issue 1 year ago • 11 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

Can not find catalog table with factoryId [Postgres]

SeaTunnel Version

2.3.4

SeaTunnel Config

nothing

Running Command

{
  "env": {
    "job.mode": "STREAMING",
    "parallelism": 1,
    "checkpoint.interval": 5000
  },
  "source": [
    {
      "base-url": "jdbc:postgresql://xxxx:9999/n2db?loggerLevel=OFF",
      "password": "xxxx",
      "hostname": "xxxx",
      "exactly_once": true,
      "startup.mode": "INITIAL",
      "port": 9999,
      "debezium": {
        "publication.name": "dbz_publication"
      },
      "slot.name": "hipot_results_slot",
      "database-name": [
        "n2db"
      ],
      "table-names": [
        "n2db.n2admin.hipot_results"
      ],
      "plugin_name": "Postgres-CDC",
      "username": "replica"
    }
  ],
  "sink": [
    {
      "base-url": "jdbc:mysql://xxxx:9030/test?useSSL=true",
      "enable_upsert_delete": "true",
      "password": "123456",
      "database": "test",
      "save_mode_create_template": "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\n${rowtype_primary_key},\n${rowtype_fields}\n) ENGINE=OLAP\n PRIMARY KEY (${rowtype_primary_key})\nDISTRIBUTED BY HASH (${rowtype_primary_key})PROPERTIES (\n    \"replication_num\" = \"2\" \n)",
      "max_retries": 3,
      "starrocks.config": {
        "format": "json"
      },
      "labelPrefix": "test_pg_cdc",
      "nodeUrls": [
        "xxxx:8030"
      ],
      "plugin_name": "StarRocks",
      "table": "test_pg_cdc",
      "username": "seatunnel"
    }
  ]
}

Error Exception

2024-03-20 17:00:01,698 INFO  [o.a.s.a.t.c.CatalogTableUtil  ] [hz.main.cached.thread-13] - Get catalog tables, cost time: 89
2024-03-20 17:00:01,698 INFO  [.s.c.s.j.c.AbstractJdbcCatalog] [hz.main.cached.thread-13] - Catalog Postgres closing
2024-03-20 17:00:01,698 WARN  [Log4j2HttpPostCommandProcessor] [hz.main.cached.thread-13] - [10.9.30.107]:6801 [seatunnel] [5.1] An error occurred while handling request HttpCommand [HTTP_POST]{uri='/hazelcast/rest/maps/submit-job'}AbstractTextCommand[HTTP_POST]{requestId=2}
org.apache.seatunnel.api.table.factory.FactoryException: ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a source for identifier 'Postgres-CDC'.
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:100) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSource(MultipleTableJobConfigParser.java:320) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:181) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.rest.RestJobExecutionEnvironment.getLogicalDag(RestJobExecutionEnvironment.java:76) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.rest.RestJobExecutionEnvironment.build(RestJobExecutionEnvironment.java:99) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.rest.RestHttpPostCommandProcessor.handleSubmitJob(RestHttpPostCommandProcessor.java:136) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.rest.RestHttpPostCommandProcessor.handle(RestHttpPostCommandProcessor.java:82) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.rest.RestHttpPostCommandProcessor.handle(RestHttpPostCommandProcessor.java:60) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.internal.ascii.TextCommandServiceImpl$CommandExecutor.run(TextCommandServiceImpl.java:402) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.internal.util.executor.CachedExecutorServiceDelegate$Worker.run(CachedExecutorServiceDelegate.java:217) ~[seatunnel-starter.jar:2.3.4]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_381]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_381]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
        at com.hazelcast.internal.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:76) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) ~[seatunnel-starter.jar:2.3.4]
Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: Can not find catalog table with factoryId [Postgres]
        at org.apache.seatunnel.api.table.catalog.CatalogTableUtil.lambda$getCatalogTables$0(CatalogTableUtil.java:129) ~[seatunnel-starter.jar:2.3.4]
        at java.util.Optional.map(Optional.java:215) ~[?:1.8.0_381]
        at org.apache.seatunnel.api.table.catalog.CatalogTableUtil.getCatalogTables(CatalogTableUtil.java:116) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.api.table.catalog.CatalogTableUtil.getCatalogTables(CatalogTableUtil.java:96) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.PostgresIncrementalSourceFactory.lambda$createSource$1(PostgresIncrementalSourceFactory.java:91) ~[?:?]
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:112) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:73) ~[seatunnel-starter.jar:2.3.4]
        ... 14 more

Zeta or Flink or Spark Version

Zeta

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

Asura7969 avatar Mar 20 '24 09:03 Asura7969

@Carl-Zhou-CN cc

Asura7969 avatar Mar 20 '24 09:03 Asura7969

hi, @Asura7969 Most likely, the catalog fails to query the table you specified

Carl-Zhou-CN avatar Mar 20 '24 11:03 Carl-Zhou-CN

hi, @Asura7969 Most likely, the catalog fails to query the table you specified

Are you referring to the issue of account permissions for connecting to pg?

Asura7969 avatar Mar 20 '24 12:03 Asura7969

Are you referring to the issue of account permissions for connecting to pg?

It could be, or it could happen because of a connection timeout, or because of a table name definition error

Carl-Zhou-CN avatar Mar 21 '24 01:03 Carl-Zhou-CN

Are you referring to the issue of account permissions for connecting to pg?

It could be, or it could happen because of a connection timeout, or because of a table name definition error

The task runs normally, but no data is written to starrocks,no exception log

Asura7969 avatar Mar 21 '24 15:03 Asura7969

I have the same issue with seatunnel V2.3.4.

Postgresql: 12.18 PostgreSQL JDBC Driver: 42.7.3

ab92015359 avatar Apr 08 '24 14:04 ab92015359

Seatunnel Log

2024-04-08 14:21:24,171 INFO  [o.a.s.c.s.u.ConfigBuilder     ] [main] - Parsed config file:
{
    "env" : {
        "execution.parallelism" : 1,
        "job.mode" : "STREAMING",
        "checkpoint.interval" : 5000,
        "read_limit.bytes_per_second" : 7000000,
        "read_limit.rows_per_second" : 400
    },
    "source" : [
        {
            "base-url" : "jdbc:postgresql://X.X.X.X:5434/postgres",
            "password" : "XXXX",
            "table-names" : [
                "postgres.public.table_name"
            ],
            "result_table_name" : "customers_Postgre_cdc",
            "database-names" : [
                "postgres"
            ],
            "schema-names" : [
                "public"
            ],
            "plugin_name" : "Postgres-CDC",
            "username" : "XXXX"
        }
    ],
    "transform" : [],
    "sink" : [
        {
            "plugin_name" : "Console"
        }
    ]
}

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:199)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.api.table.factory.FactoryException: ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a source for identifier 'Postgres-CDC'.
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:100)
        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSource(MultipleTableJobConfigParser.java:320)
        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:181)
        at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:88)
        at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:161)
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:146)
        ... 2 more
Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: Can not find catalog table with factoryId [Postgres]
        at org.apache.seatunnel.api.table.catalog.CatalogTableUtil.lambda$getCatalogTables$0(CatalogTableUtil.java:129)
        at java.util.Optional.map(Optional.java:215)
        at org.apache.seatunnel.api.table.catalog.CatalogTableUtil.getCatalogTables(CatalogTableUtil.java:116)
        at org.apache.seatunnel.api.table.catalog.CatalogTableUtil.getCatalogTables(CatalogTableUtil.java:96)
        at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.PostgresIncrementalSourceFactory.lambda$createSource$1(PostgresIncrementalSourceFactory.java:91)
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:112)
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:73)
        ... 7 more

PG Log

2024-04-09 15:25:18.370 UTC [128133] postgres@postgres LOG:  execute <unnamed>: SET extra_float_digits = 3
2024-04-09 15:25:18.371 UTC [128133] postgres@postgres LOG:  execute <unnamed>: SET application_name = 'PostgreSQL JDBC Driver'
2024-04-09 15:25:18.383 UTC [128133] postgres@postgres LOG:  execute <unnamed>: select datname from pg_database

ab92015359 avatar Apr 08 '24 14:04 ab92015359

it is ok now, thanks.

ab92015359 avatar Apr 10 '24 16:04 ab92015359

Are you referring to the issue of account permissions for connecting to pg?

It could be, or it could happen because of a connection timeout, or because of a table name definition error

The task runs normally, but no data is written to starrocks,no exception log

No specific information, can not determine why

Carl-Zhou-CN avatar Apr 11 '24 02:04 Carl-Zhou-CN

Are you referring to the issue of account permissions for connecting to pg?

It could be, or it could happen because of a connection timeout, or because of a table name definition error

The task runs normally, but no data is written to starrocks,no exception log

No specific information, can not determine why

Already solved,thanks

Asura7969 avatar Apr 11 '24 06:04 Asura7969

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] avatar May 12 '24 00:05 github-actions[bot]

@Asura7969 hi,how did you resolve it? I also had the same error

chess3cake avatar Jun 03 '24 10:06 chess3cake

@Asura7969 hi,how did you resolve it? I also had the same error

@Carl-Zhou-CN my config like this `env {

You can set engine configuration here

execution.parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 5000 read_limit.bytes_per_second=7000000 read_limit.rows_per_second=400 }

source { Postgres-CDC { startup.mode = "INITIAL" format= "COMPATIBLE_DEBEZIUM_JSON" username = "user" password = "wpwd" table-names = ["postgres.schema.table"] base-url = "jdbc:postgresql://url/postgres" } }

transform {

}

sink { Doris { fenodes = "url" username = root password = "pw" database = "${database_name}" table = "${table_name}" sink.label-prefix = "some_sink" sink.enable-2pc = "true" sink.enable-delete = "true" needs_unsupported_type_casting=true schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode = "APPEND_DATA" doris.config { format = "json" read_json_by_line = "true" } } }`

chess3cake avatar Jun 03 '24 10:06 chess3cake