I'm quite new to Spark/PySpark and have been trying to remain consistent in following 'best practices' but unfortunately, sometimes I revert to hacky solutions like below.
I am working on a tweet dataset which contains base tweets, retweeted content, and quoted content. If the tweet is not a retweeted_status or quoted_status, each subfeature for this is set to None or an empty list or some other variant depending on datatype.
What I am trying to do is to create new columns for each of these features, using the content from either the base feature IF it is not a retweet or quoted status, OR to use the retweeted content, OR to use the base feature + the quoted content.
def _shared_content(feature, df):
return df.withColumn(f'tweet_{feature.replace("entities.", "")}',
when((col(f'`retweeted_status.{feature}`').isNotNull()),
df[f'`retweeted_status.{feature}`'])
.when((col(f'`quoted_status.{feature}`').isNotNull()),
concat(
df[f'`{feature}`'],
lit(" "),
df[f'`quoted_status.{feature}`']))
.otherwise(df[f'`{feature}`']))
common_features = [
'text',
'entities.hashtags',
'entities.media',
'entities.urls',
]
for f in common_features:
df = df.transform(lambda df, f=f: _shared_content(f, df))
As you can see this is a bit of a mess so I've written a bit of pseudo-code for legibility. Here I am carrying out the following functions:
- For each feature in common features:
- If retweet_status.[FEATURE] is not None, set new col tweet_[FEATURE] to retweet_status.[FEATURE]
- If quoted_status.[FEATURE] is not None, set new col tweet_[FEATURE] to [FEATURE] + " " + quoted_status.[FEATURE]
- Otherwise, set tweet_[FEATURE] to base [FEATURE].
This solution currently works but feels awfully hacky and frankly illegible. I was wondering if there was a more Spark-like approach to this, where I could eliminate a lot of the redundant code? I tried to apply some sort of mapping from the list to the function but got a little lost.
As a final clarification, I am performing the same transformation but the only thing that changes is the feature I'm working on.
EDIT: I've made this slightly more legible with:
def _shared_content(f, df):
new_col = f'tweet_{f.replace("entities.", "")}'
retweet_cond = ((
col(f'`retweeted_status.{f}`').isNotNull()),
df[f'`retweeted_status.{f}`'])
quoted_cond = ((
col(f'`quoted_status.{f}`').isNotNull()),
concat(df[f'`{f}`'], lit(" "), df[f'`quoted_status.{f}`']))
return df.withColumn(
new_col,
when(*retweet_cond)
.when(*quoted_cond)
.otherwise(df[f'`{f}`'])
)