ipex-llm icon indicating copy to clipboard operation
ipex-llm copied to clipboard

spark backend distributed predicting got error

Open Alxe1 opened this issue 1 year ago • 10 comments

my code:

res = est.predict(data=train_data.select("embed"), feature_cols=["embed"])
print(f"=====================res: {res}")
res.show()

The error message:

36/36 [==============================] - 0s 738us/step
=====================res: DataFrame[embed: vector, prediction: vector]
22/08/04 16:14:09 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 362, in main
    eval_type = read_int(infile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 724, in read_int
    raise EOFError
EOFError

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$26$$anon$2.hasNext(RDD.scala:911)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$26$$anon$2.foreach(RDD.scala:910)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1990)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 352, in func
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 363, in <lambda>
    lambda iter: transform_func(iter, init_params, params)))
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 360, in transform_func
    return SparkRunner(**init_param).predict(**param)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/spark_runner.py", line 438, in predict
    local_model = self.model_creator(self.config)
  File "/mnt1/liulei/mytest/test.py", line 233, in model_creator
    cross_num=4)
  File "/mnt1/liulei/mytest/test.py", line 113, in deepcross2
    user_embed_ = user_embed(tf.cast(inputs[:, 0], dtype=tf.int64))
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer.py", line 926, in __call__
    input_list)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer.py", line 1084, in _functional_construction_call
    base_layer_utils.create_keras_history(inputs)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer_utils.py", line 191, in create_keras_history
    _, created_layers = _create_keras_history_helper(tensors, set(), [])
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer_utils.py", line 256, in _create_keras_history_helper
    layer_inputs, processed_ops, created_layers)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer_utils.py", line 251, in _create_keras_history_helper
    constants[i] = backend.eval_in_eager_or_function(op_input)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/backend.py", line 3898, in eval_in_eager_or_function
    outputs = graph_fn()
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1655, in __call__
    return self._call_impl(args, kwargs)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1673, in _call_impl
    return self._call_with_flat_signature(args, kwargs, cancellation_manager)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1722, in _call_with_flat_signature
    return self._call_flat(args, self.captured_inputs, cancellation_manager)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1924, in _call_flat
    ctx, args, cancellation_manager=cancellation_manager))
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 550, in call
    ctx=ctx)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/execute.py", line 60, in quick_execute
    inputs, attrs, num_outputs)
tensorflow.python.framework.errors_impl.InvalidArgumentError: Cannot assign a device for operation strided_slice_3_begin_RetVal: Could not satisfy explicit device specification '' because the node {{colocation_node strided_slice_3_begin_RetVal}} was colocated with a group of nodes that required incompatible device '/job:localhost/replica:0/task:0/device:CPU:0'. All available devices [/job:worker/replica:0/task:0/device:CPU:0, /job:worker/replica:0/task:0/device:XLA_CPU:0]. 
Colocation Debug Info:
Colocation group had the following types and supported devices: 
Root Member(assigned_device_name_index_=-1 requested_device_name_='/job:localhost/replica:0/task:0/device:CPU:0' assigned_device_name_='' resource_device_name_='' supported_device_types_=[CPU] possible_devices_=[]
_Retval: CPU XLA_CPU 
Const: CPU XLA_CPU 

Colocation members, user-requested devices, and framework assigned devices, if any:
  strided_slice_3/begin (Const) /job:localhost/replica:0/task:0/device:CPU:0
  strided_slice_3_begin_RetVal (_Retval) 

	 [[{{node strided_slice_3_begin_RetVal}}]] [Op:__inference_keras_scratch_graph_3697]

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	... 5 more
22/08/04 16:14:09 ERROR PythonRunner: This may have been caused by a prior exception:
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 352, in func
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 363, in <lambda>
    lambda iter: transform_func(iter, init_params, params)))
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 360, in transform_func
    return SparkRunner(**init_param).predict(**param)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/spark_runner.py", line 438, in predict
    local_model = self.model_creator(self.config)
  File "/mnt1/liulei/mytest/test.py", line 233, in model_creator
    cross_num=4)
  File "/mnt1/liulei/mytest/test.py", line 113, in deepcross2
    user_embed_ = user_embed(tf.cast(inputs[:, 0], dtype=tf.int64))
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer.py", line 926, in __call__
    input_list)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer.py", line 1084, in _functional_construction_call
    base_layer_utils.create_keras_history(inputs)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer_utils.py", line 191, in create_keras_history
    _, created_layers = _create_keras_history_helper(tensors, set(), [])
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer_utils.py", line 256, in _create_keras_history_helper
    layer_inputs, processed_ops, created_layers)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer_utils.py", line 251, in _create_keras_history_helper
    constants[i] = backend.eval_in_eager_or_function(op_input)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/backend.py", line 3898, in eval_in_eager_or_function
    outputs = graph_fn()
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1655, in __call__
    return self._call_impl(args, kwargs)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1673, in _call_impl
    return self._call_with_flat_signature(args, kwargs, cancellation_manager)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1722, in _call_with_flat_signature
    return self._call_flat(args, self.captured_inputs, cancellation_manager)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1924, in _call_flat
    ctx, args, cancellation_manager=cancellation_manager))
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 550, in call
    ctx=ctx)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/execute.py", line 60, in quick_execute
    inputs, attrs, num_outputs)
tensorflow.python.framework.errors_impl.InvalidArgumentError: Cannot assign a device for operation strided_slice_3_begin_RetVal: Could not satisfy explicit device specification '' because the node {{colocation_node strided_slice_3_begin_RetVal}} was colocated with a group of nodes that required incompatible device '/job:localhost/replica:0/task:0/device:CPU:0'. All available devices [/job:worker/replica:0/task:0/device:CPU:0, /job:worker/replica:0/task:0/device:XLA_CPU:0]. 
Colocation Debug Info:
Colocation group had the following types and supported devices: 
Root Member(assigned_device_name_index_=-1 requested_device_name_='/job:localhost/replica:0/task:0/device:CPU:0' assigned_device_name_='' resource_device_name_='' supported_device_types_=[CPU] possible_devices_=[]
_Retval: CPU XLA_CPU 
Const: CPU XLA_CPU 

Colocation members, user-requested devices, and framework assigned devices, if any:
  strided_slice_3/begin (Const) /job:localhost/replica:0/task:0/device:CPU:0
  strided_slice_3_begin_RetVal (_Retval) 

	 [[{{node strided_slice_3_begin_RetVal}}]] [Op:__inference_keras_scratch_graph_3697]

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1990)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
22/08/04 16:14:09 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 362, in main
    eval_type = read_int(infile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 724, in read_int
    raise EOFError
EOFError

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:297)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:289)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 352, in func
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 363, in <lambda>
    lambda iter: transform_func(iter, init_params, params)))
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 360, in transform_func
    return SparkRunner(**init_param).predict(**param)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/spark_runner.py", line 438, in predict
    local_model = self.model_creator(self.config)
  File "/mnt1/liulei/mytest/test.py", line 233, in model_creator
    cross_num=4)
  File "/mnt1/liulei/mytest/test.py", line 113, in deepcross2
    user_embed_ = user_embed(tf.cast(inputs[:, 0], dtype=tf.int64))
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer.py", line 926, in __call__
    input_list)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer.py", line 1084, in _functional_construction_call
    base_layer_utils.create_keras_history(inputs)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer_utils.py", line 191, in create_keras_history
    _, created_layers = _create_keras_history_helper(tensors, set(), [])
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer_utils.py", line 256, in _create_keras_history_helper
    layer_inputs, processed_ops, created_layers)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer_utils.py", line 251, in _create_keras_history_helper
    constants[i] = backend.eval_in_eager_or_function(op_input)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/backend.py", line 3898, in eval_in_eager_or_function
    outputs = graph_fn()
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1655, in __call__
    return self._call_impl(args, kwargs)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1673, in _call_impl
    return self._call_with_flat_signature(args, kwargs, cancellation_manager)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1722, in _call_with_flat_signature
    return self._call_flat(args, self.captured_inputs, cancellation_manager)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1924, in _call_flat
    ctx, args, cancellation_manager=cancellation_manager))
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 550, in call
    ctx=ctx)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/execute.py", line 60, in quick_execute
    inputs, attrs, num_outputs)
tensorflow.python.framework.errors_impl.InvalidArgumentError: Cannot assign a device for operation strided_slice_3_begin_RetVal: Could not satisfy explicit device specification '' because the node {{colocation_node strided_slice_3_begin_RetVal}} was colocated with a group of nodes that required incompatible device '/job:localhost/replica:0/task:0/device:CPU:0'. All available devices [/job:worker/replica:0/task:0/device:CPU:0, /job:worker/replica:0/task:0/device:XLA_CPU:0]. 
Colocation Debug Info:
Colocation group had the following types and supported devices: 
Root Member(assigned_device_name_index_=-1 requested_device_name_='/job:localhost/replica:0/task:0/device:CPU:0' assigned_device_name_='' resource_device_name_='' supported_device_types_=[CPU] possible_devices_=[]
_Retval: CPU XLA_CPU 
Const: CPU XLA_CPU 

Alxe1 avatar Aug 04 '22 08:08 Alxe1

Hi! Could you please provide your code for us to reproduce, since it could help us to locate the problem soon? By the way, where did you run the program, on Spark Local?

sgwhat avatar Aug 05 '22 01:08 sgwhat

request device '/job:localhost/replica:0/task:0/device:CPU:0' but All available devices [ /job:worker/replica:0/task:0/device:CPU:0, /job:worker/replica:0/task:0/device:XLA_CPU:0].

glorysdj avatar Aug 05 '22 01:08 glorysdj

Hi! Could you please provide your code for us to reproduce, since it could help us to locate the problem soon? By the way, where did you run the program, on Spark Local?

def train_test():
    from bigdl.orca.learn.tf2 import Estimator
    from bigdl.orca import init_orca_context
    from bigdl.orca import OrcaContext

    sc = init_orca_context(cluster_mode='local', cores=8, memory="10g", num_nodes=3)
    conf = SparkConf().setAppName("test")
    conf.set("spark.sql.execution.arrow.enabled", True)
    conf.set("spark.sql.execution.arrow.fallback.enabled", True)

    spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()

    MODEL_PATH = "/mytest/model"

    data_transform = DataTransform(MODEL_PATH, spark)
    uid_num, vid_num, sparse_num, data_count, sdf = data_transform.process()

    config = {"uid_num": int(uid_num), "vid_num": int(vid_num), "sparse_num": int(sparse_num)}

    est = Estimator.from_keras(model_creator=model_creator,
                               config=config,
                               backend="spark",
                               model_dir="hdfs://ip:port/ckpt")

    train_data, test_data = sdf.randomSplit([0.8, 0.2], 100)

    stats = est.fit(train_data,
                    epochs=2,
                    batch_size=512,
                    feature_cols=["embed"],
                    label_cols=["label"],
                    steps_per_epoch=data_count // 512)
    print("stats: {}".format(stats))

    est.save("/mytest/model.h5", save_format="h5")

    res = est.predict(data=sdf, feature_cols=["embed"])
    res.show()

Alxe1 avatar Aug 08 '22 02:08 Alxe1

Hi, I have reviewed the error report, I think it may be related to your model code or TensorFlow, which is not a bug caused by BigDL.

From the error report you provided:

  File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 360, in transform_func
    return SparkRunner(**init_param).predict(**param)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/spark_runner.py", line 438, in predict
    local_model = self.model_creator(self.config)
  File "/mnt1/liulei/mytest/test.py", line 233, in model_creator
    cross_num=4)
  File "/mnt1/liulei/mytest/test.py", line 113, in deepcross2
    user_embed_ = user_embed(tf.cast(inputs[:, 0], dtype=tf.int64))

It shows the error that happened when creating a model from model_creator function (before starting predicting). Maybe you could provide more code including DeepCross model, and we could help to look into how this error was caused.

sgwhat avatar Aug 08 '22 07:08 sgwhat

Hi, I have reviewed the error report, I think it may be related to your model code or TensorFlow, which is not a bug caused by BigDL.

From the error report you provided:

  File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 360, in transform_func
    return SparkRunner(**init_param).predict(**param)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/spark_runner.py", line 438, in predict
    local_model = self.model_creator(self.config)
  File "/mnt1/liulei/mytest/test.py", line 233, in model_creator
    cross_num=4)
  File "/mnt1/liulei/mytest/test.py", line 113, in deepcross2
    user_embed_ = user_embed(tf.cast(inputs[:, 0], dtype=tf.int64))

It shows the error that happened when creating a model from model_creator function (before starting predicting). Maybe you could provide more code including DeepCross model, and we could help to look into how this error was caused.

This is the complete code:

import json
import os
import numpy as np
import tensorflow as tf
from pyspark import Row
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType

from tensorflow.keras import Model
from tensorflow.keras.layers import Embedding, Dense

import datetime
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler, QuantileDiscretizer
from sklearn.metrics import accuracy_score, precision_score, recall_score
from tensorflow.python.keras.layers import Concatenate
from tensorflow.python.keras.regularizers import l2

from base_class.spark_base import SparkBase
from pyspark import SparkContext, SparkConf


def deepcross(user_num, item_num, user_item_dim, sparse_num,
               embed_norm, dnn_hidden_units, dnn_activation, dnn_dropout, cross_num):
    inputs = tf.keras.layers.Input(shape=(sparse_num,), name="input")
    user_embed = Embedding(input_dim=user_num,
                           output_dim=user_item_dim,
                           embeddings_initializer="random_normal",
                           embeddings_regularizer=l2(embed_norm),
                           input_length=1)
    user_embed_ = user_embed(tf.cast(inputs[:, 0], dtype=tf.int64))

    item_embed = Embedding(input_dim=item_num,
                           output_dim=user_item_dim,
                           embeddings_initializer="random_normal",
                           embeddings_regularizer=l2(embed_norm),
                           input_length=1)
    item_embed_ = item_embed(tf.cast(inputs[:, 1], dtype=tf.int64))

    sparse_feature = inputs[:, 2:]
    input_features = Concatenate(axis=1)([user_embed_, item_embed_, sparse_feature])

    dnn_output_1 = Dense(units=64, activation="relu")(input_features)
    dnn_output = Dense(units=64, activation="relu")(dnn_output_1)

    # cross_output = CrossLayer(cross_num=cross_num)(input_features)
    # output_1 = Concatenate(axis=-1)([dnn_output, cross_output])
    output_2 = Dense(1)(dnn_output)
    output = tf.nn.sigmoid(output_2)
    model = Model(inputs, output, name="my_model")
    return model


class DataTransform(object):

    def __init__(self, model_path, spark):
        self.MODEL_PATH = model_path
        self.spark = spark

    def read_data(self):
        """"""
        data_date = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime("%Y%m%d")
        # spark_base = SparkBase()
        sdf = self.spark.sql(
            "select uid,vr_id,gender, age,is_click  from db.my_table "
            "where data_date >= {}".format(data_date))
        sampled_sdf = sdf.sampleBy("is_click", fractions={0: 0.1, 1: 0.2}, seed=42)
        return sampled_sdf

    def transform_data(self, sdf):
        sdf = sdf.fillna(0)

        @pandas_udf(IntegerType())
        def bucket_age(v):
            def _bucket_age(row):
                if row < 13:
                    return 0
                elif 13 <= row < 18:
                    return 1
                elif 18 <= row < 24:
                    return 2
                elif 24 <= row < 30:
                    return 3
                elif 30 <= row < 40:
                    return 4
                elif 40 <= row < 50:
                    return 5
                else:
                    return 6
            return v.map(_bucket_age)

        sdf = sdf.withColumn("age_index", bucket_age("age"))

        string_index_uid = StringIndexer(inputCol="uid", outputCol="uid_index", handleInvalid="keep")
        string_index_vid = StringIndexer(inputCol="vr_id", outputCol="vid_index", handleInvalid="keep")
        string_index_gender = StringIndexer(inputCol="gender", outputCol="gender_index", handleInvalid='keep')

        one_hot_encoder = OneHotEncoderEstimator(inputCols=["gender_index", "age_index"],
                                                 outputCols=["o_gender_index", "o_age_index"])
        pipeline = Pipeline(stages=[string_index_uid, string_index_vid, string_index_gender, one_hot_encoder])
        transform_model = pipeline.fit(sdf)
        transformed_df = transform_model.transform(sdf)

        vector_assembler = VectorAssembler(inputCols=["uid_index", "vid_index", "o_gender_index",  "o_age_index"],
                                           outputCol="embedded_vector")

        va_df = vector_assembler.transform(transformed_df)

        df = va_df.select("embedded_vector", "is_click").rdd. \
            map(lambda x: Row(embed=Vectors.dense(x.embedded_vector).toArray().tolist(), label=x.is_click)). \
            toDF()
        uid_num = int(va_df.select("uid_index").rdd.max()[0])
        vid_num = int(va_df.select("vid_index").rdd.max()[0])
        sparse_num = len(df.select("embed").rdd.take(1)[0].embed)

        data_count = df.count()

        return uid_num, vid_num, sparse_num, data_count, df

    def process(self):
        data = self.read_data()
        uid_num, vid_num, sparse_num, data_count, df = self.transform_data(data)
        print(f"uid_num: {uid_num}, vid_num: {vid_num}, sparse_num: {sparse_num}, data_count: {data_count}")
        df.show()

        return uid_num, vid_num, sparse_num, data_count, df


def model_creator(config):
    deep_cross = deepcross(user_num=config["uid_num"]+1,
                            item_num=config["vid_num"]+1,
                            user_item_dim=16,
                            sparse_num=config["sparse_num"],
                            embed_norm=0.001,
                            dnn_hidden_units=[int(e) for e in [128, 64, 32]],
                            dnn_activation="relu",
                            dnn_dropout=0.2,
                            cross_num=4)

    loss = tf.keras.losses.BinaryCrossentropy()
    optimizer = tf.keras.optimizers.Adam()
    deep_cross.compile(optimizer=optimizer, loss=loss, metrics=[tf.keras.metrics.AUC()])
    return deep_cross


def train_test():
    from bigdl.orca.learn.tf2 import Estimator
    from bigdl.orca import init_orca_context
    from bigdl.orca import OrcaContext

    sc = init_orca_context(cluster_mode='yarn-client', cores=8, memory="10g", num_nodes=3)
    conf = SparkConf().setAppName("test")
    conf.set("spark.sql.execution.arrow.enabled", True)
    conf.set("spark.sql.execution.arrow.fallback.enabled", True)

    spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()

    MODEL_PATH = "/mytest/model"

    data_transform = DataTransform(MODEL_PATH, spark)
    uid_num, vid_num, sparse_num, data_count, sdf = data_transform.process()

    config = {"uid_num": int(uid_num), "vid_num": int(vid_num), "sparse_num": int(sparse_num)}

    est = Estimator.from_keras(model_creator=model_creator,
                               config=config,
                               backend="spark",
                               model_dir="hdfs://ip:port/ckpt")

    train_data, test_data = sdf.randomSplit([0.8, 0.2], 100)

    stats = est.fit(train_data,
                    epochs=2,
                    batch_size=512,
                    feature_cols=["embed"],
                    label_cols=["label"],
                    steps_per_epoch=data_count // 512)
    print("stats: {}".format(stats))

    est.save("/mytest/model.h5", save_format="h5")

    # stats = est.evaluate(sdf,
    #                      feature_cols=["embed"],
    #                      label_cols=["label"],
    #                      num_steps=512)
    # print("stats: {}".format(stats))
    print(f"=========================data_count: {data_count}")

    res = est.predict(data=sdf, feature_cols=["embed"])
    res.show()

    # est.shutdown()


if __name__ == '__main__':
    train_test()

It trained the data sucessfully, But In prediction, it raised the error.

Alxe1 avatar Aug 08 '22 07:08 Alxe1

Hi, I have reviewed the error report, I think it may be related to your model code or TensorFlow, which is not a bug caused by BigDL. From the error report you provided:

  File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 360, in transform_func
    return SparkRunner(**init_param).predict(**param)
  File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/spark_runner.py", line 438, in predict
    local_model = self.model_creator(self.config)
  File "/mnt1/liulei/mytest/test.py", line 233, in model_creator
    cross_num=4)
  File "/mnt1/liulei/mytest/test.py", line 113, in deepcross2
    user_embed_ = user_embed(tf.cast(inputs[:, 0], dtype=tf.int64))

It shows the error that happened when creating a model from model_creator function (before starting predicting). Maybe you could provide more code including DeepCross model, and we could help to look into how this error was caused.

This is the complete code:

import json
import os
import numpy as np
import tensorflow as tf
from pyspark import Row
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType

from tensorflow.keras import Model
from tensorflow.keras.layers import Embedding, Dense

import datetime
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler, QuantileDiscretizer
from sklearn.metrics import accuracy_score, precision_score, recall_score
from tensorflow.python.keras.layers import Concatenate
from tensorflow.python.keras.regularizers import l2

from base_class.spark_base import SparkBase
from pyspark import SparkContext, SparkConf


def deepcross(user_num, item_num, user_item_dim, sparse_num,
               embed_norm, dnn_hidden_units, dnn_activation, dnn_dropout, cross_num):
    inputs = tf.keras.layers.Input(shape=(sparse_num,), name="input")
    user_embed = Embedding(input_dim=user_num,
                           output_dim=user_item_dim,
                           embeddings_initializer="random_normal",
                           embeddings_regularizer=l2(embed_norm),
                           input_length=1)
    user_embed_ = user_embed(tf.cast(inputs[:, 0], dtype=tf.int64))

    item_embed = Embedding(input_dim=item_num,
                           output_dim=user_item_dim,
                           embeddings_initializer="random_normal",
                           embeddings_regularizer=l2(embed_norm),
                           input_length=1)
    item_embed_ = item_embed(tf.cast(inputs[:, 1], dtype=tf.int64))

    sparse_feature = inputs[:, 2:]
    input_features = Concatenate(axis=1)([user_embed_, item_embed_, sparse_feature])

    dnn_output_1 = Dense(units=64, activation="relu")(input_features)
    dnn_output = Dense(units=64, activation="relu")(dnn_output_1)

    # cross_output = CrossLayer(cross_num=cross_num)(input_features)
    # output_1 = Concatenate(axis=-1)([dnn_output, cross_output])
    output_2 = Dense(1)(dnn_output)
    output = tf.nn.sigmoid(output_2)
    model = Model(inputs, output, name="my_model")
    return model


class DataTransform(object):

    def __init__(self, model_path, spark):
        self.MODEL_PATH = model_path
        self.spark = spark

    def read_data(self):
        """"""
        data_date = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime("%Y%m%d")
        # spark_base = SparkBase()
        sdf = self.spark.sql(
            "select uid,vr_id,gender, age,is_click  from db.my_table "
            "where data_date >= {}".format(data_date))
        sampled_sdf = sdf.sampleBy("is_click", fractions={0: 0.1, 1: 0.2}, seed=42)
        return sampled_sdf

    def transform_data(self, sdf):
        sdf = sdf.fillna(0)

        @pandas_udf(IntegerType())
        def bucket_age(v):
            def _bucket_age(row):
                if row < 13:
                    return 0
                elif 13 <= row < 18:
                    return 1
                elif 18 <= row < 24:
                    return 2
                elif 24 <= row < 30:
                    return 3
                elif 30 <= row < 40:
                    return 4
                elif 40 <= row < 50:
                    return 5
                else:
                    return 6
            return v.map(_bucket_age)

        sdf = sdf.withColumn("age_index", bucket_age("age"))

        string_index_uid = StringIndexer(inputCol="uid", outputCol="uid_index", handleInvalid="keep")
        string_index_vid = StringIndexer(inputCol="vr_id", outputCol="vid_index", handleInvalid="keep")
        string_index_gender = StringIndexer(inputCol="gender", outputCol="gender_index", handleInvalid='keep')

        one_hot_encoder = OneHotEncoderEstimator(inputCols=["gender_index", "age_index"],
                                                 outputCols=["o_gender_index", "o_age_index"])
        pipeline = Pipeline(stages=[string_index_uid, string_index_vid, string_index_gender, one_hot_encoder])
        transform_model = pipeline.fit(sdf)
        transformed_df = transform_model.transform(sdf)

        vector_assembler = VectorAssembler(inputCols=["uid_index", "vid_index", "o_gender_index",  "o_age_index"],
                                           outputCol="embedded_vector")

        va_df = vector_assembler.transform(transformed_df)

        df = va_df.select("embedded_vector", "is_click").rdd. \
            map(lambda x: Row(embed=Vectors.dense(x.embedded_vector).toArray().tolist(), label=x.is_click)). \
            toDF()
        uid_num = int(va_df.select("uid_index").rdd.max()[0])
        vid_num = int(va_df.select("vid_index").rdd.max()[0])
        sparse_num = len(df.select("embed").rdd.take(1)[0].embed)

        data_count = df.count()

        return uid_num, vid_num, sparse_num, data_count, df

    def process(self):
        data = self.read_data()
        uid_num, vid_num, sparse_num, data_count, df = self.transform_data(data)
        print(f"uid_num: {uid_num}, vid_num: {vid_num}, sparse_num: {sparse_num}, data_count: {data_count}")
        df.show()

        return uid_num, vid_num, sparse_num, data_count, df


def model_creator(config):
    deep_cross = deepcross(user_num=config["uid_num"]+1,
                            item_num=config["vid_num"]+1,
                            user_item_dim=16,
                            sparse_num=config["sparse_num"],
                            embed_norm=0.001,
                            dnn_hidden_units=[int(e) for e in [128, 64, 32]],
                            dnn_activation="relu",
                            dnn_dropout=0.2,
                            cross_num=4)

    loss = tf.keras.losses.BinaryCrossentropy()
    optimizer = tf.keras.optimizers.Adam()
    deep_cross.compile(optimizer=optimizer, loss=loss, metrics=[tf.keras.metrics.AUC()])
    return deep_cross


def train_test():
    from bigdl.orca.learn.tf2 import Estimator
    from bigdl.orca import init_orca_context
    from bigdl.orca import OrcaContext

    sc = init_orca_context(cluster_mode='yarn-client', cores=8, memory="10g", num_nodes=3)
    conf = SparkConf().setAppName("test")
    conf.set("spark.sql.execution.arrow.enabled", True)
    conf.set("spark.sql.execution.arrow.fallback.enabled", True)

    spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()

    MODEL_PATH = "/mytest/model"

    data_transform = DataTransform(MODEL_PATH, spark)
    uid_num, vid_num, sparse_num, data_count, sdf = data_transform.process()

    config = {"uid_num": int(uid_num), "vid_num": int(vid_num), "sparse_num": int(sparse_num)}

    est = Estimator.from_keras(model_creator=model_creator,
                               config=config,
                               backend="spark",
                               model_dir="hdfs://ip:port/ckpt")

    train_data, test_data = sdf.randomSplit([0.8, 0.2], 100)

    stats = est.fit(train_data,
                    epochs=2,
                    batch_size=512,
                    feature_cols=["embed"],
                    label_cols=["label"],
                    steps_per_epoch=data_count // 512)
    print("stats: {}".format(stats))

    est.save("/mytest/model.h5", save_format="h5")

    # stats = est.evaluate(sdf,
    #                      feature_cols=["embed"],
    #                      label_cols=["label"],
    #                      num_steps=512)
    # print("stats: {}".format(stats))
    print(f"=========================data_count: {data_count}")

    res = est.predict(data=sdf, feature_cols=["embed"])
    res.show()

    # est.shutdown()


if __name__ == '__main__':
    train_test()

It trained the data sucessfully, But In prediction, it raised the error.

Got it! We will try to reproduce it and check it out.

sgwhat avatar Aug 08 '22 09:08 sgwhat

Hey, would you mind providing the dataset or where could I reach it?

sgwhat avatar Aug 10 '22 02:08 sgwhat

Hey, would you mind providing the dataset or where could I reach it?

Sorry, the dataset is my company's. There are only four int field, you can fabricate it.

Alxe1 avatar Aug 10 '22 03:08 Alxe1

Hey, would you mind providing the dataset or where could I reach it?

Sorry, the dataset is my company's. There are only four int field, you can fabricate it.

Hi! I just fabricate the dataset and pass the est.predict in your program on yarn-client mode. I got the predict result as below:

+--------------------+-----+--------------------+                               
|               embed|label|          prediction|
+--------------------+-----+--------------------+
|[0.0, 1.0, 1.0, 0...|    1|[0.5107923746109009]|
|[1.0, 2.0, 0.0, 1...|    0|[0.5187397003173828]|
|[2.0, 0.0, 1.0, 0...|    1|[0.5161656141281128]|
+--------------------+-----+--------------------+

I guess the error reported by Tensorflow may be caused by your device. Maybe you could move to another cluster or check the cluster configuration carefully 😀.

sgwhat avatar Aug 10 '22 14:08 sgwhat

Hey, would you mind providing the dataset or where could I reach it?

Sorry, the dataset is my company's. There are only four int field, you can fabricate it.

Hi! I just fabricate the dataset and pass the est.predict in your program on yarn-client mode. I got the predict result as below:

+--------------------+-----+--------------------+                               
|               embed|label|          prediction|
+--------------------+-----+--------------------+
|[0.0, 1.0, 1.0, 0...|    1|[0.5107923746109009]|
|[1.0, 2.0, 0.0, 1...|    0|[0.5187397003173828]|
|[2.0, 0.0, 1.0, 0...|    1|[0.5161656141281128]|
+--------------------+-----+--------------------+

I guess the error reported by Tensorflow may be caused by your device. Maybe you could move to another cluster or check the cluster configuration carefully 😀.

OK, I'll check it.

Alxe1 avatar Aug 11 '22 09:08 Alxe1