Check Type: How to Check If Something Is a Rdd or a Dataframe

Check Type: How to check if something is a RDD or a DataFrame?

isinstance will work just fine:

from pyspark.sql import DataFrame
from pyspark.rdd import RDD

def foo(x):
if isinstance(x, RDD):
return "RDD"
if isinstance(x, DataFrame):
return "DataFrame"

foo(sc.parallelize([]))
## 'RDD'
foo(sc.parallelize([("foo", 1)]).toDF())
## 'DataFrame'

but single dispatch is much more elegant approach:

from functools import singledispatch

@singledispatch
def bar(x):
pass

@bar.register(RDD)
def _(arg):
return "RDD"

@bar.register(DataFrame)
def _(arg):
return "DataFrame"

bar(sc.parallelize([]))
## 'RDD'

bar(sc.parallelize([("foo", 1)]).toDF())
## 'DataFrame'

If you don't mind additional dependencies multipledispatch is also an interesting option:

from multipledispatch import dispatch

@dispatch(RDD)
def baz(x):
return "RDD"

@dispatch(DataFrame)
def baz(x):
return "DataFrame"

baz(sc.parallelize([]))
## 'RDD'

baz(sc.parallelize([("foo", 1)]).toDF())
## 'DataFrame'

Finally the most Pythonic approach is to simply check an interface:

def foobar(x):
if hasattr(x, "rdd"):
## It is a DataFrame
else:
## It (probably) is a RDD

How can I check whether my RDD or dataframe is cached or not?

You can call getStorageLevel.useMemory on the Dataframe and the RDD to find out if the dataset is in memory.

For the Dataframe do this:

scala> val df = Seq(1, 2).toDF()
df: org.apache.spark.sql.DataFrame = [value: int]

scala> df.storageLevel.useMemory
res1: Boolean = false

scala> df.cache()
res0: df.type = [value: int]

scala> df.storageLevel.useMemory
res1: Boolean = true

For the RDD do this:

scala> val rdd = sc.parallelize(Seq(1,2))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:21

scala> rdd.getStorageLevel.useMemory
res9: Boolean = false

scala> rdd.cache()
res10: rdd.type = ParallelCollectionRDD[1] at parallelize at <console>:21

scala> rdd.getStorageLevel.useMemory
res11: Boolean = true

How to check if Spark RDD is in memory?

You want RDD.getStorageLevel. It will return StorageLevel.None if empty. However that is only if it is marked for caching or not. If you want the actual status you can use the developer api sc.getRDDStorageInfo or sc.getPersistentRDD

How to convert a Spark rdd containing np.array (or list) to a Spark DataFrame?

The numpy.float64 is not known to Spark's dataframes but you can convert the values in the rdd to floats with:

spark.createDataFrame(rdd.map(lambda x: float(x)), FloatType())

Demo:

rdd.collect()
# [1.1, 2.3, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]

# rdd datatypes are numpy.float64
rdd.map(lambda x: type(x)).collect()
# [numpy.float64,
# numpy.float64,
# . . .
# numpy.float64]

# create dataframe
spark.createDataFrame(rdd.map(lambda x: float(x)), FloatType()).show()
# +-----+
# |value|
# +-----+
# | 1.1|
# | 2.3|
# | 3.0|
# | 4.0|
# | 5.0|
# | 6.0|
# | 7.0|
# | 8.0|
# | 9.0|
# | 10.0|
# +-----+

How to see the type of a scala variable? For instance a Spark PairRDD

One method to obtain the inferred type is to just declare the type, but with a wrong type, for instance:

val x: Int = "foo"

The compiler will complain that x is a String instead of Int, so you now that String is the inferred type with a statement like:

val x = "foo"

If you're using an IDE, you will most likely have a built-in feature that allows you to see the inferred type. For instance, IntelliJ shows the type when hitting Alt + = or Ctrl + q.

In IntelliJ, this feature is called "Quick Documentation" or "Type Info". Go to Settings -> Keymap to see which shortcuts are configured on your machine for those two features.

How to check if a value in a row is empty in spark

  df.map(row=>{
// here I want to see if the downloadUrl is null
// do something

// else if the title is null
// do something

// else
// create a new dataframe df1 with a new column "allowed" with the value set to 1
// push df1 to API
})

Not sure what you mean by if title/downloadUrl is null do something

But if you want a new dataframe that only have rows downloadUrl and title not null. Try using this dataset method

case class MyObject(id:Int, downloadUrl: String, title: String)
val df = spark.read.json("C:\\filepath\\file.json").as[MyObject]
val df1 = df.filter(o => o.downloadUrl =! null && o.title != null)

Another way would be using the filter function as below

val df1 = df.filter(row=>{
val downloadUrl = row.getAs[String]("downloadUrl")
val title = row.getAs[String]("title")
// here I want to see if the downloadUrl is null
// do something

// else if the title is null
// do something

// else
// create a new dataframe df1 with a new column "allowed" with the value set to 1
return title != null && downloadUrl != null
})

Lastly if you want to push reach row to an external API, use a foreach each instead. Then use the predicate to determine whether the row should be pushed

  df.foreach(row=>{
val downloadUrl = row.getAs[String]("downloadUrl")
val title = row.getAs[String]("title")
// here I want to see if the downloadUrl is null
// do something

// else if the title is null
// do something

// else
// create a new dataframe df1 with a new column "allowed" with the value set to 1
if (title != null && downloadUrl != null){
//call the API here
}
})

But in this case we are not creating a new dataframe - df1

How to quickly check if row exists in PySpark Dataframe?

It would be better to create a spark dataframe from the entries that you want to look up, and then do a semi join or an anti join to get the rows that exist or do not exist in the lookup dataframe. This should be more efficient than checking the entries one by one.

import pyspark.sql.functions as F

df = spark.createDataFrame([[2,5],[2,10]],['A','B'])

result1 = df.join(lookup, ['A','B'], 'semi').withColumn('exists', F.lit(True))

result2 = df.join(lookup, ['A','B'], 'anti').withColumn('exists', F.lit(False))

result = result1.unionAll(result2)

result.show()
+---+---+------+
| A| B|exists|
+---+---+------+
| 2| 5| true|
| 2| 10| false|
+---+---+------+


Related Topics



Leave a reply



Submit