spark-secure-kafka-app icon indicating copy to clipboard operation
spark-secure-kafka-app copied to clipboard

How pass commonParams in PySpark?

Open luisfsantana opened this issue 6 years ago • 1 comments

Hi, I am trying to do with pyspark, but I don't know how pass the variable commonParams that you use. I need pass the protocol SASL_PLAINTEXT and "sasl.kerberos.service.name" -> "kafka".

Thank you.

luisfsantana avatar Mar 13 '18 14:03 luisfsantana

hi @luisfsantana, If you use Structured Streaming in Python then following code works:

lines = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER) \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.kerberos.service.name", "kafka") \
    .option("kafka.ssl.truststore.location", "/usr/lib/jvm/jdk1.8.0_162/jre/lib/security/jssecacerts") \
    .option("kafka.ssl.truststore.password", "changeit") \
    .option("subscribe", KAFKA_TOPIC_NAME) \
    .load()

Kafka options should be prefixed by kafka. .

For DirectStreams there is python dictionary that looks exactly as in Scala:

transactions_kafka_stream = KafkaUtils.createDirectStream(ssc,                                                                  self.config.KAFKA_SETTINGS.KAFKA_TOPIC_NAME],
                                                                  {"metadata.broker.list":
                                                                   self.config.KAFKA_SETTINGS.KAFKA_SERVER,
                                                                   'auto.commit.enable': 'true',
                                                                   'auto.offset.reset': 'largest'})

osboo avatar Jun 05 '18 13:06 osboo