2

I have a dataframe in pyspark. Here is what it looks like,

+---------+---------+
|timestamp| price   |
+---------+---------+
|670098928|  50     |
|670098930|  53     |
|670098934|  55     |
+---------+---------+

I want to fill in the gaps in timestamp with the previous state, so that I can get a perfect set to calculate time weighted averages. Here is what the output should be like -

+---------+---------+
|timestamp| price   |
+---------+---------+
|670098928|  50     |
|670098929|  50     | 
|670098930|  53     |
|670098931|  53     |
|670098932|  53     |
|670098933|  53     |
|670098934|  55     |
+---------+---------+

Eventually, I want to persist this new dataframe on disk and visualize my analysis.

How do I do this in pyspark? (For simplicity sake, I have just kept 2 columns. My actual dataframe has 89 columns with ~670 million records before filling the gaps.)

3
  • You could do interpolation with scipy. I'm not too sure PySpark can do what you want Commented Aug 18, 2016 at 0:30
  • @cricket_007 spark cannot do that. Veenit, I'm not sure why do you even want to do that ? Commented Aug 18, 2016 at 6:12
  • @eliasah I'm trying to create a dataframe with a record for each timestamp (lowest level granularity) so that if I want to do time weighted averages, it's much convenient. Commented Aug 18, 2016 at 14:07

1 Answer 1

1

You can generate timestamp ranges, flatten them and select rows

import pyspark.sql.functions as func

from pyspark.sql.types import IntegerType, ArrayType


a=sc.parallelize([[670098928, 50],[670098930, 53], [670098934, 55]])\
.toDF(['timestamp','price'])

f=func.udf(lambda x:range(x,x+5),ArrayType(IntegerType()))

a.withColumn('timestamp',f(a.timestamp))\
.withColumn('timestamp',func.explode(func.col('timestamp')))\
.groupBy('timestamp')\
.agg(func.max(func.col('price')))\
.show()

+---------+----------+
|timestamp|max(price)|
+---------+----------+
|670098928|        50|
|670098929|        50|
|670098930|        53|
|670098931|        53|
|670098932|        53|
|670098933|        53|
|670098934|        55|
|670098935|        55|
|670098936|        55|
|670098937|        55|
|670098938|        55|
+---------+----------+
Sign up to request clarification or add additional context in comments.

5 Comments

I get AttributeError: 'JavaMember' object has no attribute 'parseDataType' when I execute f=func.udf(lambda x:range(x,x+5),ArrayType(IntegerType()))
No. It doesn't. Which version of Spark are you on? I'm on 2.0.0
I am on 1.6.0 but if you can't define a simple udf there is something wrong in your environment.
you can remove the udf, and replace it by a map on the RDD, replace a.withColumn('timestamp',f(a.timestamp))\ by a.map(lambda row:(range(row[0],row[0]+5),row[1])).toDF(['timestamp','price'])\
The above code works. But, doesn't inherently solve my problem. In the UDF, you hardcode x+5. So, what if the gap between 2 numbers is more than 5. One answer to this would be to replace 5 with Integer.MAX_VALUE but then there needs to be constraint for the last number. Eventually, the timestamp is "Time", so I would want to explode it on "seconds" or "milliseconds"

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.