Error when running SparkApp from docker container against Spark running in another container

I have a problem that’s bothering me for a few days, and I’m pretty out of ideas.

I built a Spark docker container where Spark runs in standalone mode. Both master and worker are started there. This is machine running in Azure.

  • How can I define my ENV variables once in the DockerFile and pass them down to my spark image which is submitted by a supervisord managed script?
  • Spark standalone cluster on docker in network “bridge”
  • Communication with Spark using Spark JobServer in docker
  • How to integrate spark cluster with other Hadoop2.x cluster on Docker
  • deploying spark cluster through docker-compose in dock swarm mode
  • Running YARN cluster in Kubernetes/Mesos
  • Now I tried to deploy my Spark Scala App in a separate container(same machine) where I pass the Spark master URL and other stuff I need to connect to Spark. Connection is seamless.

    First problem I encountered was:

    Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.1.0.4): java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaRDDPartition
    

    Then I made a folder of my dependencies except Spark, put them in a folder alongside my app JAR file and added them to SparkConf using SparkConf.setJars,

    Now the strange thing happens:

    Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.1.0.4): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
    

    More than this, if I just run the scala app from my local machine using java -cp <dependencies(including spark jars) cp> myApp.jar it works perfectly, jobs run ok.

    I don’t have any SPARK_HOME locally and the setJars basically takes an empty list, as if I won’t use it and it still works.
    I guess it uses jars provided in the classpath when I run my app and I don’t need to provide anything else.

    If any of you guys have any ideas I would be grateful, I really can’t explain myself why this doesn’t work and I didn’t do any Spark deployments until now. I mostly ran in embedded Spark.

    Spark is same version in my app dependencies (2.0.0) as the one running in the docker container.
    I used:
    Scala 2.11.7 for my app
    Java 1.8 on both containers(app, spark)

    As requested here is the code of my app

      val jars = Option(new File(Properties.spark_jars_path).listFiles()).toList.flatten.map(_.getAbsolutePath)
      val conf = new SparkConf()
        .setMaster(RunUtils.determineMasterUrl(Properties.mode))
        .setAppName(RunUtils.SPARK_APP_NAME)
        .setJars(jars)
        .set("spark.cassandra.connection.host", Properties.cassandra_connection_host)
    
      val ssc = new StreamingContext(conf, Seconds(1))
    
      case class Result(buyDate: Timestamp, endDate: Timestamp, maxDate: Timestamp, buyAmount: Double, buyRate: Double)
    
      def main(args: Array[String]): Unit = {
    
        val DateFormatter = new java.text.SimpleDateFormat("yyyy-MM-dd")
    
        val kafkaParams = Map("metadata.broker.list" -> Properties.kafka_brokers, "auto.offset.reset" -> Properties.kafka_auto_reset_offset)
    
        //
        // BITSTAMP
        //
        val bitstampTopic = Set("bitstamp_trades")
        val bitstampStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, bitstampTopic)
        val bitstampTradeStream = bitstampStream.map(_._2).map { trade =>
          val jsonNode = JsonMapper.readTree(trade)
          Trade(
            "BITSTAMP",
            "BTC_USD",
            if(jsonNode.get("type").asInt() == 1) "SELL" else "BUY",
            DateFormatter.format(new Date(jsonNode.get("timestamp").asLong() * 1000)),
            new Date(jsonNode.get("timestamp").asLong() * 1000),
            jsonNode.get("amount").asDouble(),
            jsonNode.get("price").asDouble()
          )
        }
    
        bitstampTradeStream.saveToCassandra("coin_master", "trades", SomeColumns(
          "exchange_house",
          "exchange_currencies",
          "exchange_type",
          "date",
          "trade_time",
          "amount",
          "price")
        )
       ssc.start()
       ssc.awaitTermination()
      }
    

  • How to redirect from the main domain to a subdomain in Cloudron?
  • How to get Docker metrics using docker remote API
  • Is there any CI service that will pull from dockerhub?
  • Saving docker container image
  • How to copy image into VS2015 in order to amend code
  • Docker Storage - Getting a Layman's answer
  • One Solution collect form web for “Error when running SparkApp from docker container against Spark running in another container”

    Ok, so the problem was my map function.

    More specifically this line was the problem:

    val jsonNode = JsonMapper.readTree(trade)
    

    JsonMapper is actually a configured ObjectMapper from Jackson library and to make it work I should have used sparkContext.broadcast because of some method that had to be called on each executor.

    You can read more about why it didn’t work here:
    Spark: broadcasting jackson ObjectMapper

    So, after changing my code to something like this it worked:

    val broadcastValue = ssc.sparkContext.broadcast(JsonMapper)
    
    val kafkaParams = Map("metadata.broker.list" -> Properties.kafka_brokers, "auto.offset.reset" -> Properties.kafka_auto_reset_offset)
    
    //
    // BITSTAMP
    //
    val bitstampTopic = Set("bitstamp_trades")
    val bitstampStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, bitstampTopic)
    val bitstampTradeStream = bitstampStream.map(_._2).map { trade =>
      broadcastValue.value.registerModule(new DefaultScalaModule with RequiredPropertiesSchemaModule)
      val jsonNode = broadcastValue.value.readTree(trade)
      Trade(
        "BITSTAMP",
        "BTC_USD",
        if(jsonNode.get("type").asInt() == 1) "SELL" else "BUY",
        DateFormatter.format(new Date(jsonNode.get("timestamp").asLong() * 1000)),
        new Date(jsonNode.get("timestamp").asLong() * 1000),
        jsonNode.get("amount").asDouble(),
        jsonNode.get("price").asDouble()
      )
    }
    
    Docker will be the best open platform for developers and sysadmins to build, ship, and run distributed applications.