16

I'm starting with PySpark and I'm having troubles with creating DataFrames with nested objects.

This is my example.

I have users.

$ cat user.json
{"id":1,"name":"UserA"}
{"id":2,"name":"UserB"}

Users have orders.

$ cat order.json
{"id":1,"price":202.30,"userid":1}
{"id":2,"price":343.99,"userid":1}
{"id":3,"price":399.99,"userid":2}

And I like to join it to get such a struct where orders are array nested in users.

$ cat join.json
{"id":1, "name":"UserA", "orders":[{"id":1,"price":202.30,"userid":1},{"id":2,"price":343.99,"userid":1}]}
{"id":2,"name":"UserB","orders":[{"id":3,"price":399.99,"userid":2}]}

How can I do that ? Is there any kind of nested join or something similar ?

>>> user = sqlContext.read.json("user.json")
>>> user.printSchema();
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)

>>> order =  sqlContext.read.json("order.json")
>>> order.printSchema();
root
 |-- id: long (nullable = true)
 |-- price: double (nullable = true)
 |-- userid: long (nullable = true)

>>> joined = sqlContext.read.json("join.json")
>>> joined.printSchema();
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- userid: long (nullable = true)

EDIT: I know there is possibility to do this using join and foldByKey, but is there any simpler way ?

EDIT2: I'm using solution by @zero323

def joinTable(tableLeft, tableRight, columnLeft, columnRight, columnNested, joinType = "left_outer"):
    tmpTable = sqlCtx.createDataFrame(tableRight.rdd.groupBy(lambda r: r.asDict()[columnRight]))
    tmpTable = tmpTable.select(tmpTable._1.alias("joinColumn"), tmpTable._2.data.alias(columnNested))
    return tableLeft.join(tmpTable, tableLeft[columnLeft] == tmpTable["joinColumn"], joinType).drop("joinColumn")

I add second nested structure 'lines'

>>> lines =  sqlContext.read.json(path + "lines.json")
>>> lines.printSchema();
root
 |-- id: long (nullable = true)
 |-- orderid: long (nullable = true)
 |-- product: string (nullable = true)

orders = joinTable(order, lines, "id", "orderid", "lines")
joined = joinTable(user, orders, "id", "userid", "orders")
joined.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- userid: long (nullable = true)
 |    |    |-- lines: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- _1: long (nullable = true)
 |    |    |    |    |-- _2: long (nullable = true)
 |    |    |    |    |-- _3: string (nullable = true)

After this column names from lines are lost. Any ideas ?

EDIT 3: I tried to manual specify schema.

from pyspark.sql.types import *
fields = []
fields.append(StructField("_1", LongType(), True))
inner = ArrayType(lines.schema)
fields.append(StructField("_2", inner))
new_schema = StructType(fields)
print new_schema

grouped =  lines.rdd.groupBy(lambda r: r.orderid)
grouped =  grouped.map(lambda x: (x[0], list(x[1])))
g = sqlCtx.createDataFrame(grouped, new_schema)

Error:

TypeError: StructType(List(StructField(id,LongType,true),StructField(orderid,LongType,true),StructField(product,StringType,true))) can not accept object in type <class 'pyspark.sql.types.Row'>

3 Answers 3

32
+100

This will work only in Spark 2.0 or later

First we'll need a couple of imports:

from pyspark.sql.functions import struct, collect_list

The rest is a simple aggregation and join:

orders = spark.read.json("/path/to/order.json")
users = spark.read.json("/path/to/user.json")

combined = users.join(
    orders
        .groupBy("userId")
        .agg(collect_list(struct(*orders.columns)).alias("orders"))
        .withColumnRenamed("userId", "id"), ["id"])

For the example data the result is:

combined.show(2, False)
+---+-----+---------------------------+
|id |name |orders                     |
+---+-----+---------------------------+
|1  |UserA|[[1,202.3,1], [2,343.99,1]]|
|2  |UserB|[[3,399.99,2]]             |
+---+-----+---------------------------+

with schema:

combined.printSchema()
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- userid: long (nullable = true)

and JSON representation:

for x in combined.toJSON().collect():
    print(x)     
{"id":1,"name":"UserA","orders":[{"id":1,"price":202.3,"userid":1},{"id":2,"price":343.99,"userid":1}]}
{"id":2,"name":"UserB","orders":[{"id":3,"price":399.99,"userid":2}]}
Sign up to request clarification or add additional context in comments.

3 Comments

Note that the reason that this works is that even though users is a large (since its in a dataframe), the number of orders for a particular user is small enough to be held in a collection. What if that were not the case? What if the orders were something else say for the sake of argument, users with the same hair color, that can't be held in a collection? Would you be forced to collect the hair colors and go through them serially, or could you possibly use a cartesian join?
@oneirois In short - the whole idea won't be viable. Row is the minimal unit of parallelism and cannot be divided or partially spilled. You could just use DataFrameWriter to have separate file for each grouping factor without creating a nested structure - this would scale much better.
This is exactly what I ended up doing :)
-1

First, you need to use the userid as the join key for the second DataFrame:

user.join(order, user.id == order.userid)

Then you can use a map step to transform the resulting records to your desired format.

1 Comment

Not really. Map is not enough. If I join users and orders I will have 3 records. (and I want only 2). So I also need some kind of aggregation (foldByKey for example)
-1

For flatining your data frame from nested to normal use

dff= df.select("column with multiple columns.*")

1 Comment

Don't use Pandas please ! This will calls a Spark collect() ! It's very slow and not distributed because all the data will brought back to a single point which is the Spark driver.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.