Error when running SparkApp from docker container against Spark running in another container

I have a problem that’s bothering me for a few days, and I’m pretty out of ideas.

I built a Spark docker container where Spark runs in standalone mode. Both master and worker are started there. This is machine running in Azure.

  • Can't run Cassandra on Docker with Spark
  • How to run Apache Spark 2.1 Driver Program in docker container with bridge mode
  • Spark standalone cluster on docker in network “bridge”
  • SparkR installation error wget: unrecognized option '--progress-bar'
  • Spark UI is inaccessible from host for sequenceiq/spark:1.6.0
  • Does the Docker RPM require additional dependencies?
  • Now I tried to deploy my Spark Scala App in a separate container(same machine) where I pass the Spark master URL and other stuff I need to connect to Spark. Connection is seamless.

    First problem I encountered was:

    Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.1.0.4): java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaRDDPartition
    

    Then I made a folder of my dependencies except Spark, put them in a folder alongside my app JAR file and added them to SparkConf using SparkConf.setJars,

    Now the strange thing happens:

    Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.1.0.4): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
    

    More than this, if I just run the scala app from my local machine using java -cp <dependencies(including spark jars) cp> myApp.jar it works perfectly, jobs run ok.

    I don’t have any SPARK_HOME locally and the setJars basically takes an empty list, as if I won’t use it and it still works.
    I guess it uses jars provided in the classpath when I run my app and I don’t need to provide anything else.

    If any of you guys have any ideas I would be grateful, I really can’t explain myself why this doesn’t work and I didn’t do any Spark deployments until now. I mostly ran in embedded Spark.

    Spark is same version in my app dependencies (2.0.0) as the one running in the docker container.
    I used:
    Scala 2.11.7 for my app
    Java 1.8 on both containers(app, spark)

    As requested here is the code of my app

      val jars = Option(new File(Properties.spark_jars_path).listFiles()).toList.flatten.map(_.getAbsolutePath)
      val conf = new SparkConf()
        .setMaster(RunUtils.determineMasterUrl(Properties.mode))
        .setAppName(RunUtils.SPARK_APP_NAME)
        .setJars(jars)
        .set("spark.cassandra.connection.host", Properties.cassandra_connection_host)
    
      val ssc = new StreamingContext(conf, Seconds(1))
    
      case class Result(buyDate: Timestamp, endDate: Timestamp, maxDate: Timestamp, buyAmount: Double, buyRate: Double)
    
      def main(args: Array[String]): Unit = {
    
        val DateFormatter = new java.text.SimpleDateFormat("yyyy-MM-dd")
    
        val kafkaParams = Map("metadata.broker.list" -> Properties.kafka_brokers, "auto.offset.reset" -> Properties.kafka_auto_reset_offset)
    
        //
        // BITSTAMP
        //
        val bitstampTopic = Set("bitstamp_trades")
        val bitstampStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, bitstampTopic)
        val bitstampTradeStream = bitstampStream.map(_._2).map { trade =>
          val jsonNode = JsonMapper.readTree(trade)
          Trade(
            "BITSTAMP",
            "BTC_USD",
            if(jsonNode.get("type").asInt() == 1) "SELL" else "BUY",
            DateFormatter.format(new Date(jsonNode.get("timestamp").asLong() * 1000)),
            new Date(jsonNode.get("timestamp").asLong() * 1000),
            jsonNode.get("amount").asDouble(),
            jsonNode.get("price").asDouble()
          )
        }
    
        bitstampTradeStream.saveToCassandra("coin_master", "trades", SomeColumns(
          "exchange_house",
          "exchange_currencies",
          "exchange_type",
          "date",
          "trade_time",
          "amount",
          "price")
        )
       ssc.start()
       ssc.awaitTermination()
      }
    

  • Docker: Get host username from UID via shared PID namespace
  • Docker 1.12 Port Fowarding Services Across Nodes
  • How to run script file(.sh file) inside Dockerfile? [closed]
  • How to connect two docker containers through localhost?
  • How does rancher schedule services to specific hosts?
  • alpine linux: pmap output not showing RSS
  • One Solution collect form web for “Error when running SparkApp from docker container against Spark running in another container”

    Ok, so the problem was my map function.

    More specifically this line was the problem:

    val jsonNode = JsonMapper.readTree(trade)
    

    JsonMapper is actually a configured ObjectMapper from Jackson library and to make it work I should have used sparkContext.broadcast because of some method that had to be called on each executor.

    You can read more about why it didn’t work here:
    Spark: broadcasting jackson ObjectMapper

    So, after changing my code to something like this it worked:

    val broadcastValue = ssc.sparkContext.broadcast(JsonMapper)
    
    val kafkaParams = Map("metadata.broker.list" -> Properties.kafka_brokers, "auto.offset.reset" -> Properties.kafka_auto_reset_offset)
    
    //
    // BITSTAMP
    //
    val bitstampTopic = Set("bitstamp_trades")
    val bitstampStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, bitstampTopic)
    val bitstampTradeStream = bitstampStream.map(_._2).map { trade =>
      broadcastValue.value.registerModule(new DefaultScalaModule with RequiredPropertiesSchemaModule)
      val jsonNode = broadcastValue.value.readTree(trade)
      Trade(
        "BITSTAMP",
        "BTC_USD",
        if(jsonNode.get("type").asInt() == 1) "SELL" else "BUY",
        DateFormatter.format(new Date(jsonNode.get("timestamp").asLong() * 1000)),
        new Date(jsonNode.get("timestamp").asLong() * 1000),
        jsonNode.get("amount").asDouble(),
        jsonNode.get("price").asDouble()
      )
    }
    
    Docker will be the best open platform for developers and sysadmins to build, ship, and run distributed applications.