How to Convert Spark Schemardd into Rdd of My Case Class

How to convert spark SchemaRDD into RDD of my case class?

The best solution I've come up with that requires the least amount of copy and pasting for new classes is as follows (I'd still like to see another solution though)

First you have to define your case class, and a (partially) reusable factory method

import org.apache.spark.sql.catalyst.expressions

case class MyClass(fooBar: Long, fred: Long)

// Here you want to auto gen these functions using macros or something
object Factories extends java.io.Serializable {
def longLong[T](fac: (Long, Long) => T)(row: expressions.Row): T =
fac(row(0).asInstanceOf[Long], row(1).asInstanceOf[Long])
}

Some boiler plate which will already be available

import scala.reflect.runtime.universe._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD

The magic

import scala.reflect.ClassTag
import org.apache.spark.sql.SchemaRDD

def camelToUnderscores(name: String) =
"[A-Z]".r.replaceAllIn(name, "_" + _.group(0).toLowerCase())

def getCaseMethods[T: TypeTag]: List[String] = typeOf[T].members.sorted.collect {
case m: MethodSymbol if m.isCaseAccessor => m
}.toList.map(_.toString)

def caseClassToSQLCols[T: TypeTag]: List[String] =
getCaseMethods[T].map(_.split(" ")(1)).map(camelToUnderscores)

def schemaRDDToRDD[T: TypeTag: ClassTag](schemaRDD: SchemaRDD, fac: expressions.Row => T) = {
val tmpName = "tmpTableName" // Maybe should use a random string
schemaRDD.registerAsTable(tmpName)
sqlContext.sql("SELECT " + caseClassToSQLCols[T].mkString(", ") + " FROM " + tmpName)
.map(fac)
}

Example use

val parquetFile = sqlContext.parquetFile(path)

val normalRDD: RDD[MyClass] =
schemaRDDToRDD[MyClass](parquetFile, Factories.longLong[MyClass](MyClass.apply))

See also:

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Convert-SchemaRDD-back-to-RDD-td9071.html

Though I failed to find any example or documentation by following the JIRA link.

In spark, is there a way to convert the RDD objects into case objects

Yes. Definitely you can do that. Following code should help you do that

val array = Array((20254552,"ATM",-5100), (20174649,"ATM",5120))
val rdd = sparkContext.parallelize(array)
val transedRdd = rdd.map(x => (x._1, trans(x._2, x._3)))

You should create case class outside your current class

case class trans(atm : String, num: Int)

I hope it helps

spark schema rdd to RDD

Found the answer , its three step process to convert data frame or spark.sql.row.RDD to plain RDD.

sc.parallelize(List())
map to string

val distinct_tweets=hiveCtx.sql(" select distinct(text) from tweets_table where text <> ''")
val distinct_tweets_op=distinct_tweets.collect()
val distinct_tweets_list=sc.parallelize(List(distinct_tweets_op))
val distinct_tweets_string=distinct_tweets.map(x=>x.toString)

val test_kali=distinct_tweets_string.flatMap(line =>line.split(" ")).map(word => (word,1)).reduceByKey(_+_).sortBy {case (key,value) => -value}.map { case (key,value) => Array(key,value).mkString(",") }
test_kali.collect().foreach(println)

case class kali_test(text: String)
val test_kali_op=test_kali.map(_.split(" ")).map(p => kali_test(p(0)))
test_kali_op.registerTempTable("kali_test")
hiveCtx.sql(" select * from kali_test limit 10 ").collect().foreach(println)

This way I don't need to load in a file , I can do my operations on fly.

Thanks
Sri

How to convert case class RDD to RDD[String]?

Simplest solution is to override the toString method in your case class

case class iclass(Id1:Int,Id2:Int,SaleDate:String,Code:String) {
override def toString(): String = {
s"$Id1,$Id2,$SaleDate,$Code"
}
}

If you have an RDD[iclass] and want to convert it to an RDD[String], you can then just map it like insureRDD1.map(_.toString)

How to convert a case-class-based RDD into a DataFrame?

All you need is just

val dogDF = sqlContext.createDataFrame(dogRDD)

Second parameter is part of Java API and expects you class follows java beans convention (getters/setters). Your case class doesn't follow this convention, so no property is detected, that leads to empty DataFrame with no columns.

How to convert parquet data to case classes with spark?

Given

case class Browser(family: String,
major: Option[Int] = None,
language: String)

case class Tech(browsers: Seq[Browser],
devices: Seq[String],
oss: Seq[String])

case class Person(userId: String,
tech: Option[Tech] = None,
geographic: Option[String] = None)

and some convenience types/functions for org.apache.spark.sql.Row

type A[E] = collection.mutable.WrappedArray[E]

implicit class RichRow(val r: Row) {
def getOpt[T](n: String): Option[T] = {
if (isNullAt(n)) {
None
} else {
Some(r.getAs[T](n))
}
}

def getStringOpt(n: String) = getOpt[String](n)
def getString(n: String) = getStringOpt(n).get

def getIntOpt(n: String) = getOpt[Int](n)
def getInt(n: String) = r.getIntOpt(n).get

def getArray[T](n: String) = r.getAs[A[T]](n)

def getRow(n: String) = r.getAs[Row](n)
def getRows(n: String) = r.getAs[A[Row]](n)

def isNullAt(n: String) = r.isNullAt(r.fieldIndex(n))
}

then parsing can be organized in some functions:

def toBrowser(r: Row): Browser = {
Browser(
r.getString("family"),
r.getIntOpt("major"),
r.getString("language"))
}

def toBrowsers(rows: A[Row]): Seq[Browser] = {
rows.map(toBrowser)
}

def toTech(r: Row): Tech = {
Tech(
toBrowsers(r.getRows("browsers")),
r.getArray[String]("devices"),
r.getArray[String]("oss"))
}

def toTechOpt(r: Row): Option[Tech] = {
Option(r).map(toTech)
}

def toPerson(r: Row): Person = {
Person(
r.getString("userId"),
toTechOpt(r.getRow("tech")),
r.getStringOpt("geographic"))
}

so you can write

df.map(toPerson).collect().foreach(println)

  • I have organized the parse functions into "stand-alone" methods. I'd normally put these either as apply into the companion object of the case class or as implicit values classes for Row as well. The reason for functions is that this is easier to paste into the spark-shell

  • Each parse function handles plain columns and arrays directly, but delegates to another function when it encounters a collection (Seq and Option - these represent the next nesting level)

  • The implict class should extend AnyVal, but again this cannot be pasted into the spark-shell

How to convert rdd object to dataframe in spark

SparkSession has a number of createDataFrame methods that create a DataFrame given an RDD. I imagine one of these will work for your context.

For example:

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

Creates a DataFrame from an RDD containing Rows using the given
schema.

Mapping RDD to case(Schema) in Spark with Scala

Is this the thing you're looking for?

val input: RDD[((String, String), List[(Int, Timestamp, String, Int)])] = ...
val output: RDD[(Int, String, String, String, Timestamp, Int)] = input.flatMap { case ((pid, name), list) =>
list.map { case (id, date, code, level) =>
(id, name, code, pid, date, level)
}
}

or using for comprehension:

val output: RDD[(Int, String, String, String, Timestamp, Int)] = for {
((pid, name), list) <- input
(id, date, code, level) <- list
} yield (id, name, code, pid, date, level)


Related Topics



Leave a reply



Submit