vertx-lang-kotlin icon indicating copy to clipboard operation
vertx-lang-kotlin copied to clipboard

ReaderStream<T>.toReceiveChannel return elements in random order

Open iyfedorov opened this issue 4 years ago • 5 comments

Questions

Hi all. When i use HttpClientResponse.toReceiveChannel(Vertx) for response processing by parts received parts are consuming in random order or lost some parts.

Version

vert.x 4.2.1, kotlin 1.5.31, coroutines 1.5.2

Do you have a reproducer?

flow {
        val requestOptions = RequestOptions()
                 
        val req = _client.request(requestOptions).await()
        val resp = req.send(Buffer.buffer(body)).await()
        val statusCode = resp.statusCode()
        if (statusCode == 200) {
            val receiveChannel = resp.toReceiveChannel(vertx)
            println("=====START RECEIVE=====")
            receiveChannel.consumeEach {
                println(it.getString(0, 10) +"..."+ it.getString(it.length()-10, it.length()))
                val bytes = it.bytes
                emit(bytes)
            }
        }
    }

Steps to reproduce

In normal situation first row starts like a json document and end of each row can be matched with next row

=====START RECEIVE=====
{"logs.pro..."}}},{"dou
bles":{"ma...yzer":"log
speak"},"m...lds":{"key
word":{"ty...le"}}},"em
ail":{"typ...eyword":{"
type":"key...long"}}}}}

or

=====START RECEIVE=====
{"took":43...odales vel
 \n purus ...icitudin e
ros. Donec...2:00:00 AM
\"} ], \"i...pe\": \"ap
plication/...":"^1[./0-
9$","profi...dolor vel 
\n elit lo...,\"ip\": \
"167.197.1... mauris. F
usce a var...AM"},{"pro
fileId":34...bh, aliqua
m ut ipsum...": 30,\"pr
ofileAdded...89010]}]}}

Bad situation (from middle of document)

=====START RECEIVE=====
os. Donec ...:00:00 AM\
"} ], \"ip...e\": \"app
lication/v...:"^1[./0-9
$","profil...33030]}]}}
=====START RECEIVE=====
\n purus S...citudin er
os. Donec ...:00:00 AM\
"} ], \"ip...e\": \"app
lication/v...:"^1[./0-9
$","profil...33030]}]}}

All requests was send to elasticsearch and with other httpClients i didn't get any problems in partial processing (AsyncHttpClient, Netty)

iyfedorov avatar Nov 11 '21 16:11 iyfedorov

It can be helpful: i got like it problem with ReadStream<T>.handler

val resp = req.send(Buffer.buffer(query.body.readAllBytes())).await()
resp.endHandler {
    println("CLOSED")
    respCh.close()
}
resp.exceptionHandler {
    it.printStackTrace()
}
resp.handler {
    println(it.getString(0, 10) +"..."+ it.getString(it.length()-10, it.length()))
}  
=====START RECEIVE=====
\n purus S...citudin er
os. Donec ...:00:00 AM\
"} ], \"ip...e\": \"app
lication/v...:"^1[./0-9
$","profil...33030]}]}}
CLOSED

But when i remove .await() from send call and use Fututre.onSuccess() - i get normal response all time

req.send(Buffer.buffer(query.body.readAllBytes())).onSuccess { resp ->
    resp.endHandler {
        println("CLOSED")
    }
    resp.exceptionHandler {
      it.printStackTrace()
    }
    resp.handler {
        println(it.getString(0, 10) +"..."+ it.getString(it.length()-10, it.length()))
    }
}
{"took":23...psum primi
s in fauci...","_id":"2
201756-2-0...ue dictum,
 efficitur...oncus vari
us. Aenean...uster":"te
st","sub_c...dimentum d
ui rutrum ...tricies, a
t vehicula...,\"connect
ion_data\"..., hendreri
t vestibul...68020]}]}}
CLOSED

iyfedorov avatar Nov 12 '21 10:11 iyfedorov

Experimentally i found solution for my case :

suspend fun Future<HttpClientResponse>.await(): HttpClientResponse = when {
        succeeded() -> result().pause() <--- solution
        failed() -> throw cause()
        else -> suspendCancellableCoroutine { cont: CancellableContinuation<HttpClientResponse> ->
            onComplete { asyncResult ->
                if (asyncResult.succeeded()) cont.resume(asyncResult.result().pause()) <--- solution
                else cont.resumeWithException(asyncResult.cause())
            }
        }
}

i call HttpClientResponse.pause() immediately after taken the result. Between getting the HttpClientResponse and processing response batches i call HttpClientResponse.statusCode(). Can it invoke reading first batch without any handlers?

iyfedorov avatar Dec 28 '21 15:12 iyfedorov

is there a bug we should fix ?

vietj avatar Jan 03 '22 09:01 vietj

can you provide a reproducer project please ?

vietj avatar Jan 03 '22 09:01 vietj

Hi, sorry, i missed github notification letter. I will prepare reproducer project when i get some free time. But now i think that problem in some specific elasticsearch response details (like a long warning header with text or chunked content-encoding).

iyfedorov avatar Jan 27 '22 12:01 iyfedorov

Please reopen if you can provide a reproducer.

tsegismont avatar Oct 06 '23 12:10 tsegismont