spark-secure-kafka-app
spark-secure-kafka-app copied to clipboard
How pass commonParams in PySpark?
Hi, I am trying to do with pyspark, but I don't know how pass the variable commonParams that you use. I need pass the protocol SASL_PLAINTEXT and "sasl.kerberos.service.name" -> "kafka".
Thank you.
hi @luisfsantana, If you use Structured Streaming in Python then following code works:
lines = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER) \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.kerberos.service.name", "kafka") \
.option("kafka.ssl.truststore.location", "/usr/lib/jvm/jdk1.8.0_162/jre/lib/security/jssecacerts") \
.option("kafka.ssl.truststore.password", "changeit") \
.option("subscribe", KAFKA_TOPIC_NAME) \
.load()
Kafka options should be prefixed by kafka.
.
For DirectStreams there is python dictionary that looks exactly as in Scala:
transactions_kafka_stream = KafkaUtils.createDirectStream(ssc, self.config.KAFKA_SETTINGS.KAFKA_TOPIC_NAME],
{"metadata.broker.list":
self.config.KAFKA_SETTINGS.KAFKA_SERVER,
'auto.commit.enable': 'true',
'auto.offset.reset': 'largest'})