Home Ask Login Register

Developers Planet

Your answer is one click away!

Cherry Wu February 2016

How to convert Spark Streaming data into Spark DataFrame

So far, Spark hasn't created the DataFrame for streaming data, but when I am doing anomalies detection, it is more convenient and faster to use DataFrame for data analysis. I have done this part, but when I try to do real time anomalies detection using streaming data, the problems appeared. I tried several ways and still could not convert DStream to DataFrame, and cannot convert the RDD inside of DStream into DataFrame either.

Here's part of my latest version of the code:

import sys
import re

from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import KMeans, KMeansModel, StreamingKMeans
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import operator


sc = SparkContext(appName="test")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)

model_inputs = sys.argv[1]

def streamrdd_to_df(srdd):
    sdf = sqlContext.createDataFrame(srdd)
    sdf.show(n=2, truncate=False)
    return sdf

def main():
    indata = ssc.socketTextStream(sys.argv[2], int(sys.argv[3]))
    inrdd = indata.map(lambda r: get_tuple(r))
    Features = Row('rawFeatures')
    features_rdd = inrdd.map(lambda r: Features(r))
    features_rdd.pprint(num=3)
    streaming_df = features_rdd.flatMap(streamrdd_to_df)

    ssc.start()
    ssc.awaitTermination()

if __name__ == "__main__":
    main()

As you can see in the main() function, when I am reading the input streaming data using ssc.socketTextStream() method, it generates DStream, then I tried to convert each individual in DStream into Row, hoping I could convert the data into DataFrame later.

If I use ppprint() to print out features_rdd here, it works, which makes me think, each individual in features_rdd is a batch of RDD while the whole features_rdd is a DStream.

Then I created streamrdd_to_df() meth

Answers


Sumit February 2016

Read the Error carefully..It says there is No output operations registered. Spark is Lazy and executes the job/ cod only when it has something to produce as a result. In your program there is no "Output Operation" and same is being complained by Spark.

Define a foreach() or Raw SQL Query over the DataFrame and then print the results. It will work fine.

Post Status

Asked in February 2016
Viewed 3,965 times
Voted 12
Answered 1 times

Search




Leave an answer


Quote of the day: live life