Convert a Standard Python Key Value Dictionary List to Pyspark Data Frame

Convert a standard python key value dictionary list to pyspark data frame

The other answers work, but here's one more one-liner that works well with nested data. It's may not the most efficient, but if you're making a DataFrame from an in-memory dictionary, you're either working with small data sets like test data or using spark wrong, so efficiency should really not be a concern:

d = {any json compatible dict}
spark.read.json(sc.parallelize([json.dumps(d)]))

How to convert a dictionary to dataframe in PySpark?

You can use data_dict.items() to list key/value pairs:

spark.createDataFrame(data_dict.items()).show()

Which prints

+---+---+
| _1| _2|
+---+---+
| t1| 1|
| t2| 2|
| t3| 3|
+---+---+

Of course, you can specify your schema:

spark.createDataFrame(data_dict.items(), 
schema=StructType(fields=[
StructField("key", StringType()),
StructField("value", StringType())])).show()

Resulting in

+---+-----+
|key|value|
+---+-----+
| t1| 1|
| t2| 2|
| t3| 3|
+---+-----+

how to convert dictionary to data frame in PySpark

I found an easy way using json parser.

spark = SparkSession.builder.appName('abc').enableHiveSupport().getOrCreate()
sc = spark.sparkContext
dict_lst = {'A': '1','B':'2'}

rdd = sc.parallelize([dict_lst])
print(type(rdd))
df = spark.read.json(rdd)

df.show()

convert python dictionary into pyspark dataframe

Interesting problem! The main struggle I realized with this problem is your when reading from JSON, your schema is likely has struct type, making it harder to solve, because basically a1 has different type than a2.

My idea is using somehow converting your struct type to map type, then stack them together, then apply a few explodes:

This is your df
+----------------------------------+
|data |
+----------------------------------+
|{{[c1, c2], [c4, c3]}, {[c1, c4]}}|
+----------------------------------+

root
|-- data: struct (nullable = true)
| |-- a1: struct (nullable = true)
| | |-- b1: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- b2: array (nullable = true)
| | | |-- element: string (containsNull = true)
| |-- a2: struct (nullable = true)
| | |-- b3: array (nullable = true)
| | | |-- element: string (containsNull = true)
Create a temporary df to handle JSON's first level
first_level_df = df.select('data.*')
first_level_df.show()
first_level_cols = first_level_df.columns # ['a1', 'a2']

+--------------------+----------+
| a1| a2|
+--------------------+----------+
|{[c1, c2], [c4, c3]}|{[c1, c4]}|
+--------------------+----------+
Some helper variables
map_cols = [F.from_json(F.to_json(c), T.MapType(T.StringType(), T.StringType())).alias(c) for c in first_level_cols]
# [Column<'entries AS a1'>, Column<'entries AS a2'>]

stack_cols = ', '.join([f"'{c}', {c}" for c in first_level_cols])
# 'a1', a1, 'a2', a2
Main transformation
(first_level_df
.select(map_cols)
.select(F.expr(f'stack(2, {stack_cols})').alias('AA', 'temp'))
.select('AA', F.explode('temp').alias('BB', 'temp'))
.select('AA', 'BB', F.explode(F.from_json('temp', T.ArrayType(T.StringType()))).alias('CC'))
.show(10, False)
)

+---+---+---+
|AA |BB |CC |
+---+---+---+
|a1 |b1 |c1 |
|a1 |b1 |c2 |
|a1 |b2 |c4 |
|a1 |b2 |c3 |
|a2 |b3 |c1 |
|a2 |b3 |c4 |
+---+---+---+

Creating a pyspark dataframe from a python dictionary

Let's start with converting your python dictionary into a list of list values in the correct position (i.e. one of the expected data structures for initializing a spark dataframe).

You may try the following assuming all list values in dictionary are of the same length.

column_names = []
dataset = None
for column_name in dict_stable_feature:
column_names.append(column_name)
column_values = dict_stable_feature[column_name]
# initialize dataset ranges
if dataset is None:
dataset=[]

for i in range(0,len(column_values)):
dataset.append([column_values[i]])
else:
for ind,val in enumerate(column_values):
dataset[ind].append(val)

my_df = sparkSession.createDataFrame(dataset,schema=column_names)

if all list values are not of the same length, then you may try the following:

max_list_length = max([len(dict_stable_feature[k]) for k in dict_stable_feature])
column_names = []
dataset = [[] for i in range(0,max_list_length)]
default_data_value = None # feel free to change
for column_name in dict_stable_feature:
column_names.append(column_name)
column_values = dict_stable_feature[column_name]


for ind,val in enumerate(column_values):
dataset[ind].append(val)

# ensure all columns have the same amount of rows
no_of_values = len(column_values)
if no_of_values < max_list_length:
for i in range(no_of_values,max_list_length):
dataset[i].append(default_data_value)

my_df = sparkSession.createDataFrame(dataset,schema=column_names)

Let me know if this works for you.

How can I convert Dataframe Column1:Column2 (key:value) in Dictionary in Pyspark?

You can just use a simple collectAsMap():

df.select("Atr1", "Atr2").rdd.collectAsMap()

How to create pyspark dataframe from a dict with tuple as value?

The simplest way I can think of is to merge the string and tuple within each list.

This can be accomplished with list comrehension where you take element 0 (the string) and unpack element 1 (the tuple) using * into a list for each list in your list of lists.

l= [['HNN', (0.5083874458874459, 56)], ['KGB', (0.7378654301578141, 35)], ['KHB', (0.6676891615541922, 18)]]

df = spark.createDataFrame([[x[0],*x[1]] for x in l], ['col_1','col_2','col_3'])

Output

+-----+------------------+-----+
|col_1| col_2|col_3|
+-----+------------------+-----+
| HNN|0.5083874458874459| 56|
| KGB|0.7378654301578141| 35|
| KHB|0.6676891615541922| 18|
+-----+------------------+-----+


Related Topics



Leave a reply



Submit