Tomy February 2016

Adding data to a hashmap from on apache-spark RDD operation (Java)

I've used a map step to create a JavaRDD object containing some objects I need. Based on those objects I want to create a global hashmap containing some stats, but I can't figure out which RDD operation to use. At first I thought reduce would be the solution, but then I saw that you have to return the same type of objects. I'm not interested in reducing the items, but in gathering all the stats from all the machines (they can be computed separately and then just added up_.

For example: I have an RDD of Objects containing an integer array among other stuff and I want to compute how many times each of the integers has appeared in the array by putting them into a hashtable. Each machine should compute it's own hashtable and then put them all in one place in the driver.

Answers


Tzach Zohar February 2016

Often when you think you want to end up with a Map, you'd need to transform your records in the RDD into key-value pairs, and use reduceByKey.

Your specific example sounds exactly like the famous wordcount example (see first example here), only you want to count integers from an array within an object, instead of counting words from a sentence (String). In Scala, this would translate to:

import org.apache.spark.rdd.RDD
import scala.collection.Map

class Example {

  case class MyObj(ints: Array[Int], otherStuff: String)

  def countInts(input: RDD[MyObj]): Map[Int, Int] = {
    input
      .flatMap(_.ints)    // flatMap maps each record into several records - in this case, each int becomes a record 
      .map(i => (i, 1))   // turn into key-value map, with preliminary value 1 for each key
      .reduceByKey(_ + _) // aggregate values by key
      .collectAsMap()     // collects data into a Map
  }
}

Generally, you should let Spark perform as much of the operation as possible in a distributed manner, and delay the collection into memory as much as possible - if you collect the values before reducing, often you'll run out of memory, unless your dataset is small enough to begin with (in which case, you don't really need Spark).

Edit: and here's the same code in Java (much longer, but identical...):

    static class MyObj implements Serializable {
        Integer[] ints;
        String otherStuff;
    }

    Map<Integer, Integer> countInts(JavaRDD<MyObj> input) {
        return input
                .flatMap(new FlatMapFunction<MyObj, Integer>() {
                    @Override
                    public Iterable<Integer> call(MyObj myObj) throws Exception {
                        return Arrays.asList(myObj.ints);
                    }
                })    //  

Post Status

Asked in February 2016
Viewed 2,216 times
Voted 13
Answered 1 times

Search




Leave an answer