quarkus-langchain4j icon indicating copy to clipboard operation
quarkus-langchain4j copied to clipboard

RegisterAiService is locked down to ChatLanguageModel

Open vietk opened this issue 2 years ago • 27 comments

Hello,

Currently the RegisterAiService cannot find or be configured to use a StreamingChatLanguageModel, it's locked to a "Blocking"ChatLanguageModel It would be nice to have streaming here to write chat bots with provider that support streaming of response.

Regards

vietk avatar Dec 06 '23 23:12 vietk

Makes sense!

geoand avatar Dec 07 '23 04:12 geoand

Actually, this would only make sense if the response of the service is an async type of String

geoand avatar Dec 12 '23 10:12 geoand

Yes, and apparenlty the underlying langchain API, TokenStream, handles only Consumer<String>

On my side I have made a trial with OpenAI client where in the code you can hack the TokenStream return method.

    interface Bot {
      @SystemMessage("""
          xxx
          """)
      TokenStream chat(@UserMessage String question);
    }

If you add the support for Async methods (CompletableFuture/Mutiny) types there it would do the trick

Then I need to link it with AIService using manual registration and it worked well.

return AiServices.builder(Bot.class).streamingChatLanguageModel(model).retriever(retriever).build();

At the end, a wrapper class sends back Multi<String> instead of TokenStream ( that is not very friendly API)

vietk avatar Dec 12 '23 10:12 vietk

By the way, Are you planning to add this feature soon? I could try to contribute it, just give me your status on that

vietk avatar Dec 12 '23 11:12 vietk

You can certainly take a swing at it if you like! I have not started looking into

geoand avatar Dec 12 '23 11:12 geoand

I'm implementing the StreamingChatLanguageModel for the bam and watsonx module and I noticed the problem of "locking" on the ChatLanguageModel.

Is there any news on this issue? I could help to "unlock" it 😄

andreadimaio avatar Feb 18 '24 22:02 andreadimaio

Hello I have something in the middle :D but haven't time to finish it yet So to me, feel free to contribute

vietk avatar Feb 19 '24 06:02 vietk

At first glance at the code, the "unlock" operation should be simple. What is missing is the addInjectionPoint of the StreamingChatModel into the AiServicesProcessor and also adding the injected instance into the AiServicesRecorder class (quarkusAiServices.streamingChatLanguageModel).

I need to think if it is possible to avoid using the TokenStream class. Any suggestions are appreciated.

andreadimaio avatar Feb 19 '24 08:02 andreadimaio

I'm going to share with you a draft version of the code to unlock the StreamingChatModel and allow the user to use Multi<String> instead of TokenStream. The idea is to share with you what I've done so far to understand if I'm on the right track or if I need to do a big rollback 💥.

You can find the commit in my repository https://github.com/andreadimaio/quarkus-langchain4j. First thing to explain... you will find a lot of changes in different classes, this is because I need a fine control to choose what to enable/disable in the recorder classes. In detail, I changed the .enableIntegration() to .enableChatModel(), .enableStreamingChatModel() and .enableEmbeddingModel.

Now let's focus on the major changes in the core project. The first changes are in AiServicesProcessor.java where I added the addInjectionPoint for the StreamingChatModel class and also in AiServicesRecorder.java where I updated the context, this enables the StreamingChatModel.

To add the possibility of using the Multi<String> what I have done is to wrap a TokenStream as you can see in the AiServiceMethodImplementationSupport.java

if (returnType.equals(Multi.class)) {
            return Multi.createFrom().emitter(new Consumer<MultiEmitter<? super String>>() {

                @Override
                public void accept(MultiEmitter<? super String> em) {
                    new AiServiceTokenStream(messages, context, memoryId)
                        .onNext(em::emit)
                        .onComplete(s -> em.complete())
                        .onError(em::fail)
                        .start();
                }
            });
        }

I'm testing these changes with the bam module and everything seems fine except for some unusual behavior that I'd like to share with you.

  1. Tests I have execute on my local environment the junit tests, is all green except for the simple-ollama.
[ERROR] Failed to execute goal io.quarkus:quarkus-maven-plugin:3.7.3:build (default) on project quarkus-langchain4j-integration-test-simple-ollama: Failed to build quarkus application: io.quarkus.builder.BuildException: Build failure: Build failed due to errors
[ERROR]         [error]: Build step io.quarkus.arc.deployment.ArcProcessor#validate threw an exception: jakarta.enterprise.inject.spi.DeploymentException: jakarta.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type dev.langchain4j.model.chat.StreamingChatLanguageModel and qualifiers [@jakarta.enterprise.inject.Default]
[ERROR]         - synthetic injection point
[ERROR]         - declared on SYNTHETIC bean [types=[io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContext, java.lang.Object], qualifiers=[@Any, @io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContextQualifier("org.acme.Assistant")], target=n/a]
[ERROR]         at io.quarkus.arc.processor.BeanDeployment.processErrors(BeanDeployment.java:1508)
[ERROR]         at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:320)
[ERROR]         at io.quarkus.arc.processor.BeanProcessor.initialize(BeanProcessor.java:160)
[ERROR]         at io.quarkus.arc.deployment.ArcProcessor.validate(ArcProcessor.java:488)
[ERROR]         at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
[ERROR]         at java.base/java.lang.reflect.Method.invoke(Method.java:580)
[ERROR]         at io.quarkus.deployment.ExtensionLoader$3.execute(ExtensionLoader.java:849)
[ERROR]         at io.quarkus.builder.BuildContext.run(BuildContext.java:256)
[ERROR]         at org.jboss.threads.ContextHandler$1.runWith(ContextHandler.java:18)
[ERROR]         at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2513)
[ERROR]         at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1538)
[ERROR]         at java.base/java.lang.Thread.run(Thread.java:1583)
[ERROR]         at org.jboss.threads.JBossThread.run(JBossThread.java:501)
[ERROR] Caused by: jakarta.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type dev.langchain4j.model.chat.StreamingChatLanguageModel and qualifiers [@jakarta.enterprise.inject.Default]
[ERROR]         - synthetic injection point
[ERROR]         - declared on SYNTHETIC bean [types=[io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContext, java.lang.Object], qualifiers=[@Any, @io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContextQualifier("org.acme.Assistant")], target=n/a]
[ERROR]         at io.quarkus.arc.processor.Beans.resolveInjectionPoint(Beans.java:518)
[ERROR]         at io.quarkus.arc.processor.BeanInfo.init(BeanInfo.java:638)
[ERROR]         at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:308)
[ERROR]         ... 11 more
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR]   mvn <args> -rf :quarkus-langchain4j-integration-test-simple-ollama

I'm investigating why I'm getting this error, the .enableStreamingChatModel() should fix it, but it seems this change doesn't work for DevServices.

  1. Multi with QuarkusRestClientBuilder To enable the StreamingChatModel for the bam module I'm using the Multi<String> as a response in the BamRestApi class, and when I use this kind of response type the QuarkusJsonCodecFactory.SnakeCaseObjectMapperHolder.MAPPER doesn't work, so I have to force it with the @JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) at the class level.

  2. In the project that I'm using as test (git clone https://github.com/andreadimaio/multistreamtest.git), I have a PoemService interface

@RegisterAiService
public interface PoemService {
    
    @SystemMessage("You are a poet")
    @UserMessage("Write a poem of 5 lines about {topic}")
    public TokenStream poemTokenStream(String topic);

    @SystemMessage("You are a poet")
    @UserMessage("Write a poem of 5 lines about {topic}")
    public Multi<String> poemMulti(String topic);
}

used and exposed by the PoemResource class

@Path("/poem")
public class PoemResource {

    @Inject
    PoemService service;

    @GET
    @Path("/token_stream")
    @Produces(MediaType.TEXT_PLAIN)
    public Multi<String> tokenStream() {
        return Multi.createFrom().emitter(em -> {
            service.poemTokenStream("dog")
                .onNext(em::emit)
                .onComplete(s -> {
                    System.out.println(s);
                    em.complete();
                })
                .onError(em::fail)
                .start();
        });
    }
    
    @GET
    @Path("/multi_stream")
    @Produces(MediaType.TEXT_PLAIN)
    public Multi<String> multiStream() {
        return service.poemMulti("dog");
    }
}

In the first endpoint the LLM interface returns a TokenStream which I remapped with a Multi and the result is what I expect.

> curl http://localhost:8080/poem/token_stream
  A furry friend so loyal and true,
Tail wagging, eyes shining bright,
Bringing joy and comfort to you,
A companion through day and night,
A love that will never take flight.

In the second endpoint, the LLM interface returns a Multi and in this case, as you can see, the response is always a JSON, also using the same prompt.

> curl http://localhost:8080/poem/multi_stream
  {
"barks": "loud",
"fur": "soft",
"tail": "wagging",
"eyes": "brown",
"love": "unconditional"
}

I think this behavior is due to the return type, but I don't know where in the code I can check this.

@geoand :pray:

andreadimaio avatar Feb 21 '24 10:02 andreadimaio

Nice!

Can you open a draft PR please? That will make it a lot easier to collaborate on what the remaining issues.

geoand avatar Feb 21 '24 10:02 geoand

Hello, just my thoughts on this because I wanted to achieve the same: I've been stopped by the fact that when the AIService method is synchronous there's a bit of logic that I was not able to include before returning a multi from the token stream.

vietk avatar Feb 21 '24 11:02 vietk

Some of the business logic should be in the AiServiceStreamingResponseHandler class used by the async workflow, but I haven't tested it yet. What is definitely missing is the part related to the audit and moderation. Maybe these could be added in the onComplete handler.

andreadimaio avatar Feb 21 '24 12:02 andreadimaio

Hey there ... the LLM-Streaming feature would be really cool as it could come in handy for a project and would greatly improve the usability and user experience! Is there some news about that? Thanks.

wfrank2509 avatar Mar 26 '24 09:03 wfrank2509

The preliminary work was done by @andreadimaio and will be available with our next release which is due in a few days

geoand avatar Mar 26 '24 09:03 geoand

@wfrank2509, you must use Multi<String> as the return type of a method, if you want to use the streaming functionality.

@RegisterAiService
public interface LLMService {

    @SystemMessage("...")
    @UserMessage("...")
    Multi<String> poem(...);
}

andreadimaio avatar Mar 28 '24 14:03 andreadimaio

Sounds cool - Thank you!

wfrank2509 avatar Mar 28 '24 15:03 wfrank2509

Hey @andreadimaio,

thanks for the implementation. I had a look at the PR and I was wondering, does this only work for WatsonX? Or does it also work for OpenAI?

Thanks!

andreas-repp avatar Mar 28 '24 16:03 andreas-repp

It should work for OpenAI as well

geoand avatar Mar 28 '24 16:03 geoand

Hey @andreadimaio,

thanks for the implementation. I had a look at the PR and I was wondering, does this only work for WatsonX? Or does it also work for OpenAI?

Thanks!

In general the Multi<String> should works for all the module that implement the StreamingChatModel interface.

andreadimaio avatar Mar 28 '24 16:03 andreadimaio

How do I enable this for the OpenAI integration? I see that the extension has a OpenAiStreamingChatModel but it doesn't seem to work. In my code, I get the following error where it seems it cannot find a bean. Do you know what I might be doing wrong?

2024-03-28 17:35:51,681 ERROR [io.qua.dep.dev.IsolatedDevModeMain] (main) Failed to start quarkus: java.lang.RuntimeException: io.quarkus.builder.BuildException: Build failure: Build failed due to errors
	[error]: Build step io.quarkus.arc.deployment.ArcProcessor#validate threw an exception: jakarta.enterprise.inject.spi.DeploymentException: jakarta.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type dev.langchain4j.model.chat.StreamingChatLanguageModel and qualifiers [@jakarta.enterprise.inject.Default]
	- synthetic injection point
	- declared on SYNTHETIC bean [types=[io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContext, java.lang.Object], qualifiers=[@Any, @io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContextQualifier("com.arconsis.youtube.quarkus.langchain.services.ai.SimplePoemAiService")], target=n/a]
	at io.quarkus.arc.processor.BeanDeployment.processErrors(BeanDeployment.java:1508)
	at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:320)
	at io.quarkus.arc.processor.BeanProcessor.initialize(BeanProcessor.java:160)
	at io.quarkus.arc.deployment.ArcProcessor.validate(ArcProcessor.java:488)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at io.quarkus.deployment.ExtensionLoader$3.execute(ExtensionLoader.java:849)
	at io.quarkus.builder.BuildContext.run(BuildContext.java:256)
	at org.jboss.threads.ContextHandler$1.runWith(ContextHandler.java:18)
	at org.jboss.threads.EnhancedQueueExecutor$Task.doRunWith(EnhancedQueueExecutor.java:2516)
	at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2495)
	at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1521)
	at java.base/java.lang.Thread.run(Thread.java:1583)
	at org.jboss.threads.JBossThread.run(JBossThread.java:483)
Caused by: jakarta.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type dev.langchain4j.model.chat.StreamingChatLanguageModel and qualifiers [@jakarta.enterprise.inject.Default]
	- synthetic injection point
	- declared on SYNTHETIC bean [types=[io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContext, java.lang.Object], qualifiers=[@Any, @io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContextQualifier("com.arconsis.youtube.quarkus.langchain.services.ai.SimplePoemAiService")], target=n/a]
	at io.quarkus.arc.processor.Beans.resolveInjectionPoint(Beans.java:519)
	at io.quarkus.arc.processor.BeanInfo.init(BeanInfo.java:638)
	at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:308)
	... 12 more

	at io.quarkus.runner.bootstrap.AugmentActionImpl.runAugment(AugmentActionImpl.java:334)
	at io.quarkus.runner.bootstrap.AugmentActionImpl.createInitialRuntimeApplication(AugmentActionImpl.java:251)
	at io.quarkus.runner.bootstrap.AugmentActionImpl.createInitialRuntimeApplication(AugmentActionImpl.java:60)
	at io.quarkus.deployment.dev.IsolatedDevModeMain.firstStart(IsolatedDevModeMain.java:112)
	at io.quarkus.deployment.dev.IsolatedDevModeMain.accept(IsolatedDevModeMain.java:433)
	at io.quarkus.deployment.dev.IsolatedDevModeMain.accept(IsolatedDevModeMain.java:55)
	at io.quarkus.bootstrap.app.CuratedApplication.runInCl(CuratedApplication.java:138)
	at io.quarkus.bootstrap.app.CuratedApplication.runInAugmentClassLoader(CuratedApplication.java:93)
	at io.quarkus.deployment.dev.DevModeMain.start(DevModeMain.java:131)
	at io.quarkus.deployment.dev.DevModeMain.main(DevModeMain.java:62)
Caused by: io.quarkus.builder.BuildException: Build failure: Build failed due to errors
	[error]: Build step io.quarkus.arc.deployment.ArcProcessor#validate threw an exception: jakarta.enterprise.inject.spi.DeploymentException: jakarta.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type dev.langchain4j.model.chat.StreamingChatLanguageModel and qualifiers [@jakarta.enterprise.inject.Default]
	- synthetic injection point
	- declared on SYNTHETIC bean [types=[io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContext, java.lang.Object], qualifiers=[@Any, @io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContextQualifier("com.arconsis.youtube.quarkus.langchain.services.ai.SimplePoemAiService")], target=n/a]
	at io.quarkus.arc.processor.BeanDeployment.processErrors(BeanDeployment.java:1508)
	at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:320)
	at io.quarkus.arc.processor.BeanProcessor.initialize(BeanProcessor.java:160)
	at io.quarkus.arc.deployment.ArcProcessor.validate(ArcProcessor.java:488)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at io.quarkus.deployment.ExtensionLoader$3.execute(ExtensionLoader.java:849)
	at io.quarkus.builder.BuildContext.run(BuildContext.java:256)
	at org.jboss.threads.ContextHandler$1.runWith(ContextHandler.java:18)
	at org.jboss.threads.EnhancedQueueExecutor$Task.doRunWith(EnhancedQueueExecutor.java:2516)
	at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2495)
	at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1521)
	at java.base/java.lang.Thread.run(Thread.java:1583)
	at org.jboss.threads.JBossThread.run(JBossThread.java:483)
Caused by: jakarta.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type dev.langchain4j.model.chat.StreamingChatLanguageModel and qualifiers [@jakarta.enterprise.inject.Default]
	- synthetic injection point
	- declared on SYNTHETIC bean [types=[io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContext, java.lang.Object], qualifiers=[@Any, @io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContextQualifier("com.arconsis.youtube.quarkus.langchain.services.ai.SimplePoemAiService")], target=n/a]
	at io.quarkus.arc.processor.Beans.resolveInjectionPoint(Beans.java:519)
	at io.quarkus.arc.processor.BeanInfo.init(BeanInfo.java:638)
	at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:308)
	... 12 more

	at io.quarkus.builder.Execution.run(Execution.java:123)
	at io.quarkus.builder.BuildExecutionBuilder.execute(BuildExecutionBuilder.java:79)
	at io.quarkus.deployment.QuarkusAugmentor.run(QuarkusAugmentor.java:160)
	at io.quarkus.runner.bootstrap.AugmentActionImpl.runAugment(AugmentActionImpl.java:330)
	... 9 more
Caused by: jakarta.enterprise.inject.spi.DeploymentException: jakarta.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type dev.langchain4j.model.chat.StreamingChatLanguageModel and qualifiers [@jakarta.enterprise.inject.Default]
	- synthetic injection point
	- declared on SYNTHETIC bean [types=[io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContext, java.lang.Object], qualifiers=[@Any, @io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContextQualifier("com.arconsis.youtube.quarkus.langchain.services.ai.SimplePoemAiService")], target=n/a]
	at io.quarkus.arc.processor.BeanDeployment.processErrors(BeanDeployment.java:1508)
	at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:320)
	at io.quarkus.arc.processor.BeanProcessor.initialize(BeanProcessor.java:160)
	at io.quarkus.arc.deployment.ArcProcessor.validate(ArcProcessor.java:488)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at io.quarkus.deployment.ExtensionLoader$3.execute(ExtensionLoader.java:849)
	at io.quarkus.builder.BuildContext.run(BuildContext.java:256)
	at org.jboss.threads.ContextHandler$1.runWith(ContextHandler.java:18)
	at org.jboss.threads.EnhancedQueueExecutor$Task.doRunWith(EnhancedQueueExecutor.java:2516)
	at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2495)
	at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1521)
	at java.base/java.lang.Thread.run(Thread.java:1583)
	at org.jboss.threads.JBossThread.run(JBossThread.java:483)
Caused by: jakarta.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type dev.langchain4j.model.chat.StreamingChatLanguageModel and qualifiers [@jakarta.enterprise.inject.Default]
	- synthetic injection point
	- declared on SYNTHETIC bean [types=[io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContext, java.lang.Object], qualifiers=[@Any, @io.quarkiverse.langchain4j.runtime.aiservice.QuarkusAiServiceContextQualifier("com.arconsis.youtube.quarkus.langchain.services.ai.SimplePoemAiService")], target=n/a]
	at io.quarkus.arc.processor.Beans.resolveInjectionPoint(Beans.java:519)
	at io.quarkus.arc.processor.BeanInfo.init(BeanInfo.java:638)
	at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:308)
	... 12 more

andreas-repp avatar Mar 28 '24 16:03 andreas-repp

Just doing:

    @RegisterAiService
    interface Assistant {

        Multi<String> chat(String message);
    }

works for me

geoand avatar Mar 28 '24 16:03 geoand

What version are you using? I tested it now and everything works (with OpenAI)

andreadimaio avatar Mar 28 '24 16:03 andreadimaio

It should work with 0.10.z.

BTW, I will be releasing 0.10.2 in a few minutes (if nothing unexpected comes up)

geoand avatar Mar 28 '24 16:03 geoand

Could it be because I use Kotlin? I'm using 0.10.1.

andreas-repp avatar Mar 28 '24 16:03 andreas-repp

I doubt it, but with Kotlin, nothing surprises me...

If you attach a sample, I will take a look tomorrow

geoand avatar Mar 28 '24 16:03 geoand

I found my issue... I accidentially still had the huggingface extension enabled in my playground project and that doesn't support the streaming... sorry for that :(

andreas-repp avatar Mar 28 '24 17:03 andreas-repp

No problem :)

geoand avatar Mar 28 '24 17:03 geoand