hibernate-reactive icon indicating copy to clipboard operation
hibernate-reactive copied to clipboard

Hibernate Reactive doesn't play well with async Azure SDK's it seems

Open RJJdeVries opened this issue 1 year ago • 0 comments

Hi there,

I'm having some trouble implementing Hibernate Reactive Panache while also using the async client of Azure Blob SDK (which uses Project Reaktor under the hood).

I have a handler which does a couple of things in order:

  1. First store an entity in the database
  2. Then upload blobs to storage container
  3. Then send service bus message to queue

Now my Azure Blob adapter integration test is working fine. I am able to upload one or more blobs to an Azurite container running locally.

It gets interesting when I run the integration test for my handler. I get the following error message:

org.hibernate.HibernateException: java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [265]: 'vert.x-eventloop-thread-1' current Thread [243]: 'reactor-http-nio-4'

So I have implemented a BlobHandler which uses the async client to upload blobs. In a nutshell, I use the AdaptersToFlow.publisher() to wrap the async call.

private Uni<Void> storeSingle(BlobData blob) {
        System.out.println("Going to store a single blob.");
        return Uni
                .createFrom()
                .publisher(
                        AdaptersToFlow.publisher(
                                blobContainerClient
                                        .getBlobAsyncClient(blob.getBlobName())
                                        .uploadFromFile(blob.getFile().getPath()))
                )
                .onFailure()
                .recoverWithUni(() -> Uni.createFrom().failure(new BlobProblem(
                        new ProblemDetail(
                                "BLOB_DOWNLOAD_PROBLEM",
                                "A problem occurred while downloading a blob.",
                                Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
                                List.of()
                        )
                )));

    }

    @Override
    public Uni<Void> storeMultiple(List<BlobData> blobs) {
        System.out.println("Going to store multiple blobs.");
        return Uni
                .join()
                .all(blobs.stream().map(this::storeSingle).toList())
                .usingConcurrencyOf(blobConfig.uploadConcurrency())
                .andFailFast()
                .onItem()
                .ignore()
                .andContinueWithNull()
                .onFailure()
                .recoverWithUni(exception -> {
                    System.out.println("Failure occurred while trying to store multiple blobs.");
                    Log.error("Upload multiple blobs", "A problem occurred while uploading one or more blobs.", exception);

                    return Uni.createFrom().failure(new BlobProblem(
                            new ProblemDetail(
                                    "MULTIPLE_BLOBS_UPLOAD_PROBLEM",
                                    "A problem occurred while uploading one or more blobs.",
                                    Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
                                    List.of()
                            )
                    ));
                });
    }

I am using the blob handler in my main handler, like this:

return Uni
                .join()
                .all(listOfUnis)
                .andFailFast()
                .chain(() -> storeConversionRequest.store(conversionRequest))
                .map(ignored -> {
                    System.out.println("After storing conversion request");

                    return null;
                })
                .chain(() -> storeBlob.storeMultiple(newListOfBlobData))
                .map(ignored -> {
                    System.out.println("After uploading all the blobs");

                    return null;
                })
                .chain(() -> sendConversionRequest.send(new ConversionRequestCommand(
                                conversionRequest.getId(),
                                blobs.stream().map(BlobData::getBlobName).toList(),
                                "",
                                targetFileType
                        )
                ))
                .map(ignored -> {
                    System.out.println("After sending the command");

                    return null;
                })
                .map(ignored -> conversionRequest.getId())
                .onFailure()
                .recoverWithUni(exception -> {
                    System.out.println("Error occurred while handling a conversion request.");
                    Log.error(
                            conversionRequest.getId().toString(),
                            "An error occurred while handling new conversion request.",
                            exception
                    );

                    return Uni.createFrom().failure(exception);
                });

These are some excerpts, since its company code.

Now I have tried some stuff with the Mutiny method .runSubscriptionOn(MutinyHelper.executor(vertx.getOrCreateContext())) , but I then get an error message stating 'not using duplicated context'.

I don't think this is an issue with Hibernate per see, but I just don't get it to work and I'm starting to get a bit frustrated as well. And since the error message originates from Hibernate Reactive I'm trying my luck here!

RJJdeVries avatar May 20 '24 13:05 RJJdeVries