opensearch-migrations
opensearch-migrations copied to clipboard
[RFS] Improved RFS Worker exception logging
Description
- Perform some refactoring of the RFS Worker and select, underlying libraries to better log exceptional situations
Issues Resolved
- https://opensearch.atlassian.net/browse/MIGRATIONS-1731
Testing
- Ran the
ReindexFromSnapshot.java
all-in-one script against the docker compose test setup - Updated unit tests
- Provoked an exception to demonstrate the new logging behavior, displayed below:
chelma@3c22fba4e266 RFS % gradle runRfsWorker --args='--snapshot-name reindex-from-snapshot --s3-local-dir /tmp/s3_files --s3-repo-uri s3://chelma-iad-rfs-local-testing --s3-region us-east-1 --source-host http://localhost:19200 --target-host http://localhost:29200 --min-replicas 0 --index-template-whitelist "my_index_template" --component-template-whitelist "my_component_template"'
> Task :runRfsWorker
17:46:04.671 INFO Running RfsWorker
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
17:46:04.943 INFO Checking if work remains in the Snapshot Phase...
17:46:05.741 INFO Snapshot not yet completed, entering Snapshot Phase...
17:46:05.741 INFO Snapshot CMS Entry not found, attempting to create it...
17:46:06.001 INFO Snapshot CMS Entry created
17:46:06.001 INFO Attempting to initiate the snapshot...
17:46:06.776 INFO Snapshot repo registration successful
17:46:06.824 ERROR Could not create snapshot: _snapshot/migration_assistant_repo/reindex-from-snapshot. Response Code: 400, Response Message: Bad Request, Response Body: {"error":{"root_cause":[{"type":"invalid_snapshot_name_exception","reason":"[migration_assistant_repo:reindex-from-snapshot] Invalid snapshot name [reindex-from-snapshot], snapshot with the same name already exists"}],"type":"invalid_snapshot_name_exception","reason":"[migration_assistant_repo:reindex-from-snapshot] Invalid snapshot name [reindex-from-snapshot], snapshot with the same name already exists"},"status":400}
17:46:08.232 ERROR Could not create snapshot: _snapshot/migration_assistant_repo/reindex-from-snapshot. Response Code: 400, Response Message: Bad Request, Response Body: {"error":{"root_cause":[{"type":"invalid_snapshot_name_exception","reason":"[migration_assistant_repo:reindex-from-snapshot] Invalid snapshot name [reindex-from-snapshot], snapshot with the same name already exists"}],"type":"invalid_snapshot_name_exception","reason":"[migration_assistant_repo:reindex-from-snapshot] Invalid snapshot name [reindex-from-snapshot], snapshot with the same name already exists"},"status":400}
17:46:09.605 ERROR Could not create snapshot: _snapshot/migration_assistant_repo/reindex-from-snapshot. Response Code: 400, Response Message: Bad Request, Response Body: {"error":{"root_cause":[{"type":"invalid_snapshot_name_exception","reason":"[migration_assistant_repo:reindex-from-snapshot] Invalid snapshot name [reindex-from-snapshot], snapshot with the same name already exists"}],"type":"invalid_snapshot_name_exception","reason":"[migration_assistant_repo:reindex-from-snapshot] Invalid snapshot name [reindex-from-snapshot], snapshot with the same name already exists"},"status":400}
17:46:13.447 ERROR Could not create snapshot: _snapshot/migration_assistant_repo/reindex-from-snapshot. Response Code: 400, Response Message: Bad Request, Response Body: {"error":{"root_cause":[{"type":"invalid_snapshot_name_exception","reason":"[migration_assistant_repo:reindex-from-snapshot] Invalid snapshot name [reindex-from-snapshot], snapshot with the same name already exists"}],"type":"invalid_snapshot_name_exception","reason":"[migration_assistant_repo:reindex-from-snapshot] Invalid snapshot name [reindex-from-snapshot], snapshot with the same name already exists"},"status":400}
17:46:13.448 ERROR Snapshot reindex-from-snapshot creation failed
reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3
at reactor.core.Exceptions.retryExhausted(Exceptions.java:308) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.util.retry.RetryBackoffSpec.lambda$static$0(RetryBackoffSpec.java:68) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.util.retry.RetryBackoffSpec.lambda$null$4(RetryBackoffSpec.java:560) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onNext(FluxConcatMapNoPrefetch.java:183) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.SinkManyEmitterProcessor.drain(SinkManyEmitterProcessor.java:476) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.SinkManyEmitterProcessor.tryEmitNext(SinkManyEmitterProcessor.java:273) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:194) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:149) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:194) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2097) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:118) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:415) ~[reactor-netty-core-1.1.18.jar:1.1.18]
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:446) ~[reactor-netty-core-1.1.18.jar:1.1.18]
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:500) ~[reactor-netty-core-1.1.18.jar:1.1.18]
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:793) ~[reactor-netty-http-1.1.18.jar:1.1.18]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114) ~[reactor-netty-core-1.1.18.jar:1.1.18]
Exception in thread "main" com.rfs.common.SnapshotCreator$SnapshotCreationFailed: Failed to create snapshot reindex-from-snapshot
at com.rfs.common.SnapshotCreator.createSnapshot(SnapshotCreator.java:59)
at com.rfs.worker.SnapshotStep$InitiateSnapshot.run(SnapshotStep.java:109)
at com.rfs.worker.SnapshotRunner.run(SnapshotRunner.java:31)
at com.rfs.RunRfsWorker.main(RunRfsWorker.java:126)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.108.Final.jar:4.1.108.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.108.Final.jar:4.1.108.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.108.Final.jar:4.1.108.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.108.Final.jar:4.1.108.Final]
at java.base/java.lang.Thread.run(Thread.java:829) ~[?:?]
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:104) ~[reactor-core-3.6.5.jar:3.6.5]
at reactor.core.publisher.Mono.block(Mono.java:1779) ~[reactor-core-3.6.5.jar:3.6.5]
at com.rfs.common.OpenSearchClient.createSnapshot(OpenSearchClient.java:118) ~[main/:?]
at com.rfs.common.SnapshotCreator.createSnapshot(SnapshotCreator.java:55) [main/:?]
at com.rfs.worker.SnapshotStep$InitiateSnapshot.run(SnapshotStep.java:109) [main/:?]
at com.rfs.worker.SnapshotRunner.run(SnapshotRunner.java:31) [main/:?]
at com.rfs.RunRfsWorker.main(RunRfsWorker.java:126) [main/:?]
Caused by: com.rfs.common.OpenSearchClient$OperationFailed: Could not create snapshot: _snapshot/migration_assistant_repo/reindex-from-snapshot. Response Code: 400, Response Message: Bad Request, Response Body: {"error":{"root_cause":[{"type":"invalid_snapshot_name_exception","reason":"[migration_assistant_repo:reindex-from-snapshot] Invalid snapshot name [reindex-from-snapshot], snapshot with the same name already exists"}],"type":"invalid_snapshot_name_exception","reason":"[migration_assistant_repo:reindex-from-snapshot] Invalid snapshot name [reindex-from-snapshot], snapshot with the same name already exists"},"status":400}
at com.rfs.common.OpenSearchClient.lambda$createSnapshot$4(OpenSearchClient.java:113) ~[main/:?]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132) ~[reactor-core-3.6.5.jar:3.6.5]
... 42 more
17:46:13.461 ERROR Snapshot Phase failed w/ an exception
17:46:13.461 ERROR {"exceptionMessage":"Failed to create snapshot reindex-from-snapshot","exceptionClass":"SnapshotCreationFailed","exceptionTrace":"[com.rfs.common.SnapshotCreator.createSnapshot(SnapshotCreator.java:59), com.rfs.worker.SnapshotStep$InitiateSnapshot.run(SnapshotStep.java:109), com.rfs.worker.SnapshotRunner.run(SnapshotRunner.java:31), com.rfs.RunRfsWorker.main(RunRfsWorker.java:126)]","phase":"SNAPSHOT_IN_PROGRESS","currentStep":"InitiateSnapshot","cmsEntry":"null"}
17:46:13.462 ERROR Error running RfsWorker
com.rfs.common.SnapshotCreator$SnapshotCreationFailed: Failed to create snapshot reindex-from-snapshot
at com.rfs.common.SnapshotCreator.createSnapshot(SnapshotCreator.java:59) ~[main/:?]
at com.rfs.worker.SnapshotStep$InitiateSnapshot.run(SnapshotStep.java:109) ~[main/:?]
at com.rfs.worker.SnapshotRunner.run(SnapshotRunner.java:31) ~[main/:?]
at com.rfs.RunRfsWorker.main(RunRfsWorker.java:126) [main/:?]
Check List
- [X] New functionality includes testing
- [X] All tests pass, including unit test, integration test and doctest
- [ ] New functionality has been documented
- [X] Commits are signed per the DCO using --signoff
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check here.