1
\$\begingroup\$

I was successfully able to write a small script using PySpark to retrieve and organize data from a large .xml file. Being new to using PySpark, I am wondering if there is any better way to write the following code:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id

### xml file from https://wit3.fbk.eu/
sc = SparkSession.builder.getOrCreate()
df = sc.read.format("com.databricks.spark.xml").option("rowTag","transcription").load('ted_en-20160408.xml')
df_values = df.select("seekvideo._VALUE")
df_id = df.select("seekvideo._id")
df_values = df_values.withColumn("id", monotonically_increasing_id())
df_id = df_id.withColumn("id", monotonically_increasing_id())
result = df_values.join(df_id, "id", "outer").drop("id")
answer = result.toPandas()

transcription = dict()
for talk in range(len(ted)):
    if not answer._id.iloc[talk]:
        continue
    transcription[talk] = zip(answer._id.iloc[talk], answer._VALUE.iloc[talk])

where df is of the form:

DataFrame[_corrupt_record: string, seekvideo: array<struct<_VALUE:string,_id:bigint>>]

and transcription is a dictionary of the transcriptions of each TED Talk keyed by position. For example, transcription[0] is of the form:

[(800, u'When I moved to Harare in 1985,'),
 (4120,
  u"social justice was at the core of Zimbabwe's national health policy."),
 (8920, u'The new government emerged from a long war of independence'),
 (12640, u'and immediately proclaimed a socialist agenda:'),
 (15480, u'health care services, primary education'),
...
]
\$\endgroup\$

1 Answer 1

1
\$\begingroup\$

Formatting

Pyspark code can create some pretty long lines, parentheses can allow you to break them up for easier readability. Let's do that first:

### xml file from https://wit3.fbk.eu/
sc = (
    SparkSession
    .builder
    .getOrCreate()
)

df = (
    sc 
    .read
    .format("com.databricks.spark.xml")
    .option("rowTag","transcription")
    .load('ted_en-20160408.xml')
)

df_values = df.select("seekvideo._VALUE")
df_id = df.select("seekvideo._id")

df_values = df_values.withColumn("id", monotonically_increasing_id())
df_id = df_id.withColumn("id", monotonically_increasing_id())

result = df_values.join(df_id, "id", "outer").drop("id")
answer = result.toPandas()

withColumn

Sometimes you can swap out withColumn to a select via alias:

df_values = df.select(
    "seekvideo._VALUE",
    monotonically_increasing_id().alias('id')
)

df_id = df.select(
    "seekvideo._id",
    monotonically_increasing_id().alias('id')
)

result = (
    df_values
    .join(df_id, "id", "outer")
    .drop("id")
)

Do you need a join?

You already have these records tied together. This really could be just a select and collect:

result = (
    df
    .select(
        'seekvideo.*', 
        monotonically_increasing_id().alias('id')
    )
)

values = {
    row.id: (row._id, row._VALUE)
    for row in result.collect()
}
\$\endgroup\$

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.