How to List All Available Patterns in a Scdynamicstorage

Pyspark: get list of files/directories on HDFS path

I believe it's helpful to think of Spark only as a data processing tool, with a domain that begins at loading the data. It can read many formats, and it supports Hadoop glob expressions, which are terribly useful for reading from multiple paths in HDFS, but it doesn't have a builtin facility that I'm aware of for traversing directories or files, nor does it have utilities specific to interacting with Hadoop or HDFS.

There are a few available tools to do what you want, including esutil and hdfs. The hdfs lib supports both CLI and API, you can jump straight to 'how do I list HDFS files in Python' right here. It looks like this:

from hdfs import Config
client = Config().get_client('dev')
files = client.list('the_dir_path')

Find dynamic intervals per group with Sparklyr

Spark ML Bucketizer can be used only for global operations so it won't work for you. Instead you can create a reference table

ref <- purrr::map2(names(list.threshold), 
list.threshold,
function(name, brks) purrr::map2(
c("-Infinity", brks), c(brks, "Infinity"),
function(low, high) list(
name = name,
low = low,
high = high))) %>%
purrr::flatten() %>%
bind_rows() %>%
group_by(name) %>%
arrange(low, .by_group = TRUE) %>%
mutate(simple_grade = row_number() - 1) %>%
copy_to(sc, .) %>%
mutate_at(vars(one_of("low", "high")), as.numeric)
# Source: spark<?> [?? x 4]
name low high simple_grade
<chr> <dbl> <dbl> <dbl>
1 Jane -Inf 3 0
2 Jane 3 5 1
3 Jane 5 8 2
4 Jane 8 Inf 3
5 John -Inf 5 0
6 John 5 7 1
7 John 7 Inf 2
8 Steve -Inf 4 0
9 Steve 4 Inf 1

and then left_join it with the data table:

sdf <- copy_to(sc, data)

simplified <- left_join(sdf, ref, by=c("Person" = "name")) %>%
filter(Grade >= low & Grade < High) %>%
select(-low, -high)
simplified
# Source: spark<?> [?? x 4]
Person Year Grade simple_grade
<chr> <int> <dbl> <dbl>
1 John 1900 6 1
2 John 1901 3 0
3 John 1902 4 0
4 John 1903 4 0
5 John 1904 8 2
6 John 1905 5 1
7 John 1906 2 0
8 John 1907 9 2
9 John 1908 7 2
10 Steve 1902 4 1
# … with more rows
simplified %>% dbplyr::remote_query_plan()
== Physical Plan ==
*(2) Project [Person#132, Year#133, Grade#134, simple_grade#15]
+- *(2) BroadcastHashJoin [Person#132], [name#12], Inner, BuildRight, ((Grade#134 >= low#445) && (Grade#134 < high#446))
:- *(2) Filter (isnotnull(Grade#134) && isnotnull(Person#132))
: +- InMemoryTableScan [Person#132, Year#133, Grade#134], [isnotnull(Grade#134), isnotnull(Person#132)]
: +- InMemoryRelation [Person#132, Year#133, Grade#134], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- Scan ExistingRDD[Person#132,Year#133,Grade#134]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- *(1) Project [name#12, cast(low#13 as double) AS low#445, cast(high#14 as double) AS high#446, simple_grade#15]
+- *(1) Filter ((isnotnull(name#12) && isnotnull(cast(high#14 as double))) && isnotnull(cast(low#13 as double)))
+- InMemoryTableScan [high#14, low#13, name#12, simple_grade#15], [isnotnull(name#12), isnotnull(cast(high#14 as double)), isnotnull(cast(low#13 as double))]
+- InMemoryRelation [name#12, low#13, high#14, simple_grade#15], StorageLevel(disk, memory, deserialized, 1 replicas)
+- Scan ExistingRDD[name#12,low#13,high#14,simple_grade#15]

Allocating memory for dynamic array - The block header has been corrupted (FastMM4)

Solved.
Today I spent the day manually inspecting each line of code. I made quite few changes and finally the problem went away. I haven't tried to see which specific line generated the problem.

Thanks a lot to every body for help !!!

How to perform union on two DataFrames with different amounts of columns in Spark?

In Scala you just have to append all missing columns as nulls.

import org.apache.spark.sql.functions._

// let df1 and df2 the Dataframes to merge
val df1 = sc.parallelize(List(
(50, 2),
(34, 4)
)).toDF("age", "children")

val df2 = sc.parallelize(List(
(26, true, 60000.00),
(32, false, 35000.00)
)).toDF("age", "education", "income")

val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union

def expr(myCols: Set[String], allCols: Set[String]) = {
allCols.toList.map(x => x match {
case x if myCols.contains(x) => col(x)
case _ => lit(null).as(x)
})
}

df1.select(expr(cols1, total):_*).unionAll(df2.select(expr(cols2, total):_*)).show()

+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 50| 2| null| null|
| 34| 4| null| null|
| 26| null| true|60000.0|
| 32| null| false|35000.0|
+---+--------+---------+-------+

Update

Both temporal DataFrames will have the same order of columns, because we are mapping through total in both cases.

df1.select(expr(cols1, total):_*).show()
df2.select(expr(cols2, total):_*).show()

+---+--------+---------+------+
|age|children|education|income|
+---+--------+---------+------+
| 50| 2| null| null|
| 34| 4| null| null|
+---+--------+---------+------+

+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 26| null| true|60000.0|
| 32| null| false|35000.0|
+---+--------+---------+-------+

Java - Dynamic String replacement inside a Reader stream

You just want to subclass BufferedReader.

class MyBufferedReader extends BufferedReader {

MyBufferedReader(Reader r) {
super(r);
}

@Override
String readLine() {
String line = super.readLine();
// perform replacement here
return line;
}

}

Open your file as usual, but instead of wrapping it in a BufferedReader, wrap it in your subclass.

try ( Reader r = ...;
BufferedReader br = new MyBufferedReader(r)) {
String line;
while ((line = br.readLine()) != null) {
// use returned line
}
}

Update

The following is a Reader which will allow you to do line-by-line replacements of an input stream, while still presenting a Reader interface to the user of the stream.

Internally, the original stream is wrapped in a BufferedReader, and read one line at a time. Any desired transformation may be performed on the lines which have been read. The transformed line is then turned into a StringReader. When the user of the stream calls any of the read(...) operations, the request is directed to the buffered StringReader to satisfy. If the StringReader runs out of characters, the next line of the BufferedReader is loaded and transformed, to continue to provide input for the read(...).

abstract public class TranslatingReader extends Reader {

private BufferedReader input;
private StringReader output;

public TranslatingReader(Reader in) {
input = new BufferedReader(in);
output = new StringReader("");
}

abstract public String translate(String line);

@Override
public int read(char[] cbuf, int off, int len) throws IOException {
int read = 0;

while (len > 0) {
int nchars = output.read(cbuf, off, len);
if (nchars == -1) {
String line = input.readLine();
if (line == null) {
break;
}

line = tranlate(line);

line += "\n"; // Add the newline which was removed by readLine()
output = new StringReader(line);
} else {
read += nchars;
off += nchars;
len -= nchars;
}
}

if (read == 0)
read = -1;

return read;
}

@Override
public void close() throws IOException {
input.close();
output.close();
}
}

How to construct an RDD or DataFrame dynamically?

This is less of a spark problem than it is a scala problem. Is the data stored across multiple files?

I would recommend parallelizing by file and then parsing row by row.

For the parsing I would:

  1. Create a case class of what you want the rows to look like (This will allow the schema to be inferred using reflection when creating the DF)
  2. Create a list of name/regex tuples for the parsing like: ("Attribute", regex)
  3. Map over the list of regex and convert to a map: (Attribute -> Option[Value])
  4. Create the case class objects
  5. This should lead to a data structure of List[CaseClass] or RDD[CaseClass] which can be converted to a dataframe. You may need to do additional processing to filter out un-needed rows and to remove the Options.


Related Topics



Leave a reply



Submit