alpakka
alpakka copied to clipboard
ES: elasticsearch ssl skip hostname verification, elastic source read normally, but elastic bulk flow always fail due to 401 Authrization
I use akka elasticsearch stream to bulk write to ES. When I upgrade from 2.0.2 to 3.0.4. I find a problem that elastic source read normally, but elastic bulk flow always fail due to 401 Authrization. Note: My elasticsearch cluster is aws opendistro elasticsearch. I access the es by ssl way (skip hostname verification)
Versions used
alpakka 3.0.4
Akka version: 2.6.19
Expected Behavior
My test code is below:
implicit object AnyJsonFormat extends JsonFormat[Any] {
def write(x: Any): JsValue = x match {
case n: Long => JsNumber(n)
case n: Int => JsNumber(n)
case n: BigInt => JsNumber(n)
case n: BigDecimal => JsNumber(n)
case n: Double => JsNumber(n)
case n: Float => JsNumber(n)
case n: Short => JsNumber(n)
case s: String => JsString(s)
case x: Seq[_] => JsArray(x.map(write).toVector)
//case l: List[_] => JsArray(l.map(write).toVector)
case m: Map[_, _] => JsObject(m.asInstanceOf[Map[String, _]].view.mapValues(write).toMap)
case b: Boolean if b => JsTrue
case b: Boolean if !b => JsFalse
case v: JsValue => v
case Some(v) => write(v)
case None => JsNull
case null => JsNull
case x =>
serializationError("Do not understand object of type " + x.getClass.getName)
}
def read(value: JsValue): Any = value match {
case JsNumber(n) =>
if (n.isValidInt) {
n.intValue
} else if (n.isValidLong) {
n.longValue
} else if (n.isDecimalFloat) {
n.floatValue
} else if (n.isDecimalDouble) {
n.doubleValue
} else n.doubleValue
case JsString(s) => s
case a: JsArray => a.elements.toIterator.map(read).toList
case o: JsObject => o.fields.map { field =>
(field._1, read(field._2))
}
case JsTrue => true
case JsFalse => false
case JsNull => null
case x =>
deserializationError("Do not understand how to deserialize " + x)
}
}
def constructElasticsearchParams(indexName: String, typeName: String, apiVersion: ApiVersion): ElasticsearchParams = {
if (apiVersion == ApiVersion.V5) {
ElasticsearchParams.V5(indexName, typeName)
} else {
ElasticsearchParams.V7(indexName)
}
}
case class SSLStore(storeType: String, file: String, password: String, profile: String)
case class SSLConfig(username: String, password: String, skipHostNameVerification: Boolean, keyStore: Option[SSLStore] = None, trustStore: Option[SSLStore] = None)
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("ES")
implicit val materializer = ActorMaterializer()
val sslConfig = Option(SSLConfig("admin", "Bestsign654321", true))
// https://doc.akka.io/docs/akka-http/current/client-side/client-https-support.html
def createInsecureSslEngine(host: String, port: Int): SSLEngine = { // Disabling hostname verification
val trustfulSslContext: SSLContext = {
object NoCheckX509TrustManager extends X509TrustManager {
override def checkClientTrusted(chain: Array[X509Certificate], authType: String): Unit = ()
override def checkServerTrusted(chain: Array[X509Certificate], authType: String): Unit = ()
override def getAcceptedIssuers: Array[X509Certificate] = Array[X509Certificate]()
}
val context = SSLContext.getInstance("TLS")
context.init(Array[KeyManager](), Array(NoCheckX509TrustManager), null)
context
}
//val engine = SSLContext.getDefault.createSSLEngine(host, port)
val engine = trustfulSslContext.createSSLEngine(host, port)
engine.setUseClientMode(true)
if (sslConfig.isDefined) {
engine.setEnabledProtocols(Array("TLSv1.1", "TLSv1.2", "SSLv3"))
// WARNING: this creates an SSL Engine without enabling endpoint identification/verification procedures
// Disabling host name verification is a very bad idea, please don't unless you have a very good reason to.
// When in doubt, use the `ConnectionContext.httpsClient` that takes an `SSLContext` instead, or enable with:
// engine.setSSLParameters({
// val params = engine.getSSLParameters
// params.setEndpointIdentificationAlgorithm("https")
// params
// })
// engine.setNeedClientAuth(false)
// engine.setWantClientAuth(false)
}
engine
}
val clientSslContext: HttpsConnectionContext = ConnectionContext.httpsClient(createInsecureSslEngine _)
val connectionSettings = ElasticsearchConnectionSettings(s"https://10.128.55.91:9200")
//.withCredentials(sslConfig.get.username, sslConfig.get.password)
.withHeaders(List(Authorization.create(BasicHttpCredentials("admin", "Bestsign654321"))))
.withConnectionContext(clientSslContext)
val baseSourceSettings = ElasticsearchSourceSettings(connectionSettings).withApiVersion(ApiVersion.V7)
/** read normally **/
ElasticsearchSource
.create(
constructElasticsearchParams("saas_contract_nested3", "_doc", ApiVersion.V7),
query = """{"match_all": {}}""",
settings = baseSourceSettings
).take(2).map { message: ReadResult[spray.json.JsObject] =>
import spray.json._
import DefaultJsonProtocol._
implicit val mapAnyFormat = mapFormat[String, Any]
val doc: Map[String, Any] = jsonReader[Map[String, Any]].read(message.source)
doc
}.runForeach(println)
case class Book(id: Long, name: String, description: String)
import spray.json._
import spray.json.DefaultJsonProtocol._
implicit val bookFormat: RootJsonFormat[Book] = jsonFormat3(Book)
/** use the same configuration. but bulk write will fail due to 401 Authrization **/
val source = Source(List(Book(1L, "哈利波特", "...."))).map { book: Book =>
WriteMessage.createIndexMessage(book.id.toString, book.toJson.toString()).withPassThrough("unit")
}.groupedWithin(10, 1 second)
val esFlow = ElasticsearchFlow.createBulk[String,String](
elasticsearchParams = ElasticsearchParams.V7("test-test"),
settings = ElasticsearchWriteSettings(connectionSettings)
)
source.via(esFlow).runWith(Sink.seq)
Thread.sleep(300000000)
}
expect behaivor: all success or failure
Actual Behavior
Elasticsearch source read normally, but elastic bulk flow always fail due to 401 Authrization
Relevant Info
I view the source code. The reason is below: ElasticsearchApi.scala
@InternalApi private[impl] object ElasticsearchApi {
def executeRequest(
request: HttpRequest,
connectionSettings: ElasticsearchConnectionSettings
)(implicit http: HttpExt): Future[HttpResponse] = {
if (connectionSettings.hasCredentialsDefined) {
http.singleRequest(
request.addCredentials(BasicHttpCredentials(connectionSettings.username.get, connectionSettings.password.get))
)
} else {
http.singleRequest(request,
connectionContext =
connectionSettings.connectionContext.getOrElse(http.defaultClientHttpsContext))
}
}
}
The methods withCredentials
and withConnectionContext
of ElasticsearchConnectionSettings
are mutually exclusive. If I want to use username
and password
, at the same time to custom HttpsConnectionContext
, I only to set the http head like .withHeaders(List(Authorization.create(BasicHttpCredentials("admin", "Bestsign123456"))))
.
But ElasticsearchFlow.createBulk
and ElasticsearchSimpleFlowStage
method will ignore the headers. Rather ElasticsearchSourceStage.sendScrollScanRequest
method will use the headers.
ElasticsearchSimpleFlowStage
source code
if (json.nonEmpty) {
val uri = baseUri.withPath(Path(endpoint))
val request = HttpRequest(HttpMethods.POST)
.withUri(uri)
.withEntity(HttpEntity(NDJsonProtocol.`application/x-ndjson`, json))
ElasticsearchApi
.executeRequest(
request,
connectionSettings = settings.connection
)
ElasticsearchSourceStage.sendScrollScanRequest
source code
val uri = prepareUri(Path("/_search/scroll"))
val request = HttpRequest(HttpMethods.POST)
.withUri(uri)
.withEntity(
HttpEntity(ContentTypes.`application/json`,
Map("scroll" -> settings.scroll, "scroll_id" -> actualScrollId).toJson.compactPrint)
)
.withHeaders(settings.connection.headers)
ElasticsearchApi
.executeRequest(
request,
settings.connection
)
I think the bulk write request should not ignore the external headers configuration. We can add .withHeaders(settings.connection.headers)
in ElasticsearchSimpleFlowStage.scala file.
Thank you for reporting this problem. Your suggestion sounds correct. Would you be in a position to try it and provide a Pull Request to update Alpakka?
@ennru Thank you for your reply.
When I try to provide a Pull Request to update Alpakka, I find that there someone already submitted the change at master branch on January 3. So I temporarily used the snapshot version(3.0.4+51-a4dacc6d-SNAPSHOT
) in the application and look forward to the release of version 3.0.5
Great, that was fixed already. A release is long due, but will be called 4.0.0 as many other updates went in already. I hope to get to it soon...