_corrupt_record error when reading a JSON file into Spark
You need to have one json object per row in your input file, see http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json
If your json file looks like this it will give you the expected dataframe:
{ "a": 1, "b": 2 }
{ "a": 3, "b": 4 }
....
df.show()
+---+---+
| a| b|
+---+---+
| 1| 2|
| 3| 4|
+---+---+
Reading JSON with Apache Spark - `corrupt_record`
Spark cannot read JSON-array to a record on top-level, so you have to pass:
{"toid":"osgb4000000031043205","point":[508180.748,195333.973],"index":1}
{"toid":"osgb4000000031043206","point":[508163.122,195316.627],"index":2}
{"toid":"osgb4000000031043207","point":[508172.075,195325.719],"index":3}
{"toid":"osgb4000000031043208","point":[508513,196023],"index":4}
As it's described in the tutorial you're referring to:
Let's begin by loading a JSON file, where each line is a JSON object
The reasoning is quite simple. Spark expects you to pass a file with a lot of JSON-entities (entity per line), so it could distribute their processing (per entity, roughly saying).
To put more light on it, here is a quote form the official doc
Note that the file that is offered as a json file is not a typical
JSON file. Each line must contain a separate, self-contained valid
JSON object. As a consequence, a regular multi-line JSON file will
most often fail.
This format is called JSONL. Basically it's an alternative to CSV.
pyspark corrupt_record while reading json file
You input isn't a valid JSON so you can't read it using spark.read.json
. Instead, you can load it as text DataFrame with spark.read.text
and parse the stringified dict into json using UDF:
import ast
import json
from pyspark.sql import functions as F
from pyspark.sql.types import *
schema = StructType([
StructField("event_date_utc", StringType(), True),
StructField("deleted", BooleanType(), True),
StructField("cost", IntegerType(), True),
StructField("name", StringType(), True)
])
dict_to_json = F.udf(lambda x: json.dumps(ast.literal_eval(x)))
df = spark.read.text("xxx") \
.withColumn("value", F.from_json(dict_to_json("value"), schema)) \
.select("value.*")
df.show()
#+--------------+-------+----+----+
#|event_date_utc|deleted|cost|name|
#+--------------+-------+----+----+
#|null |false |1 |Mike|
#+--------------+-------+----+----+
could not read data from json using pyspark
Your JSON works in my pyspark. I can get a similar error when the record text goes across multiple lines. Please ensure that each record fits in one line.
Alternatively, tell it to support multi-line records:
spark.read.json(filename, multiLine=True)
What works:
{ "employees": [{ "firstName": "John", "lastName": "Doe" }, { "firstName": "Anna", "lastName": "Smith" }, { "firstName": "Peter", "lastName": "Jones" } ] }
That outputs:
spark.read.json('/home/ernest/Desktop/brokenjson.json').printSchema()
root
|-- employees: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- firstName: string (nullable = true)
| | |-- lastName: string (nullable = true)
When I try some input like this:
{
"employees": [{ "firstName": "John", "lastName": "Doe" }, { "firstName": "Anna", "lastName": "Smith" }, { "firstName": "Peter", "lastName": "Jones" } ] }
Then I get the corrupt record in schema:
root
|-- _corrupt_record: string (nullable = true)
But when used with multiline options, the latter input works too.
Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column
The problem is with the JSON file. The file : "D:/playground/input.json"
looks like as you descibed as
{
"a": {
"b": 1
}
}
This is not right. Spark while processing json data considers each new line as a complete json. Thus it is failing.
You should keep your complete json in a single line in a compact form by removing all white spaces and newlines.
Like
{"a":{"b":1}}
If you want multiple jsons in a single file keep them like this
{"a":{"b":1}}
{"a":{"b":2}}
{"a":{"b":3}} ...
For more infos see
Related Topics
How to Sort a List of Lists by a Specific Index of the Inner List
How to Properly Setup Pipenv in Pycharm
Python - Get Last Element After Str.Split()
Importerror: No Module Named Sklearn (Python)
Using Buttons in Tkinter to Navigate to Different Pages of the Application
Python - How to Check If Table Exists
Pickle - Cpickle.Unpicklingerror: Invalid Load Key, '?'
How to Add Pandas Data to an Existing CSV File
Typeerror: Unsupported Operand Type(S) for ** or Pow(): 'List' and 'Int'
Python - Split a List of Dicts into Individual Dicts
Importerror: No Module Named Psycopg2 After Install
Sqlalchemy: How to Filter Date Field
Numpy: Checking If a Value Is Nat
Split Datetime Column into a Date and Time Python
Cast String to Float Is Not Supported in Linear Model