alpakka icon indicating copy to clipboard operation
alpakka copied to clipboard

ES: elasticsearch ssl skip hostname verification, elastic source read normally, but elastic bulk flow always fail due to 401 Authrization

Open ZhiXingHeYiApple opened this issue 2 years ago • 3 comments

I use akka elasticsearch stream to bulk write to ES. When I upgrade from 2.0.2 to 3.0.4. I find a problem that elastic source read normally, but elastic bulk flow always fail due to 401 Authrization. Note: My elasticsearch cluster is aws opendistro elasticsearch. I access the es by ssl way (skip hostname verification)

Versions used

alpakka 3.0.4

Akka version: 2.6.19

Expected Behavior

My test code is below:

 implicit object AnyJsonFormat extends JsonFormat[Any] {
    def write(x: Any): JsValue = x match {
      case n: Long => JsNumber(n)
      case n: Int => JsNumber(n)
      case n: BigInt => JsNumber(n)
      case n: BigDecimal => JsNumber(n)
      case n: Double => JsNumber(n)
      case n: Float => JsNumber(n)
      case n: Short => JsNumber(n)
      case s: String => JsString(s)
      case x: Seq[_] => JsArray(x.map(write).toVector)
      //case l: List[_] => JsArray(l.map(write).toVector)
      case m: Map[_, _] => JsObject(m.asInstanceOf[Map[String, _]].view.mapValues(write).toMap)
      case b: Boolean if b => JsTrue
      case b: Boolean if !b => JsFalse
      case v: JsValue => v
      case Some(v) => write(v)
      case None => JsNull
      case null => JsNull
      case x =>
        serializationError("Do not understand object of type " + x.getClass.getName)
    }

    def read(value: JsValue): Any = value match {
      case JsNumber(n) =>
        if (n.isValidInt) {
          n.intValue
        } else if (n.isValidLong) {
          n.longValue
        } else if (n.isDecimalFloat) {
          n.floatValue
        } else if (n.isDecimalDouble) {
          n.doubleValue
        } else n.doubleValue
      case JsString(s) => s
      case a: JsArray => a.elements.toIterator.map(read).toList
      case o: JsObject => o.fields.map { field =>
        (field._1, read(field._2))
      }
      case JsTrue => true
      case JsFalse => false
      case JsNull => null
      case x =>
        deserializationError("Do not understand how to deserialize " + x)
    }
  }

 def constructElasticsearchParams(indexName: String, typeName: String, apiVersion: ApiVersion): ElasticsearchParams = {
    if (apiVersion == ApiVersion.V5) {
      ElasticsearchParams.V5(indexName, typeName)
    } else {
      ElasticsearchParams.V7(indexName)
    }
  }
  
  case class SSLStore(storeType: String, file: String, password: String, profile: String)

  case class SSLConfig(username: String, password: String, skipHostNameVerification: Boolean, keyStore: Option[SSLStore] = None, trustStore: Option[SSLStore] = None)

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("ES")
    implicit val materializer = ActorMaterializer()

    val sslConfig = Option(SSLConfig("admin", "Bestsign654321", true))

    // https://doc.akka.io/docs/akka-http/current/client-side/client-https-support.html
    def createInsecureSslEngine(host: String, port: Int): SSLEngine = { // Disabling hostname verification
      val trustfulSslContext: SSLContext = {
        object NoCheckX509TrustManager extends X509TrustManager {
          override def checkClientTrusted(chain: Array[X509Certificate], authType: String): Unit = ()
          override def checkServerTrusted(chain: Array[X509Certificate], authType: String): Unit = ()
          override def getAcceptedIssuers: Array[X509Certificate] = Array[X509Certificate]()
        }
        val context = SSLContext.getInstance("TLS")
        context.init(Array[KeyManager](), Array(NoCheckX509TrustManager), null)
        context
      }
      //val engine = SSLContext.getDefault.createSSLEngine(host, port)
      val engine = trustfulSslContext.createSSLEngine(host, port)
      engine.setUseClientMode(true)
      if (sslConfig.isDefined) {
        engine.setEnabledProtocols(Array("TLSv1.1", "TLSv1.2", "SSLv3"))
        // WARNING: this creates an SSL Engine without enabling endpoint identification/verification procedures
        // Disabling host name verification is a very bad idea, please don't unless you have a very good reason to.
        // When in doubt, use the `ConnectionContext.httpsClient` that takes an `SSLContext` instead, or enable with:
//        engine.setSSLParameters({
//          val params = engine.getSSLParameters
//          params.setEndpointIdentificationAlgorithm("https")
//          params
//        })
//        engine.setNeedClientAuth(false)
//        engine.setWantClientAuth(false)
      }
      engine
    }

    val clientSslContext: HttpsConnectionContext = ConnectionContext.httpsClient(createInsecureSslEngine _)
    val connectionSettings = ElasticsearchConnectionSettings(s"https://10.128.55.91:9200")
      //.withCredentials(sslConfig.get.username, sslConfig.get.password)
      .withHeaders(List(Authorization.create(BasicHttpCredentials("admin", "Bestsign654321"))))
      .withConnectionContext(clientSslContext)

    val baseSourceSettings = ElasticsearchSourceSettings(connectionSettings).withApiVersion(ApiVersion.V7)
    
    /** read normally **/
    ElasticsearchSource
      .create(
        constructElasticsearchParams("saas_contract_nested3", "_doc", ApiVersion.V7),
        query = """{"match_all": {}}""",
        settings = baseSourceSettings
      ).take(2).map { message: ReadResult[spray.json.JsObject] =>
      import spray.json._
      import DefaultJsonProtocol._
      implicit val mapAnyFormat = mapFormat[String, Any]
      val doc: Map[String, Any] = jsonReader[Map[String, Any]].read(message.source)
      doc
    }.runForeach(println)



    case class Book(id: Long, name: String, description: String)

    import spray.json._
    import spray.json.DefaultJsonProtocol._
    implicit val bookFormat: RootJsonFormat[Book] = jsonFormat3(Book)

   /** use the same configuration. but bulk write will fail due to 401 Authrization **/
    val source = Source(List(Book(1L, "哈利波特", "...."))).map { book: Book =>
      WriteMessage.createIndexMessage(book.id.toString, book.toJson.toString()).withPassThrough("unit")
    }.groupedWithin(10, 1 second)
    val esFlow = ElasticsearchFlow.createBulk[String,String](
      elasticsearchParams = ElasticsearchParams.V7("test-test"),
      settings = ElasticsearchWriteSettings(connectionSettings)
    )
    source.via(esFlow).runWith(Sink.seq)


    Thread.sleep(300000000)

  }

expect behaivor: all success or failure

Actual Behavior

Elasticsearch source read normally, but elastic bulk flow always fail due to 401 Authrization

Relevant Info

I view the source code. The reason is below: ElasticsearchApi.scala

@InternalApi private[impl] object ElasticsearchApi {
  def executeRequest(
      request: HttpRequest,
      connectionSettings: ElasticsearchConnectionSettings
  )(implicit http: HttpExt): Future[HttpResponse] = {
    if (connectionSettings.hasCredentialsDefined) {
      http.singleRequest(
        request.addCredentials(BasicHttpCredentials(connectionSettings.username.get, connectionSettings.password.get))
      )
    } else {
      http.singleRequest(request,
                         connectionContext =
                           connectionSettings.connectionContext.getOrElse(http.defaultClientHttpsContext))
    }
  }
}

The methods withCredentials and withConnectionContext of ElasticsearchConnectionSettings are mutually exclusive. If I want to use username and password, at the same time to custom HttpsConnectionContext, I only to set the http head like .withHeaders(List(Authorization.create(BasicHttpCredentials("admin", "Bestsign123456")))).

But ElasticsearchFlow.createBulk and ElasticsearchSimpleFlowStage method will ignore the headers. Rather ElasticsearchSourceStage.sendScrollScanRequest method will use the headers.

ElasticsearchSimpleFlowStage source code

if (json.nonEmpty) {
        val uri = baseUri.withPath(Path(endpoint))
        val request = HttpRequest(HttpMethods.POST)
          .withUri(uri)
          .withEntity(HttpEntity(NDJsonProtocol.`application/x-ndjson`, json))

        ElasticsearchApi
          .executeRequest(
            request,
            connectionSettings = settings.connection
          )

ElasticsearchSourceStage.sendScrollScanRequest source code

val uri = prepareUri(Path("/_search/scroll"))

          val request = HttpRequest(HttpMethods.POST)
            .withUri(uri)
            .withEntity(
              HttpEntity(ContentTypes.`application/json`,
                         Map("scroll" -> settings.scroll, "scroll_id" -> actualScrollId).toJson.compactPrint)
            )
            .withHeaders(settings.connection.headers)

          ElasticsearchApi
            .executeRequest(
              request,
              settings.connection
            )

I think the bulk write request should not ignore the external headers configuration. We can add .withHeaders(settings.connection.headers) in ElasticsearchSimpleFlowStage.scala file.

ZhiXingHeYiApple avatar Mar 25 '22 10:03 ZhiXingHeYiApple

Thank you for reporting this problem. Your suggestion sounds correct. Would you be in a position to try it and provide a Pull Request to update Alpakka?

ennru avatar Mar 25 '22 13:03 ennru

@ennru Thank you for your reply. When I try to provide a Pull Request to update Alpakka, I find that there someone already submitted the change at master branch on January 3. So I temporarily used the snapshot version(3.0.4+51-a4dacc6d-SNAPSHOT) in the application and look forward to the release of version 3.0.5

ZhiXingHeYiApple avatar Mar 26 '22 15:03 ZhiXingHeYiApple

Great, that was fixed already. A release is long due, but will be called 4.0.0 as many other updates went in already. I hope to get to it soon...

ennru avatar Mar 28 '22 07:03 ennru