Check Type: How to check if something is a RDD or a DataFrame?
isinstance
will work just fine:
from pyspark.sql import DataFrame
from pyspark.rdd import RDD
def foo(x):
if isinstance(x, RDD):
return "RDD"
if isinstance(x, DataFrame):
return "DataFrame"
foo(sc.parallelize([]))
## 'RDD'
foo(sc.parallelize([("foo", 1)]).toDF())
## 'DataFrame'
but single dispatch is much more elegant approach:
from functools import singledispatch
@singledispatch
def bar(x):
pass
@bar.register(RDD)
def _(arg):
return "RDD"
@bar.register(DataFrame)
def _(arg):
return "DataFrame"
bar(sc.parallelize([]))
## 'RDD'
bar(sc.parallelize([("foo", 1)]).toDF())
## 'DataFrame'
If you don't mind additional dependencies multipledispatch
is also an interesting option:
from multipledispatch import dispatch
@dispatch(RDD)
def baz(x):
return "RDD"
@dispatch(DataFrame)
def baz(x):
return "DataFrame"
baz(sc.parallelize([]))
## 'RDD'
baz(sc.parallelize([("foo", 1)]).toDF())
## 'DataFrame'
Finally the most Pythonic approach is to simply check an interface:
def foobar(x):
if hasattr(x, "rdd"):
## It is a DataFrame
else:
## It (probably) is a RDD
How can I check whether my RDD or dataframe is cached or not?
You can call getStorageLevel.useMemory
on the Dataframe and the RDD to find out if the dataset is in memory.
For the Dataframe do this:
scala> val df = Seq(1, 2).toDF()
df: org.apache.spark.sql.DataFrame = [value: int]
scala> df.storageLevel.useMemory
res1: Boolean = false
scala> df.cache()
res0: df.type = [value: int]
scala> df.storageLevel.useMemory
res1: Boolean = true
For the RDD do this:
scala> val rdd = sc.parallelize(Seq(1,2))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:21
scala> rdd.getStorageLevel.useMemory
res9: Boolean = false
scala> rdd.cache()
res10: rdd.type = ParallelCollectionRDD[1] at parallelize at <console>:21
scala> rdd.getStorageLevel.useMemory
res11: Boolean = true
How to check if Spark RDD is in memory?
You want RDD.getStorageLevel
. It will return StorageLevel.None
if empty. However that is only if it is marked for caching or not. If you want the actual status you can use the developer api sc.getRDDStorageInfo
or sc.getPersistentRDD
How to convert a Spark rdd containing np.array (or list) to a Spark DataFrame?
The numpy.float64
is not known to Spark's dataframes but you can convert the values in the rdd to floats with:
spark.createDataFrame(rdd.map(lambda x: float(x)), FloatType())
Demo:
rdd.collect()
# [1.1, 2.3, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
# rdd datatypes are numpy.float64
rdd.map(lambda x: type(x)).collect()
# [numpy.float64,
# numpy.float64,
# . . .
# numpy.float64]
# create dataframe
spark.createDataFrame(rdd.map(lambda x: float(x)), FloatType()).show()
# +-----+
# |value|
# +-----+
# | 1.1|
# | 2.3|
# | 3.0|
# | 4.0|
# | 5.0|
# | 6.0|
# | 7.0|
# | 8.0|
# | 9.0|
# | 10.0|
# +-----+
How to see the type of a scala variable? For instance a Spark PairRDD
One method to obtain the inferred type is to just declare the type, but with a wrong type, for instance:
val x: Int = "foo"
The compiler will complain that x is a String
instead of Int
, so you now that String
is the inferred type with a statement like:
val x = "foo"
If you're using an IDE, you will most likely have a built-in feature that allows you to see the inferred type. For instance, IntelliJ shows the type when hitting Alt + = or Ctrl + q.
In IntelliJ, this feature is called "Quick Documentation" or "Type Info". Go to Settings -> Keymap to see which shortcuts are configured on your machine for those two features.
How to check if a value in a row is empty in spark
df.map(row=>{
// here I want to see if the downloadUrl is null
// do something
// else if the title is null
// do something
// else
// create a new dataframe df1 with a new column "allowed" with the value set to 1
// push df1 to API
})
Not sure what you mean by if title/downloadUrl is null do something
But if you want a new dataframe that only have rows downloadUrl and title not null. Try using this dataset method
case class MyObject(id:Int, downloadUrl: String, title: String)
val df = spark.read.json("C:\\filepath\\file.json").as[MyObject]
val df1 = df.filter(o => o.downloadUrl =! null && o.title != null)
Another way would be using the filter function as below
val df1 = df.filter(row=>{
val downloadUrl = row.getAs[String]("downloadUrl")
val title = row.getAs[String]("title")
// here I want to see if the downloadUrl is null
// do something
// else if the title is null
// do something
// else
// create a new dataframe df1 with a new column "allowed" with the value set to 1
return title != null && downloadUrl != null
})
Lastly if you want to push reach row to an external API, use a foreach each instead. Then use the predicate to determine whether the row should be pushed
df.foreach(row=>{
val downloadUrl = row.getAs[String]("downloadUrl")
val title = row.getAs[String]("title")
// here I want to see if the downloadUrl is null
// do something
// else if the title is null
// do something
// else
// create a new dataframe df1 with a new column "allowed" with the value set to 1
if (title != null && downloadUrl != null){
//call the API here
}
})
But in this case we are not creating a new dataframe - df1
How to quickly check if row exists in PySpark Dataframe?
It would be better to create a spark dataframe from the entries that you want to look up, and then do a semi join
or an anti join
to get the rows that exist or do not exist in the lookup dataframe. This should be more efficient than checking the entries one by one.
import pyspark.sql.functions as F
df = spark.createDataFrame([[2,5],[2,10]],['A','B'])
result1 = df.join(lookup, ['A','B'], 'semi').withColumn('exists', F.lit(True))
result2 = df.join(lookup, ['A','B'], 'anti').withColumn('exists', F.lit(False))
result = result1.unionAll(result2)
result.show()
+---+---+------+
| A| B|exists|
+---+---+------+
| 2| 5| true|
| 2| 10| false|
+---+---+------+
Related Topics
How to Read from S3 in Pyspark Running in Local Mode
Defining and Calling a Function Within a Python Class
Replace Single Quote to Double Quote Python Pandas Dataframe
Matplotlib Rotate Image File by X Degrees
How to Increment a Variable on a for Loop in Jinja Template
Missing 1 Required Positional Argument - Issue
Python - Using Regex to Find Multiple Matches and Print Them Out
Valueerror: Invalid \Escape Unable to Load Json from File
List of the Most Recently Updated Files in Python
How to Read Image Data from a Url in Python
How to Get Rid of the B-Prefix in a String in Python
Running Two Python Scripts With Bash File
How to Extract All Upper from a String - Python
How to Change Milliseconds to Seconds in Python
Pandas Fill in Missing Date Within Each Group With Information in the Previous Row
How to Scroll a Web Page Using Selenium Webdriver in Python