spark-snowflake
spark-snowflake copied to clipboard
Issue with Databricks Spark Streaming: Private key must be specified in Snowflake streaming
We are using databricks Spark to load data into snowflake. It is working perfectly with Batch jobs but failing with streaming. here is code:
val options =Map(
"sfUrl" -> "********.snowflakecomputing.com",
"sfUser" -> "*****",
"sfPassword" -> "****",
"sfDatabase" -> "TEST_DB",
"sfSchema" -> "TEST_DOCUMENT",
"sfWarehouse" -> "COMPUTE_WH"
)
val rawStream = spark.readStream.schema(schema).json(path)
rawStream.writeStream.format("snowflake").options(options) .option("dbtable", "L_FEATURE_TEST").option("checkpointLocation", checkpointRaw).trigger(Trigger.Once()).start()
Error:
java.lang.IllegalArgumentException: requirement failed: Private key must be specified in Snowflake streaming
at scala.Predef$.require(Predef.scala:224)
at net.snowflake.spark.snowflake.SnowflakeSink.<init>(SnowflakeSink.scala:41)
at net.snowflake.spark.snowflake.DefaultSource.createSink(DefaultSource.scala:137)
at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:305)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:330)
at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:1)
at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:64)
at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:66)
at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:68)
at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:70)
at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:72)
at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:74)
at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:76)
at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw$$iw.<init>(command-2679090388403770:78)
at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw$$iw.<init>(command-2679090388403770:80)
at line92a282bca6f44a208d621b415f7ee12490.$read$$iw$$iw.<init>(command-2679090388403770:82)
at line92a282bca6f44a208d621b415f7ee12490.$read$$iw.<init>(command-2679090388403770:84)
at line92a282bca6f44a208d621b415f7ee12490.$read.<init>(command-2679090388403770:86)
at line92a282bca6f44a208d621b415f7ee12490.$read$.<init>(command-2679090388403770:90)
at line92a282bca6f44a208d621b415f7ee12490.$read$.<clinit>(command-2679090388403770)
at line92a282bca6f44a208d621b415f7ee12490.$eval$.$print$lzycompute(<notebook>:7)
Not sure, This is issue. Is this possible to load streaming data using the username and password ?
I am also having issues using when using Spark Structured Streaming. I noticed the error @satendrakumar was experiencing above so I modified my code to supply a private key via the privateKey
option. It returned the following error:
ERROR IllegalArgumentException: "A snowflake passsword or private key path must be provided with 'sfpassword or pem_private_key' parameter, e.g. 'password'"
When trying to also include the pem_private_key
option, I get the following exception despite me following code examples found in the Snowflake docs:
IllegalArgumentException: 'Input PEM private key is invalid'
The streaming mode does not currently support streaming data directly from Databricks or Qubole. However, the connector still works in non-streaming mode with both Qubole and Databricks.
The streaming mode does not currently support streaming data directly from Databricks or Qubole. However, the connector still works in non-streaming mode with both Qubole and Databricks.
To be clear @rkesh-singh, you are currently using the Spark-Snowflake connector for batch writes? I am as well... but looking to use the structured streaming SnowflakeSink published here for streaming jobs. No documentation exists 😩
@andregoode Streaming support is still in preview. You can contact Snowflake for enabling in your account.
@rkesh-singh Is there any update on this? Or is it still in preview mode?
I used below method, and it worked.
I have tried it with PySpark, should also work with modifications in Spark-Scala.
Pre-requisites: Public key must be added to user in Snowflake.
Additional Libraries imported:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
Used following code to obtain decrypted Private key without header and trailer:
private_key_obj = open(private_key_path,"r")
private_key=private_key_obj.read()
private_key_obj.close()
key = bytes(private_key, 'utf-8')
p_key = serialization.load_pem_private_key(key, password=passphrase.encode(), backend=default_backend())
pkb = p_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
).replace(b"-----BEGIN PRIVATE KEY-----\n", b"") \
.replace(b"\n-----END PRIVATE KEY-----", b"") \
.decode("utf-8")
In options added 'pem_private_key' as 'pkb'.
Added some additional parameters in writeStream():
rawstream.writeStream\
.outputMode("append")\
.option("checkpointLocation", <checkpoint location>)\
.option("dbtable",<target table name>)\
.options(**options)\
.option("streaming_stage", <temp stage name>)\
.format("snowflake")\
.start().awaitTermination()
still un-support streaming read?