Add docs rgd threads and thread-safety
I detected an issue when trying to make multiple calls concurrently.
Sometimes, if I make 4 concurrent calls to Session.call() I would only get 3 responses and worse, sometimes one of the responses was the payload of another procedure...
So I dig into the autobahn-java code and noticed the following:
In Session.java you have this:
private <T> CompletableFuture<T> reallyCall(
String procedure,
List<Object> args, Map<String, Object> kwargs,
CallOptions options,
TypeReference<T> resultTypeReference,
Class<T> resultTypeClass) {
throwIfNotConnected();
CompletableFuture<T> future = new CompletableFuture<>();
long requestID = mIDGenerator.next();
mCallRequests.put(requestID, new CallRequest(requestID, procedure, future, options,
resultTypeReference, resultTypeClass));
if (options == null) {
send(new Call(requestID, procedure, args, kwargs, 0));
} else {
send(new Call(requestID, procedure, args, kwargs, options.timeout));
}
return future;
}
And in IDGenerator.java this:
public class IDGenerator {
private long mNext;
public long next() {
mNext += 1;
if (mNext > 9007199254740992L) {
mNext = 1;
}
return mNext;
}
}
As you can see, that is not thread-safe. Neither mNext nor mCallRequests can be set concurrently.
And to prove it, I created a small snippet in kotlin:
fun main() = runBlocking {
val scope = CoroutineScope(Job() + Dispatchers.IO)
println("Start generating ids")
val results = (1..50).map {
generateIdsConcurrently(scope)
}
println("Results: $results")
println("All successful: ${results.all { it }}")
}
private suspend fun generateIdsConcurrently(scope: CoroutineScope): Boolean {
val tasks = mutableListOf<Job>()
val idsMap = HashMap<Int, Int>()
val numberOfIdsExpected = 10
val idGenerator = IDGenerator()
(1..numberOfIdsExpected).onEach { index ->
val childJob = scope.launch {
//this delay forces more failures
delay(100)
val id = idGenerator.next()
idsMap[id.toInt()] = index
}
tasks.add(childJob)
}
tasks.joinAll()
val expectedIds = idsMap.values.sorted()
val generatedIds = idsMap.keys.sorted()
return expectedIds == generatedIds
}
If we run this code, we can see that it almost always fails (created 50 trials to make it more frequent). So the generated IDs aren't always sequential.
Similar issues happens to Session.subscribe() also (and potentially other methods).
SOLUTION
- First Step
Make the next() method synchronized:
public class IDGenerator {
private long mNext;
public synchronized long next() {
mNext += 1;
if (mNext > 9007199254740992L) {
mNext = 1;
}
return mNext;
}
}
This improved quite a lot but it was still failling sometimes.
- Second Step
Replace the HashMap by a ConcurrentHashMap.
With this, the test passes 100% of the time.
QUESTIONS
- Is my analysis correct or I am making some mistake?
- Is there any hidden reason for why this is not thread safe?
- Was this already detected before? I didn't find anything about it...
- Is the caller of autobahn expected to externally synchronize the calls for some reason?
QUESTIONS
- Is my analysis correct or I am making some mistake?
- Is there any hidden reason for why this is not thread safe?
- Was this already detected before? I didn't find anything about it...
- Is the caller of autobahn expected to externally synchronize the calls for some reason?
Are you using threads with autobahn API ? The Session class is designed to be single-threaded. Do you have a real reproducer for that issue ?
For more info please take a look at this issue as well https://github.com/crossbario/autobahn-java/issues/329
Is there any hidden reason for why this is not thread safe?
yes, the library was designed for async style (pls see the discussion in the issue Omer linked), and all objects of a given WAMP session must only be accessed from the same thread.
note that this doesn't mean your program has to be single threaded, and you can still use WAMP objects running on an IO thread from different threads (eg UI) using means provided by the run-time (Android) or generally thread-safe queues.
note also, that this design will generally lead to faster code, since unnecessary locking is not paid for.
Is my analysis correct or I am making some mistake?
yes, it demonstrates that AutobahnJava library objects are not threadsafe
Is there any hidden reason for why this is not thread safe?
yes, pls see above
Was this already detected before? I didn't find anything about it...
yes, it was deliberately designed like this (it works as expected)
Is the caller of autobahn expected to externally synchronize the calls for some reason?
yes, the only valid reason to use threads is to make use of multiple CPU cores for parallel processing.
for concurrent processing, you don't need threads and an event-driven/lockless design like AutobahnJava is sufficient and more efficient
Thanks for the responses @om26er @oberstet 👍
Although I already implemented the change I mentioned on my fork and so far it looks working fine in a multithreaded environment (I'm using kotlin coroutines).
Do you see any other downside about my solution other than making things a bit slower due to locking? Is there the chance of missing events or something like that?
Also, I would like to suggest to add that info (about using autobahn from a single thread) explicit on the README... Otherwise only looking at the implementation we know about that. And as you can see, I am not the first one having that doubt.
Do you see any other downside about my solution other than making things a bit slower due to locking? Is there the chance of missing events or something like that?
yes, any locking incurs deadlock risks, which you must handle somehow. rgd missed events: I haven't read your fork code .. so not sure. maybe not.
Also, I would like to suggest to add that info (about using autobahn from a single thread) explicit on the README... Otherwise only looking at the implementation we know about that. And as you can see, I am not the first one having that doubt.
yes, I totally agree. this is actually a pretty important design decision taken in the code. it is opinionated. it must be mentioned in the readme.
btw, another subtle aspect:
- ABJ is designed - for library users - as a non-locking, non-thread-safe, run-to-completion, single-threaded library
- however, ABJ itself, internally, does use threads! eg network threads for IO on android. this is necessary (on android), and the library uses the android provided facilities for inter-thread message passing
checkout https://github.com/crossbario/autobahn-java/blob/455c1c5eae70a83db1362ef848383868a676c135/autobahn/src/main/java/io/crossbar/autobahn/websocket/WebSocketConnection.java#L748
ABJ can run on Android and Netty. on Netty, things work differently and is using the native Netty WebSocket stuff
https://github.com/crossbario/autobahn-java/tree/master/autobahn/src/main/java/io/crossbar/autobahn/wamp/transports