airbyte
airbyte copied to clipboard
Use LOG4J2 to wrap connectors logs to JSON format
What
Fixes #659
How
Use LOG4J2 and JsonTemplateLayout to wrap connectors logs in JSON format like AirbyteLogMessage
Recommended reading order
-
x.java
-
y.python
🚨 User Impact 🚨
Are there any breaking changes? What is the end result perceived by the user? If yes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.
Pre-merge Checklist
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
- [ ] Community member? Grant edit access to maintainers (instructions)
- [ ] Secrets in the connector's spec are annotated with
airbyte_secret
- [ ] Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
. - [ ] Code reviews completed
- [ ] Documentation updated
- [ ] Connector's
README.md
- [ ] Connector's
bootstrap.md
. See description and examples - [ ]
docs/integrations/<source or destination>/<name>.md
including changelog. See changelog example - [ ]
docs/integrations/README.md
- [ ]
airbyte-integrations/builds.md
- [ ] Connector's
- [ ] PR name follows PR naming conventions
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
- [ ] Create a non-forked branch based on this PR and test the below items on it
- [ ] Build is successful
- [ ] If new credentials are required for use in CI, add them to GSM. Instructions.
- [ ]
/test connector=connectors/<name>
command is passing - [ ] New Connector version released on Dockerhub by running the
/publish
command described here - [ ] After the connector is published, connector added to connector index as described here
- [ ] Seed specs have been re-generated by building the platform and committing the changes to the seed spec files, as described here
Updating a connector
Community member or Airbyter
- [ ] Grant edit access to maintainers (instructions)
- [ ] Secrets in the connector's spec are annotated with
airbyte_secret
- [ ] Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
. - [ ] Code reviews completed
- [ ] Documentation updated
- [ ] Connector's
README.md
- [ ] Connector's
bootstrap.md
. See description and examples - [ ] Changelog updated in
docs/integrations/<source or destination>/<name>.md
including changelog. See changelog example
- [ ] Connector's
- [ ] PR name follows PR naming conventions
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
- [ ] Create a non-forked branch based on this PR and test the below items on it
- [ ] Build is successful
- [ ] If new credentials are required for use in CI, add them to GSM. Instructions.
- [ ]
/test connector=connectors/<name>
command is passing - [ ] New Connector version released on Dockerhub and connector version bumped by running the
/publish
command described here
Connector Generator
- [ ] Issue acceptance criteria met
- [ ] PR name follows PR naming conventions
- [ ] If adding a new generator, add it to the list of scaffold modules being tested
- [ ] The generator test modules (all connectors with
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changes - [ ] Documentation which references the generator is updated as needed
Tests
Unit
Put your unit tests output here.
Integration
Put your integration tests output here.
Acceptance
Put your acceptance tests output here.
/test connector=connectors/source-postgres
:clock2: connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/2862778188 :x: connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/2862778188 :bug: https://gradle.com/s/dwtw7nnscy5us
Build Failed
Test summary info:
Could not find result summary
With these changes, all connectors logs look like this:
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable WORKER_ENVIRONMENT: 'DOCKER'"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_MAIN_CONTAINER_CPU_REQUEST: 'null'"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_MAIN_CONTAINER_CPU_LIMIT: 'null'"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_MAIN_CONTAINER_MEMORY_REQUEST: 'null'"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_MAIN_CONTAINER_MEMORY_LIMIT: 'null'"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_KUBE_TOLERATIONS: ''"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_KUBE_NODE_SELECTORS: ''"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_KUBE_ANNOTATIONS: ''"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET: ''"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY: 'IfNotPresent'"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_KUBE_SIDECAR_CONTAINER_IMAGE_PULL_POLICY: 'IfNotPresent'"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_KUBE_SOCAT_IMAGE: 'alpine/socat:1.7.4.3-r0'"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_KUBE_BUSYBOX_IMAGE: 'busybox:1.28'"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable JOB_KUBE_CURL_IMAGE: 'curlimages/curl:7.83.1'"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable WORKER_ENVIRONMENT: 'DOCKER'"}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable DEPLOYMENT_MODE: 'OSS'"}}
{"type":"LOG","log":{"level":"INFO","message":"Creating container for image: postgres:13-alpine"}}
{"type":"LOG","log":{"level":"INFO","message":"Container postgres:13-alpine is starting: 58a660cf9ec646caed3d5bb360dd6f9c70b1cd99ccf49c81c324d8648298e403"}}
{"type":"LOG","log":{"level":"INFO","message":"Container postgres:13-alpine started in PT1.344788305S"}}
{"type":"LOG","log":{"level":"INFO","message":"HikariPool-5 - Starting..."}}
{"type":"LOG","log":{"level":"INFO","message":"HikariPool-5 - Start completed."}}
{"type":"LOG","log":{"level":"INFO","message":"Using default value for environment variable USE_STREAM_CAPABLE_STATE: 'false'"}}
{"type":"LOG","log":{"level":"INFO","message":"Checking if airbyte/source-postgres:dev exists..."}}
{"type":"LOG","log":{"level":"INFO","message":"airbyte/source-postgres:dev was found locally."}}
{"type":"LOG","log":{"level":"INFO","message":"Creating docker job ID: 0"}}
{"type":"LOG","log":{"level":"INFO","message":"Preparing command: docker run --rm --init -i -w /data/job --log-driver none --name source-postgres-spec-0-0-torcr --network host -v /tmp/airbyte_tests/test7593383916175201673:/data -v /tmp/airbyte_tests/output846556706032332208:/local -e DEPLOYMENT_MODE=OSS -e USE_STREAM_CAPABLE_STATE=false -e WORKER_ENVIRONMENT=DOCKER -e AIRBYTE_ROLE= -e WORKER_CONNECTOR_IMAGE=airbyte/source-postgres:dev -e WORKER_JOB_ATTEMPT=0 -e AIRBYTE_VERSION= -e WORKER_JOB_ID=0 airbyte/source-postgres:dev spec"}}
{"type":"LOG","log":{"level":"INFO","message":"Running source under deployment mode: OSS"}}
{"type":"LOG","log":{"level":"INFO","message":"Starting source: io.airbyte.integrations.base.ssh.SshWrappedSource"}}
{"type":"LOG","log":{"level":"INFO","message":"integration args: {spec=null}"}}
{"type":"LOG","log":{"level":"INFO","message":"Running integration: io.airbyte.integrations.base.ssh.SshWrappedSource"}}
{"type":"LOG","log":{"level":"INFO","message":"Command: SPEC"}}
{"type":"LOG","log":{"level":"INFO","message":"Integration config: IntegrationConfig{command=SPEC, configPath='null', catalogPath='null', statePath='null'}"}}
{"type":"LOG","log":{"level":"INFO","message":"Completed integration: io.airbyte.integrations.base.ssh.SshWrappedSource"}}
{"type":"LOG","log":{"level":"INFO","message":"Completed source: io.airbyte.integrations.base.ssh.SshWrappedSource"}}
We can configure this structure using AirbyteLogMessageTemplate.json
.
/test connector=connectors/source-mysql
:clock2: connectors/source-mysql https://github.com/airbytehq/airbyte/actions/runs/2866705735 :white_check_mark: connectors/source-mysql https://github.com/airbytehq/airbyte/actions/runs/2866705735 No Python unittests run
Build Passed
Test summary info:
All Passed
/test connector=connectors/source-postgres
:clock2: connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/2867414898 :white_check_mark: connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/2867414898 No Python unittests run
Build Passed
Test summary info:
All Passed
/test connector=connectors/source-postgres
:clock2: connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/2868700148 :white_check_mark: connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/2868700148 No Python unittests run
Build Passed
Test summary info:
All Passed
@yurii-bidiuk could you show an example of what an exception stacktrace looks like? (i.e. if the connector has a throw new RuntimeException("whatever")
, does that also get wrapped correctly?)
And is https://github.com/airbytehq/airbyte-internal-issues/issues/19 an example of how the logs show up in the UI? (if yes: that looks correct to me - airbyte technically expects connectors to only output JSON, so the current behavior is actually violating the airbyte protocol)
@edgao
I produced an exception (throw new RuntimeException("some error");
) in connector and stack trace is not wrapped in JSON. JSON log contains only high-level error message.
{"type":"LOG","log":{"level":"ERROR","message":"Something went wrong in the connector. See the logs for more details."}}
Also, there is no original connector stack trace with a message. Instead, we get a stack trace from worker
2022-08-22 09:48:28 [1;31mERROR[m i.a.w.g.DefaultReplicationWorker(run):184 - Sync worker failed.
java.util.concurrent.ExecutionException: io.airbyte.workers.general.DefaultReplicationWorker$SourceException: Source cannot be stopped!
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) ~[?:?]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) ~[?:?]
at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:177) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:65) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:155) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
at java.lang.Thread.run(Thread.java:1589) [?:?]
Suppressed: io.airbyte.workers.exception.WorkerException: Source process exit with code 1. This warning is normal if the job was cancelled.
at io.airbyte.workers.internal.DefaultAirbyteSource.close(DefaultAirbyteSource.java:136) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:141) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:65) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:155) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
at java.lang.Thread.run(Thread.java:1589) [?:?]
Caused by: io.airbyte.workers.general.DefaultReplicationWorker$SourceException: Source cannot be stopped!
at io.airbyte.workers.general.DefaultReplicationWorker.lambda$getReplicationRunnable$6(DefaultReplicationWorker.java:340) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
... 1 more
Caused by: io.airbyte.workers.exception.WorkerException: Source process exit with code 1. This warning is normal if the job was cancelled.
at io.airbyte.workers.internal.DefaultAirbyteSource.close(DefaultAirbyteSource.java:136) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
at io.airbyte.workers.general.DefaultReplicationWorker.lambda$getReplicationRunnable$6(DefaultReplicationWorker.java:338) ~[io.airbyte-airbyte-workers-0.40.0-alpha.jar:?]
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
The original stack trace with a message is present in the sync summary log.
Does the original stacktrace never get logged? That seems problematic. Losing the original stacktrace will make debugging connector crashes a lot more difficult.
https://logging.apache.org/log4j/2.x/manual/json-template-layout.html says there are ways to add the exception stacktrace (the error.stack_trace
entry), I think adding that would be really valuable. (not sure how to tell log4j to combine two different resolvers in a single entry though)
@edgao the original stack trace is logged only inside sync summary log.
https://logging.apache.org/log4j/2.x/manual/json-template-layout.html says there are ways to add the exception stacktrace
You're right and I've already tried this config, but it's not working for Airbyte and stack trace is ignored.
Also I didn't find a way how to combine two different resolvers. A possible way is to write custom resolver, something like RandomNumberResolver
from https://logging.apache.org/log4j/2.x/manual/json-template-layout.html
I'd pretty strongly prefer to have the exception stacktrace logged immediately, rather than captured inside the sync summary log. Otherwise it could hide important information (e.g. timestamp).
NOTE :warning: Changes in this PR affect the following connectors. Make sure to run corresponding integration tests:
- destination-oracle
- source-db2
- source-snowflake
- source-db2-strict-encrypt
- source-relational-db
- destination-rockset
- destination-oracle-strict-encrypt
- source-mongodb-v2
- destination-mysql
- source-clickhouse-strict-encrypt
- source-cockroachdb-strict-encrypt
- destination-postgres-strict-encrypt
- destination-keen
- destination-mysql-strict-encrypt
- destination-local-json
- destination-redshift
- destination-mongodb-strict-encrypt
- source-e2e-test
- destination-mongodb
- destination-e2e-test
- source-jdbc
- source-oracle-strict-encrypt
- source-scaffold-java-jdbc
- destination-meilisearch
- destination-elasticsearch
- source-mysql
- source-sftp
- destination-bigquery
- destination-mssql-strict-encrypt
- destination-pubsub
- source-clickhouse
- destination-mqtt
- source-postgres-strict-encrypt
- source-kafka
- destination-kafka
- source-bigquery
- destination-jdbc
- destination-bigquery-denormalized
- destination-databricks
- destination-clickhouse
- destination-pulsar
- source-oracle
- destination-snowflake
- destination-dev-null
- destination-cassandra
- source-cockroachdb
- source-mysql-strict-encrypt
- source-elasticsearch
- destination-redis
- destination-kinesis
- destination-postgres
- destination-scylla
- source-e2e-test-cloud
- destination-dynamodb
- source-postgres
- destination-gcs
- destination-mssql
- source-mssql
- destination-s3
- destination-clickhouse-strict-encrypt
- source-mssql-strict-encrypt
- destination-mariadb-columnstore
- source-mongodb-strict-encrypt
- destination-csv
- destination-azure-blob-storage
- source-redshift
- source-tidb
NOTE :warning: Changes in this PR affect the following connectors. Make sure to run corresponding integration tests:
- destination-mssql-strict-encrypt
- destination-clickhouse
- destination-scylla
- source-postgres
- source-db2-strict-encrypt
- destination-local-json
- destination-bigquery-denormalized
- source-e2e-test
- source-elasticsearch
- destination-snowflake
- destination-pubsub
- destination-mariadb-columnstore
- source-snowflake
- destination-mongodb
- source-oracle
- source-db2
- destination-dynamodb
- destination-cassandra
- source-bigquery
- destination-pulsar
- destination-mysql
- source-postgres-strict-encrypt
- source-sftp
- source-e2e-test-cloud
- destination-meilisearch
- source-mssql-strict-encrypt
- destination-mongodb-strict-encrypt
- source-kafka
- source-oracle-strict-encrypt
- destination-kinesis
- destination-redis
- source-redshift
- destination-azure-blob-storage
- destination-elasticsearch
- destination-mssql
- destination-postgres
- source-tidb
- source-cockroachdb
- destination-s3
- source-clickhouse-strict-encrypt
- source-cockroachdb-strict-encrypt
- destination-dev-null
- destination-csv
- destination-kafka
- destination-gcs
- destination-clickhouse-strict-encrypt
- source-scaffold-java-jdbc
- destination-rockset
- source-mysql
- destination-bigquery
- destination-postgres-strict-encrypt
- destination-oracle-strict-encrypt
- source-mongodb-strict-encrypt
- destination-oracle
- source-mysql-strict-encrypt
- destination-redshift
- source-mssql
- destination-e2e-test
- destination-jdbc
- destination-keen
- destination-mysql-strict-encrypt
- source-mongodb-v2
- destination-databricks
- destination-mqtt
- source-jdbc
- source-clickhouse
- source-relational-db
@edgao for some reason stack trace wasn't logged inside AirbyteExceptionHandler. So I manually added a logging stack trace as part of message and now we can see the original connector stack trace as part of the JSON log.
From the UI it has the following look:
And produced connector's log:
{"type":"LOG","log":{"level":"INFO","message":"HikariPool-1 - Shutdown initiated..."}}
{"type":"LOG","log":{"level":"INFO","message":"HikariPool-1 - Shutdown completed."}}
{"type":"LOG","log":{"level":"ERROR","message":"Exception occurred while getting the delegate read iterator, closing SSH tunnel"}}
{"type":"LOG","log":{"level":"ERROR","message":"Something went wrong in the connector. See the logs for more details.java.lang.IllegalStateException: Unexpected state blob, the state contains either multiple global or conflicting state type.\n\tat io.airbyte.config.helpers.StateMessageHelper.getTypedState(StateMessageHelper.java:72)\n\tat io.airbyte.integrations.source.relationaldb.AbstractDbSource.deserializeInitialState(AbstractDbSource.java:540)\n\tat io.airbyte.integrations.source.relationaldb.AbstractDbSource.read(AbstractDbSource.java:118)\n\tat io.airbyte.integrations.source.postgres.PostgresSource.read(PostgresSource.java:287)\n\tat io.airbyte.integrations.base.ssh.SshWrappedSource.read(SshWrappedSource.java:54)\n\tat io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:136)\n\tat io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:97)\n\tat io.airbyte.integrations.base.adaptive.AdaptiveSourceRunner$Runner.run(AdaptiveSourceRunner.java:86)\n\tat io.airbyte.integrations.source.postgres.PostgresSourceRunner.main(PostgresSourceRunner.java:15)\n"}}
Can we move on with this?
does this mean that anywhere else we use the LOGGER.log("message", e)
call, it needs to be replaced by the ExceptionUtils.getStackTrace style? (that doesn't sound ideal)
also @yurii-bidiuk @grishick feel free to tell me I'm being too nitpicky here :P
I wonder if we should just have a "stacktrace" field at the protocol level. Could be useful as a way to distinguish between technical vs non-technical error message?
does this mean that anywhere else we use the LOGGER.log("message", e) call, it needs to be replaced by the ExceptionUtils.getStackTrace style? (that doesn't sound ideal)
This does not sound good.
I wonder if we should just have a "stacktrace" field at the protocol level. Could be useful as a way to distinguish between technical vs non-technical error message?
This seems like a good idea tagging @airbytehq/protocol-reviewers to chime in
AirbyteTraceMessage
already has these properties: stacktrace, failure-type, internal and external message. It might be the case that all exceptions should be AirbyteTraceMessage
rather than AirbyteLogMessage
?
It is probably right to say that we shouldn't be "plain" logging exceptions anywhere, and that we should have some sort of formatting utility class. In that utility, we can decide (in a single method) if we AirbyteTraceMessage
or AirbyteLogMessage
, or both.
aren't trace messages more "this is why the sync failed"? I think there's cases where we catch an exception but don't fail and still want to log it (e.g. LOGGER.warn("Caught xyz transient exception, retrying", e)
)
we shouldn't be "plain" logging exceptions anywhere
what do you mean by this? Like require that instead of using a log4j Logger, connector devs need to use some wrapper around it?
Ah! We are talking about non-fatal exceptions (because we catch them and retry).
I had initially assumed that exceptions were always fatal, and if that's the case, then we should be using AirbyteTraceMessage. If there are non-fatal exceptions, you can still send multiple AirbyteTraceMessage - if there are multiple AirbyteTraceMessage, it's safe to send multiple - the platform will use only the latest AirbyteTraceMessage as the main FailureReason. I guess the question is if the connector wants to "elevate" the information from those retryable exceptions to the platform or not.
Like require that instead of using a log4j Logger, connector devs need to use some wrapper around it?
Yep. I would suggest that there's a helper like:
const e = new Error("oh no");
airbyte.utils.logException(e); // yes, a new custom method
//
function logError(Exception e, Boolean toSendTraceMessage = true) {
LOGGER.log("error", ExceptionUtils.getStackTrace(e));
if (toSendTraceMessage) { emitAirbyteTraceMessage(e) }
}
CC @alovew
This PR addresses both fatal and non-fatal exceptions, I'm just thinking through consequences for both cases. The fatal case is probably simple, since we can add a top-level try-catch to wrap it inside whatever we want. The non-fatal case is more interesting, because it requires code change everywhere non-fatal exceptions happen.
My concern with adding a new method is that we can't really enforce that it's used everywhere. E.g. the official BigQuery SDK (used in destination-bigquery) has an internal exception retry handler, which we have no (reasonable) way of modifying to use the airbyte exception logger util. In that case we basically must rely on log4j logs working properly.
(and yes, in that example, either we eventually succeed and therefore don't care about the retries, or it fails and throws a real exception. But I think there are cases where we want to see the exception log without it actually being fatal)
I think the right solution is to use AirbyteTraceMessage with AirbyteErrorTraceMessage and to use custom JSON template (https://logging.apache.org/log4j/2.x/manual/json-template-layout.html ) to place error message and stack trace in the right fields.
@grishick I already tried this approach. I added these fields to AirbyteLogMessageTemplate.json
and expected that new fields (error.type, error.message, error.stack_trace
) will appear in the logs.
{
"type": "LOG",
"log": {
"level": {
"$resolver": "level",
"field": "name"
},
"message": {
"$resolver": "message",
"stringified": true
}
},
"error.type": {
"$resolver": "exception",
"field": "className"
},
"error.message": {
"$resolver": "exception",
"field": "message"
},
"error.stack_trace": {
"$resolver" : "exception",
"field" : "stackTrace",
"stackTrace" : {
"stringified" : true
}
}
}
But these fields are ignored and in the result, we can see only type
, level
, and message
even if exception was thrown:
{"type":"LOG","log":{"level":"ERROR","message":"Something went wrong in the connector. See the logs for more details."}}
Here's an example of log message template that preserves the stack trace:
{
"type": "TRACE",
"trace": {
"type": "ERROR",
"emitted_at": {
"$resolver": "timestamp",
"pattern": {
"format": "%d{UNIX_MILLIS}",
"timeZone": "UTC"
}
},
"error": {
"message": {
"$resolver": "message",
"stringified": true
},
"stack_trace": {
"$resolver": "exception",
"field": "stackTrace",
"stackTrace": {
"stringified": true
}
},
"failure_type": "system_error",
}
}
}
@evantahler while I was playing with formatting error message as AirbyteTraceMessage
I saw that AirbyteTraceMessage
only supports type: ERROR
. I think it makes sense for it to support the same levels as AirbyteLogMessage
. To make this solution complete we'd need to adjust the protocol in one of the two ways:
- add
AirbyteErrorTraceMessage
property toAirbyteLogMessage
similar to how it is a property ofAirbyteTraceMessage
- expand type property of
AirbyteTraceMessage
to include other error levels (WARN, FATAL, INFO, DEBUG, TRACE)
@yurii-bidiuk the problem is that AirbyteLogMessage
does not have fields to show stack trace, so either you need to use AirbyteTraceMessage
or expand the protocol to include AirbyteErrorTraceMessage
in AirbyteLogMessage
. See airbyte_protocol.yaml
file for details on the protocol.
I think that adding "levels" to AirbyteTraceMessage
as a top-level property isn't quite right - type
talks about intended usage more than severity. Other types in that enum will be "trace", "metric", etc. I think that means that I would prefer AirbyteErrorTraceMessage
to then gain a "level" properly.
However, if we start without adding AirbyteTraceMessage.error.level
, don't we already have enough information to know if the error was fatal or not? e.g.
- if the sync succeeded, all AirbyteTraceMessages are informational only (warn)
- if the sync failed, the latest AirbyteTraceMessage is the cause (fatal)
- if the sync failed, all but the latest AirbyteTraceMessages are informational (warn)
@evantahler (cc @salima-airbyte ) I think adding level
to AirbyteErrorTraceMessage
would work as well, but I would probably make it reflect the log level coming from Log4J. I.e., it would look like this:
"log.level": {
"$resolver": "level",
"field": "name"
}
So, I guess, the question is do we want to keep stack traces for exceptions that have severity level lower than ERROR
? If the answer is yes, then we need to either add a property for stack traces to AirbyteLogMessage
or to add a property for severity level to AirbyteTraceMessage
, or AirbyteErrorTraceMessage
. And if the answer is no, then we configure Log4J to output AirbyteTraceMessage
for ERROR
+ exceptions and AirbyteLogMessage
for INFO-WARN
exceptions - this can be done with log4j configuration and JSON message templates for Log4J.
@yurii-bidiuk we discussed the approach with @salima-airbyte and @evantahler and came to a conclusion that this PR needs to be modified to use Log4J configs and message template to achieve the following result:
-
Info
level messages should be logged asAirbyteLogMessage
and should omit the stack trace. This is similar to the code change that you already have. -
Fatal
,Error
,Warn
, andDebug
level messages should be logged asAirbyteTraceMeasage
which has a dedicated field for stack trace. Since neitherAirbyteErrorTraceMessage
norAirbyteTraceMeasage
haveseverity
orlog.level
field, in order to not lose the information about the log level of the error, you should useinternal_message
field to report the log level. E.g.
"internal_message": {
"$resolver": "level",
"field": "name"
}
Let me know if you have questions about this.
I have some updates:
It looks like we currently assume the first AirbyteErrorTraceMessage for the source or the destination is the reason for the crash. If this PR starts emitting non-fatal error messages as AirbyteErrorTraceMessage, and then there's a fatal exception, this will likely confuse folks.
https://github.com/airbytehq/airbyte/blob/fbac1d3fa5acf73563c34f4c182a44bfd68a4315/airbyte-workers/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java#L270-L272
So, we will need a way to disambiguate these "information only" trace messages as part of this work. One proposal might be to add to the ENUM AirbyteErrorTraceMessage.failure_type
a new "non-fatal" option. Then, we could exclude message with that type from the errorTraceMessageFailure()
method. I'll mock this out in a different PR (https://github.com/airbytehq/airbyte/pull/16240).
cc @alovew @jdpgrailsdev
@evantahler is there a reason we want to use Airbyte Error Trace Messages specifically? The PRD for the trace message project specified that the Error Trace Messages are for failures, and I believe we decided to make each Trace Message type have a specific use case because we didn't want them to become too generic. From the PRD:
AirbyteTraceMessage vs AirbyteDisplayMessage
A previous version of this proposal envisioned an AirbyteDisplayMessage. This was changed because “display” did not capture all the ways this message was being used. It also created either too wide of a schema to encapsulate all the use-cases, or would need to be too loosely defined in a way that would not be consistently consumible. This was corrected with the addition of sub-types and a new name.
Maybe this is too similar to make a new type? But I'm thinking something like AirbyteNotificationTraceMessage (which is specifically called out in the PRD as one we want to eventually create)
@yurii-bidiuk once this PR is merged, you can use AirbyteTraceNotificationMessage
to log error messages and exceptions