raydp icon indicating copy to clipboard operation
raydp copied to clipboard

Error while using foreachBatch in writeStreaming

Open gbraes opened this issue 2 years ago • 5 comments

Hi, I am using raydp-nightly (0.5.0.dev0) with ray 1.11.0 and pyspark 3.2.1 (python 3.7.7). I have a cluster with the a ray head and 2 ray workers nodes. I have tested with multiple pyspark pipelines and I never had a problem. Now however, I am reading some data from Kafka and trying to apply the function foreachBatch:

#. dataframe 'df' coming from a kafka stream ..I can output in console with no issues, however when I write down...
...
def func(batch_df, batch_id):
    print(batch_id)

df.writeStream. foreachBatch(func).start().awaitTermination()

It works using directly the session from pyspark but it does not work using raydp. It gives the next error:

Traceback (most recent call last):
  File "test-kafka-ray.py", line 123, in <module>
    df.writeStream. foreachBatch(func).start().awaitTermination()
  File "/opt/conda/envs/py377/lib/python3.7/site-packages/pyspark/sql/streaming.py", line 1127, in foreachBatch
    gw = self._spark._sc._gateway
AttributeError: 'SparkSession' object has no attribute '_gateway'
(RayDPSparkMaster pid=1387, ip=172.20.114.20) 2022-04-22 10:55:50,815 INFO RayAppMaster [Thread-2]: Stopping RayAppMaster 

Any idea?

Thanx

gbraes avatar Apr 22 '22 18:04 gbraes

Hi @gbraes , we are very excited to see that you have tried many pipelines with raydp! We have never tested raydp with kafka though. Can you please give a minimal code snippet to reproduce the problem?

kira-lin avatar Apr 24 '22 01:04 kira-lin

Thank you, @kira-lin .

I have changed to Ray 1.8 and RayDP 0.4.1 and still failing...

This is the code that is failing:

import ray
import raydp
from pyspark import SparkContext, SparkConf, SQLContext

endpoint = "localhost:6379"

configs = {"spark.jars":"/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar," \
         "/jars/spark-streaming-kafka-0-10_2.12-3.2.1.jar,/jars/kafka-clients-3.1.0.jar,/jars/spark-token-provider-kafka-0-10_2.12-3.2.1.jar,/jars/commons-pool2-2.11.1.jar"}

ray.init(address=endpoint,_redis_password="password") 
sc = raydp.init_spark(app_name='RayDP stream Example', num_executors=2, executor_cores=2, executor_memory='4GB',configs=configs)

sqlContext = SQLContext(sc)

KAFKA_BROKER="localhost:9092"
topic = "mytopic"

# Subscribe to 1 topic defaults to the earliest and latest offsets
df = sqlContext \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", KAFKA_BROKER) \
  .option("subscribe", topic).load() 

def func(batch_df, batch_id):
    batch_df.show()
    return batch_df

# This one gives you the error 
df.writeStream. foreachBatch(func).start().awaitTermination()
# This one works...
#df.writeStream.format("console").outputMode("append").start().awaitTermination()

And these are the logs:

2022-04-25 11:02:16,161 INFO worker.py:823 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379
2022-04-25 11:02:21,710 WARN NativeCodeLoader [Thread-2]: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2022-04-25 11:02:21,851 INFO SecurityManager [Thread-2]: Changing view acls to: root
2022-04-25 11:02:21,852 INFO SecurityManager [Thread-2]: Changing modify acls to: root
2022-04-25 11:02:21,853 INFO SecurityManager [Thread-2]: Changing view acls groups to: 
2022-04-25 11:02:21,853 INFO SecurityManager [Thread-2]: Changing modify acls groups to: 
2022-04-25 11:02:21,854 INFO SecurityManager [Thread-2]: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
2022-04-25 11:02:22,337 INFO Utils [Thread-2]: Successfully started service 'RAY_RPC_ENV' on port 43361.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/ray/anaconda3/lib/python3.7/site-packages/ray/jars/ray_dist.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/ray/anaconda3/lib/python3.7/site-packages/pyspark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
2022-04-25 11:02:26,218 INFO RayAppMaster$RayAppMasterEndpoint [dispatcher-event-loop-1]: Registering app RayDP stream Example
2022-04-25 11:02:26,222 INFO RayAppMaster$RayAppMasterEndpoint [dispatcher-event-loop-1]: Registered app RayDP stream Example with ID app-20220425110226-0000
/home/ray/anaconda3/lib/python3.7/site-packages/pyspark/sql/context.py:79: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.
  FutureWarning
Traceback (most recent call last):
  File "test6.py", line 25, in <module>
    df.writeStream. foreachBatch(func).start().awaitTermination()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/pyspark/sql/streaming.py", line 1127, in foreachBatch
    gw = self._spark._sc._gateway
AttributeError: 'SparkSession' object has no attribute '_gateway'
2022-04-25 11:02:33,093 INFO RayAppMaster [Thread-2]: Stopping RayAppMaster

Note: I have changed the Host IP address from the first log by 127.0.0.1. There is a plain kafka broker (localhost:9292) sending messages on the topic "mytopic"

gbraes avatar Apr 25 '22 18:04 gbraes

Hi, I finally made it works:

I changed Ray to 1.12 and RayDP 0.4.2. Also I used directly the sparksession without sqlContext:

import ray
import raydp
from pyspark import SparkContext, SparkConf, SQLContext

endpoint = "localhost:6379"

configs = {"spark.jars":"/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar," \
         "/jars/spark-streaming-kafka-0-10_2.12-3.2.1.jar,/jars/kafka-clients-3.1.0.jar,/jars/spark-token-provider-kafka-0-10_2.12-3.2.1.jar,/jars/commons-pool2-2.11.1.jar"}

ray.init(address=endpoint,_redis_password="password") 
sc = raydp.init_spark(app_name='RayDP stream Example', num_executors=2, executor_cores=2, executor_memory='4GB',configs=configs)

#sqlContext = SQLContext(sc)

KAFKA_BROKER="localhost:9092"
topic = "mytopic"

# Subscribe to 1 topic defaults to the earliest and latest offsets
df = sc \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", KAFKA_BROKER) \
  .option("subscribe", topic).load() 

def func(batch_df, batch_id):
    batch_df.show()
    return batch_df

# This one gave you the error but now works 
df.writeStream. foreachBatch(func).start().awaitTermination()
# This one works...
#df.writeStream.format("console").outputMode("append").start().awaitTermination()

Apparently when I use 'sqlContext = SQLContext(sc)' somehow I lose the proper spark context from RayDP. In previous versions (Ray 1.8/RayDP 0.4.1) still I got this error even making this change.

By the way , in this Ray/RayDP version I am getting a lot of INFO logs. I am trying to disable them. Does Anyone know how to do that when using RayDP? sc.sparkContext.setLogLevel('WARN') does not work.

UPDATE: There is also a difference I found: I was using Java 8 and now I am using Java 11

gbraes avatar Apr 26 '22 17:04 gbraes

Hi @gbraes , glad you made it work! This is quite strange, since 0.4.2 just added support for ray 1.11 and 1.12. There are no major changes. We'll look into the sqlContext issue, does this bother you?

Due to some version conflicts, we did not include log4j from pyspark. We use the one from ray. Setting logging_level='warn' while calling ray.init works. This is quite confusing, we'll see how to improve this.

Thanks

kira-lin avatar Apr 27 '22 06:04 kira-lin

Hi @kira-lin, no the sqlContext issue is not a problem at the moment as the Sparksession is the more modern recommended way of create pipelines. I have seen that there is also a difference I found: I was using Java 8 and now I am using Java 11 (build 11.0.15+10-Ubuntu-0ubuntu0.20.02.1). Probably this is the reason why it works now. Also I have seen that logging_level='warn' in Ray.init does not work with Java 11 . If you test the above code till KAFKA_BROKER=.... so you don't need to have the kafka broker installation, Ray keeps sending you INFO messages. I will put this in a different issue that I have opened.

Thanks for this great work!

gbraes avatar Apr 27 '22 09:04 gbraes

close as stale

kira-lin avatar Apr 17 '23 01:04 kira-lin