Convert Spark Dataframe Column to Python List

Convert spark DataFrame column to python list

See, why this way that you are doing is not working. First, you are trying to get integer from a Row Type, the output of your collect is like this:

>>> mvv_list = mvv_count_df.select('mvv').collect()
>>> mvv_list[0]
Out: Row(mvv=1)

If you take something like this:

>>> firstvalue = mvv_list[0].mvv
Out: 1

You will get the mvv value. If you want all the information of the array you can take something like this:

>>> mvv_array = [int(row.mvv) for row in mvv_list.collect()]
>>> mvv_array
Out: [1,2,3,4]

But if you try the same for the other column, you get:

>>> mvv_count = [int(row.count) for row in mvv_list.collect()]
Out: TypeError: int() argument must be a string or a number, not 'builtin_function_or_method'

This happens because count is a built-in method. And the column has the same name as count. A workaround to do this is change the column name of count to _count:

>>> mvv_list = mvv_list.selectExpr("mvv as mvv", "count as _count")
>>> mvv_count = [int(row._count) for row in mvv_list.collect()]

But this workaround is not needed, as you can access the column using the dictionary syntax:

>>> mvv_array = [int(row['mvv']) for row in mvv_list.collect()]
>>> mvv_count = [int(row['count']) for row in mvv_list.collect()]

And it will finally work!

Pyspark - Convert column to list

Use collect_list then get only the list by accessing index and assigned to variable.

Example:

df.show()
#+-------+
#| Name|
#+-------+
#| Andy|
#|Brandon|
#| Carl|
#+-------+

output=df.agg(collect_list(col("name"))).collect()[0][0]

output
#['Andy', 'Brandon', 'Carl']

Another way would be using list comprehension:

ss=df.select("Name").collect()

output=[i[0] for i in ss]

output
#['Andy', 'Brandon', 'Carl']

Pyspark dataframe column to list

it is pretty easy as you can first collect the df with will return list of Row type then

row_list = df.select('sno_id').collect()

then you can iterate on row type to convert column into list

sno_id_array = [ row.sno_id for row in row_list]

sno_id_array
['123','234','512','111']

Using Flat map and more optimized solution

sno_id_array = df.select("sno_id ").rdd.flatMap(lambda x: x).collect()

spark - Converting dataframe to list improving performance

If you really need a local list there is not much you can do here but one improvement is to collect only a single column not a whole DataFrame:

df.select(col_name).flatMap(lambda x: x).collect()

Convert Spark Data frame to multiple list with one column as key

If you're trying to collect your data anyway, the easiest way IMO to get the data in your desired format is via pandas.

You can call toPandas(), set the index to bin, and then call to_dict():

output = df.toPandas().set_index("bin").to_dict()
print(output)
#{'end': {1: 0.5, 2: 1.7, 3: 2.5, 4: 4.7, 5: 6.3},
# 'median': {1: 0.0, 2: 1.0, 3: 2.0, 4: 4.0, 5: 6.0},
# 'min': {1: 0.0, 2: 0.8, 3: 1.6, 4: 3.7, 5: 5.7}}

This will create a dictionary of dictionaries, where the outer key is the column name and the inner key is the bin. If you wanted separate variables, you can just extract from output, but don't use min as a variable name since it will stomp on __builtin__.min.

median, min_, end = output['median'], output['min'], output['end']
print(median[1])
#0.0

Convert Column value in Dataframe to list

If you want to in python:

nameList = [c  for x in df.rdd.collect() for c in x['name']]

or If you want to do it in spark:

from pyspark.sql import functions as F

df.withColumn('name', F.split(F.col('name'), '')).show()

Result:

+---+--------------+-----+----------+--------+
| id| name|class|start_data|end_date|
+---+--------------+-----+----------+--------+
| 1|[j, o, h, n, ]| xii| 20170909|20210909|
+---+--------------+-----+----------+--------+

How to convert spark Streaming dataframe column into a Python list

In terms of your first point, you're not asking the correct question. Since Spark 2.0, the APIs mostly overlap, therefore a Spark Streaming DataFrame is essentially the same thing as a Spark (SQL) DataFrame, albeit Spark Streaming DataFrame is unbounded.

Since Spark 2.0, DataFrames and Datasets can represent static, bounded data,
as well as streaming, unbounded data.

Therefore, you should be able to perform majority of your necessary manipulations on your (streaming) DataFrame.

In terms of your second point, try to have a look at aggregation functions such as collect_list() and collect_set(). Try this code:

from pyspark.sql import SparkSession
from pyspark.sql import functions as f

>>> spark = SparkSession.builder.getOrCreate()
>>> df = spark._sc.parallelize([
["test","24","12392234","Activation"],
["test","24","12392234","Load"]]
).toDF(["application_name","id","syntheticid","journey"])

>>> df.show()
+----------------+---+-----------+----------+
|application_name| id|syntheticid| journey|
+----------------+---+-----------+----------+
| test| 24| 12392234|Activation|
| test| 24| 12392234| Load|
+----------------+---+-----------+----------+


>>> grouped_df = df.groupBy('application_name').agg(f.collect_list('journey').alias('collection'))
>>> grouped_df.show()
+----------------+------------------+
|application_name| collection|
+----------------+------------------+
| test|[Activation, Load]|
+----------------+------------------+


>>> python_list = [item for sublist in [row.collection for row in grouped_df.collect()] for item in sublist]

>>> python_list
['Activation', 'Load']


Related Topics



Leave a reply



Submit