seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Bug] [spark.source.FakeStream] runtime error->ConfigObject is immutable, you can't call Map.put

Open tmljob opened this issue 2 years ago • 4 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

When running the template configuration of spark streaming(spark.streaming.conf.template), the error is as follows.

SeaTunnel Version

dev

SeaTunnel Config

env {
  # You can set spark configuration here
  # see available properties defined by spark: https://spark.apache.org/docs/latest/configuration.html#available-properties
  spark.app.name = "SeaTunnel"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
  spark.streaming.batchDuration = 5
}

source {
  # This is a example input plugin **only for test and demonstrate the feature input plugin**
  FakeStream {
    content = ["Hello World, SeaTunnel"]
  }

  # You can also use other input plugins, such as file
  # file {
  #   result_table_name = "accesslog"
  #   path = "hdfs://hadoop-cluster-01/nginx/accesslog"
  #   format = "json"
  # }

  # If you would like to get more information about how to configure seatunnel and see full list of input plugins,
  # please go to https://seatunnel.apache.org/docs/spark/configuration/source-plugins/FakeStream
}

transform {

  split {
    fields = ["msg", "name"]
    delimiter = ","
  }

  # you can also use other filter plugins, such as sql
  # Sql {
  #   sql = "select * from accesslog where request_time > 1000"
  # }

  # If you would like to get more information about how to configure seatunnel and see full list of filter plugins,
  # please go to https://seatunnel.apache.org/docs/spark/configuration/transform-plugins/Split
}

sink {
  # choose stdout output plugin to output data to console
  Console {}

  # you can also use other output plugins, such as hdfs
  # hdfs {
  #   path = "hdfs://hadoop-cluster-01/nginx/accesslog_processed"
  #   save_mode = "append"
  # }

  # If you would like to get more information about how to configure seatunnel and see full list of output plugins,
  # please go to https://seatunnel.apache.org/docs/spark/configuration/sink-plugins/Console
}

Running Command

./bin/start-seatunnel-spark.sh --master local --deploy-mode client --config ./config/spark.streaming.conf.template

Error Exception

22/05/11 20:22:53 ERROR util.Utils: Exception encountered
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException: ConfigObject is immutable, you can't call Map.put
Serialization trace:
object (org.apache.seatunnel.shade.com.typesafe.config.impl.SimpleConfig)
config (org.apache.seatunnel.spark.fake.source.FakeReceiver)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:391)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
        at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:36)
        at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:23)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
        at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:279)
        at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1$$anonfun$apply$mcV$sp$2.apply(ParallelCollectionRDD.scala:80)
        at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1$$anonfun$apply$mcV$sp$2.apply(ParallelCollectionRDD.scala:80)
        at org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:211)
        at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:80)
        at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
        at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1374)
        at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
        at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:375)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: ConfigObject is immutable, you can't call Map.put
        at org.apache.seatunnel.shade.com.typesafe.config.impl.AbstractConfigObject.weAreImmutable(AbstractConfigObject.java:193)
        at org.apache.seatunnel.shade.com.typesafe.config.impl.AbstractConfigObject.put(AbstractConfigObject.java:204)
        at org.apache.seatunnel.shade.com.typesafe.config.impl.AbstractConfigObject.put(AbstractConfigObject.java:20)
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:162)
        at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        ... 38 more

Flink or Spark Version

spark 2.4

Java or Scala Version

java 8

Screenshots

No response

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

tmljob avatar May 11 '22 12:05 tmljob

Is this issue same as https://github.com/apache/incubator-seatunnel/issues/1512 ?

EricJoy2048 avatar May 14 '22 06:05 EricJoy2048

Is this issue same as #1512 ?

These two issues have similar error messages, but in different versions and in different execution scenarios.

#1512,seatunnel1.5.7 #1853,dev

tmljob avatar May 15 '22 14:05 tmljob

apache-seatunnel-incubating-2.1.1 spark-2.4.7

Caused by: java.lang.UnsupportedOperationException: ConfigObject is immutable, you can't call Map.put

einvince avatar Jun 03 '22 06:06 einvince

@tmljob i have the same problem,do you solve it?

liuq2008202 avatar Nov 21 '22 02:11 liuq2008202

@tmljob i have the same problem,do you solve it?

me too,st2.1.3 spark2.4.8

pdlovedy avatar Mar 22 '23 06:03 pdlovedy