Add scala future test case using multiple threads
What does this PR do?
I wanted to give the scala futures implementation a spin but, hit a snag when using it with multiple threads. It seems like the transaction is not activated/deactivated at the right time in some cases. The existing test cases only use a single transaction so it wouldn't catch it.
Sometimes it fails because current tracer != the tracer passed as parameter.
Also sometimes num is different some the transaction name.
Example output:
thread=35, trace='Transaction7' 00-852e1e7ca03813413465fc7c21b4ab5e-42a231f21779d2aa-01 (4cd1e659) start transaction num=20
num should be the same as the number in the transaction name. It looks like the transaction from one future is still active when starting a different unrelated future.
The test sometimes passes. The Thread.sleep is not necessary for it to fail but makes it happen more often.
Not intended to be merged as-is, just to show the issue.
Checklist
- [ ] This is an enhancement of existing features, or a new feature in existing plugins
- [ ] I have updated CHANGELOG.asciidoc
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] Added an API method or config option? Document in which version this will be introduced
- [ ] I have made corresponding changes to the documentation
- [ ] This is a bugfix
- [ ] I have updated CHANGELOG.asciidoc
- [ ] I have added tests that would fail without this fix
- [ ] This is a new plugin
- [ ] I have updated CHANGELOG.asciidoc
- [ ] My code follows the style guidelines of this project
- [ ] I have made corresponding changes to the documentation
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] New and existing unit tests pass locally with my changes
- [ ] I have updated CHANGELOG.asciidoc
- [ ] I have updated supported-technologies.asciidoc
- [ ] Added an API method or config option? Document in which version this will be introduced
- [ ] Added an instrumentation plugin? Describe how you made sure that old, non-supported versions are not instrumented by accident.
- [ ] This is something else
- [ ] I have updated CHANGELOG.asciidoc
:broken_heart: Build Failed
the below badges are clickable and redirect to their specific view in the CI or DOCS
![]()
![]()
![]()
![]()
![]()
Expand to view the summary
Build stats
-
Start Time: 2023-07-03T11:39:28.352+0000
-
Duration: 17 min 48 sec
Steps errors 
Expand to view the steps failures
Load a resource file from a library
- Took 0 min 0 sec . View more details here
- Description:
approval-list/elastic/apm-agent-java.yml
mvn install
- Took 10 min 2 sec . View more details here
- Description:
./mvnw clean install -DskipTests=true -Dmaven.javadoc.skip=true
mvn install
- Took 0 min 27 sec . View more details here
- Description:
./mvnw clean install -DskipTests=true -Dmaven.javadoc.skip=true
mvn install
- Took 0 min 27 sec . View more details here
- Description:
./mvnw clean install -DskipTests=true -Dmaven.javadoc.skip=true
mvn install
- Took 0 min 28 sec . View more details here
- Description:
./mvnw clean install -DskipTests=true -Dmaven.javadoc.skip=true
mvn install
- Took 0 min 27 sec . View more details here
- Description:
./mvnw clean install -DskipTests=true -Dmaven.javadoc.skip=true
:grey_exclamation: Flaky test report
No test was executed to be analysed.
:robot: GitHub comments
Expand to view the GitHub comments
To re-run your PR in the CI, just comment with:
-
/test: Re-trigger the build. -
run benchmark tests: Run the benchmark tests. -
run jdk compatibility tests: Run the JDK Compatibility tests. -
run integration tests: Run the Agent Integration tests. -
run end-to-end tests: Run the APM-ITs. -
run windows tests: Build & tests on windows. -
runelasticsearch-ci/docs: Re-trigger the docs validation. (use unformatted text in the comment!)
I will have a look later today :) Looks interesting!
You are indeed correct the current scenario is only tested with 1 transaction. Although I've tested this with a dummy-application as well that has multiple transactions going on at the same time: https://github.com/milanvdm/scala-elastic-apm/tree/master/src/main/scala/me/milan/main/future
@felixbarny I've spent some time investigating this issue again :)
Kamon's implementation has some nice descriptions on how and why they instrument certain methods to make the Future context propagation work correctly.
Seems like currently the part of https://github.com/kamon-io/Kamon/blob/master/instrumentation/kamon-scala-future/src/main/scala-2.13/kamon/instrumentation/futures/scala/FutureChainingInstrumentation.scala#L39-L49 is implemented in the elastic-agent but the other cases are missing. Might be the cause of certain cases not working atm.
I tried implementing these parts but Im stuck on https://github.com/kamon-io/Kamon/blob/master/instrumentation/kamon-scala-future/src/main/scala-2.13/kamon/instrumentation/futures/scala/FutureChainingInstrumentation.scala#L56 where it cleans a context on a certain match. I cannot figure out how that should be done in the elastic-agent.
That's going to be tricky as the Elastic agent allows to be attached at runtime. At that point, the type initializer of Future has most likely already been executed. Also, the Kamon agent adds interfaces/mixins such as HasContext which also can't be done using runtime attachment.
Maybe an alternative could be to avoid context propagation by checking if the future is (as in reference equality (==)) scala.concurrent.Future.unit.
@felixbarny Been debugging this a bit more again :)
I have some strange behavior that I have a question around:
The following code is instrumenting a specific method:
@Nullable
@Advice.OnMethodEnter(suppress = Throwable.class, inline = false)
public static Object onEnter(@Advice.This Object thiz) {
logger.warn(promisesToContext.toString());
AbstractSpan<?> context = promisesToContext.remove(thiz);
if (context != null) {
logger.warn("==============");
logger.warn("ACTIVATE Run on " + Thread.currentThread().getId() + " - " + context);
context.activate();
logger.warn(tracer.currentTransaction().toString());
logger.warn("==============");
// decrements the reference we incremented to avoid that the parent context gets recycled before the promise is run
// because we have activated it, we can be sure it doesn't get recycled until we deactivate in the OnMethodExit advice
context.decrementReferences();
}
return context;
But in the logs I get the following:
2021-06-29 22:38:13,721 [ForkJoinPool-1-worker-13] WARN co.elastic.apm.agent.cache.WeakKeySoftValueLoadingCache - ==============
2021-06-29 22:38:13,721 [ForkJoinPool-1-worker-13] WARN co.elastic.apm.agent.cache.WeakKeySoftValueLoadingCache - ACTIVATE Run on 30 - '10' 00-97d38655e32835994ec9d4277289de50-0e7cd552f87e9631-01 (7c2f827d)
2021-06-29 22:38:13,721 [ForkJoinPool-1-worker-13] WARN co.elastic.apm.agent.cache.WeakKeySoftValueLoadingCache - '6' 00-ae27a70994d59099ff6d696c1b290a03-6bb34461a653557e-01 (fef99bd)
2021-06-29 22:38:13,721 [ForkJoinPool-1-worker-13] WARN co.elastic.apm.agent.cache.WeakKeySoftValueLoadingCache - ==============
thread=30 transactionNumber=10, trace='6' 00-ae27a70994d59099ff6d696c1b290a03-6bb34461a653557e-01 (fef99bd) before futureNumber=1, 6
So even after calling context.activate();, the same thread will return a different context when calling tracer.currentTransaction().toString(). I am probably missing some understanding on how the context.activate() works?
You'll need to call tracer.getActive(), which returns a AbstractSpan<?> (either a Span or a Transaction). The method tracer.currentTransaction() returns the transaction. Transactions are the entry point into the service, whereas spans are operations within that.
@felixbarny Im not sure how the getActive and the currentTransaction can give a different result though.
In the following test:
val fs = (1 to 10).map(transactionNumber => Future {
Thread.sleep(10)
val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName(transactionNumber.toString).activate()
println(s"thread=${Thread.currentThread().getId} transaction=$transactionNumber, span=${tracer.getActive}, trace=${tracer.currentTransaction()} starting transaction")
val futures = (1 to 1)
.map(futureNumber => Future {
Thread.sleep(10)
val currentTransactionNumber = tracer.currentTransaction().getNameAsString
println(s"thread=${Thread.currentThread().getId} transactionNumber=$transactionNumber, span=${tracer.getActive}. trace=${tracer.currentTransaction()} before futureNumber=$futureNumber, $currentTransactionNumber")
assertEquals(transaction, tracer.currentTransaction())
assertEquals(currentTransactionNumber.toInt, transactionNumber)
(transaction, futureNumber)
}
Since there are only rootTransactions started, shouldnt both methods always return the same transaction (as no spans are ever created as a child of the root-transaction)?
The test is failing due to this. It is activating the correct context at the right times. So getActive will correctly propagate the contexts between the Futures. But the currentTransaction method doesn't return the correct context for some reason.
Seems you are activating multiple transactions that belong to different traces on the same thread. That's an illegal state but it seems we currently don't guard against that. What happens when calling currentTransaction() is that the bottom-most span's transaction is returned.
https://github.com/elastic/apm-agent-java/blob/48d697eb4504b50b9dcb844ed2d9d4ec10a94a10/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java#L252-L255
That means if you activate multiple transactions, the first activated one will be returned.
I believe the specific issue in your example is that you don't deactivate the transaction after the inner map operation.
@felixbarny It indeed looks like the spans are correctly passed across threads and being linked to the correct Future. Meaning that tracer.getActive() will give the expected results.
As you mention, the difference is with the tracer.currentTransaction. In the current implementation, tracer.currentTransaction will always return the transaction that was created on the thread that the currentTransaction is run on.
So here I am a bit lost. I've got a solution that correctly activates and deactivates spans across different threads correctly for getActive. So I dont see how (and why) tracer.getActive and tracer.currentTransaction return different results.
How can I deactivate and activate transactions for currentTransaction in the same way I do for getActive?
I didn't read through everything, but I hope I can help:
We maintain a stack of transactions/spans for each thread. getActive returns the topmost span/transaction. currentTransaction returns the bottommost transaction or the parent transaction if the bottom is a span.
A span/transaction is pushed to the stack when you activate it and popped when you deactivate it. This has nothing to do with where the transactions/spans are created or ended.
If you run multiple futures on a thread pool, you must make sure any activated transaction is also deactivated before the thread is returned to the pool to run the next task. Otherwise, the next time you activate a transaction on that thread (either explicitly or through the context propagation mechanism), you would have two activated transactions, which is an illegal state and in which case getActive() != currentTransaction().
@henrikno a lot has changed since this was opened. Is this still relevant? Would you want to follow up on this, or should we close it for now?
Hi! We just realized that we haven't looked into this issue in a while. We're sorry! We're labeling this issue as stalled to make it hit our filters and make sure we get back to it in as soon as possible. In the meantime, it'd be extremely helpful if you could take a look at it as well and confirm its relevance. A simple comment with a nice emoji will be enough :+1. Thank you for your contribution!
Hi! This issue has been stale for a while and we're going to close it as part of our cleanup procedure. We appreciate your contribution and would like to apologize if we have not been able to review it, due to the current heavy load of the team. Feel free to re-open this issue if you think it should stay open. Thank you for your contribution!