spring-data-opensearch
spring-data-opensearch copied to clipboard
[FEATURE] Reactive Client for OpenSearch
Hi OpenSearch Team,
I see that Reactive classes from Spring Data Elasticsearch are missing, like DefaultReactiveElasticsearchClient: https://github.com/spring-projects/spring-data-elasticsearch/tree/main/src/main/java/org/springframework/data/elasticsearch/client/erhlc
https://github.com/opensearch-project/spring-data-opensearch/tree/main/spring-data-opensearch/src/main/java/org/opensearch/data/client/orhlc
How can I use Spring WebFlux with OpenSearch?
Thanks.
This sounds like it's worth working on. Feel free to propose an architecture here or to submit a PR.
How can I use Spring WebFlux with OpenSearch?
You can use the open search async client with Mono.fromFuture but this is just a workaround, for example Mono.fromFuture(client.search(searchRequest, JsonNode.class))
Thanks, I managed to do it using the old spring-data-elasticsearch
@naumenkovadym Please can you help me how you were able to implement the same as I am stuck at same place and not able to integrate with Spring WebFlux
hi @waytoharish
Sure, here's an example
plugins {
id 'java'
id 'org.springframework.boot' version '2.5.14'
id 'io.spring.dependency-management' version '1.1.0'
}
group = 'com.example'
version = '1.0.0'
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation('org.springframework.boot:spring-boot-starter-data-elasticsearch') {
exclude group: 'org.elasticsearch'
exclude group: 'org.elasticsearch.client'
}
implementation "org.elasticsearch.client:elasticsearch-rest-high-level-client:7.13.4"
implementation 'io.netty:netty-all'
implementation("com.google.guava:guava:31.1-jre")
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
implementation 'ca.pjer:logback-awslogs-appender:1.6.0'
implementation 'ch.qos.logback:logback-classic'
implementation 'ch.qos.logback:logback-classic'
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
implementation('com.fasterxml.jackson.core:jackson-core:2.10.0')
implementation('com.fasterxml.jackson.core:jackson-annotations:2.10.0')
implementation('com.fasterxml.jackson.core:jackson-databind:2.10.0')
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
testImplementation("org.testcontainers:elasticsearch:1.17.6")
testImplementation 'org.junit.vintage:junit-vintage-engine:5.9.2'
}
Configuration
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.client.reactive.ReactiveRestClients;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
@Configuration
public class OpenSearchConfiguration {
private static final int DEFAULT_CODEC_MAX_MEMORY_SIZE = 16 * 1024 * 1024;
//vpc-...
@Value("${opensearch.host}")
private String host;
//443
@Value("${opensearch.port}")
private int port;
//https
@Value("${opensearch.scheme}")
private String scheme;
@Value("${opensearch.user}")
private String user;
@Value("${opensearch.pass}")
private String pass;
@Bean
public RestHighLevelClient restHighLevelClient() {
String httpHostAddress = scheme + "://" + host + ":" + port;
RestClientBuilder restClientBuilder = RestClient.builder(HttpHost.create(httpHostAddress));
if (user != null && pass != null && !user.isEmpty() && !pass.isEmpty()) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(user, pass)
);
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
return new RestHighLevelClient(restClientBuilder);
}
@Bean
public ReactiveElasticsearchClient reactiveOpensearchClient() {
String httpHostAddress = host + ":" + port;
ClientConfiguration.MaybeSecureClientConfigurationBuilder clientConfigurationBuilder =
ClientConfiguration.builder()
.connectedTo(httpHostAddress);
if (user != null && pass != null && !user.isEmpty() && !pass.isEmpty()) {
clientConfigurationBuilder = (ClientConfiguration.MaybeSecureClientConfigurationBuilder)
clientConfigurationBuilder.usingSsl().withBasicAuth(user, pass);
}
return ReactiveRestClients.create(clientConfigurationBuilder.withWebClientConfigurer(webClient -> {
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
.codecs(configurer -> configurer.defaultCodecs()
.maxInMemorySize(DEFAULT_CODEC_MAX_MEMORY_SIZE))
.build();
return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
}).build());
}
@Bean
public ElasticsearchConverter elasticsearchConverter() {
return new MappingElasticsearchConverter(elasticsearchMappingContext());
}
@Bean
public SimpleElasticsearchMappingContext elasticsearchMappingContext() {
return new SimpleElasticsearchMappingContext();
}
@Bean
public ReactiveElasticsearchOperations reactiveElasticsearchOperations() {
return new ReactiveElasticsearchTemplate(reactiveOpensearchClient(), elasticsearchConverter());
}
}
ReactiveElasticsearchOperations should be used in repos
Would really appreciate if this issue could be worked on! It would help us a lot!
@cunhap care to help?
Any progress with the issue?
@BarboraCigankova @cunhap I don't think anyone is working on this.
Has there been any activity on this issue?
@kalyashov I don't think anyone is working on this.
Duplicate of https://github.com/opensearch-project/spring-data-opensearch/issues/23