0
A = sc.parallelize(xrange(1, 100))
t = 50
B = A.filter(lambda x: x < t)
print B.collect()
t = 10
C = B.filter(lambda x: x > t)
print C.collect()

I want to get all numbers below 50 from A and put them into B, and then get all numbers above 10 from B and put them into C. But, the result of C.collect() is empty array.

However, if I change

m = 10
C = B.filter(lambda x: x > m)

It will work fine. I can't understand why, on this operation it takes previous t value

1 Answer 1

4

I can't understand why, on this operation it takes previous t value

Its actually not referencing to previous t=50 but is referencing to t=10 itself

Its more evident if you write a custom function to print what's happening as

A = sc.parallelize(xrange(1, 100))
t = 50
B = A.filter(lambda x: x < t)
B.collect()
t = 10
def filters(x):
    print x, t
    return x > t
C = B.filter(lambda x: filters(x))
print C.collect()

filters function will print as following

1 10
2 10
3 10
4 10
5 10
6 10
7 10
8 10
9 10

It shows that t is 10 and B = A.filter(lambda x: x < t) is being called again with t=10 i.e. B now has 1 to 9 and when .filter(lambda x: x > t) is called, it returns an empty list as there is no number greater than 10

As Spark's official documentation also says

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.