Pyspark: get list of files/directories on HDFS path
I believe it's helpful to think of Spark only as a data processing tool, with a domain that begins at loading the data. It can read many formats, and it supports Hadoop glob expressions, which are terribly useful for reading from multiple paths in HDFS, but it doesn't have a builtin facility that I'm aware of for traversing directories or files, nor does it have utilities specific to interacting with Hadoop or HDFS.
There are a few available tools to do what you want, including esutil and hdfs. The hdfs lib supports both CLI and API, you can jump straight to 'how do I list HDFS files in Python' right here. It looks like this:
from hdfs import Config
client = Config().get_client('dev')
files = client.list('the_dir_path')
Find dynamic intervals per group with Sparklyr
Spark ML Bucketizer
can be used only for global operations so it won't work for you. Instead you can create a reference table
ref <- purrr::map2(names(list.threshold),
list.threshold,
function(name, brks) purrr::map2(
c("-Infinity", brks), c(brks, "Infinity"),
function(low, high) list(
name = name,
low = low,
high = high))) %>%
purrr::flatten() %>%
bind_rows() %>%
group_by(name) %>%
arrange(low, .by_group = TRUE) %>%
mutate(simple_grade = row_number() - 1) %>%
copy_to(sc, .) %>%
mutate_at(vars(one_of("low", "high")), as.numeric)
# Source: spark<?> [?? x 4]
name low high simple_grade
<chr> <dbl> <dbl> <dbl>
1 Jane -Inf 3 0
2 Jane 3 5 1
3 Jane 5 8 2
4 Jane 8 Inf 3
5 John -Inf 5 0
6 John 5 7 1
7 John 7 Inf 2
8 Steve -Inf 4 0
9 Steve 4 Inf 1
and then left_join
it with the data table:
sdf <- copy_to(sc, data)
simplified <- left_join(sdf, ref, by=c("Person" = "name")) %>%
filter(Grade >= low & Grade < High) %>%
select(-low, -high)
simplified
# Source: spark<?> [?? x 4]
Person Year Grade simple_grade
<chr> <int> <dbl> <dbl>
1 John 1900 6 1
2 John 1901 3 0
3 John 1902 4 0
4 John 1903 4 0
5 John 1904 8 2
6 John 1905 5 1
7 John 1906 2 0
8 John 1907 9 2
9 John 1908 7 2
10 Steve 1902 4 1
# … with more rows
simplified %>% dbplyr::remote_query_plan()
== Physical Plan ==
*(2) Project [Person#132, Year#133, Grade#134, simple_grade#15]
+- *(2) BroadcastHashJoin [Person#132], [name#12], Inner, BuildRight, ((Grade#134 >= low#445) && (Grade#134 < high#446))
:- *(2) Filter (isnotnull(Grade#134) && isnotnull(Person#132))
: +- InMemoryTableScan [Person#132, Year#133, Grade#134], [isnotnull(Grade#134), isnotnull(Person#132)]
: +- InMemoryRelation [Person#132, Year#133, Grade#134], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- Scan ExistingRDD[Person#132,Year#133,Grade#134]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- *(1) Project [name#12, cast(low#13 as double) AS low#445, cast(high#14 as double) AS high#446, simple_grade#15]
+- *(1) Filter ((isnotnull(name#12) && isnotnull(cast(high#14 as double))) && isnotnull(cast(low#13 as double)))
+- InMemoryTableScan [high#14, low#13, name#12, simple_grade#15], [isnotnull(name#12), isnotnull(cast(high#14 as double)), isnotnull(cast(low#13 as double))]
+- InMemoryRelation [name#12, low#13, high#14, simple_grade#15], StorageLevel(disk, memory, deserialized, 1 replicas)
+- Scan ExistingRDD[name#12,low#13,high#14,simple_grade#15]
Allocating memory for dynamic array - The block header has been corrupted (FastMM4)
Solved.
Today I spent the day manually inspecting each line of code. I made quite few changes and finally the problem went away. I haven't tried to see which specific line generated the problem.
Thanks a lot to every body for help !!!
How to perform union on two DataFrames with different amounts of columns in Spark?
In Scala you just have to append all missing columns as nulls
.
import org.apache.spark.sql.functions._
// let df1 and df2 the Dataframes to merge
val df1 = sc.parallelize(List(
(50, 2),
(34, 4)
)).toDF("age", "children")
val df2 = sc.parallelize(List(
(26, true, 60000.00),
(32, false, 35000.00)
)).toDF("age", "education", "income")
val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union
def expr(myCols: Set[String], allCols: Set[String]) = {
allCols.toList.map(x => x match {
case x if myCols.contains(x) => col(x)
case _ => lit(null).as(x)
})
}
df1.select(expr(cols1, total):_*).unionAll(df2.select(expr(cols2, total):_*)).show()
+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 50| 2| null| null|
| 34| 4| null| null|
| 26| null| true|60000.0|
| 32| null| false|35000.0|
+---+--------+---------+-------+
Update
Both temporal DataFrames
will have the same order of columns, because we are mapping through total
in both cases.
df1.select(expr(cols1, total):_*).show()
df2.select(expr(cols2, total):_*).show()
+---+--------+---------+------+
|age|children|education|income|
+---+--------+---------+------+
| 50| 2| null| null|
| 34| 4| null| null|
+---+--------+---------+------+
+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 26| null| true|60000.0|
| 32| null| false|35000.0|
+---+--------+---------+-------+
Java - Dynamic String replacement inside a Reader stream
You just want to subclass BufferedReader.
class MyBufferedReader extends BufferedReader {
MyBufferedReader(Reader r) {
super(r);
}
@Override
String readLine() {
String line = super.readLine();
// perform replacement here
return line;
}
}
Open your file as usual, but instead of wrapping it in a BufferedReader, wrap it in your subclass.
try ( Reader r = ...;
BufferedReader br = new MyBufferedReader(r)) {
String line;
while ((line = br.readLine()) != null) {
// use returned line
}
}
Update
The following is a Reader
which will allow you to do line-by-line replacements of an input stream, while still presenting a Reader
interface to the user of the stream.
Internally, the original stream is wrapped in a BufferedReader
, and read one line at a time. Any desired transformation may be performed on the lines which have been read. The transformed line is then turned into a StringReader
. When the user of the stream calls any of the read(...)
operations, the request is directed to the buffered StringReader
to satisfy. If the StringReader
runs out of characters, the next line of the BufferedReader
is loaded and transformed, to continue to provide input for the read(...)
.
abstract public class TranslatingReader extends Reader {
private BufferedReader input;
private StringReader output;
public TranslatingReader(Reader in) {
input = new BufferedReader(in);
output = new StringReader("");
}
abstract public String translate(String line);
@Override
public int read(char[] cbuf, int off, int len) throws IOException {
int read = 0;
while (len > 0) {
int nchars = output.read(cbuf, off, len);
if (nchars == -1) {
String line = input.readLine();
if (line == null) {
break;
}
line = tranlate(line);
line += "\n"; // Add the newline which was removed by readLine()
output = new StringReader(line);
} else {
read += nchars;
off += nchars;
len -= nchars;
}
}
if (read == 0)
read = -1;
return read;
}
@Override
public void close() throws IOException {
input.close();
output.close();
}
}
How to construct an RDD or DataFrame dynamically?
This is less of a spark problem than it is a scala problem. Is the data stored across multiple files?
I would recommend parallelizing by file and then parsing row by row.
For the parsing I would:
- Create a case class of what you want the rows to look like (This will allow the schema to be inferred using reflection when creating the DF)
- Create a list of name/regex tuples for the parsing like: ("Attribute", regex)
- Map over the list of regex and convert to a map: (Attribute -> Option[Value])
- Create the case class objects
- This should lead to a data structure of List[CaseClass] or RDD[CaseClass] which can be converted to a dataframe. You may need to do additional processing to filter out un-needed rows and to remove the Options.
Related Topics
How Does Swift Disambiguate Type Arguments in Expression Contexts
Covert Realm List to Realm Result
Problem with Frameworks in Command Line Tool
Swift Set UIbutton Setbordercolor in Storyboard
How to Query Firebase Data Childbyautoid
How to Set a Custom Annotations for All Points Except for User Location
Lazy Initialization and Deinit
Occasional Blank Frames After Exporting Asset - Avexportsession
Uibutton Background Color Overlaps Text on Highlight
How to Refer to a Global Type from Within a Class That Has a Nested Type with The Same Name
Lldb for Swift: Access Computed Property or Perform Function Call in Type Summary Python Script
How to Set Default Clouse Param in View Method
Compiling for iOS 10.3, But Module 'swiftuicharts' Has a Minimum Deployment Target of iOS 13.0
I Opened My App in Xcode 10 and Now I Have Errors in 9.4.1: Sdkapplicationdelegate (Facebookcore)
Geofire/Firebase Function Is Executing Handler Multiple Times in Swift
Swiftui Textfield Resets Value and Ignores Binding
Problem with Swiftui and Foreach on Xcode Playground
Parse Weird Bug in Swift That Causes Acl Write Permissions to Change to an Objectid