How to Split Single Row into Multiple Rows in Spark Dataframe Using Java

How to split single row into multiple rows in Spark DataFrame using Java

make sure you have unique column names, denn you can do :

import or.apache.spark.sql.functions._

table
.select("id","movie",explode(array("cast1", "cast2", "cast3", "cast4")).as("cast"))
.where(col("cast").isNotNull)

How to split single row in to multiple rows in Java Spark

A new FlatMapFunction did the job:

import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;

/**
* id var_a var_b
* -- ----- -----
* 01 1 2
* 02 3 0
*
* becomes
*
* id var_name var_value
* -- -------- ---------
* 01 var_a 1
* 01 var_b 2
* 02 var_a 3
* 02 var_b 0
*
*/
public class OneToManyMapFunction implements FlatMapFunction<Row, Row> {

//indexes of fields that won't change in the new rows (id)
private int[] fixedFields = {0};
//indexes of fields that will create new rows (var_a, var_b)
private int[] dynamicFields = {1, 2};
//names of the dynamic fields
private String[] dynamicFieldsName = {"var_a", "var_b"};

public OneToManyMapFunction() {}

@Override
public Iterable<Row> call(Row row) throws Exception {

List<Row> rows = new ArrayList<Row>();
Object[] fixedValues = ArrayUtils.EMPTY_OBJECT_ARRAY;

//add values that won't change in the new rows
for (int i = 0; i < fixedFields.length; i++) {
fixedValues = ArrayUtils.add(fixedValues, row.get(fixedFields[i]));
}

//create new rows
for (int i = 0; i < dynamicFields.length; i++) {
//copy fixed values (id)
Object[] values = ArrayUtils.clone(fixedValues);

//add dynamic value name (var_a or var_b)
values = ArrayUtils.add(values, dynamicFieldsName[i]);
//add dynamic value
values = ArrayUtils.add(values, row.get(dynamicFields[i]));

//create new row for dynamic val
Row newRow = RowFactory.create(values);
rows.add(newRow);
}

return rows;
}

}

scala split single row to multiple rows based on time column

You can achieve this in spark 2.4+ with the following transformations:

  • Split the WatchTime in an array of 3600 seconds with sequence higher-order function
  • Explode the array to generate the new rows
  • Adjust the Hour and WatchTime values for each rows
  • Remove all rows with a zero WatchTime
val result = df
.withColumn("stamps", sequence(lit(0), 'WatchTime, lit(3600)))
.withColumn("offset", explode('stamps))
.withColumn("Hour", 'Hour + ('offset/3600).cast("int"))
.withColumn("WatchTime", 'WatchTime - 'offset)
.withColumn("WatchTime", when('WatchTime <= 3600, 'WatchTime).otherwise(3600))
.filter('WatchTime > 0)
.drop("stamps","offset")

result.show()
+------+-------------------+----+----------+---------+
|u_name| Date|Hour|Content_id|WatchTime|
+------+-------------------+----+----------+---------+
| user1|2019-07-28 00:00:00| 21| 100| 3600|
| user1|2019-07-28 00:00:00| 22| 100| 3600|
| user1|2019-07-28 00:00:00| 23| 100| 3600|
| user2|2019-07-28 00:00:00| 20| 101| 3600|
| user3|2019-07-28 00:00:00| 21| 202| 3600|
| user3|2019-07-28 00:00:00| 22| 202| 3400|
+------+-------------------+----+----------+---------+

This algorithm may generate hours higher than 23.
If you need accurate Date and Hour information, I'd advice you to use single unix timestamp column combining the start date and hour since it will let you do time manipulation and proper conversion to date and hour when needed.

It would look like this:

val result = df
.withColumn("StartDateTime", unix_timestamp('Date) + ('Hour * 3600 ))
.withColumn("stamps", sequence(lit(0), 'WatchTime, lit(3600)))
.withColumn("offset", explode('stamps))
.withColumn("StartDateTime", from_unixtime('StartDateTime + 'offset))
.withColumn("WatchTime", when('WatchTime - 'offset>3600,3600).otherwise('WatchTime - 'offset))
.filter('WatchTime > 0)
.select('u_name, 'content_id, 'StartDateTime, 'WatchTime)

result.show
+------+----------+-------------------+---------+
|u_name|content_id| StartDateTime|WatchTime|
+------+----------+-------------------+---------+
| user1| 100|2019-07-28 21:00:00| 3600|
| user1| 100|2019-07-28 22:00:00| 3600|
| user1| 100|2019-07-28 23:00:00| 3600|
| user2| 101|2019-07-28 20:00:00| 3600|
| user3| 202|2019-07-28 21:00:00| 3600|
| user3| 202|2019-07-28 22:00:00| 3400|
+------+----------+-------------------+---------+

How to split column into multiple rows using Spark JavaRDD

The first part is quite simple to convert from Scala to Java, you only need to use map to split each line by comma to get a JavaRDD<String[]>. Then using flatMap, for each row, split the last part of the array corresponding to Name, and using java streams, you can transform each element of the names list into a new list.

Here is a complete example:

JavaRDD<String> input = JSC.parallelize(
Arrays.asList("1,23,50,Harry;Potter", "2,24,60,Hermione;Granger")
);

JavaRDD<String[]> result = input.map(line -> line.split(","))
.flatMap(r -> {
List<String> names = Arrays.asList(r[3].split(";"));

String[][] values = names.stream()
.map(name -> new String[]{r[0], r[1], r[2], name})
.toArray(String[][]::new);

return Arrays.asList(values).iterator();
});

// print the result RDD
for (String[] line : result.collect()) {
System.out.println(Arrays.toString(line));
}
// [1, 23, 50, Harry]
// [1, 23, 50, Potter]
// [2, 24, 60, Hermione]
// [2, 24, 60, Granger]

how to split row into multiple rows on the basis of date using spark scala?

I have done with creating an UDF like below.

This UDF will create an array of dates(dates from all the months inclusive start and end dates) if pa_start_date and the number of months between the pa_start_date and pa_end_date passed as parameters.

def udfFunc: ((Date, Long) => Array[String]) = {
(d, l) =>
{
var t = LocalDate.fromDateFields(d)
val dates: Array[String] = new Array[String](l.toInt)
for (i <- 0 until l.toInt) {
println(t)
dates(i) = t.toString("YYYY-MM-dd")
t = LocalDate.fromDateFields(t.toDate()).plusMonths(1)
}
dates
}
}
val my_udf = udf(udfFunc)

And the final dataframe is created as below.

val df = ss.read.format("csv").option("header", true).load(path)
.select($"p_id", $"pa_id", $"p_st_date", $"p_end_date", $"pa_start_date", $"pa_end_date",
my_udf(to_date(col("pa_start_date"), "dd-MMM-yy"), ceil(months_between(to_date(col("pa_end_date"), "dd-MMM-yy"), to_date(col("pa_start_date"), "dd-MMM-yy")))).alias("udf")) // gives array of dates from UDF
.withColumn("after_divide", explode($"udf")) // divide array of dates to individual rows
.withColumn("period_end_date", date_format(last_day($"after_divide"), "dd-MMM-yy")) // fetching the end_date for the particular date
.drop("udf")
.withColumn("row_number", row_number() over (Window.partitionBy("p_id", "pa_id", "p_st_date", "p_end_date", "pa_start_date", "pa_end_date").orderBy(col("after_divide").asc))) // just helper column for calculating `period_start_date` below
.withColumn("period_start_date", date_format(when(col("row_number").isin(1), $"after_divide").otherwise(trunc($"after_divide", "month")), "dd-MMM-yy"))
.drop("after_divide")
.drop("row_number") // dropping all the helper columns which is not needed in output.

And here is the output.

+----+-----+---------+----------+-------------+-----------+---------------+-----------------+
|p_id|pa_id|p_st_date|p_end_date|pa_start_date|pa_end_date|period_end_date|period_start_date|
+----+-----+---------+----------+-------------+-----------+---------------+-----------------+
| p1| pa3| 1-Jan-17| 1-Dec-17| 9-Feb-17| 20-Apr-17| 28-Feb-17| 09-Feb-17|
| p1| pa3| 1-Jan-17| 1-Dec-17| 9-Feb-17| 20-Apr-17| 31-Mar-17| 01-Mar-17|
| p1| pa3| 1-Jan-17| 1-Dec-17| 9-Feb-17| 20-Apr-17| 30-Apr-17| 01-Apr-17|
| p1| pa2| 3-Jan-18| 8-Dec-18| 6-Mar-18| 10-Nov-18| 31-Mar-18| 06-Mar-18|
| p1| pa2| 3-Jan-18| 8-Dec-18| 6-Mar-18| 10-Nov-18| 30-Apr-18| 01-Apr-18|
| p1| pa2| 3-Jan-18| 8-Dec-18| 6-Mar-18| 10-Nov-18| 31-May-18| 01-May-18|
| p1| pa2| 3-Jan-18| 8-Dec-18| 6-Mar-18| 10-Nov-18| 30-Jun-18| 01-Jun-18|
| p1| pa2| 3-Jan-18| 8-Dec-18| 6-Mar-18| 10-Nov-18| 31-Jul-18| 01-Jul-18|
| p1| pa2| 3-Jan-18| 8-Dec-18| 6-Mar-18| 10-Nov-18| 31-Aug-18| 01-Aug-18|
| p1| pa2| 3-Jan-18| 8-Dec-18| 6-Mar-18| 10-Nov-18| 30-Sep-18| 01-Sep-18|
| p1| pa2| 3-Jan-18| 8-Dec-18| 6-Mar-18| 10-Nov-18| 31-Oct-18| 01-Oct-18|
| p1| pa2| 3-Jan-18| 8-Dec-18| 6-Mar-18| 10-Nov-18| 30-Nov-18| 01-Nov-18|
| p1| pa1| 2-Jan-18| 5-Dec-18| 2-Mar-18| 8-Aug-18| 31-Mar-18| 02-Mar-18|
| p1| pa1| 2-Jan-18| 5-Dec-18| 2-Mar-18| 8-Aug-18| 30-Apr-18| 01-Apr-18|
| p1| pa1| 2-Jan-18| 5-Dec-18| 2-Mar-18| 8-Aug-18| 31-May-18| 01-May-18|
| p1| pa1| 2-Jan-18| 5-Dec-18| 2-Mar-18| 8-Aug-18| 30-Jun-18| 01-Jun-18|
| p1| pa1| 2-Jan-18| 5-Dec-18| 2-Mar-18| 8-Aug-18| 31-Jul-18| 01-Jul-18|
| p1| pa1| 2-Jan-18| 5-Dec-18| 2-Mar-18| 8-Aug-18| 31-Aug-18| 01-Aug-18|
+----+-----+---------+----------+-------------+-----------+---------------+-----------------+

Spark split a column value into multiple rows

This is what you could do, split the string with pipe and explode the data using spark function

import org.apache.spark.sql.functions._
import spark.implicits._

val df = Seq(("a1", "b1", "c1|c2|c3|c4")).toDF("A", "B", "C")

df.withColumn("C", explode(split($"C", "\\|"))).show

Output:

+---+---+---+
| A| B| C|
+---+---+---+
| a1| b1| c1|
| a1| b1| c2|
| a1| b1| c3|
| a1| b1| c4|
+---+---+---+

Hope this helps!

How to split pipe-separated column into multiple rows?

I'd use split standard function.

scala> movies.show(truncate = false)
+-------+---------+-----------------------+
|movieId|movieName|genre |
+-------+---------+-----------------------+
|1 |example1 |action|thriller|romance|
|2 |example2 |fantastic|action |
+-------+---------+-----------------------+

scala> movies.withColumn("genre", explode(split($"genre", "[|]"))).show
+-------+---------+---------+
|movieId|movieName| genre|
+-------+---------+---------+
| 1| example1| action|
| 1| example1| thriller|
| 1| example1| romance|
| 2| example2|fantastic|
| 2| example2| action|
+-------+---------+---------+

// You can use \\| for split instead
scala> movies.withColumn("genre", explode(split($"genre", "\\|"))).show
+-------+---------+---------+
|movieId|movieName| genre|
+-------+---------+---------+
| 1| example1| action|
| 1| example1| thriller|
| 1| example1| romance|
| 2| example2|fantastic|
| 2| example2| action|
+-------+---------+---------+

p.s. You could use Dataset.flatMap to achieve the same result which is something Scala devs would enjoy more I'm sure.



Related Topics



Leave a reply



Submit