Srini February 2016

Spark HBase Join Error: object not serializable class: org.apache.hadoop.hbase.client.Result

I have data across two hbase tables and need to get the joined result from them.

What is the best way to get joined result.? I tried joining using RDDs, but it gave me error. I am getting the following error.

object not serializable (class: org.apache.hadoop.hbase.client.Result

val hbaseConf = HBaseConfiguration.create();
    hbaseConf.set("hbase.zookeeper.quorum", "localhost")
    hbaseConf.set(TableInputFormat.INPUT_TABLE, "table1")

    val table1RDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).persist(StorageLevel.MEMORY_AND_DISK)

    val table1Data = filteredRouters.map(  {case(rowkey:ImmutableBytesWritable, values:Result) => (Bytes.toString(values.getValue(Bytes.toBytes("cf"), Bytes.toBytes("col1"))), values) }).persist(StorageLevel.MEMORY_AND_DISK)

    //-------------//

    hbaseConf.set(TableInputFormat.INPUT_TABLE, "interface")
    val table2RDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).persist(StorageLevel.MEMORY_AND_DISK)

    val table2Data = loopBacks.map(  {case(rowkey:ImmutableBytesWritable, values:Result) => (Bytes.toString(values.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("col1"))), values) }).persist(StorageLevel.MEMORY_AND_DISK)

    interfaceData.foreach({case(key:String, values:Result) => {println("---> key is " + key)}})

    // Got the table data //

    val joinedRDD = routerData.join(interfaceData).persist(StorageLevel.MEMORY_AND_DISK);
    joinedRDD.foreach({case((key:String, results: (Result, Result))) => 
      {
        println(" key is " + key);
        println(" value is ");
      }
    }
    )

StackTrace:

16/02/09 11:21:21 ERROR TaskSetManager: Task 0.0 in stage 6.0 (TID 6) had a not serializable result: org.apache.hadoop.hbase.client.Result
Serialization stack:
    - object not serializable (class: org.apache.hado        

Answers


Srini February 2016

I Solved this problem by using Spark Kyro Serialization.

I have added the following code

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.client.Result]))

That solved the problem.

This would be solution for some other similar problems as well.

Post Status

Asked in February 2016
Viewed 3,937 times
Voted 5
Answered 1 times

Search




Leave an answer