spring-data-opensearch icon indicating copy to clipboard operation
spring-data-opensearch copied to clipboard

[FEATURE] Reactive Client for OpenSearch

Open naumenkovadym opened this issue 1 year ago • 11 comments

Hi OpenSearch Team,

I see that Reactive classes from Spring Data Elasticsearch are missing, like DefaultReactiveElasticsearchClient: https://github.com/spring-projects/spring-data-elasticsearch/tree/main/src/main/java/org/springframework/data/elasticsearch/client/erhlc

https://github.com/opensearch-project/spring-data-opensearch/tree/main/spring-data-opensearch/src/main/java/org/opensearch/data/client/orhlc

How can I use Spring WebFlux with OpenSearch?

Thanks.

naumenkovadym avatar Mar 16 '23 19:03 naumenkovadym

This sounds like it's worth working on. Feel free to propose an architecture here or to submit a PR.

wbeckler avatar Apr 03 '23 18:04 wbeckler

How can I use Spring WebFlux with OpenSearch?

You can use the open search async client with Mono.fromFuture but this is just a workaround, for example Mono.fromFuture(client.search(searchRequest, JsonNode.class))

j3t avatar Apr 06 '23 14:04 j3t

Thanks, I managed to do it using the old spring-data-elasticsearch

naumenkovadym avatar Apr 12 '23 16:04 naumenkovadym

@naumenkovadym Please can you help me how you were able to implement the same as I am stuck at same place and not able to integrate with Spring WebFlux

waytoharish avatar Aug 08 '23 14:08 waytoharish

hi @waytoharish

Sure, here's an example

plugins {
    id 'java'
    id 'org.springframework.boot' version '2.5.14'
    id 'io.spring.dependency-management' version '1.1.0'
}

group = 'com.example'
version = '1.0.0'

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'org.springframework.boot:spring-boot-starter-validation'
    implementation('org.springframework.boot:spring-boot-starter-data-elasticsearch') {
        exclude group: 'org.elasticsearch'
        exclude group: 'org.elasticsearch.client'
    }
    implementation "org.elasticsearch.client:elasticsearch-rest-high-level-client:7.13.4"

    implementation 'io.netty:netty-all'

    implementation("com.google.guava:guava:31.1-jre")

    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'

    implementation 'ca.pjer:logback-awslogs-appender:1.6.0'
    implementation 'ch.qos.logback:logback-classic'
    implementation 'ch.qos.logback:logback-classic'

    implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'

    implementation('com.fasterxml.jackson.core:jackson-core:2.10.0')
    implementation('com.fasterxml.jackson.core:jackson-annotations:2.10.0')
    implementation('com.fasterxml.jackson.core:jackson-databind:2.10.0')

    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'io.projectreactor:reactor-test'
    testImplementation("org.testcontainers:elasticsearch:1.17.6")
    testImplementation 'org.junit.vintage:junit-vintage-engine:5.9.2'
}

Configuration

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.client.reactive.ReactiveRestClients;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.web.reactive.function.client.ExchangeStrategies;

@Configuration
public class OpenSearchConfiguration {

    private static final int DEFAULT_CODEC_MAX_MEMORY_SIZE = 16 * 1024 * 1024;

    //vpc-...
    @Value("${opensearch.host}")
    private String host;

    //443
    @Value("${opensearch.port}")
    private int port;

    //https
    @Value("${opensearch.scheme}")
    private String scheme;

    @Value("${opensearch.user}")
    private String user;

    @Value("${opensearch.pass}")
    private String pass;

    @Bean
    public RestHighLevelClient restHighLevelClient() {

        String httpHostAddress = scheme + "://" + host + ":" + port;
        RestClientBuilder restClientBuilder = RestClient.builder(HttpHost.create(httpHostAddress));

        if (user != null && pass != null && !user.isEmpty() && !pass.isEmpty()) {
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(
                    AuthScope.ANY,
                    new UsernamePasswordCredentials(user, pass)
            );
            restClientBuilder.setHttpClientConfigCallback(httpClientBuilder ->
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
        }

        return new RestHighLevelClient(restClientBuilder);
    }

    @Bean
    public ReactiveElasticsearchClient reactiveOpensearchClient() {

        String httpHostAddress = host + ":" + port;

        ClientConfiguration.MaybeSecureClientConfigurationBuilder clientConfigurationBuilder =
                ClientConfiguration.builder()
                        .connectedTo(httpHostAddress);

        if (user != null && pass != null && !user.isEmpty() && !pass.isEmpty()) {
            clientConfigurationBuilder = (ClientConfiguration.MaybeSecureClientConfigurationBuilder)
                    clientConfigurationBuilder.usingSsl().withBasicAuth(user, pass);
        }

        return ReactiveRestClients.create(clientConfigurationBuilder.withWebClientConfigurer(webClient -> {
            ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
                    .codecs(configurer -> configurer.defaultCodecs()
                            .maxInMemorySize(DEFAULT_CODEC_MAX_MEMORY_SIZE))
                    .build();
            return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
        }).build());
    }


    @Bean
    public ElasticsearchConverter elasticsearchConverter() {
        return new MappingElasticsearchConverter(elasticsearchMappingContext());
    }

    @Bean
    public SimpleElasticsearchMappingContext elasticsearchMappingContext() {
        return new SimpleElasticsearchMappingContext();
    }

    @Bean
    public ReactiveElasticsearchOperations reactiveElasticsearchOperations() {
        return new ReactiveElasticsearchTemplate(reactiveOpensearchClient(), elasticsearchConverter());
    }
}

ReactiveElasticsearchOperations should be used in repos

naumenkovadym avatar Aug 08 '23 14:08 naumenkovadym

Would really appreciate if this issue could be worked on! It would help us a lot!

cunhap avatar Oct 04 '23 09:10 cunhap

@cunhap care to help?

dblock avatar Oct 13 '23 18:10 dblock

Any progress with the issue?

BarboraCigankova avatar Jan 16 '24 10:01 BarboraCigankova

@BarboraCigankova @cunhap I don't think anyone is working on this.

dblock avatar Jan 17 '24 22:01 dblock

Has there been any activity on this issue?

kalyashov avatar Apr 16 '24 13:04 kalyashov

@kalyashov I don't think anyone is working on this.

reta avatar Apr 16 '24 16:04 reta

Duplicate of https://github.com/opensearch-project/spring-data-opensearch/issues/23

reta avatar May 08 '24 19:05 reta