airbyte icon indicating copy to clipboard operation
airbyte copied to clipboard

JDBC sources: verify handling of arrays data types

Open sashaNeshcheret opened this issue 2 years ago • 1 comments

Current Behavior

Adding fields to a table that are not of the expected data type in Postgres will result in unexpected failures as shown here

Expected Behavior

Data type handling should be able to handle all data types that Postgres supports

Logs

    "message": "Internal Server Error: malformed JsonSchema array type, must have items field in {\"type\":\"array\"}",
    "exceptionClassName": "java.lang.IllegalArgumentException",
    "exceptionStack": [
        "java.lang.IllegalArgumentException: malformed JsonSchema array type, must have items field in {\"type\":\"array\"}",
        "\tat io.airbyte.commons.json.JsonSchemas.traverseJsonSchemaInternal(JsonSchemas.java:191)",
        "\tat io.airbyte.commons.json.JsonSchemas.traverseJsonSchemaInternal(JsonSchemas.java:201)",
        "\tat io.airbyte.commons.json.JsonSchemas.traverseJsonSchema(JsonSchemas.java:102)",
        "\tat io.airbyte.commons.json.JsonSchemas.traverseJsonSchemaWithFilteredCollector(JsonSchemas.java:137)",
        "\tat io.airbyte.commons.json.JsonSchemas.traverseJsonSchemaWithCollector(JsonSchemas.java:120)",
        "\tat io.airbyte.protocol.models.CatalogHelpers.getFullyQualifiedFieldNamesWithTypes(CatalogHelpers.java:224)",
        "\tat io.airbyte.protocol.models.CatalogHelpers.getStreamDiff(CatalogHelpers.java:319)",
        "\tat io.airbyte.protocol.models.CatalogHelpers.lambda$getCatalogDiff$7(CatalogHelpers.java:309)",
        "\tat java.base/java.lang.Iterable.forEach(Iterable.java:75)",
        "\tat io.airbyte.protocol.models.CatalogHelpers.getCatalogDiff(CatalogHelpers.java:305)",
        "\tat io.airbyte.server.handlers.ConnectionsHandler.getDiff(ConnectionsHandler.java:268)",
        "\tat io.airbyte.server.handlers.WebBackendConnectionsHandler.webBackendGetConnection(WebBackendConnectionsHandler.java:261)",
        "\tat io.airbyte.server.wrapped.ConfigurationApiWrapped.lambda$webBackendGetConnection$113(ConfigurationApiWrapped.java:1126)",
        "\tat io.airbyte.cloud.auth.AuthHelper.execute(AuthHelper.java:204)",
        "\tat io.airbyte.cloud.auth.AuthHelper.authForWorkspace(AuthHelper.java:173)",
        "\tat io.airbyte.server.wrapped.ConfigurationApiWrapped.webBackendGetConnection(ConfigurationApiWrapped.java:1123)",
        "\tat jdk.internal.reflect.GeneratedMethodAccessor658.invoke(Unknown Source)",
        "\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)",
        "\tat java.base/java.lang.reflect.Method.invoke(Method.java:568)",
        "\tat org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)",
        "\tat org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124)",
        "\tat org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167)",
        "\tat org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$TypeOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:219)",
        "\tat org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79)",
        "\tat org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:469)",
        "\tat org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:391)",
        "\tat org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:80)",
        "\tat org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:253)",
        "\tat org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)",
        "\tat org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)",
        "\tat org.glassfish.jersey.internal.Errors.process(Errors.java:292)",
        "\tat org.glassfish.jersey.internal.Errors.process(Errors.java:274)",
        "\tat org.glassfish.jersey.internal.Errors.process(Errors.java:244)",
        "\tat org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)",
        "\tat org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)",
        "\tat org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)",
        "\tat org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)",
        "\tat org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)",
        "\tat org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366)",
        "\tat org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319)",
        "\tat org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)",
        "\tat org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:763)",
        "\tat org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:569)",
        "\tat org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)",
        "\tat org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1377)",
        "\tat org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)",
        "\tat org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:507)",
        "\tat org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)",
        "\tat org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1292)",
        "\tat org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)",
        "\tat org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)",
        "\tat org.eclipse.jetty.server.Server.handle(Server.java:501)",
        "\tat org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)",
        "\tat org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556)",
        "\tat org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)",
        "\tat org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:273)",
        "\tat org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)",
        "\tat org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)",
        "\tat org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)",
        "\tat org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)",
        "\tat org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)",
        "\tat org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)",
        "\tat org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129)",
        "\tat org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:375)",
        "\tat org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)",
        "\tat org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)",
        "\tat java.base/java.lang.Thread.run(Thread.java:833)"
    ],
    "rootCauseExceptionStack": []
}

Steps to Reproduce

Are you willing to submit a PR?

Addressable points are that there needs to be an enforcement of the inclusion of "items" for any "array" type stream as referenced here

Initial PR that attempted to address this issue and add an included items but only for string data types within the JsonSchema.ARRAY

sashaNeshcheret avatar Aug 01 '22 11:08 sashaNeshcheret

The first step here is to create an E2E test scenario with Postgres Source and any GA destination (e.g., pick one GA destination) with a source data set that has various arrays. The goal of the test is to confirm if (and which) source arrays will break syncs with Postgres Source.

grishick avatar Sep 07 '22 17:09 grishick

Implementation details:

  1. Create custom PostgresType on base of JdbcType
  2. Extend PostgresType with all supported array datatypes
  3. Modify PostgresSourceOperations to extend AbstractJdbcCompatibleSourceOperations<PostgresType>
  4. Modify JsonSchemaType to support array with items
  5. Map newly created PostgresType on JsonSchemaType () example
 PostgresType.BOOL_ARRAY -> JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY)
              .withItems(JsonSchemaPrimitive.BOOLEAN.name().toLowerCase())
              .withAirbyteType("bool_array")
              .build();
  1. Map json value for all supported array datatypes for full refresh and incremental sync modes
  2. Write datatype tests

VitaliiMaltsev avatar Sep 23 '22 08:09 VitaliiMaltsev

We should definitely not crash, but I'm not clear on the goal here. is it to support arrays (with any subtype) or render them as a string? @misteryeo @grishick @subodh1810

bleonard avatar Sep 30 '22 21:09 bleonard

@edgao and @VitaliiMaltsev met earlier this week about Vitalii's proposal. The goal is to support arrays as arrays and not render them as strings.

grishick avatar Sep 30 '22 23:09 grishick