How to split single row into multiple rows in Spark DataFrame using Java
make sure you have unique column names, denn you can do :
import or.apache.spark.sql.functions._
table
.select("id","movie",explode(array("cast1", "cast2", "cast3", "cast4")).as("cast"))
.where(col("cast").isNotNull)
How to split single row in to multiple rows in Java Spark
A new FlatMapFunction did the job:
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
/**
* id var_a var_b
* -- ----- -----
* 01 1 2
* 02 3 0
*
* becomes
*
* id var_name var_value
* -- -------- ---------
* 01 var_a 1
* 01 var_b 2
* 02 var_a 3
* 02 var_b 0
*
*/
public class OneToManyMapFunction implements FlatMapFunction<Row, Row> {
//indexes of fields that won't change in the new rows (id)
private int[] fixedFields = {0};
//indexes of fields that will create new rows (var_a, var_b)
private int[] dynamicFields = {1, 2};
//names of the dynamic fields
private String[] dynamicFieldsName = {"var_a", "var_b"};
public OneToManyMapFunction() {}
@Override
public Iterable<Row> call(Row row) throws Exception {
List<Row> rows = new ArrayList<Row>();
Object[] fixedValues = ArrayUtils.EMPTY_OBJECT_ARRAY;
//add values that won't change in the new rows
for (int i = 0; i < fixedFields.length; i++) {
fixedValues = ArrayUtils.add(fixedValues, row.get(fixedFields[i]));
}
//create new rows
for (int i = 0; i < dynamicFields.length; i++) {
//copy fixed values (id)
Object[] values = ArrayUtils.clone(fixedValues);
//add dynamic value name (var_a or var_b)
values = ArrayUtils.add(values, dynamicFieldsName[i]);
//add dynamic value
values = ArrayUtils.add(values, row.get(dynamicFields[i]));
//create new row for dynamic val
Row newRow = RowFactory.create(values);
rows.add(newRow);
}
return rows;
}
}
scala split single row to multiple rows based on time column
You can achieve this in spark 2.4+ with the following transformations:
- Split the WatchTime in an array of 3600 seconds with
sequence
higher-order function - Explode the array to generate the new rows
- Adjust the Hour and WatchTime values for each rows
- Remove all rows with a zero WatchTime
val result = df
.withColumn("stamps", sequence(lit(0), 'WatchTime, lit(3600)))
.withColumn("offset", explode('stamps))
.withColumn("Hour", 'Hour + ('offset/3600).cast("int"))
.withColumn("WatchTime", 'WatchTime - 'offset)
.withColumn("WatchTime", when('WatchTime <= 3600, 'WatchTime).otherwise(3600))
.filter('WatchTime > 0)
.drop("stamps","offset")
result.show()
+------+-------------------+----+----------+---------+
|u_name| Date|Hour|Content_id|WatchTime|
+------+-------------------+----+----------+---------+
| user1|2019-07-28 00:00:00| 21| 100| 3600|
| user1|2019-07-28 00:00:00| 22| 100| 3600|
| user1|2019-07-28 00:00:00| 23| 100| 3600|
| user2|2019-07-28 00:00:00| 20| 101| 3600|
| user3|2019-07-28 00:00:00| 21| 202| 3600|
| user3|2019-07-28 00:00:00| 22| 202| 3400|
+------+-------------------+----+----------+---------+
This algorithm may generate hours higher than 23.
If you need accurate Date and Hour information, I'd advice you to use single unix timestamp column combining the start date and hour since it will let you do time manipulation and proper conversion to date and hour when needed.
It would look like this:
val result = df
.withColumn("StartDateTime", unix_timestamp('Date) + ('Hour * 3600 ))
.withColumn("stamps", sequence(lit(0), 'WatchTime, lit(3600)))
.withColumn("offset", explode('stamps))
.withColumn("StartDateTime", from_unixtime('StartDateTime + 'offset))
.withColumn("WatchTime", when('WatchTime - 'offset>3600,3600).otherwise('WatchTime - 'offset))
.filter('WatchTime > 0)
.select('u_name, 'content_id, 'StartDateTime, 'WatchTime)
result.show
+------+----------+-------------------+---------+
|u_name|content_id| StartDateTime|WatchTime|
+------+----------+-------------------+---------+
| user1| 100|2019-07-28 21:00:00| 3600|
| user1| 100|2019-07-28 22:00:00| 3600|
| user1| 100|2019-07-28 23:00:00| 3600|
| user2| 101|2019-07-28 20:00:00| 3600|
| user3| 202|2019-07-28 21:00:00| 3600|
| user3| 202|2019-07-28 22:00:00| 3400|
+------+----------+-------------------+---------+
How to split column into multiple rows using Spark JavaRDD
The first part is quite simple to convert from Scala to Java, you only need to use map
to split each line by comma to get a JavaRDD<String[]>
. Then using flatMap
, for each row, split the last part of the array corresponding to Name
, and using java streams, you can transform each element of the names list into a new list.
Here is a complete example:
JavaRDD<String> input = JSC.parallelize(
Arrays.asList("1,23,50,Harry;Potter", "2,24,60,Hermione;Granger")
);
JavaRDD<String[]> result = input.map(line -> line.split(","))
.flatMap(r -> {
List<String> names = Arrays.asList(r[3].split(";"));
String[][] values = names.stream()
.map(name -> new String[]{r[0], r[1], r[2], name})
.toArray(String[][]::new);
return Arrays.asList(values).iterator();
});
// print the result RDD
for (String[] line : result.collect()) {
System.out.println(Arrays.toString(line));
}
// [1, 23, 50, Harry]
// [1, 23, 50, Potter]
// [2, 24, 60, Hermione]
// [2, 24, 60, Granger]
how to split row into multiple rows on the basis of date using spark scala?
I have done with creating an UDF like below.
This UDF will create an array of dates(dates from all the months inclusive start and end dates) if pa_start_date
and the number of months between the pa_start_date
and pa_end_date
passed as parameters.
def udfFunc: ((Date, Long) => Array[String]) = {
(d, l) =>
{
var t = LocalDate.fromDateFields(d)
val dates: Array[String] = new Array[String](l.toInt)
for (i <- 0 until l.toInt) {
println(t)
dates(i) = t.toString("YYYY-MM-dd")
t = LocalDate.fromDateFields(t.toDate()).plusMonths(1)
}
dates
}
}
val my_udf = udf(udfFunc)
And the final dataframe is created as below.
val df = ss.read.format("csv").option("header", true).load(path)
.select($"p_id", $"pa_id", $"p_st_date", $"p_end_date", $"pa_start_date", $"pa_end_date",
my_udf(to_date(col("pa_start_date"), "dd-MMM-yy"), ceil(months_between(to_date(col("pa_end_date"), "dd-MMM-yy"), to_date(col("pa_start_date"), "dd-MMM-yy")))).alias("udf")) // gives array of dates from UDF
.withColumn("after_divide", explode($"udf")) // divide array of dates to individual rows
.withColumn("period_end_date", date_format(last_day($"after_divide"), "dd-MMM-yy")) // fetching the end_date for the particular date
.drop("udf")
.withColumn("row_number", row_number() over (Window.partitionBy("p_id", "pa_id", "p_st_date", "p_end_date", "pa_start_date", "pa_end_date").orderBy(col("after_divide").asc))) // just helper column for calculating `period_start_date` below
.withColumn("period_start_date", date_format(when(col("row_number").isin(1), $"after_divide").otherwise(trunc($"after_divide", "month")), "dd-MMM-yy"))
.drop("after_divide")
.drop("row_number") // dropping all the helper columns which is not needed in output.
And here is the output.
+----+-----+---------+----------+-------------+-----------+---------------+-----------------+
|p_id|pa_id|p_st_date|p_end_date|pa_start_date|pa_end_date|period_end_date|period_start_date|
+----+-----+---------+----------+-------------+-----------+---------------+-----------------+
| p1| pa3| 1-Jan-17| 1-Dec-17| 9-Feb-17| 20-Apr-17| 28-Feb-17| 09-Feb-17|
| p1| pa3| 1-Jan-17| 1-Dec-17| 9-Feb-17| 20-Apr-17| 31-Mar-17| 01-Mar-17|
| p1| pa3| 1-Jan-17| 1-Dec-17| 9-Feb-17| 20-Apr-17| 30-Apr-17| 01-Apr-17|
| p1| pa2| 3-Jan-18| 8-Dec-18| 6-Mar-18| 10-Nov-18| 31-Mar-18| 06-Mar-18|
| p1| pa2| 3-Jan-18| 8-Dec-18| 6-Mar-18| 10-Nov-18| 30-Apr-18| 01-Apr-18|
| p1| pa2| 3-Jan-18| 8-Dec-18| 6-Mar-18| 10-Nov-18| 31-May-18| 01-May-18|
| p1| pa2| 3-Jan-18| 8-Dec-18| 6-Mar-18| 10-Nov-18| 30-Jun-18| 01-Jun-18|
| p1| pa2| 3-Jan-18| 8-Dec-18| 6-Mar-18| 10-Nov-18| 31-Jul-18| 01-Jul-18|
| p1| pa2| 3-Jan-18| 8-Dec-18| 6-Mar-18| 10-Nov-18| 31-Aug-18| 01-Aug-18|
| p1| pa2| 3-Jan-18| 8-Dec-18| 6-Mar-18| 10-Nov-18| 30-Sep-18| 01-Sep-18|
| p1| pa2| 3-Jan-18| 8-Dec-18| 6-Mar-18| 10-Nov-18| 31-Oct-18| 01-Oct-18|
| p1| pa2| 3-Jan-18| 8-Dec-18| 6-Mar-18| 10-Nov-18| 30-Nov-18| 01-Nov-18|
| p1| pa1| 2-Jan-18| 5-Dec-18| 2-Mar-18| 8-Aug-18| 31-Mar-18| 02-Mar-18|
| p1| pa1| 2-Jan-18| 5-Dec-18| 2-Mar-18| 8-Aug-18| 30-Apr-18| 01-Apr-18|
| p1| pa1| 2-Jan-18| 5-Dec-18| 2-Mar-18| 8-Aug-18| 31-May-18| 01-May-18|
| p1| pa1| 2-Jan-18| 5-Dec-18| 2-Mar-18| 8-Aug-18| 30-Jun-18| 01-Jun-18|
| p1| pa1| 2-Jan-18| 5-Dec-18| 2-Mar-18| 8-Aug-18| 31-Jul-18| 01-Jul-18|
| p1| pa1| 2-Jan-18| 5-Dec-18| 2-Mar-18| 8-Aug-18| 31-Aug-18| 01-Aug-18|
+----+-----+---------+----------+-------------+-----------+---------------+-----------------+
Spark split a column value into multiple rows
This is what you could do, split the string with pipe and explode the data using spark function
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq(("a1", "b1", "c1|c2|c3|c4")).toDF("A", "B", "C")
df.withColumn("C", explode(split($"C", "\\|"))).show
Output:
+---+---+---+
| A| B| C|
+---+---+---+
| a1| b1| c1|
| a1| b1| c2|
| a1| b1| c3|
| a1| b1| c4|
+---+---+---+
Hope this helps!
How to split pipe-separated column into multiple rows?
I'd use split
standard function.
scala> movies.show(truncate = false)
+-------+---------+-----------------------+
|movieId|movieName|genre |
+-------+---------+-----------------------+
|1 |example1 |action|thriller|romance|
|2 |example2 |fantastic|action |
+-------+---------+-----------------------+
scala> movies.withColumn("genre", explode(split($"genre", "[|]"))).show
+-------+---------+---------+
|movieId|movieName| genre|
+-------+---------+---------+
| 1| example1| action|
| 1| example1| thriller|
| 1| example1| romance|
| 2| example2|fantastic|
| 2| example2| action|
+-------+---------+---------+
// You can use \\| for split instead
scala> movies.withColumn("genre", explode(split($"genre", "\\|"))).show
+-------+---------+---------+
|movieId|movieName| genre|
+-------+---------+---------+
| 1| example1| action|
| 1| example1| thriller|
| 1| example1| romance|
| 2| example2|fantastic|
| 2| example2| action|
+-------+---------+---------+
p.s. You could use Dataset.flatMap
to achieve the same result which is something Scala devs would enjoy more I'm sure.
Related Topics
Read Huge Excel File(500K Rows) in Java
Kafka: Failed to Update Metadata After 60000 Ms
How to Clean Project Cache in Intellij Idea Like Eclipse'S Clean
Batch Inserts Using JPA Entitymanager
Spring Boot Required Request Part 'File' Is Not Present
How to Write Unit Test for a Setter Method Which Does Not Have a Getter Method
Maven- Not Downloading New Added Dependency in Pom.Xml File
How to Link Feature and Step Definition in Cucumber
Changing Scene When a Button Is Pressed Fxml
Default Value in Lombok. How to Init Default With Both Constructor and Builder
Kafka Consumer in Java Not Consuming Messages
How to Parallelize a Foreach Loop in Java
Limiting the Number of Characters in a String, and Chopping Off the Rest
In Java How to Extract Exact Domain Name from Url
How to Use Limit in Spring Within SQL Query
In Activity.Oncreate(), Why Does Intent.Getextras() Sometimes Return Null