vertx-lang-kotlin
vertx-lang-kotlin copied to clipboard
ReaderStream<T>.toReceiveChannel return elements in random order
Questions
Hi all. When i use HttpClientResponse.toReceiveChannel(Vertx) for response processing by parts received parts are consuming in random order or lost some parts.
Version
vert.x 4.2.1, kotlin 1.5.31, coroutines 1.5.2
Do you have a reproducer?
flow {
val requestOptions = RequestOptions()
val req = _client.request(requestOptions).await()
val resp = req.send(Buffer.buffer(body)).await()
val statusCode = resp.statusCode()
if (statusCode == 200) {
val receiveChannel = resp.toReceiveChannel(vertx)
println("=====START RECEIVE=====")
receiveChannel.consumeEach {
println(it.getString(0, 10) +"..."+ it.getString(it.length()-10, it.length()))
val bytes = it.bytes
emit(bytes)
}
}
}
Steps to reproduce
In normal situation first row starts like a json document and end of each row can be matched with next row
=====START RECEIVE=====
{"logs.pro..."}}},{"dou
bles":{"ma...yzer":"log
speak"},"m...lds":{"key
word":{"ty...le"}}},"em
ail":{"typ...eyword":{"
type":"key...long"}}}}}
or
=====START RECEIVE=====
{"took":43...odales vel
\n purus ...icitudin e
ros. Donec...2:00:00 AM
\"} ], \"i...pe\": \"ap
plication/...":"^1[./0-
9$","profi...dolor vel
\n elit lo...,\"ip\": \
"167.197.1... mauris. F
usce a var...AM"},{"pro
fileId":34...bh, aliqua
m ut ipsum...": 30,\"pr
ofileAdded...89010]}]}}
Bad situation (from middle of document)
=====START RECEIVE=====
os. Donec ...:00:00 AM\
"} ], \"ip...e\": \"app
lication/v...:"^1[./0-9
$","profil...33030]}]}}
=====START RECEIVE=====
\n purus S...citudin er
os. Donec ...:00:00 AM\
"} ], \"ip...e\": \"app
lication/v...:"^1[./0-9
$","profil...33030]}]}}
All requests was send to elasticsearch and with other httpClients i didn't get any problems in partial processing (AsyncHttpClient, Netty)
It can be helpful: i got like it problem with ReadStream<T>.handler
val resp = req.send(Buffer.buffer(query.body.readAllBytes())).await()
resp.endHandler {
println("CLOSED")
respCh.close()
}
resp.exceptionHandler {
it.printStackTrace()
}
resp.handler {
println(it.getString(0, 10) +"..."+ it.getString(it.length()-10, it.length()))
}
=====START RECEIVE=====
\n purus S...citudin er
os. Donec ...:00:00 AM\
"} ], \"ip...e\": \"app
lication/v...:"^1[./0-9
$","profil...33030]}]}}
CLOSED
But when i remove .await() from send call and use Fututre.onSuccess() - i get normal response all time
req.send(Buffer.buffer(query.body.readAllBytes())).onSuccess { resp ->
resp.endHandler {
println("CLOSED")
}
resp.exceptionHandler {
it.printStackTrace()
}
resp.handler {
println(it.getString(0, 10) +"..."+ it.getString(it.length()-10, it.length()))
}
}
{"took":23...psum primi
s in fauci...","_id":"2
201756-2-0...ue dictum,
efficitur...oncus vari
us. Aenean...uster":"te
st","sub_c...dimentum d
ui rutrum ...tricies, a
t vehicula...,\"connect
ion_data\"..., hendreri
t vestibul...68020]}]}}
CLOSED
Experimentally i found solution for my case :
suspend fun Future<HttpClientResponse>.await(): HttpClientResponse = when {
succeeded() -> result().pause() <--- solution
failed() -> throw cause()
else -> suspendCancellableCoroutine { cont: CancellableContinuation<HttpClientResponse> ->
onComplete { asyncResult ->
if (asyncResult.succeeded()) cont.resume(asyncResult.result().pause()) <--- solution
else cont.resumeWithException(asyncResult.cause())
}
}
}
i call HttpClientResponse.pause() immediately after taken the result. Between getting the HttpClientResponse and processing response batches i call HttpClientResponse.statusCode(). Can it invoke reading first batch without any handlers?
is there a bug we should fix ?
can you provide a reproducer project please ?
Hi, sorry, i missed github notification letter. I will prepare reproducer project when i get some free time. But now i think that problem in some specific elasticsearch response details (like a long warning header with text or chunked content-encoding).
Please reopen if you can provide a reproducer.