1

I have a data frame like the below:

+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+
|firstname|middlename|lastname|id   |gender|salary|meta                                                                                                |
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+
|James    |          |Smith   |36636|M     |3000  |{"firstname":"James","middlename":"","lastname":"Smith","id":"36636","gender":"M","salary":3000}    |
|Michael  |Rose      |        |40288|M     |4000  |{"firstname":"Michael","middlename":"Rose","lastname":"","id":"40288","gender":"M","salary":4000}   |
|Robert   |          |Williams|42114|M     |4000  |{"firstname":"Robert","middlename":"","lastname":"Williams","id":"42114","gender":"M","salary":4000}|
|Maria    |Anne      |Jones   |39192|F     |4000  |{"firstname":"Maria","middlename":"Anne","lastname":"Jones","id":"39192","gender":"F","salary":4000}|
|Jen      |Mary      |Brown   |     |F     |-1    |{"firstname":"Jen","middlename":"Mary","lastname":"Brown","id":"","gender":"F","salary":-1}         |
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+  

Now, there is a UDF for which I need to iterate over the meta column and pass each row to that UDF. However, I am only able to pass the first row.

Below is the code:

def parse_and_post(col):
    for i in col.collect():
        print(i)
        result = json.loads(i)
        print(result["firstname"])
        #Below is a sample check
        if result["firstname"] == "James":
            return 200
        else:
            return -1
        #Actual check is as follows
        #Format the record in the form of list
        #get token
        #response = send the record to the API
        #return the response


new_df_status = new_df.withColumn("status_of_each_record", lit(parse_and_post(new_df.rdd.map(lambda x: x["meta"]))))  

When I execute this code I get the output as below. However, status for only first record should be 200 and rest should be -1:

{"firstname":"James","middlename":"","lastname":"Smith","id":"36636","gender":"M","salary":3000}
James
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+---------------------+
|firstname|middlename|lastname|id   |gender|salary|meta                                                                                                |status_of_each_record|
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+---------------------+
|James    |          |Smith   |36636|M     |3000  |{"firstname":"James","middlename":"","lastname":"Smith","id":"36636","gender":"M","salary":3000}    |200                  |
|Michael  |Rose      |        |40288|M     |4000  |{"firstname":"Michael","middlename":"Rose","lastname":"","id":"40288","gender":"M","salary":4000}   |200                  |
|Robert   |          |Williams|42114|M     |4000  |{"firstname":"Robert","middlename":"","lastname":"Williams","id":"42114","gender":"M","salary":4000}|200                  |
|Maria    |Anne      |Jones   |39192|F     |4000  |{"firstname":"Maria","middlename":"Anne","lastname":"Jones","id":"39192","gender":"F","salary":4000}|200                  |
|Jen      |Mary      |Brown   |     |F     |-1    |{"firstname":"Jen","middlename":"Mary","lastname":"Brown","id":"","gender":"F","salary":-1}         |200                  |
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+---------------------+

How to iterate over each row of column meta. What exactly am I doing wrong here?

4
  • does the API allow bulk POST? Commented May 10, 2022 at 15:54
  • I tried bulk POST but it didn't work. Therefore, trying to ingest one record at a time Commented May 10, 2022 at 16:04
  • Do you want to solve for bulk POST? you can fix this syntax to do ingest one by one but bulk is probably better approach. Commented May 10, 2022 at 16:09
  • 1
    If possible, I would like to explore both approaches. I will try to hit the client's API. If it accepts bulk then its well and good and if it doesn't I will try to ingest it one by one Commented May 10, 2022 at 16:12

2 Answers 2

1

I think the main issue here is that User Defined Functions expect to be called once per row, rather than passed the entire data frame. So for me the following works:

new_df = ctx.spark.createDataFrame((
    ["James", "", "Smith",  36636, "M", 3000, '{"firstname":"James","middlename":"","lastname":"Smith","id":"36636","gender":"M","salary":3000}'],
    ["Michael", "Rose", "", 40288, "M", 4000, '{"firstname":"Michael","middlename":"Rose","lastname":"","id":"40288","gender":"M","salary":4000}'],
    ["Robert", "", "Williams", 42114, "M", 4000, '{"firstname":"Robert","middlename":"","lastname":"Williams","id":"42114","gender":"M","salary":4000}'],
    ["Maria", "Anne", "Jones", 39192,"F", 4000, '{"firstname":"Maria","middlename":"Anne","lastname":"Jones","id":"39192","gender":"F","salary":4000}'],
    ["Jen", "Mary", "Brown", None, "F", -1, '{"firstname":"Jen","middlename":"Mary","lastname":"Brown","id":"","gender":"F","salary":-1}']
)).toDF("firstname", "middlenmame", "lastname", "id", "gender", "salary", "meta")


@udf()
def parse_and_post(meta):
    result = json.loads(meta)
    print(result["firstname"])
    if result["firstname"] == "James":
        return 200
    else:
        return -1


new_df_status = new_df.withColumn(
    "status_of_each_record", parse_and_post(new_df.meta)) 

In your example, you expect the entire data frame as input to parse_and_post, but here we only expect one row at a time. This also simplifies how we create the new column.

Do you need a UDF at all?

The second thing you might want to think about is whether you can get away without using a UDF at all? Using a UDF is a bit of a performance killer, and quite often you can do without. For example:

from pyspark.sql.types import StructType, StructField
from pyspark.sql import functions as f

# Let spark know what shape of json data to expect. We can ignore
# fields we don't care about with it being a problem
schema = StructType([StructField("firstname", StringType())])

new_df_status = new_df.withColumn(
    "status_of_each_record", 
    f.when(f.from_json(new_df.meta, schema).firstname == "James", 200)
    .otherwise(-1)
)  

new_df_status.show()

Even assuming you've provided a toy example, it's worth letting Spark do as much of the heavy lifting as you can (like json parsing) as that portion can happen at scale.

Sign up to request clarification or add additional context in comments.

4 Comments

I was close with the first approach. I just missed the annotation therefore it wasn't working as expected which forced me to use the rdd way
so basically your second approach will work fine, but what if I need to convert the records as [{},{},{}] and then get the token from one API and hit the actual API with the records. Is it possible with your approach? I will edit the question for a better understanding
I have edited the UDF in the post and accepted your answer. I was just curious that if I want to implement the steps mentioned in the UDF comments, will I be able to do it without UDF? since there are multiple steps involved.
If you want deal with lists of dicts you can use explode to turn one row into many. If you need to call an API in your UDF then that will have to be Python, but if you can leave as much manipulation to Java as possible it's probably still better. As other have mentioned, it's a small optimisation compared with bulking your requests. Also you're quite likely to get rate throttled if Spark sends 1 million requests in a second.
1

For row by row ingestion, refer to @Jon Betts's approach.

In case, you pursue bulk POST for the API and if the API accepts array of meta data, you can do the following. This should reduce the number of API calls which generally works more efficient.

You can first create a list of meta JSON.

If id is distributed well.

from pyspark.sql import functions as F

num_split = 10 # depends on how big is your data and how much the API can handle.

df = (df.groupBy(F.col('id') % num_split)
      .agg(F.collect_list('meta')).alias('meta'))

@F.udf
def _call_bulk_api(meta_list):
    # call bulk API (PATCH)
    # The returned status is highly dependent on the API.
    return 200

df = df.withColumn('status', _call_bulk_api(F.col('meta')))

If id is not well distributed, create incrementing id.

df = (df.withColumn('sid', F.row_number().over(Window.orderBy('id')))
      .groupBy(F.col('sid') % num_split)
      .agg(F.collect_list('meta')).alias('meta'))

4 Comments

Looks cool. However, what is the significance of ` .groupBy(F.col('sid') % num_split)` ? what it does actually?
Unfortunately, the client's API doesn't accept bulk requests for now but yeah this could be worth implementing in the future.
That is to concat multiple rows into 1 single row. if you have 1000 records and num_split is 10, you will end up with only 10 records where each meta should have 100 rows in a list. Thus, you can call bulk API with 100 records for 10 times.
Ohh that's cool actually. Thank you so much for your answer @Emma.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.