airbyte icon indicating copy to clipboard operation
airbyte copied to clipboard

Use LOG4J2 to wrap connectors logs to JSON format

Open yurii-bidiuk opened this issue 2 years ago • 81 comments

What

Fixes #659

How

Use LOG4J2 and JsonTemplateLayout to wrap connectors logs in JSON format like AirbyteLogMessage

Recommended reading order

  1. x.java
  2. y.python

🚨 User Impact 🚨

Are there any breaking changes? What is the end result perceived by the user? If yes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.

Pre-merge Checklist

Expand the relevant checklist and delete the others.

New Connector

Community member or Airbyter

  • [ ] Community member? Grant edit access to maintainers (instructions)
  • [ ] Secrets in the connector's spec are annotated with airbyte_secret
  • [ ] Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • [ ] Code reviews completed
  • [ ] Documentation updated
    • [ ] Connector's README.md
    • [ ] Connector's bootstrap.md. See description and examples
    • [ ] docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
    • [ ] docs/integrations/README.md
    • [ ] airbyte-integrations/builds.md
  • [ ] PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • [ ] Create a non-forked branch based on this PR and test the below items on it
  • [ ] Build is successful
  • [ ] If new credentials are required for use in CI, add them to GSM. Instructions.
  • [ ] /test connector=connectors/<name> command is passing
  • [ ] New Connector version released on Dockerhub by running the /publish command described here
  • [ ] After the connector is published, connector added to connector index as described here
  • [ ] Seed specs have been re-generated by building the platform and committing the changes to the seed spec files, as described here
Updating a connector

Community member or Airbyter

  • [ ] Grant edit access to maintainers (instructions)
  • [ ] Secrets in the connector's spec are annotated with airbyte_secret
  • [ ] Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • [ ] Code reviews completed
  • [ ] Documentation updated
    • [ ] Connector's README.md
    • [ ] Connector's bootstrap.md. See description and examples
    • [ ] Changelog updated in docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
  • [ ] PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • [ ] Create a non-forked branch based on this PR and test the below items on it
  • [ ] Build is successful
  • [ ] If new credentials are required for use in CI, add them to GSM. Instructions.
  • [ ] /test connector=connectors/<name> command is passing
  • [ ] New Connector version released on Dockerhub and connector version bumped by running the /publish command described here
Connector Generator
  • [ ] Issue acceptance criteria met
  • [ ] PR name follows PR naming conventions
  • [ ] If adding a new generator, add it to the list of scaffold modules being tested
  • [ ] The generator test modules (all connectors with -scaffold in their name) have been updated with the latest scaffold by running ./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates then checking in your changes
  • [ ] Documentation which references the generator is updated as needed

Tests

Unit

Put your unit tests output here.

Integration

Put your integration tests output here.

Acceptance

Put your acceptance tests output here.

yurii-bidiuk avatar Aug 15 '22 18:08 yurii-bidiuk

/test connector=connectors/source-postgres

:clock2: connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/2862778188 :x: connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/2862778188 :bug: https://gradle.com/s/dwtw7nnscy5us

Build Failed

Test summary info:

Could not find result summary

yurii-bidiuk avatar Aug 15 '22 18:08 yurii-bidiuk

With these changes, all connectors logs look like this:

{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable WORKER_ENVIRONMENT: 'DOCKER'"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_MAIN_CONTAINER_CPU_REQUEST: 'null'"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_MAIN_CONTAINER_CPU_LIMIT: 'null'"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_MAIN_CONTAINER_MEMORY_REQUEST: 'null'"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_MAIN_CONTAINER_MEMORY_LIMIT: 'null'"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_KUBE_TOLERATIONS: ''"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_KUBE_NODE_SELECTORS: ''"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_KUBE_ANNOTATIONS: ''"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET: ''"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY: 'IfNotPresent'"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_KUBE_SIDECAR_CONTAINER_IMAGE_PULL_POLICY: 'IfNotPresent'"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_KUBE_SOCAT_IMAGE: 'alpine/socat:1.7.4.3-r0'"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_KUBE_BUSYBOX_IMAGE: 'busybox:1.28'"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_KUBE_CURL_IMAGE: 'curlimages/curl:7.83.1'"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable WORKER_ENVIRONMENT: 'DOCKER'"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable DEPLOYMENT_MODE: 'OSS'"}}
{"type":"LOG","log":{"level":"INFO","message":"Creating container for image: postgres:13-alpine"}}
{"type":"LOG","log":{"level":"INFO","message":"Container postgres:13-alpine is starting: 58a660cf9ec646caed3d5bb360dd6f9c70b1cd99ccf49c81c324d8648298e403"}}
{"type":"LOG","log":{"level":"INFO","message":"Container postgres:13-alpine started in PT1.344788305S"}}
{"type":"LOG","log":{"level":"INFO","message":"HikariPool-5 - Starting..."}}
{"type":"LOG","log":{"level":"INFO","message":"HikariPool-5 - Start completed."}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable USE_STREAM_CAPABLE_STATE: 'false'"}}
{"type":"LOG","log":{"level":"INFO","message":"Checking if airbyte/source-postgres:dev exists..."}}
{"type":"LOG","log":{"level":"INFO","message":"airbyte/source-postgres:dev was found locally."}}
{"type":"LOG","log":{"level":"INFO","message":"Creating docker job ID: 0"}}
{"type":"LOG","log":{"level":"INFO","message":"Preparing command: docker run --rm --init -i -w /data/job --log-driver none --name source-postgres-spec-0-0-torcr --network host -v /tmp/airbyte_tests/test7593383916175201673:/data -v /tmp/airbyte_tests/output846556706032332208:/local -e DEPLOYMENT_MODE=OSS -e USE_STREAM_CAPABLE_STATE=false -e WORKER_ENVIRONMENT=DOCKER -e AIRBYTE_ROLE= -e WORKER_CONNECTOR_IMAGE=airbyte/source-postgres:dev -e WORKER_JOB_ATTEMPT=0 -e AIRBYTE_VERSION= -e WORKER_JOB_ID=0 airbyte/source-postgres:dev spec"}}
{"type":"LOG","log":{"level":"INFO","message":"Running source under deployment mode: OSS"}}
{"type":"LOG","log":{"level":"INFO","message":"Starting source: io.airbyte.integrations.base.ssh.SshWrappedSource"}}
{"type":"LOG","log":{"level":"INFO","message":"integration args: {spec=null}"}}
{"type":"LOG","log":{"level":"INFO","message":"Running integration: io.airbyte.integrations.base.ssh.SshWrappedSource"}}
{"type":"LOG","log":{"level":"INFO","message":"Command: SPEC"}}
{"type":"LOG","log":{"level":"INFO","message":"Integration config: IntegrationConfig{command=SPEC, configPath='null', catalogPath='null', statePath='null'}"}}
{"type":"LOG","log":{"level":"INFO","message":"Completed integration: io.airbyte.integrations.base.ssh.SshWrappedSource"}}
{"type":"LOG","log":{"level":"INFO","message":"Completed source: io.airbyte.integrations.base.ssh.SshWrappedSource"}}


We can configure this structure using AirbyteLogMessageTemplate.json.

yurii-bidiuk avatar Aug 15 '22 18:08 yurii-bidiuk

/test connector=connectors/source-mysql

:clock2: connectors/source-mysql https://github.com/airbytehq/airbyte/actions/runs/2866705735 :white_check_mark: connectors/source-mysql https://github.com/airbytehq/airbyte/actions/runs/2866705735 No Python unittests run

Build Passed

Test summary info:

All Passed

yurii-bidiuk avatar Aug 16 '22 09:08 yurii-bidiuk

/test connector=connectors/source-postgres

:clock2: connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/2867414898 :white_check_mark: connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/2867414898 No Python unittests run

Build Passed

Test summary info:

All Passed

yurii-bidiuk avatar Aug 16 '22 11:08 yurii-bidiuk

/test connector=connectors/source-postgres

:clock2: connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/2868700148 :white_check_mark: connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/2868700148 No Python unittests run

Build Passed

Test summary info:

All Passed

yurii-bidiuk avatar Aug 16 '22 14:08 yurii-bidiuk

@yurii-bidiuk could you show an example of what an exception stacktrace looks like? (i.e. if the connector has a throw new RuntimeException("whatever"), does that also get wrapped correctly?)

And is https://github.com/airbytehq/airbyte-internal-issues/issues/19 an example of how the logs show up in the UI? (if yes: that looks correct to me - airbyte technically expects connectors to only output JSON, so the current behavior is actually violating the airbyte protocol)

edgao avatar Aug 19 '22 00:08 edgao

@edgao I produced an exception (throw new RuntimeException("some error");) in connector and stack trace is not wrapped in JSON. JSON log contains only high-level error message.
{"type":"LOG","log":{"level":"ERROR","message":"Something went wrong in the connector. See the logs for more details."}} Also, there is no original connector stack trace with a message. Instead, we get a stack trace from worker

2022-08-22 09:48:28 [1;31mERROR[m i.a.w.g.DefaultReplicationWorker(run):184 - Sync worker failed.
java.util.concurrent.ExecutionException: io.airbyte.workers.general.DefaultReplicationWorker$SourceException: Source cannot be stopped!
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) ~[?:?]
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) ~[?:?]
	at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:177) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
	at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:65) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
	at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:155) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
	at java.lang.Thread.run(Thread.java:1589) [?:?]
	Suppressed: io.airbyte.workers.exception.WorkerException: Source process exit with code 1. This warning is normal if the job was cancelled.
		at io.airbyte.workers.internal.DefaultAirbyteSource.close(DefaultAirbyteSource.java:136) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
		at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:141) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
		at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:65) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
		at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:155) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
		at java.lang.Thread.run(Thread.java:1589) [?:?]
Caused by: io.airbyte.workers.general.DefaultReplicationWorker$SourceException: Source cannot be stopped!
	at io.airbyte.workers.general.DefaultReplicationWorker.lambda$getReplicationRunnable$6(DefaultReplicationWorker.java:340) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
	... 1 more
Caused by: io.airbyte.workers.exception.WorkerException: Source process exit with code 1. This warning is normal if the job was cancelled.
	at io.airbyte.workers.internal.DefaultAirbyteSource.close(DefaultAirbyteSource.java:136) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
	at io.airbyte.workers.general.DefaultReplicationWorker.lambda$getReplicationRunnable$6(DefaultReplicationWorker.java:338) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]

The original stack trace with a message is present in the sync summary log.

yurii-bidiuk avatar Aug 22 '22 10:08 yurii-bidiuk

Does the original stacktrace never get logged? That seems problematic. Losing the original stacktrace will make debugging connector crashes a lot more difficult.

https://logging.apache.org/log4j/2.x/manual/json-template-layout.html says there are ways to add the exception stacktrace (the error.stack_trace entry), I think adding that would be really valuable. (not sure how to tell log4j to combine two different resolvers in a single entry though)

edgao avatar Aug 22 '22 18:08 edgao

@edgao the original stack trace is logged only inside sync summary log.

https://logging.apache.org/log4j/2.x/manual/json-template-layout.html says there are ways to add the exception stacktrace

You're right and I've already tried this config, but it's not working for Airbyte and stack trace is ignored. Also I didn't find a way how to combine two different resolvers. A possible way is to write custom resolver, something like RandomNumberResolver from https://logging.apache.org/log4j/2.x/manual/json-template-layout.html

yurii-bidiuk avatar Aug 22 '22 20:08 yurii-bidiuk

I'd pretty strongly prefer to have the exception stacktrace logged immediately, rather than captured inside the sync summary log. Otherwise it could hide important information (e.g. timestamp).

edgao avatar Aug 23 '22 15:08 edgao

NOTE :warning: Changes in this PR affect the following connectors. Make sure to run corresponding integration tests:

  • destination-oracle
  • source-db2
  • source-snowflake
  • source-db2-strict-encrypt
  • source-relational-db
  • destination-rockset
  • destination-oracle-strict-encrypt
  • source-mongodb-v2
  • destination-mysql
  • source-clickhouse-strict-encrypt
  • source-cockroachdb-strict-encrypt
  • destination-postgres-strict-encrypt
  • destination-keen
  • destination-mysql-strict-encrypt
  • destination-local-json
  • destination-redshift
  • destination-mongodb-strict-encrypt
  • source-e2e-test
  • destination-mongodb
  • destination-e2e-test
  • source-jdbc
  • source-oracle-strict-encrypt
  • source-scaffold-java-jdbc
  • destination-meilisearch
  • destination-elasticsearch
  • source-mysql
  • source-sftp
  • destination-bigquery
  • destination-mssql-strict-encrypt
  • destination-pubsub
  • source-clickhouse
  • destination-mqtt
  • source-postgres-strict-encrypt
  • source-kafka
  • destination-kafka
  • source-bigquery
  • destination-jdbc
  • destination-bigquery-denormalized
  • destination-databricks
  • destination-clickhouse
  • destination-pulsar
  • source-oracle
  • destination-snowflake
  • destination-dev-null
  • destination-cassandra
  • source-cockroachdb
  • source-mysql-strict-encrypt
  • source-elasticsearch
  • destination-redis
  • destination-kinesis
  • destination-postgres
  • destination-scylla
  • source-e2e-test-cloud
  • destination-dynamodb
  • source-postgres
  • destination-gcs
  • destination-mssql
  • source-mssql
  • destination-s3
  • destination-clickhouse-strict-encrypt
  • source-mssql-strict-encrypt
  • destination-mariadb-columnstore
  • source-mongodb-strict-encrypt
  • destination-csv
  • destination-azure-blob-storage
  • source-redshift
  • source-tidb

github-actions[bot] avatar Aug 26 '22 10:08 github-actions[bot]

NOTE :warning: Changes in this PR affect the following connectors. Make sure to run corresponding integration tests:

  • destination-mssql-strict-encrypt
  • destination-clickhouse
  • destination-scylla
  • source-postgres
  • source-db2-strict-encrypt
  • destination-local-json
  • destination-bigquery-denormalized
  • source-e2e-test
  • source-elasticsearch
  • destination-snowflake
  • destination-pubsub
  • destination-mariadb-columnstore
  • source-snowflake
  • destination-mongodb
  • source-oracle
  • source-db2
  • destination-dynamodb
  • destination-cassandra
  • source-bigquery
  • destination-pulsar
  • destination-mysql
  • source-postgres-strict-encrypt
  • source-sftp
  • source-e2e-test-cloud
  • destination-meilisearch
  • source-mssql-strict-encrypt
  • destination-mongodb-strict-encrypt
  • source-kafka
  • source-oracle-strict-encrypt
  • destination-kinesis
  • destination-redis
  • source-redshift
  • destination-azure-blob-storage
  • destination-elasticsearch
  • destination-mssql
  • destination-postgres
  • source-tidb
  • source-cockroachdb
  • destination-s3
  • source-clickhouse-strict-encrypt
  • source-cockroachdb-strict-encrypt
  • destination-dev-null
  • destination-csv
  • destination-kafka
  • destination-gcs
  • destination-clickhouse-strict-encrypt
  • source-scaffold-java-jdbc
  • destination-rockset
  • source-mysql
  • destination-bigquery
  • destination-postgres-strict-encrypt
  • destination-oracle-strict-encrypt
  • source-mongodb-strict-encrypt
  • destination-oracle
  • source-mysql-strict-encrypt
  • destination-redshift
  • source-mssql
  • destination-e2e-test
  • destination-jdbc
  • destination-keen
  • destination-mysql-strict-encrypt
  • source-mongodb-v2
  • destination-databricks
  • destination-mqtt
  • source-jdbc
  • source-clickhouse
  • source-relational-db

github-actions[bot] avatar Aug 26 '22 10:08 github-actions[bot]

@edgao for some reason stack trace wasn't logged inside AirbyteExceptionHandler. So I manually added a logging stack trace as part of message and now we can see the original connector stack trace as part of the JSON log. From the UI it has the following look: Screenshot from 2022-08-26 16-44-21 And produced connector's log:

{"type":"LOG","log":{"level":"INFO","message":"HikariPool-1 - Shutdown initiated..."}}
{"type":"LOG","log":{"level":"INFO","message":"HikariPool-1 - Shutdown completed."}}
{"type":"LOG","log":{"level":"ERROR","message":"Exception occurred while getting the delegate read iterator, closing SSH tunnel"}}
{"type":"LOG","log":{"level":"ERROR","message":"Something went wrong in the connector. See the logs for more details.java.lang.IllegalStateException: Unexpected state blob, the state contains either multiple global or conflicting state type.\n\tat io.airbyte.config.helpers.StateMessageHelper.getTypedState(StateMessageHelper.java:72)\n\tat io.airbyte.integrations.source.relationaldb.AbstractDbSource.deserializeInitialState(AbstractDbSource.java:540)\n\tat io.airbyte.integrations.source.relationaldb.AbstractDbSource.read(AbstractDbSource.java:118)\n\tat io.airbyte.integrations.source.postgres.PostgresSource.read(PostgresSource.java:287)\n\tat io.airbyte.integrations.base.ssh.SshWrappedSource.read(SshWrappedSource.java:54)\n\tat io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:136)\n\tat io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:97)\n\tat io.airbyte.integrations.base.adaptive.AdaptiveSourceRunner$Runner.run(AdaptiveSourceRunner.java:86)\n\tat io.airbyte.integrations.source.postgres.PostgresSourceRunner.main(PostgresSourceRunner.java:15)\n"}}

Can we move on with this?

yurii-bidiuk avatar Aug 26 '22 13:08 yurii-bidiuk

does this mean that anywhere else we use the LOGGER.log("message", e) call, it needs to be replaced by the ExceptionUtils.getStackTrace style? (that doesn't sound ideal)

also @yurii-bidiuk @grishick feel free to tell me I'm being too nitpicky here :P

I wonder if we should just have a "stacktrace" field at the protocol level. Could be useful as a way to distinguish between technical vs non-technical error message?

edgao avatar Aug 27 '22 00:08 edgao

does this mean that anywhere else we use the LOGGER.log("message", e) call, it needs to be replaced by the ExceptionUtils.getStackTrace style? (that doesn't sound ideal)

This does not sound good.

I wonder if we should just have a "stacktrace" field at the protocol level. Could be useful as a way to distinguish between technical vs non-technical error message?

This seems like a good idea tagging @airbytehq/protocol-reviewers to chime in

grishick avatar Aug 27 '22 00:08 grishick

AirbyteTraceMessage already has these properties: stacktrace, failure-type, internal and external message. It might be the case that all exceptions should be AirbyteTraceMessage rather than AirbyteLogMessage?

It is probably right to say that we shouldn't be "plain" logging exceptions anywhere, and that we should have some sort of formatting utility class. In that utility, we can decide (in a single method) if we AirbyteTraceMessage or AirbyteLogMessage, or both.

evantahler avatar Aug 27 '22 00:08 evantahler

aren't trace messages more "this is why the sync failed"? I think there's cases where we catch an exception but don't fail and still want to log it (e.g. LOGGER.warn("Caught xyz transient exception, retrying", e))

we shouldn't be "plain" logging exceptions anywhere

what do you mean by this? Like require that instead of using a log4j Logger, connector devs need to use some wrapper around it?

edgao avatar Aug 27 '22 18:08 edgao

Ah! We are talking about non-fatal exceptions (because we catch them and retry).

I had initially assumed that exceptions were always fatal, and if that's the case, then we should be using AirbyteTraceMessage. If there are non-fatal exceptions, you can still send multiple AirbyteTraceMessage - if there are multiple AirbyteTraceMessage, it's safe to send multiple - the platform will use only the latest AirbyteTraceMessage as the main FailureReason. I guess the question is if the connector wants to "elevate" the information from those retryable exceptions to the platform or not.

Like require that instead of using a log4j Logger, connector devs need to use some wrapper around it?

Yep. I would suggest that there's a helper like:

const e = new Error("oh no");
airbyte.utils.logException(e); // yes, a new custom method

//

function logError(Exception e, Boolean toSendTraceMessage = true) {
  LOGGER.log("error", ExceptionUtils.getStackTrace(e));
  if (toSendTraceMessage) { emitAirbyteTraceMessage(e) } 
}

CC @alovew

evantahler avatar Aug 29 '22 18:08 evantahler

This PR addresses both fatal and non-fatal exceptions, I'm just thinking through consequences for both cases. The fatal case is probably simple, since we can add a top-level try-catch to wrap it inside whatever we want. The non-fatal case is more interesting, because it requires code change everywhere non-fatal exceptions happen.

My concern with adding a new method is that we can't really enforce that it's used everywhere. E.g. the official BigQuery SDK (used in destination-bigquery) has an internal exception retry handler, which we have no (reasonable) way of modifying to use the airbyte exception logger util. In that case we basically must rely on log4j logs working properly.

(and yes, in that example, either we eventually succeed and therefore don't care about the retries, or it fails and throws a real exception. But I think there are cases where we want to see the exception log without it actually being fatal)

edgao avatar Aug 29 '22 20:08 edgao

I think the right solution is to use AirbyteTraceMessage with AirbyteErrorTraceMessage and to use custom JSON template (https://logging.apache.org/log4j/2.x/manual/json-template-layout.html ) to place error message and stack trace in the right fields.

grishick avatar Aug 30 '22 17:08 grishick

@grishick I already tried this approach. I added these fields to AirbyteLogMessageTemplate.json and expected that new fields (error.type, error.message, error.stack_trace) will appear in the logs.

{
  "type": "LOG",
  "log": {
    "level": {
      "$resolver": "level",
      "field": "name"
    },
    "message": {
      "$resolver": "message",
      "stringified": true
    }
  },
  "error.type": {
    "$resolver": "exception",
    "field": "className"
  },
  "error.message": {
    "$resolver": "exception",
    "field": "message"
  },
  "error.stack_trace": {
    "$resolver" : "exception",
    "field" : "stackTrace",
    "stackTrace" : {
      "stringified" : true
    }
  }
}

But these fields are ignored and in the result, we can see only type, level, and message even if exception was thrown: {"type":"LOG","log":{"level":"ERROR","message":"Something went wrong in the connector. See the logs for more details."}}

yurii-bidiuk avatar Aug 30 '22 17:08 yurii-bidiuk

Here's an example of log message template that preserves the stack trace:

{
  "type": "TRACE",
  "trace": {
    "type": "ERROR",
    "emitted_at": {
      "$resolver": "timestamp",
      "pattern": {
        "format": "%d{UNIX_MILLIS}",
        "timeZone": "UTC"
      }
    },
    "error": {
      "message": {
        "$resolver": "message",
        "stringified": true
      },
      "stack_trace": {
        "$resolver": "exception",
        "field": "stackTrace",
        "stackTrace": {
          "stringified": true
        }
      },
      "failure_type": "system_error",
    }
  }
}

@evantahler while I was playing with formatting error message as AirbyteTraceMessage I saw that AirbyteTraceMessage only supports type: ERROR. I think it makes sense for it to support the same levels as AirbyteLogMessage. To make this solution complete we'd need to adjust the protocol in one of the two ways:

  • add AirbyteErrorTraceMessage property to AirbyteLogMessage similar to how it is a property of AirbyteTraceMessage
  • expand type property of AirbyteTraceMessage to include other error levels (WARN, FATAL, INFO, DEBUG, TRACE)

grishick avatar Aug 30 '22 19:08 grishick

@yurii-bidiuk the problem is that AirbyteLogMessage does not have fields to show stack trace, so either you need to use AirbyteTraceMessage or expand the protocol to include AirbyteErrorTraceMessage in AirbyteLogMessage. See airbyte_protocol.yaml file for details on the protocol.

grishick avatar Aug 30 '22 19:08 grishick

I think that adding "levels" to AirbyteTraceMessage as a top-level property isn't quite right - type talks about intended usage more than severity. Other types in that enum will be "trace", "metric", etc. I think that means that I would prefer AirbyteErrorTraceMessage to then gain a "level" properly.

However, if we start without adding AirbyteTraceMessage.error.level, don't we already have enough information to know if the error was fatal or not? e.g.

  • if the sync succeeded, all AirbyteTraceMessages are informational only (warn)
  • if the sync failed, the latest AirbyteTraceMessage is the cause (fatal)
  • if the sync failed, all but the latest AirbyteTraceMessages are informational (warn)

evantahler avatar Aug 30 '22 20:08 evantahler

@evantahler (cc @salima-airbyte ) I think adding level to AirbyteErrorTraceMessage would work as well, but I would probably make it reflect the log level coming from Log4J. I.e., it would look like this:

 "log.level": {
    "$resolver": "level",
    "field": "name"
  }

grishick avatar Aug 30 '22 21:08 grishick

So, I guess, the question is do we want to keep stack traces for exceptions that have severity level lower than ERROR? If the answer is yes, then we need to either add a property for stack traces to AirbyteLogMessage or to add a property for severity level to AirbyteTraceMessage, or AirbyteErrorTraceMessage. And if the answer is no, then we configure Log4J to output AirbyteTraceMessage for ERROR+ exceptions and AirbyteLogMessage for INFO-WARN exceptions - this can be done with log4j configuration and JSON message templates for Log4J.

grishick avatar Aug 30 '22 21:08 grishick

@yurii-bidiuk we discussed the approach with @salima-airbyte and @evantahler and came to a conclusion that this PR needs to be modified to use Log4J configs and message template to achieve the following result:

  • Info level messages should be logged as AirbyteLogMessage and should omit the stack trace. This is similar to the code change that you already have.
  • Fatal, Error, Warn, and Debug level messages should be logged as AirbyteTraceMeasage which has a dedicated field for stack trace. Since neither AirbyteErrorTraceMessage nor AirbyteTraceMeasage have severity or log.level field, in order to not lose the information about the log level of the error, you should use internal_message field to report the log level. E.g.
 "internal_message": {
    "$resolver": "level",
    "field": "name"
  }

Let me know if you have questions about this.

grishick avatar Aug 31 '22 20:08 grishick

I have some updates:

It looks like we currently assume the first AirbyteErrorTraceMessage for the source or the destination is the reason for the crash. If this PR starts emitting non-fatal error messages as AirbyteErrorTraceMessage, and then there's a fatal exception, this will likely confuse folks.

https://github.com/airbytehq/airbyte/blob/fbac1d3fa5acf73563c34f4c182a44bfd68a4315/airbyte-workers/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java#L270-L272

So, we will need a way to disambiguate these "information only" trace messages as part of this work. One proposal might be to add to the ENUM AirbyteErrorTraceMessage.failure_type a new "non-fatal" option. Then, we could exclude message with that type from the errorTraceMessageFailure() method. I'll mock this out in a different PR (https://github.com/airbytehq/airbyte/pull/16240).

cc @alovew @jdpgrailsdev

evantahler avatar Sep 01 '22 20:09 evantahler

@evantahler is there a reason we want to use Airbyte Error Trace Messages specifically? The PRD for the trace message project specified that the Error Trace Messages are for failures, and I believe we decided to make each Trace Message type have a specific use case because we didn't want them to become too generic. From the PRD:

AirbyteTraceMessage vs AirbyteDisplayMessage

A previous version of this proposal envisioned an AirbyteDisplayMessage.  This was changed because “display” did not capture all the ways this message was being used.  It also created either too wide of a schema to encapsulate all the use-cases, or would need to be too loosely defined in a way that would not be consistently consumible.  This was corrected with the addition of sub-types and a new name.

Maybe this is too similar to make a new type? But I'm thinking something like AirbyteNotificationTraceMessage (which is specifically called out in the PRD as one we want to eventually create)

alovew avatar Sep 01 '22 22:09 alovew

@yurii-bidiuk once this PR is merged, you can use AirbyteTraceNotificationMessage to log error messages and exceptions

grishick avatar Sep 03 '22 06:09 grishick