Yankee February 2016

How to filter from RDDs and DataFrames in Spark?

I have a .tsv file pageviews_by_second consisting of the timestamp site and requestsfields:

"timestamp"              "site"   "requests"
"2015-03-16T00:09:55"   "mobile"    1595
"2015-03-16T00:10:39"   "mobile"    1544
"2015-03-16T00:19:39"   "desktop"   2460

I want the first row to be gone, because it leads to errors in the operations I have to perform on the data.

I tried doing it in the following ways:

1.Filtering the RDD before splitting it

val RDD1 = sc.textFile("pageviews_by_second")       
val top_row = RDD1.first() 
//returns: top_row: String = "timestamp"    "site"  "requests"    
val RDD2 = RDD1.filter(x => x!= top_row)
RDD2.first() 
//returns: "2015-03-16T00:09:55"    "mobile"    1595

2.Filtering the RDD after splitting it

val RDD1 = sc.textFile("pageviews_by_second").map(_.split("\t")
RDD1.first()  //returns res0: Array[String] = Array("timestamp, 'site", "requests")
val top_row = RDD1.first()
val RDD2 = RDD1.filter(x => x!= top_row)
RDD2.first() //returns: res1: Array[String] = Array("timestamp", "site" ,"requests")
val RDD2 = RDD1.filter(x => x(0)!="timestamp" && x(1)!="site" && x(2)!="requests")
 RDD2.first() //returns: res1: Array[String] = Array("timestamp", "site" ,"requests")

3.Converting into a DataFrame using 'case class' and the filtering it

case class Wiki(timestamp: String, site: String, requests: String)
val DF = sc.textFile("pageviews_by_second").map(_.split("\t")).map(w => Wiki(w(0), w(1), w(2))).toDF()
val top_row = DF.first()
//returns: top_row: org.apache.spark.sql.Row = ["timestamp","site","requests"]
DF.filter(_ => _ != top_row)
//returns: error: missing parameter type
val DF2 = DF.filter(_ => _ != top_row2)

Why is only the 1st method able to filter out the first row while the other

Answers


nareshbabral February 2016

You first need to understand the data types that you are comparing while removing the top row.

Comparing two strings will yield true or false in method 1. hence it filters out the top row

In method 2 you are comparing 2 Arrays.Use deep method of array to do deeper comparison of arrays in scala

Method2
val RDD1 = sc.textFile("D:\\trial.txt").map(_.split("\t"))
val top_row = RDD1.first()
val RDD2 = RDD1.filter(x => x.deep!= top_row.deep)
RDD2.first().foreach(println(_))

In method 3 you are comparing two rows object of dataframe. It would be better if you convert row to toSeq followed by toArray and then use deep method to filter out first row of dataframe.

//Method 3    
DF.filter(_ => _.toSeq.toArray.deep!=top_row.toSeq.toArray.deep)

Revert if it helps. Thanks!!!


Glennie Helles Sindholt February 2016

First of all, you really should use the spark-csv-package - it can automatically filter out headers when creating the DataFrame (or rdd). You simply have to specify that :)

Secondly, rdds are not ordered in the way that you seem to think they are. Calling first is not guaranteed to return the first row of your csv-file. It your first scenario, apparently you did get the first row, but it's better if you just considered yourself lucky in that case. Also, removing a header like this from a potentially VERY large data set is very inefficient, as Spark will need to search through all the rows to filter out just a single row.

If ordering is important to you for further calculations, you can always do a zipWithIndex. That way, you can then sort the rdd to preserve ordering.


purplebee February 2016

There is way to remove header, instead of deep comparison:

data = sc.textFile('path_to_data') header = data.first() #extract header data = data.filter(lambda x:x !=header) #filter out header

Might be relevant to you: How to skip header from csv files in Spark?


Yankee February 2016

I found another method which would be more efficient than the filter method that I've used. Posting it as an answer as someone else may find it helpful:

rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }

Source: http://stackoverflow.com/a/27857878/3546389

Post Status

Asked in February 2016
Viewed 1,998 times
Voted 4
Answered 4 times

Search




Leave an answer