I am new to Python a Spark, currently working through this
tutorial
on Spark's explode operation for array/map fields of a DataFrame.
Based on the very first section 1 (PySpark explode array or map column to rows), it's very intuitive. The minimum working example DataFrame is created the Annex below. The schema and DataFrame table are:
>>> df.printSchema()
root
|-- name: string (nullable = true)
|-- knownLanguages: array (nullable = true)
| |-- element: string (containsNull = true)
|-- properties: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
>>> df.show(truncate=False)
+----------+-------------------+-----------------------------+
|name |knownLanguages |properties |
+----------+-------------------+-----------------------------+
|James |[Java, Scala] |{eye -> brown, hair -> black}|
|Michael |[Spark, Java, null]|{eye -> null, hair -> brown} |
|Robert |[CSharp, ] |{eye -> , hair -> red} |
|Washington|null |null |
|Jefferson |[1, 2] |{} |
+----------+-------------------+-----------------------------+
The explode function is illustrated as follows:
>>> df \
... .select(df.name,explode(df.knownLanguages)) \
... .show()
+---------+------+
|name |col |
+---------+------+
|James |Java |
|James |Scala |
|Michael |Spark |
|Michael |Java |
|Michael |null |
|Robert |CSharp|
|Robert | |
|Jefferson|1 |
|Jefferson|2 |
+---------+------+
The explode function is shown in the context of a SELECT query,
however, which I find to be very unintuitive. SELECT prunes away rows
and never increases the height of a data frame. Only joins
potentially increase height, but even there, the filtering of rows is
applied to a Cartesian join [1], so is still a potential reduction in
height rather than an increase. Correct me if I'm wrong, but the
above SELECT is not being applied to a join, since it is invoked as a
method of DataFrame df.
I tried to better see how explode fits SELECT through the latter's
doc string: "Projects a set of expressions and returns a new
:class:DataFrame". Projection refers to choosing column
expressions. I unsuccessfully
tried to get insight into how the above explode code fits SELECTion
by examining its content:
explode(df.knownLanguages) # Shows no columnar data
Out[114]: Column<'explode(knownLanguages)'>
Later, I found that it is not possible to examine the columnar data
content of Column object, as described
here.
The prototype for explode returns a Column object while the doc
string says that it "Returns a new row for each element in the given
array or map". It's difficult to picture such column, as there is no
"given array or map" -- there are as many heterogenous arrays/maps as
there are records in DataFrame df.
Even if we accept that the Column object doesn't contain such
columnar data, it's
necessary to picture how such a column would be conceptually
constructed in order to see how the SELECT makes sense.
I can't come up with such a column of data that would make sense in
the SELECT query because no matter how the explode column is constructed, it will be
of a different height than DataFrame df.
Would it be correct to conclude that explode() can yield no column
expression that would fit SELECT's
projection/selection operation
as applied DataFrame df, and that it is simply a signal to the
select() method to create a new DataFrame by replicating each
record $i$ by $n_i$ times, where $n_i$ is the number of items in the
record's array/map?
I'm just starting to find may way around Spark. I anticipate, however, that if explode() breaks the projection/selection model of SELECT, it may be difficult to craft more complex queries than in the tutorial based on knowledge of designed-for behaviour.
Notes
[1] SELECT filters a Cartesian join in concept, though of course, not in execution. This is reflected by the fact that early SQL used WHERE in place of ON. All the WHERE clauses are (conceptually) applied to a Cartesian join.
Annex: Create minimum working example DataFrame table
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pyspark-by-examples').getOrCreate()
arrayData = [
('James',['Java','Scala'],{'hair':'black','eye':'brown'}),
('Michael',['Spark','Java',None],{'hair':'brown','eye':None}),
('Robert',['CSharp',''],{'hair':'red','eye':''}),
('Washington',None,None),
('Jefferson',['1','2'],{})
]
df = spark.createDataFrame(data=arrayData, schema = ['name','knownLanguages','properties'])
explodeis not a select operation, but a column operation that returns a new column. as in doc -pyspark.sql.functions.explode(col: ColumnOrName) → pyspark.sql.column.Column. you can use it inwithColumn. all of this is a projection because a new column is generated.dfbecause it is being invoked as its method, but that can't be right because theexplode()column doesn't even match the height ofdf. A SELECT operation in relational algebra specifies columns from a single relational data table, and it only makes sense if the columns are the same height. Even when selecting from a JOIN, the SELECTion is being done from the single table that results from the JOIN. The DataFrame.select() method is described as a projection, so this concept of projection should still apply.explain()method of dataframe can help you somewhat. try it