Why Do Rapply and Lapply Handle Null Differently

Why do rapply and lapply handle NULL differently?

I think you answered your own question: because it's recursive.

You don't often see this, but NULL can actually be used to indicate an empty sequence, because it is the empty pairlist (similar to how () in Scheme terminates a list. Internally, R is very scheme like).

So, rapply recurses into the empty list, but doesn't bother turning it back into a pairlist when it's done; you get a regular empty list.

Actually, rapply and lapply don't really treat NULL that differently:

> lapply(NULL, identity)
list()

And you can see in the R source code (memory.c) that this is exactly how pairlists are meant to work:

SEXP allocList(int n)
{
int i;
SEXP result;
result = R_NilValue;
for (i = 0; i < n; i++)
result = CONS(R_NilValue, result);
return result;
}

How to manipulate NULL elements in a nested list?

I'm going to go with "use a version of rapply doesn't doesn't have weird behaviour with NULL". This is the simplest implementation I can think of:

simple_rapply <- function(x, fn)
{
if(is.list(x))
{
lapply(x, simple_rapply, fn)
} else
{
fn(x)
}
}

(rawr::rapply2, as mentioned in the comments by @rawr is a more sophisticated attempt.)

Now I can do the replacement using

simple_rapply(l, function(x) if(is.null(x)) NA else x)

Is there a way to handle the first lapply element differently? or prepend it to the lapply result?

I just worked out a solution that does what I wanted in a clean way without code duplicates:

# read the xml children
xml <- xml_children(read_xml(paste("<xml>", paste(xmlvec, collapse=""), "</xml>")))
xml
# process the first element separately
y <- xml_attrs(xml[[1]])[mapping]
if (any(is.na(names(y)))) {
y <- y[-which(is.na(names(y)))]
}
y[setdiff(mapping, names(y))] <- NA
y <- y[order(factor(names(y), levels=mapping))]
y
# process the remaining elements in lapply prepending the first
df <- as.data.frame(do.call(
rbind,
c(list(y),
lapply(xml[2:length(xml)],
function(x) {
xml_attrs(x)[mapping]
}
)
)
), stringsAsFactors = FALSE)
df

lapply() emptied list step by step while processing

The reason is that the assignment is taking place inside a function, and you've used the normal assignment operator <-, rather than the superassignment operator <<-. When inside a function scope, IOW when a function is executed, the normal assignment operator always assigns to a local variable in the evaluation environment that is created for that particular evaluation of that function (returned by a call to environment() from inside the function with fun=NULL). Thus, your global other.list variable, which is defined in the global environment (returned by globalenv()), will not be touched by such an assignment. The superassignment operator, on the other hand, will follow the closure environment chain (can be followed recursively via parent.env()) back until it finds a variable with the name on the LHS of the assignment, and then it assigns to that. The global environment is always at the base of the closure environment chain. If no such variable is found, the superassignment operator creates one in the global environment.

Thus, if you change <- to <<- in the assignment that takes place inside the function, you will be able to modify the global other.list variable.

See https://stat.ethz.ch/R-manual/R-devel/library/base/html/assignOps.html.

Here, I tried to make a little demo to demonstrate these concepts. In all my assignments, I'm assigning the actual environment that contains the variable being assigned to:

oldGlobal <- environment(); ## environment() is same as globalenv() in global scope
(function() {
newLocal1 <- environment(); ## creates a new local variable in this function evaluation's evaluation environment
print(newLocal1); ## <environment: 0x6014cbca8> (different for every evaluation)
oldGlobal <<- parent.env(environment()); ## target search hits oldGlobal in closure environment; RHS is same as globalenv()
newGlobal1 <<- globalenv(); ## target search fails; creates a new variable in the global environment
(function() {
newLocal2 <- environment(); ## creates a new local variable in this function evaluation's evaluation environment
print(newLocal2); ## <environment: 0x6014d2160> (different for every evaluation)
newLocal1 <<- parent.env(environment()); ## target search hits the existing newLocal1 in closure environment
print(newLocal1); ## same value that was already in newLocal1
oldGlobal <<- parent.env(parent.env(environment())); ## target search hits oldGlobal two closure environments up in the chain; RHS is same as globalenv()
newGlobal2 <<- globalenv(); ## target search fails; creates a new variable in the global environment
})();
})();
oldGlobal; ## <environment: R_GlobalEnv>
newGlobal1; ## <environment: R_GlobalEnv>
newGlobal2; ## <environment: R_GlobalEnv>

What is the difference between NULL and character(0) in R?

The R Language Definition has this on NULL:

There is a special object called NULL. It is used whenever there is a need to indicate or
specify that an object is absent. It should not be confused with a vector or list of zero
length. The NULL object has no type and no modifiable properties. There is only one NULL
object in R, to which all instances refer. To test for NULL use is.null. You cannot set
attributes on NULL.

So by definition NULL is very different to zero length vectors. A zero length vector very much isn't absent. NULL is really a catch-all for something that is absent or not set, but not missing-ness, which is the job of NA. There is an exception, the zero-length pairlist, as mentioned by @Owen. The Language Definition states:

A zero-length pairlist is NULL, as would be expected in Lisp but in contrast to a zero-length list.

which highlights the exception in this case.

To test for a zero-length vector use something like if(length(foo) == 0L) for example. And combine that with a class check (is.character(foo)) if you want a specific type of zero length vector.

How does Spark 2.0 handle column nullability?

Different DataFrame creation processes are handled differently with respect to null types. It's not really straightforward, because there are at least three different areas that nulls are being handled completely differently.

  1. First, SPARK-15192 is about RowEncoders. And in the case of RowEncoders, there are no nulls allowed, and the error messages have been improved. For example, with the two dozen or so overloading of SparkSession.createDataFrame(), there are quite a few implementations of createDataFrame() that are basically converting an RDD to a DataFrame.
    In my example below no nulls were accepted. So try something similar to converting an RDD to a DataFrame using createDateFrame() method like below and you will get same results...

    val nschema = StructType(Seq(StructField("colA", IntegerType, nullable = false), StructField("colB", IntegerType, nullable = true), StructField("colC", IntegerType, nullable = false), StructField("colD", IntegerType, nullable = true)))
    val intNullsRDD = sc.parallelize(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row(2,null,null,null),org.apache.spark.sql.Row(null,3,null,null),org.apache.spark.sql.Row(null,null,null,4)))
    spark.createDataFrame(intNullsRDD, schema).show()

In Spark 2.1.1, the error message is pretty nice.

17/11/23 21:30:37 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 6)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: The 0th field 'colA' of input row cannot be null.
validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, colA), IntegerType) AS colA#73
+- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, colA), IntegerType)
+- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, colA)
+- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
+- input[0, org.apache.spark.sql.Row, true]

Stepping through the code, you can see where this happens. Way below in the doGenCode() method there is the validation. And immediately below, when the RowEncoder object is being created with val encoder = RowEncoder(schema), that logic begins.

     @DeveloperApi
@InterfaceStability.Evolving
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = {
createDataFrame(rowRDD, schema, needsConversion = true)
}

private[sql] def createDataFrame(
rowRDD: RDD[Row],
schema: StructType,
needsConversion: Boolean) = {
// TODO: use MutableProjection when rowRDD is another DataFrame and the applied
// schema differs from the existing schema on any field data type.
val catalystRows = if (needsConversion) {
val encoder = RowEncoder(schema)
rowRDD.map(encoder.toRow)
} else {
rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)}
}
val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
Dataset.ofRows(self, logicalPlan)
}

After stepping through this logic more, here is that improved message in objects.scala and this is where the code handles null values. Actually the error message is passed into ctx.addReferenceObj(errMsg) but you get the idea.

 case class GetExternalRowField(
child: Expression,
index: Int,
fieldName: String) extends UnaryExpression with NonSQLExpression {

override def nullable: Boolean = false
override def dataType: DataType = ObjectType(classOf[Object])
override def eval(input: InternalRow): Any =
throw new UnsupportedOperationException("Only code-generated evaluation is supported")

private val errMsg = s"The ${index}th field '$fieldName' of input row cannot be null."

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
// Use unnamed reference that doesn't create a local field here to reduce the number of fields
// because errMsgField is used only when the field is null.
val errMsgField = ctx.addReferenceObj(errMsg)
val row = child.genCode(ctx)
val code = s"""
${row.code}

if (${row.isNull}) {
throw new RuntimeException("The input external row cannot be null.");
}

if (${row.value}.isNullAt($index)) {
throw new RuntimeException($errMsgField);
}

final Object ${ev.value} = ${row.value}.get($index);
"""
ev.copy(code = code, isNull = "false")
}
}

  1. Something completely different happens when pulling from an HDFS data source. In this case there will be no error message when there is a non-nullable column and a null comes in. The column still accepts null values. Check out the quick testFile "testFile.csv" I created and then put it into hdfs hdfs dfs -put testFile.csv /data/nullTest

       |colA|colB|colC|colD| 
    | | | | |
    | | 2| 2| 2|
    | | 3| | |
    | 4| | | |

When I read from the file below with the same nschema schema, all of the blank values became null, even if the field was non-nullable. There are ways of how to handle blanks differently, but this is the default. Both csv and parquet had the same results.

val nschema = StructType(Seq(StructField("colA", IntegerType, nullable = true), StructField("colB", IntegerType, nullable = true), StructField("colC", IntegerType, nullable = true), StructField("colD", IntegerType, nullable = true)))
val jListNullsADF = spark.createDataFrame(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row(2,null,null,null),org.apache.spark.sql.Row(null,3,null,null),org.apache.spark.sql.Row(null,null,null,4)).asJava,nschema)
jListNullsADF.write.format("parquet").save("/data/parquetnulltest")
spark.read.format("parquet").schema(schema).load("/data/parquetnulltest").show()

+----+----+----+----+
|colA|colB|colC|colD|
+----+----+----+----+
|null|null|null|null|
|null| 2| 2| 2|
|null|null| 3|null|
|null| 4|null| 4|
+----+----+----+----+

The cause of the nulls being allowed starts with the DataFrameReader creation where a call is made to baseRelationToDataFrame() in DataFramerReader.scala. baseRelationToDataFrame() in SparkSession.scala uses a QueryPlan class in the method and the QueryPlan is recreating the StructType. The method fromAttributes() which always has nullable fields is basically the same schema as the original one but forces nullability. Thus, by the time it gets back RowEncoder(), it is now a nullable version of the original schema.

Immediately below in DataFrameReader.scala you can see the baseRelationToDataFrame() call...

  @scala.annotation.varargs
def load(paths: String*): DataFrame = {
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap).resolveRelation())
}

Immediately below in the file SparkSession.scala you can see the Dataset.ofRows(self: SparkSession, lr: LogicalRelation) method is being called, pay close attention to the LogicalRelation plan constructor.

  def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
Dataset.ofRows(self, LogicalRelation(baseRelation))
}

In Dataset.scala, the analyzed QueryPlan object's schema property is being passed as the third argument to create the Dataset in new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema)).

  def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
val qe = sparkSession.sessionState.executePlan(logicalPlan)
qe.assertAnalyzed()
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
}
}

In QueryPlan.scala the StructType.fromAttributes() method is being used

 lazy val schema: StructType = StructType.fromAttributes(output)

And finally in StructType.scala the nullable property is always nullable.

  private[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))

About the query plan being different based on nullability, I think it is totally possible that the LogicalPlan was different based on whether a column was nullable or not. A lot of information is passed into that object and there is a lot of subsequent logic to creeate the plan. But it is not being kept nullable when it is actually writing the dataframe, as we saw a second ago.


  1. The third case is dependent on DataType. When you create a DataFrame using the method createDataFrame(rows: java.util.List[Row], schema: StructType) it will actually create zeros where there is a null passed into a non-nullable IntegerType field. You can see the example below...

      val schema = StructType(Seq(StructField("colA", IntegerType, nullable = false), StructField("colB", IntegerType, nullable = true), StructField("colC", IntegerType, nullable = false), StructField("colD", IntegerType, nullable = true))) 
    val jListNullsDF = spark.createDataFrame(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row(2,null,null,null),org.apache.spark.sql.Row(null,3,null,null),org.apache.spark.sql.Row(null,null,null,4)).asJava,schema)
    jListNullsDF.show()

    +----+----+----+----+
    |colA|colB|colC|colD|
    +----+----+----+----+
    | 0|null| 0|null|
    | 2|null| 0|null|
    | 0| 3| 0|null|
    | 0|null| 0| 4|
    +----+----+----+----+

It looks like there is logic in org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt() that substitutes zeros for nulls. However, with non-nullable StringType fields, nulls are not handled as gracefully.

   val strschema = StructType(Seq(StructField("colA", StringType, nullable = false), StructField("colB", StringType, nullable = true), StructField("colC", StringType, nullable = false), StructField("colD", StringType, nullable = true)))
val strNullsRDD = sc.parallelize(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row("r2colA",null,null,null),org.apache.spark.sql.Row(null,"r3colC",null,null),org.apache.spark.sql.Row(null,null,null,"r4colD")))
spark.createDataFrame(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row("r2cA",null,null,null),org.apache.spark.sql.Row(null,"row3cB",null,null),org.apache.spark.sql.Row(null,null,null,"row4ColD")).asJava,strschema).show()

but below is the not very helpful error message that doesn't specify the ordinal position of the field...

java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:210)

aggregate methods treat missing values (NA) differently

Good question, but in my opinion, this shouldn't have caused a major debugging headache because it is documented quite clearly in multiple places in the manual page for aggregate.

First, in the usage section:

## S3 method for class 'formula'
aggregate(formula, data, FUN, ...,
subset, na.action = na.omit)

Later, in the description:

na.action: a function which indicates what should happen when the data contain NA values. The default is to ignore missing values in the given variables.


I can't answer why the formula mode was written differently---that's something the function authors would have to answer---but using the above information, you can probably use the following:

aggregate(.~Name, M, FUN=sum, na.rm=TRUE, na.action=NULL)
# Name Col1 Col2
# 1 name 1 2


Related Topics



Leave a reply



Submit