ipex-llm
ipex-llm copied to clipboard
spark backend distributed predicting got error
my code:
res = est.predict(data=train_data.select("embed"), feature_cols=["embed"])
print(f"=====================res: {res}")
res.show()
The error message:
36/36 [==============================] - 0s 738us/step
=====================res: DataFrame[embed: vector, prediction: vector]
22/08/04 16:14:09 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 362, in main
eval_type = read_int(infile)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 724, in read_int
raise EOFError
EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$26$$anon$2.hasNext(RDD.scala:911)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$26$$anon$2.foreach(RDD.scala:910)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1990)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 352, in func
File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 363, in <lambda>
lambda iter: transform_func(iter, init_params, params)))
File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 360, in transform_func
return SparkRunner(**init_param).predict(**param)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/spark_runner.py", line 438, in predict
local_model = self.model_creator(self.config)
File "/mnt1/liulei/mytest/test.py", line 233, in model_creator
cross_num=4)
File "/mnt1/liulei/mytest/test.py", line 113, in deepcross2
user_embed_ = user_embed(tf.cast(inputs[:, 0], dtype=tf.int64))
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer.py", line 926, in __call__
input_list)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer.py", line 1084, in _functional_construction_call
base_layer_utils.create_keras_history(inputs)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer_utils.py", line 191, in create_keras_history
_, created_layers = _create_keras_history_helper(tensors, set(), [])
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer_utils.py", line 256, in _create_keras_history_helper
layer_inputs, processed_ops, created_layers)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer_utils.py", line 251, in _create_keras_history_helper
constants[i] = backend.eval_in_eager_or_function(op_input)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/backend.py", line 3898, in eval_in_eager_or_function
outputs = graph_fn()
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1655, in __call__
return self._call_impl(args, kwargs)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1673, in _call_impl
return self._call_with_flat_signature(args, kwargs, cancellation_manager)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1722, in _call_with_flat_signature
return self._call_flat(args, self.captured_inputs, cancellation_manager)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1924, in _call_flat
ctx, args, cancellation_manager=cancellation_manager))
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 550, in call
ctx=ctx)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/execute.py", line 60, in quick_execute
inputs, attrs, num_outputs)
tensorflow.python.framework.errors_impl.InvalidArgumentError: Cannot assign a device for operation strided_slice_3_begin_RetVal: Could not satisfy explicit device specification '' because the node {{colocation_node strided_slice_3_begin_RetVal}} was colocated with a group of nodes that required incompatible device '/job:localhost/replica:0/task:0/device:CPU:0'. All available devices [/job:worker/replica:0/task:0/device:CPU:0, /job:worker/replica:0/task:0/device:XLA_CPU:0].
Colocation Debug Info:
Colocation group had the following types and supported devices:
Root Member(assigned_device_name_index_=-1 requested_device_name_='/job:localhost/replica:0/task:0/device:CPU:0' assigned_device_name_='' resource_device_name_='' supported_device_types_=[CPU] possible_devices_=[]
_Retval: CPU XLA_CPU
Const: CPU XLA_CPU
Colocation members, user-requested devices, and framework assigned devices, if any:
strided_slice_3/begin (Const) /job:localhost/replica:0/task:0/device:CPU:0
strided_slice_3_begin_RetVal (_Retval)
[[{{node strided_slice_3_begin_RetVal}}]] [Op:__inference_keras_scratch_graph_3697]
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
... 5 more
22/08/04 16:14:09 ERROR PythonRunner: This may have been caused by a prior exception:
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 352, in func
File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 363, in <lambda>
lambda iter: transform_func(iter, init_params, params)))
File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 360, in transform_func
return SparkRunner(**init_param).predict(**param)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/spark_runner.py", line 438, in predict
local_model = self.model_creator(self.config)
File "/mnt1/liulei/mytest/test.py", line 233, in model_creator
cross_num=4)
File "/mnt1/liulei/mytest/test.py", line 113, in deepcross2
user_embed_ = user_embed(tf.cast(inputs[:, 0], dtype=tf.int64))
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer.py", line 926, in __call__
input_list)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer.py", line 1084, in _functional_construction_call
base_layer_utils.create_keras_history(inputs)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer_utils.py", line 191, in create_keras_history
_, created_layers = _create_keras_history_helper(tensors, set(), [])
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer_utils.py", line 256, in _create_keras_history_helper
layer_inputs, processed_ops, created_layers)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer_utils.py", line 251, in _create_keras_history_helper
constants[i] = backend.eval_in_eager_or_function(op_input)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/backend.py", line 3898, in eval_in_eager_or_function
outputs = graph_fn()
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1655, in __call__
return self._call_impl(args, kwargs)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1673, in _call_impl
return self._call_with_flat_signature(args, kwargs, cancellation_manager)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1722, in _call_with_flat_signature
return self._call_flat(args, self.captured_inputs, cancellation_manager)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1924, in _call_flat
ctx, args, cancellation_manager=cancellation_manager))
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 550, in call
ctx=ctx)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/execute.py", line 60, in quick_execute
inputs, attrs, num_outputs)
tensorflow.python.framework.errors_impl.InvalidArgumentError: Cannot assign a device for operation strided_slice_3_begin_RetVal: Could not satisfy explicit device specification '' because the node {{colocation_node strided_slice_3_begin_RetVal}} was colocated with a group of nodes that required incompatible device '/job:localhost/replica:0/task:0/device:CPU:0'. All available devices [/job:worker/replica:0/task:0/device:CPU:0, /job:worker/replica:0/task:0/device:XLA_CPU:0].
Colocation Debug Info:
Colocation group had the following types and supported devices:
Root Member(assigned_device_name_index_=-1 requested_device_name_='/job:localhost/replica:0/task:0/device:CPU:0' assigned_device_name_='' resource_device_name_='' supported_device_types_=[CPU] possible_devices_=[]
_Retval: CPU XLA_CPU
Const: CPU XLA_CPU
Colocation members, user-requested devices, and framework assigned devices, if any:
strided_slice_3/begin (Const) /job:localhost/replica:0/task:0/device:CPU:0
strided_slice_3_begin_RetVal (_Retval)
[[{{node strided_slice_3_begin_RetVal}}]] [Op:__inference_keras_scratch_graph_3697]
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1990)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
22/08/04 16:14:09 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 362, in main
eval_type = read_int(infile)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 724, in read_int
raise EOFError
EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:297)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:289)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 352, in func
File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 363, in <lambda>
lambda iter: transform_func(iter, init_params, params)))
File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 360, in transform_func
return SparkRunner(**init_param).predict(**param)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/spark_runner.py", line 438, in predict
local_model = self.model_creator(self.config)
File "/mnt1/liulei/mytest/test.py", line 233, in model_creator
cross_num=4)
File "/mnt1/liulei/mytest/test.py", line 113, in deepcross2
user_embed_ = user_embed(tf.cast(inputs[:, 0], dtype=tf.int64))
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer.py", line 926, in __call__
input_list)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer.py", line 1084, in _functional_construction_call
base_layer_utils.create_keras_history(inputs)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer_utils.py", line 191, in create_keras_history
_, created_layers = _create_keras_history_helper(tensors, set(), [])
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer_utils.py", line 256, in _create_keras_history_helper
layer_inputs, processed_ops, created_layers)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/engine/base_layer_utils.py", line 251, in _create_keras_history_helper
constants[i] = backend.eval_in_eager_or_function(op_input)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/keras/backend.py", line 3898, in eval_in_eager_or_function
outputs = graph_fn()
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1655, in __call__
return self._call_impl(args, kwargs)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1673, in _call_impl
return self._call_with_flat_signature(args, kwargs, cancellation_manager)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1722, in _call_with_flat_signature
return self._call_flat(args, self.captured_inputs, cancellation_manager)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 1924, in _call_flat
ctx, args, cancellation_manager=cancellation_manager))
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/function.py", line 550, in call
ctx=ctx)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/tensorflow/python/eager/execute.py", line 60, in quick_execute
inputs, attrs, num_outputs)
tensorflow.python.framework.errors_impl.InvalidArgumentError: Cannot assign a device for operation strided_slice_3_begin_RetVal: Could not satisfy explicit device specification '' because the node {{colocation_node strided_slice_3_begin_RetVal}} was colocated with a group of nodes that required incompatible device '/job:localhost/replica:0/task:0/device:CPU:0'. All available devices [/job:worker/replica:0/task:0/device:CPU:0, /job:worker/replica:0/task:0/device:XLA_CPU:0].
Colocation Debug Info:
Colocation group had the following types and supported devices:
Root Member(assigned_device_name_index_=-1 requested_device_name_='/job:localhost/replica:0/task:0/device:CPU:0' assigned_device_name_='' resource_device_name_='' supported_device_types_=[CPU] possible_devices_=[]
_Retval: CPU XLA_CPU
Const: CPU XLA_CPU
Hi! Could you please provide your code for us to reproduce, since it could help us to locate the problem soon? By the way, where did you run the program, on Spark Local?
request device '/job:localhost/replica:0/task:0/device:CPU:0' but All available devices [ /job:worker/replica:0/task:0/device:CPU:0, /job:worker/replica:0/task:0/device:XLA_CPU:0].
Hi! Could you please provide your code for us to reproduce, since it could help us to locate the problem soon? By the way, where did you run the program, on Spark Local?
def train_test():
from bigdl.orca.learn.tf2 import Estimator
from bigdl.orca import init_orca_context
from bigdl.orca import OrcaContext
sc = init_orca_context(cluster_mode='local', cores=8, memory="10g", num_nodes=3)
conf = SparkConf().setAppName("test")
conf.set("spark.sql.execution.arrow.enabled", True)
conf.set("spark.sql.execution.arrow.fallback.enabled", True)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
MODEL_PATH = "/mytest/model"
data_transform = DataTransform(MODEL_PATH, spark)
uid_num, vid_num, sparse_num, data_count, sdf = data_transform.process()
config = {"uid_num": int(uid_num), "vid_num": int(vid_num), "sparse_num": int(sparse_num)}
est = Estimator.from_keras(model_creator=model_creator,
config=config,
backend="spark",
model_dir="hdfs://ip:port/ckpt")
train_data, test_data = sdf.randomSplit([0.8, 0.2], 100)
stats = est.fit(train_data,
epochs=2,
batch_size=512,
feature_cols=["embed"],
label_cols=["label"],
steps_per_epoch=data_count // 512)
print("stats: {}".format(stats))
est.save("/mytest/model.h5", save_format="h5")
res = est.predict(data=sdf, feature_cols=["embed"])
res.show()
Hi, I have reviewed the error report, I think it may be related to your model code or TensorFlow, which is not a bug caused by BigDL.
From the error report you provided:
File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 360, in transform_func
return SparkRunner(**init_param).predict(**param)
File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/spark_runner.py", line 438, in predict
local_model = self.model_creator(self.config)
File "/mnt1/liulei/mytest/test.py", line 233, in model_creator
cross_num=4)
File "/mnt1/liulei/mytest/test.py", line 113, in deepcross2
user_embed_ = user_embed(tf.cast(inputs[:, 0], dtype=tf.int64))
It shows the error that happened when creating a model from model_creator
function (before starting predicting). Maybe you could provide more code including DeepCross
model, and we could help to look into how this error was caused.
Hi, I have reviewed the error report, I think it may be related to your model code or TensorFlow, which is not a bug caused by BigDL.
From the error report you provided:
File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 360, in transform_func return SparkRunner(**init_param).predict(**param) File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/spark_runner.py", line 438, in predict local_model = self.model_creator(self.config) File "/mnt1/liulei/mytest/test.py", line 233, in model_creator cross_num=4) File "/mnt1/liulei/mytest/test.py", line 113, in deepcross2 user_embed_ = user_embed(tf.cast(inputs[:, 0], dtype=tf.int64))
It shows the error that happened when creating a model from
model_creator
function (before starting predicting). Maybe you could provide more code includingDeepCross
model, and we could help to look into how this error was caused.
This is the complete code:
import json
import os
import numpy as np
import tensorflow as tf
from pyspark import Row
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType
from tensorflow.keras import Model
from tensorflow.keras.layers import Embedding, Dense
import datetime
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler, QuantileDiscretizer
from sklearn.metrics import accuracy_score, precision_score, recall_score
from tensorflow.python.keras.layers import Concatenate
from tensorflow.python.keras.regularizers import l2
from base_class.spark_base import SparkBase
from pyspark import SparkContext, SparkConf
def deepcross(user_num, item_num, user_item_dim, sparse_num,
embed_norm, dnn_hidden_units, dnn_activation, dnn_dropout, cross_num):
inputs = tf.keras.layers.Input(shape=(sparse_num,), name="input")
user_embed = Embedding(input_dim=user_num,
output_dim=user_item_dim,
embeddings_initializer="random_normal",
embeddings_regularizer=l2(embed_norm),
input_length=1)
user_embed_ = user_embed(tf.cast(inputs[:, 0], dtype=tf.int64))
item_embed = Embedding(input_dim=item_num,
output_dim=user_item_dim,
embeddings_initializer="random_normal",
embeddings_regularizer=l2(embed_norm),
input_length=1)
item_embed_ = item_embed(tf.cast(inputs[:, 1], dtype=tf.int64))
sparse_feature = inputs[:, 2:]
input_features = Concatenate(axis=1)([user_embed_, item_embed_, sparse_feature])
dnn_output_1 = Dense(units=64, activation="relu")(input_features)
dnn_output = Dense(units=64, activation="relu")(dnn_output_1)
# cross_output = CrossLayer(cross_num=cross_num)(input_features)
# output_1 = Concatenate(axis=-1)([dnn_output, cross_output])
output_2 = Dense(1)(dnn_output)
output = tf.nn.sigmoid(output_2)
model = Model(inputs, output, name="my_model")
return model
class DataTransform(object):
def __init__(self, model_path, spark):
self.MODEL_PATH = model_path
self.spark = spark
def read_data(self):
""""""
data_date = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime("%Y%m%d")
# spark_base = SparkBase()
sdf = self.spark.sql(
"select uid,vr_id,gender, age,is_click from db.my_table "
"where data_date >= {}".format(data_date))
sampled_sdf = sdf.sampleBy("is_click", fractions={0: 0.1, 1: 0.2}, seed=42)
return sampled_sdf
def transform_data(self, sdf):
sdf = sdf.fillna(0)
@pandas_udf(IntegerType())
def bucket_age(v):
def _bucket_age(row):
if row < 13:
return 0
elif 13 <= row < 18:
return 1
elif 18 <= row < 24:
return 2
elif 24 <= row < 30:
return 3
elif 30 <= row < 40:
return 4
elif 40 <= row < 50:
return 5
else:
return 6
return v.map(_bucket_age)
sdf = sdf.withColumn("age_index", bucket_age("age"))
string_index_uid = StringIndexer(inputCol="uid", outputCol="uid_index", handleInvalid="keep")
string_index_vid = StringIndexer(inputCol="vr_id", outputCol="vid_index", handleInvalid="keep")
string_index_gender = StringIndexer(inputCol="gender", outputCol="gender_index", handleInvalid='keep')
one_hot_encoder = OneHotEncoderEstimator(inputCols=["gender_index", "age_index"],
outputCols=["o_gender_index", "o_age_index"])
pipeline = Pipeline(stages=[string_index_uid, string_index_vid, string_index_gender, one_hot_encoder])
transform_model = pipeline.fit(sdf)
transformed_df = transform_model.transform(sdf)
vector_assembler = VectorAssembler(inputCols=["uid_index", "vid_index", "o_gender_index", "o_age_index"],
outputCol="embedded_vector")
va_df = vector_assembler.transform(transformed_df)
df = va_df.select("embedded_vector", "is_click").rdd. \
map(lambda x: Row(embed=Vectors.dense(x.embedded_vector).toArray().tolist(), label=x.is_click)). \
toDF()
uid_num = int(va_df.select("uid_index").rdd.max()[0])
vid_num = int(va_df.select("vid_index").rdd.max()[0])
sparse_num = len(df.select("embed").rdd.take(1)[0].embed)
data_count = df.count()
return uid_num, vid_num, sparse_num, data_count, df
def process(self):
data = self.read_data()
uid_num, vid_num, sparse_num, data_count, df = self.transform_data(data)
print(f"uid_num: {uid_num}, vid_num: {vid_num}, sparse_num: {sparse_num}, data_count: {data_count}")
df.show()
return uid_num, vid_num, sparse_num, data_count, df
def model_creator(config):
deep_cross = deepcross(user_num=config["uid_num"]+1,
item_num=config["vid_num"]+1,
user_item_dim=16,
sparse_num=config["sparse_num"],
embed_norm=0.001,
dnn_hidden_units=[int(e) for e in [128, 64, 32]],
dnn_activation="relu",
dnn_dropout=0.2,
cross_num=4)
loss = tf.keras.losses.BinaryCrossentropy()
optimizer = tf.keras.optimizers.Adam()
deep_cross.compile(optimizer=optimizer, loss=loss, metrics=[tf.keras.metrics.AUC()])
return deep_cross
def train_test():
from bigdl.orca.learn.tf2 import Estimator
from bigdl.orca import init_orca_context
from bigdl.orca import OrcaContext
sc = init_orca_context(cluster_mode='yarn-client', cores=8, memory="10g", num_nodes=3)
conf = SparkConf().setAppName("test")
conf.set("spark.sql.execution.arrow.enabled", True)
conf.set("spark.sql.execution.arrow.fallback.enabled", True)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
MODEL_PATH = "/mytest/model"
data_transform = DataTransform(MODEL_PATH, spark)
uid_num, vid_num, sparse_num, data_count, sdf = data_transform.process()
config = {"uid_num": int(uid_num), "vid_num": int(vid_num), "sparse_num": int(sparse_num)}
est = Estimator.from_keras(model_creator=model_creator,
config=config,
backend="spark",
model_dir="hdfs://ip:port/ckpt")
train_data, test_data = sdf.randomSplit([0.8, 0.2], 100)
stats = est.fit(train_data,
epochs=2,
batch_size=512,
feature_cols=["embed"],
label_cols=["label"],
steps_per_epoch=data_count // 512)
print("stats: {}".format(stats))
est.save("/mytest/model.h5", save_format="h5")
# stats = est.evaluate(sdf,
# feature_cols=["embed"],
# label_cols=["label"],
# num_steps=512)
# print("stats: {}".format(stats))
print(f"=========================data_count: {data_count}")
res = est.predict(data=sdf, feature_cols=["embed"])
res.show()
# est.shutdown()
if __name__ == '__main__':
train_test()
It trained the data sucessfully, But In prediction, it raised the error.
Hi, I have reviewed the error report, I think it may be related to your model code or TensorFlow, which is not a bug caused by BigDL. From the error report you provided:
File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/pyspark_estimator.py", line 360, in transform_func return SparkRunner(**init_param).predict(**param) File "/mnt/softwares/my_env/lib/python3.6/site-packages/bigdl/orca/learn/tf2/spark_runner.py", line 438, in predict local_model = self.model_creator(self.config) File "/mnt1/liulei/mytest/test.py", line 233, in model_creator cross_num=4) File "/mnt1/liulei/mytest/test.py", line 113, in deepcross2 user_embed_ = user_embed(tf.cast(inputs[:, 0], dtype=tf.int64))
It shows the error that happened when creating a model from
model_creator
function (before starting predicting). Maybe you could provide more code includingDeepCross
model, and we could help to look into how this error was caused.This is the complete code:
import json import os import numpy as np import tensorflow as tf from pyspark import Row from pyspark.ml.linalg import Vectors from pyspark.sql import SparkSession from pyspark.sql.functions import pandas_udf from pyspark.sql.types import IntegerType from tensorflow.keras import Model from tensorflow.keras.layers import Embedding, Dense import datetime from pyspark.ml import Pipeline from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler, QuantileDiscretizer from sklearn.metrics import accuracy_score, precision_score, recall_score from tensorflow.python.keras.layers import Concatenate from tensorflow.python.keras.regularizers import l2 from base_class.spark_base import SparkBase from pyspark import SparkContext, SparkConf def deepcross(user_num, item_num, user_item_dim, sparse_num, embed_norm, dnn_hidden_units, dnn_activation, dnn_dropout, cross_num): inputs = tf.keras.layers.Input(shape=(sparse_num,), name="input") user_embed = Embedding(input_dim=user_num, output_dim=user_item_dim, embeddings_initializer="random_normal", embeddings_regularizer=l2(embed_norm), input_length=1) user_embed_ = user_embed(tf.cast(inputs[:, 0], dtype=tf.int64)) item_embed = Embedding(input_dim=item_num, output_dim=user_item_dim, embeddings_initializer="random_normal", embeddings_regularizer=l2(embed_norm), input_length=1) item_embed_ = item_embed(tf.cast(inputs[:, 1], dtype=tf.int64)) sparse_feature = inputs[:, 2:] input_features = Concatenate(axis=1)([user_embed_, item_embed_, sparse_feature]) dnn_output_1 = Dense(units=64, activation="relu")(input_features) dnn_output = Dense(units=64, activation="relu")(dnn_output_1) # cross_output = CrossLayer(cross_num=cross_num)(input_features) # output_1 = Concatenate(axis=-1)([dnn_output, cross_output]) output_2 = Dense(1)(dnn_output) output = tf.nn.sigmoid(output_2) model = Model(inputs, output, name="my_model") return model class DataTransform(object): def __init__(self, model_path, spark): self.MODEL_PATH = model_path self.spark = spark def read_data(self): """""" data_date = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime("%Y%m%d") # spark_base = SparkBase() sdf = self.spark.sql( "select uid,vr_id,gender, age,is_click from db.my_table " "where data_date >= {}".format(data_date)) sampled_sdf = sdf.sampleBy("is_click", fractions={0: 0.1, 1: 0.2}, seed=42) return sampled_sdf def transform_data(self, sdf): sdf = sdf.fillna(0) @pandas_udf(IntegerType()) def bucket_age(v): def _bucket_age(row): if row < 13: return 0 elif 13 <= row < 18: return 1 elif 18 <= row < 24: return 2 elif 24 <= row < 30: return 3 elif 30 <= row < 40: return 4 elif 40 <= row < 50: return 5 else: return 6 return v.map(_bucket_age) sdf = sdf.withColumn("age_index", bucket_age("age")) string_index_uid = StringIndexer(inputCol="uid", outputCol="uid_index", handleInvalid="keep") string_index_vid = StringIndexer(inputCol="vr_id", outputCol="vid_index", handleInvalid="keep") string_index_gender = StringIndexer(inputCol="gender", outputCol="gender_index", handleInvalid='keep') one_hot_encoder = OneHotEncoderEstimator(inputCols=["gender_index", "age_index"], outputCols=["o_gender_index", "o_age_index"]) pipeline = Pipeline(stages=[string_index_uid, string_index_vid, string_index_gender, one_hot_encoder]) transform_model = pipeline.fit(sdf) transformed_df = transform_model.transform(sdf) vector_assembler = VectorAssembler(inputCols=["uid_index", "vid_index", "o_gender_index", "o_age_index"], outputCol="embedded_vector") va_df = vector_assembler.transform(transformed_df) df = va_df.select("embedded_vector", "is_click").rdd. \ map(lambda x: Row(embed=Vectors.dense(x.embedded_vector).toArray().tolist(), label=x.is_click)). \ toDF() uid_num = int(va_df.select("uid_index").rdd.max()[0]) vid_num = int(va_df.select("vid_index").rdd.max()[0]) sparse_num = len(df.select("embed").rdd.take(1)[0].embed) data_count = df.count() return uid_num, vid_num, sparse_num, data_count, df def process(self): data = self.read_data() uid_num, vid_num, sparse_num, data_count, df = self.transform_data(data) print(f"uid_num: {uid_num}, vid_num: {vid_num}, sparse_num: {sparse_num}, data_count: {data_count}") df.show() return uid_num, vid_num, sparse_num, data_count, df def model_creator(config): deep_cross = deepcross(user_num=config["uid_num"]+1, item_num=config["vid_num"]+1, user_item_dim=16, sparse_num=config["sparse_num"], embed_norm=0.001, dnn_hidden_units=[int(e) for e in [128, 64, 32]], dnn_activation="relu", dnn_dropout=0.2, cross_num=4) loss = tf.keras.losses.BinaryCrossentropy() optimizer = tf.keras.optimizers.Adam() deep_cross.compile(optimizer=optimizer, loss=loss, metrics=[tf.keras.metrics.AUC()]) return deep_cross def train_test(): from bigdl.orca.learn.tf2 import Estimator from bigdl.orca import init_orca_context from bigdl.orca import OrcaContext sc = init_orca_context(cluster_mode='yarn-client', cores=8, memory="10g", num_nodes=3) conf = SparkConf().setAppName("test") conf.set("spark.sql.execution.arrow.enabled", True) conf.set("spark.sql.execution.arrow.fallback.enabled", True) spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate() MODEL_PATH = "/mytest/model" data_transform = DataTransform(MODEL_PATH, spark) uid_num, vid_num, sparse_num, data_count, sdf = data_transform.process() config = {"uid_num": int(uid_num), "vid_num": int(vid_num), "sparse_num": int(sparse_num)} est = Estimator.from_keras(model_creator=model_creator, config=config, backend="spark", model_dir="hdfs://ip:port/ckpt") train_data, test_data = sdf.randomSplit([0.8, 0.2], 100) stats = est.fit(train_data, epochs=2, batch_size=512, feature_cols=["embed"], label_cols=["label"], steps_per_epoch=data_count // 512) print("stats: {}".format(stats)) est.save("/mytest/model.h5", save_format="h5") # stats = est.evaluate(sdf, # feature_cols=["embed"], # label_cols=["label"], # num_steps=512) # print("stats: {}".format(stats)) print(f"=========================data_count: {data_count}") res = est.predict(data=sdf, feature_cols=["embed"]) res.show() # est.shutdown() if __name__ == '__main__': train_test()
It trained the data sucessfully, But In prediction, it raised the error.
Got it! We will try to reproduce it and check it out.
Hey, would you mind providing the dataset or where could I reach it?
Hey, would you mind providing the dataset or where could I reach it?
Sorry, the dataset is my company's. There are only four int field, you can fabricate it.
Hey, would you mind providing the dataset or where could I reach it?
Sorry, the dataset is my company's. There are only four int field, you can fabricate it.
Hi! I just fabricate the dataset and pass the est.predict
in your program on yarn-client mode. I got the predict result as below:
+--------------------+-----+--------------------+
| embed|label| prediction|
+--------------------+-----+--------------------+
|[0.0, 1.0, 1.0, 0...| 1|[0.5107923746109009]|
|[1.0, 2.0, 0.0, 1...| 0|[0.5187397003173828]|
|[2.0, 0.0, 1.0, 0...| 1|[0.5161656141281128]|
+--------------------+-----+--------------------+
I guess the error reported by Tensorflow may be caused by your device. Maybe you could move to another cluster or check the cluster configuration carefully 😀.
Hey, would you mind providing the dataset or where could I reach it?
Sorry, the dataset is my company's. There are only four int field, you can fabricate it.
Hi! I just fabricate the dataset and pass the
est.predict
in your program on yarn-client mode. I got the predict result as below:+--------------------+-----+--------------------+ | embed|label| prediction| +--------------------+-----+--------------------+ |[0.0, 1.0, 1.0, 0...| 1|[0.5107923746109009]| |[1.0, 2.0, 0.0, 1...| 0|[0.5187397003173828]| |[2.0, 0.0, 1.0, 0...| 1|[0.5161656141281128]| +--------------------+-----+--------------------+
I guess the error reported by Tensorflow may be caused by your device. Maybe you could move to another cluster or check the cluster configuration carefully 😀.
OK, I'll check it.