airbyte
airbyte copied to clipboard
JDBC sources: verify handling of arrays data types
Current Behavior
Adding fields to a table that are not of the expected data type in Postgres will result in unexpected failures as shown here
Expected Behavior
Data type handling should be able to handle all data types that Postgres supports
Logs
"message": "Internal Server Error: malformed JsonSchema array type, must have items field in {\"type\":\"array\"}",
"exceptionClassName": "java.lang.IllegalArgumentException",
"exceptionStack": [
"java.lang.IllegalArgumentException: malformed JsonSchema array type, must have items field in {\"type\":\"array\"}",
"\tat io.airbyte.commons.json.JsonSchemas.traverseJsonSchemaInternal(JsonSchemas.java:191)",
"\tat io.airbyte.commons.json.JsonSchemas.traverseJsonSchemaInternal(JsonSchemas.java:201)",
"\tat io.airbyte.commons.json.JsonSchemas.traverseJsonSchema(JsonSchemas.java:102)",
"\tat io.airbyte.commons.json.JsonSchemas.traverseJsonSchemaWithFilteredCollector(JsonSchemas.java:137)",
"\tat io.airbyte.commons.json.JsonSchemas.traverseJsonSchemaWithCollector(JsonSchemas.java:120)",
"\tat io.airbyte.protocol.models.CatalogHelpers.getFullyQualifiedFieldNamesWithTypes(CatalogHelpers.java:224)",
"\tat io.airbyte.protocol.models.CatalogHelpers.getStreamDiff(CatalogHelpers.java:319)",
"\tat io.airbyte.protocol.models.CatalogHelpers.lambda$getCatalogDiff$7(CatalogHelpers.java:309)",
"\tat java.base/java.lang.Iterable.forEach(Iterable.java:75)",
"\tat io.airbyte.protocol.models.CatalogHelpers.getCatalogDiff(CatalogHelpers.java:305)",
"\tat io.airbyte.server.handlers.ConnectionsHandler.getDiff(ConnectionsHandler.java:268)",
"\tat io.airbyte.server.handlers.WebBackendConnectionsHandler.webBackendGetConnection(WebBackendConnectionsHandler.java:261)",
"\tat io.airbyte.server.wrapped.ConfigurationApiWrapped.lambda$webBackendGetConnection$113(ConfigurationApiWrapped.java:1126)",
"\tat io.airbyte.cloud.auth.AuthHelper.execute(AuthHelper.java:204)",
"\tat io.airbyte.cloud.auth.AuthHelper.authForWorkspace(AuthHelper.java:173)",
"\tat io.airbyte.server.wrapped.ConfigurationApiWrapped.webBackendGetConnection(ConfigurationApiWrapped.java:1123)",
"\tat jdk.internal.reflect.GeneratedMethodAccessor658.invoke(Unknown Source)",
"\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)",
"\tat java.base/java.lang.reflect.Method.invoke(Method.java:568)",
"\tat org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)",
"\tat org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124)",
"\tat org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167)",
"\tat org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$TypeOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:219)",
"\tat org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79)",
"\tat org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:469)",
"\tat org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:391)",
"\tat org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:80)",
"\tat org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:253)",
"\tat org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)",
"\tat org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)",
"\tat org.glassfish.jersey.internal.Errors.process(Errors.java:292)",
"\tat org.glassfish.jersey.internal.Errors.process(Errors.java:274)",
"\tat org.glassfish.jersey.internal.Errors.process(Errors.java:244)",
"\tat org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)",
"\tat org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)",
"\tat org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)",
"\tat org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)",
"\tat org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)",
"\tat org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:366)",
"\tat org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:319)",
"\tat org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)",
"\tat org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:763)",
"\tat org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:569)",
"\tat org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)",
"\tat org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1377)",
"\tat org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)",
"\tat org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:507)",
"\tat org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)",
"\tat org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1292)",
"\tat org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)",
"\tat org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)",
"\tat org.eclipse.jetty.server.Server.handle(Server.java:501)",
"\tat org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)",
"\tat org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556)",
"\tat org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)",
"\tat org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:273)",
"\tat org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)",
"\tat org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)",
"\tat org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)",
"\tat org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)",
"\tat org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)",
"\tat org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)",
"\tat org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129)",
"\tat org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:375)",
"\tat org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)",
"\tat org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)",
"\tat java.base/java.lang.Thread.run(Thread.java:833)"
],
"rootCauseExceptionStack": []
}
Steps to Reproduce
Are you willing to submit a PR?
Addressable points are that there needs to be an enforcement of the inclusion of "items"
for any "array"
type stream as referenced here
Initial PR that attempted to address this issue and add an included items
but only for string data types within the JsonSchema.ARRAY
The first step here is to create an E2E test scenario with Postgres Source and any GA destination (e.g., pick one GA destination) with a source data set that has various arrays. The goal of the test is to confirm if (and which) source arrays will break syncs with Postgres Source.
Implementation details:
- Create custom PostgresType on base of JdbcType
- Extend PostgresType with all supported array datatypes
- Modify PostgresSourceOperations to extend AbstractJdbcCompatibleSourceOperations<PostgresType>
- Modify JsonSchemaType to support array with items
- Map newly created PostgresType on JsonSchemaType () example
PostgresType.BOOL_ARRAY -> JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY)
.withItems(JsonSchemaPrimitive.BOOLEAN.name().toLowerCase())
.withAirbyteType("bool_array")
.build();
- Map json value for all supported array datatypes for full refresh and incremental sync modes
- Write datatype tests
We should definitely not crash, but I'm not clear on the goal here. is it to support arrays (with any subtype) or render them as a string? @misteryeo @grishick @subodh1810
@edgao and @VitaliiMaltsev met earlier this week about Vitalii's proposal. The goal is to support arrays as arrays and not render them as strings.