0

I am new to pyspark and I want to explode array values in such a way that each value gets assigned to a new column. I tried using explode but I couldn't get the desired output. Below is my output

output in hand

Schema:

Required output:

this is the code

from pyspark.sql import *
from pyspark.sql.functions import explode
if __name__ == "__main__":
spark = SparkSession.builder \
    .master("local[3]") \
    .appName("DataOps") \
    .getOrCreate()

dataFrameJSON = spark.read \
    .option("multiLine", True) \
    .option("mode", "PERMISSIVE") \
    .json("data.json")

dataFrameJSON.printSchema()
sub_DF = dataFrameJSON.select(explode("values.line").alias("new_values"))
sub_DF.printSchema()

sub_DF2 = sub_DF.select("new_values.*")
sub_DF2.printSchema()
sub_DF.show(truncate=False)

new_DF = sub_DF2.select("id", "period.*", "property")
new_DF.show(truncate=False)
new_DF.printSchema()

this is data:

{
        "values" : {
            "line" : [
                {
                    "id" : 1,
                    "period" : {
                        "start_ts" : "2020-01-01T00:00:00",
                        "end_ts" : "2020-01-01T00:15:00"
                    },
                    "property" : [
                        {
                            "name" : "PID",
                            "val" : "P120E12345678"
                        },
                        {
                            "name" : "EngID",
                            "val" : "PANELID00000000"
                        },
                        {
                            "name" : "TownIstat",
                            "val" : "12058091"
                        },
                        {
                            "name" : "ActiveEng",
                            "val" : "5678.1"
                        }
                    ]
                }
}
2
  • Please put code examples, error outputs in text instead of images so it is easier for the community to help you. Commented Sep 27, 2020 at 6:46
  • 1
    @ Mikayel Saghyan, I hope now the code and sample data are visible to you? I am trying to produce the required output as given in above link Commented Sep 27, 2020 at 14:48

2 Answers 2

2

Could you include the data instead of screenshots ?

Meanwhile, assuming that df is the dataframe being used, what we need to do, is to create a new dataframe, while exrtracting the vals from the previous property array to new columns, and droping the property column at last :

from pyspark.sql.functions import col
output_df = df.withColumn("PID", col("property")[0].val).withColumn("EngID", col("property")[1].val).withColumn("TownIstat", col("property")[2].val).withColumn("ActiveEng", col("property")[3].val).drop("property")

In case the elementwas of type ArrayType use the following :

from pyspark.sql.functions import col
output_df = df.withColumn("PID", col("property")[0][1]).withColumn("EngID", col("property")[1][1]).withColumn("TownIstat", col("property")[2][1]).withColumn("ActiveEng", col("property")[3][1]).drop("property")

Explode will explode the arrays into new Rows, not columns, see this : pyspark explode

Sign up to request clarification or add additional context in comments.

10 Comments

thanks for your response, I tried your suggested code, but I am getting the following error: TypeError: col() missing 1 required positional argument: 'strg'
Could you include your code in the question above ? with data samples ? When you edit, try to edit your own question, not my answer, you're on the right path
code and sample data is available now, please help me to produce the required output as I have provided in the screenshot, thank you in advance
You probably forgot to insert the "property" string inside of col(), can you show all of your code after using my answer ? I just used the same code on Databricks, and it works just fine, no errors
sub_DF = dataFrameJSON.select("UrbanDataset.values.line") sub_DF2 = dataFrameJSON.select(explode("UrbanDataset.values.line").alias("new_values")) sub_DF3 = sub_DF2.select("new_values.*") new_DF = sub_DF3.select("id", "period.*", "property") new_DF.show(truncate=False) output_df = new_DF.withColumn("PID", col("property")[0][1]) \ .withColumn("EngID", col("property")[1][1]) \ .withColumn("TownIstat", col("property")[2][1]) \ .withColumn("ActiveEng", col("property")[3][1]).drop("property") output_df.show(truncate=False)
|
2

This is a general solution and works even when the JSONs are messy (different ordering of elements or if some of the elements are missing)

You got to flatten first, regexp_replace to split the 'property' column and finally pivot. This also avoids hard coding of the new column names.

Constructing your dataframe:

from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import *

schema = StructType([StructField("id", IntegerType()), StructField("start_ts", StringType()), StructField("end_ts", StringType()), \
    StructField("property", ArrayType(StructType(  [StructField("name", StringType()),  StructField("val", StringType())]    )))])

data = [[1, "2010", "2020", [["PID", "P123"], ["Eng", "PA111"], ["Town", "999"], ["Act", "123.1"]]],\
         [2, "2011", "2012", [["PID", "P456"], ["Eng", "PA222"], ["Town", "777"], ["Act", "234.1"]]]]

df = spark.createDataFrame(data,schema=schema)

df.show(truncate=False)
+---+--------+------+------------------------------------------------------+    
|id |start_ts|end_ts|property                                              |
+---+--------+------+------------------------------------------------------+
|1  |2010    |2020  |[[PID, P123], [Eng, PA111], [Town, 999], [Act, 123.1]]|
|2  |2011    |2012  |[[PID, P456], [Eng, PA222], [Town, 777], [Act, 234.1]]|
+---+--------+------+------------------------------------------------------+

Flattening and pivoting:

df_flatten = df.rdd.flatMap(lambda x: [(x[0],x[1], x[2], y) for y in x[3]]).toDF(['id', 'start_ts', 'end_ts', 'property'])\
            .select('id', 'start_ts', 'end_ts', col("property").cast("string"))

df_split = df_flatten.select('id', 'start_ts', 'end_ts', regexp_replace(df_flatten.property, "[\[\]]", "").alias("replacced_col"))\
                .withColumn("arr", split(col("replacced_col"), ", "))\
                .select(col("arr")[0].alias("col1"), col("arr")[1].alias("col2"), 'id', 'start_ts', 'end_ts')

final_df = df_split.groupby(df_split.id,)\
                        .pivot("col1")\
                        .agg(first("col2"))\
                        .join(df,'id').drop("property")

Output:

final_df.show()
+---+-----+-----+----+----+--------+------+
| id|  Act|  Eng| PID|Town|start_ts|end_ts|
+---+-----+-----+----+----+--------+------+
|  1|123.1|PA111|P123| 999|    2010|  2020|
|  2|234.1|PA222|P456| 777|    2011|  2012|
+---+-----+-----+----+----+--------+------+

8 Comments

many thanks for your response, by using your code I am getting the following error: TypeError: col() missing 1 required positional argument: 'strg', I think I have an issue with col package? can you please share you complete code including all the imports? thank you
i have added the imports. Where are you getting the error?
ample of thanks for your response, it's working now for me as well
Why all the hassle instead of simply extracting the relevant data by accessing it, then create the new columns ? I don't get it, this is complicating things.
It's because this is a general solution. I understand that it is simpler to just use indices for this question. But if the order of the elements in 'property' is changed or if one of them is missing, the indices method wouldn't work. I'm factoring in these cases of messy JSONs in my solution. (And no hard-coding of column names. Works for 'n' number of elements)
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.