0

I have a spark dataframe having two columns and I am trying to add a new column referring a new value for these columns. I am taking this values from a dictionary which contains the correct value for the column

+--------------+--------------------+
|       country|                 zip|
+--------------+--------------------+
|        Brazil|                7541|
|United Kingdom|                5678|
|         Japan|                1234|
|       Denmark|                2345|
|        Canada|                4567|
|         Italy|                6031|
|        Sweden|                4205|
|        France|                6111|
|         Spain|                8555|
|         India|                2552|
+--------------+--------------------+

The correct value for the country should be India and zip should be 1234 and that is stored in a dictionary

column_dict = {'country' : 'India', zip: 1234}

I am trying to make the new column value as "Brazil: India, Zip :1234" where the value of the column is anything different from these values.

I have tried it in following way but it's returning empty column but the function is returning the desired value

     cols = list(df.columns)
     col_list = list(column_dict.keys())

def update(df, cols = cols , col_list = col_list):
   z = []
   for col1, col2 in zip(cols,col_list):
      if col1 == col2:
         if df.col1 != column_dict[col2]: 
            z.append("{'col':" + col2  + ", 'reco': " + str(column_dict[col2]) + "}")   
         else:
            z.append("{'col':" + col2  + ", 'reco': }")

my_udf = udf(lambda x: update(x, cols, col_list))
z = y.withColumn("NewValue", lit(my_udf(y, cols,col_list)))

If I export the same output dataframe to csv value is coming with the parts appending with '\'. How can I get the function value on the column in exact way?

1
  • what kind of dataframe you are expecting? Your question is not very clear... Can you tell us the outcome you are interested for? Commented Dec 17, 2018 at 12:15

1 Answer 1

1

A simple way is to make a dataframe from your dictionary and union() it to your main dataframe and then groupby and get the last value. here you can do this:

sc = SparkContext.getOrCreate()

newDf = sc.parallelize([
    {'country' : 'India', 'zip': 1234}
]).toDF()

newDF.show()

newDF:

+-------+----+
|country| zip|
+-------+----+
|  India|1234|
+-------+----+

and finalDF:

unionDF = df.union(newDF)

unionDF.show()
+--------------+--------------------+
|       country|                 zip|
+--------------+--------------------+
|        Brazil|                7541|
|United Kingdom|                5678|
|         Japan|                1234|
|       Denmark|                2345|
|        Canada|                4567|
|         Italy|                6031|
|        Sweden|                4205|
|        France|                6111|
|         Spain|                8555|
|         India|                2552|
|         India|                1234|
+--------------+--------------------+

and in the end do groupby and last:

import pyspark.sql.functions as f

finalDF = unionDF.groupbby('country').agg(f.last('zip'))

finalDF.show()

+--------------+--------------------+
|       country|                 zip|
+--------------+--------------------+
|        Brazil|                7541|
|United Kingdom|                5678|
|         Japan|                1234|
|       Denmark|                2345|
|        Canada|                4567|
|         Italy|                6031|
|        Sweden|                4205|
|        France|                6111|
|         Spain|                8555|
|         India|                1234|
+--------------+--------------------+
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.