9

I am currently learning Python and want to apply it on/with Spark. I have this very simple (and useless) script:

import sys
from pyspark import SparkContext

class MyClass:
    def __init__(self, value):
        self.v = str(value)

    def addValue(self, value):
        self.v += str(value)

    def getValue(self):
        return self.v

if __name__ == "__main__":
    if len(sys.argv) != 1:
        print("Usage CC")
        exit(-1)

    data = [1, 2, 3, 4, 5, 2, 5, 3, 2, 3, 7, 3, 4, 1, 4]
    sc = SparkContext(appName="WordCount")
    d = sc.parallelize(data)
    inClass = d.map(lambda input: (input, MyClass(input)))
    reduzed = inClass.reduceByKey(lambda a, b: a.addValue(b.getValue))
    print(reduzed.collect())

When executing it with

spark-submit CustomClass.py

..the following error is thorwn (output shortened):

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 133, in dump_stream
    for obj in iterator:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1728, in add_shuffle_key
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 415, in dumps
    return pickle.dumps(obj, protocol)
PicklingError: Can't pickle __main__.MyClass: attribute lookup __main__.MyClass failed
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)...

To me the statement

PicklingError: Can't pickle __main__.MyClass: attribute lookup __main__.MyClass failed

seems to be important. It means that the class instances can't be serialized, right? Do you know how to solve this issue?

Thanks and regards

1 Answer 1

18

There are a number of issues:

  • If you put MyClass in a separate file it can be pickled. This is a common problem for many Python uses of pickle. This is simple to solve by moving MyClass and the using from myclass import MyClass. Normally dill can fix these issues (as in import dill as pickle), but it didn't work for me here.
  • Once this is solved, your reduce doesn't work since calling addValue return None (no return), not an instance of MyClass. You need to change addValue to return self.
  • Finally, the lambda need to call getValue, so should have a.addValue(b.getValue())

Together: myclass.py

class MyClass:
    def __init__(self, value):
        self.v = str(value)

    def addValue(self, value):
        self.v += str(value)
        return self

    def getValue(self):
        return self.v

main.py

import sys
from pyspark import SparkContext
from myclass import MyClass

if __name__ == "__main__":
    if len(sys.argv) != 1:
        print("Usage CC")
        exit(-1)

    data = [1, 2, 3, 4, 5, 2, 5, 3, 2, 3, 7, 3, 4, 1, 4]
    sc = SparkContext(appName="WordCount")
    d = sc.parallelize(data)
    inClass = d.map(lambda input: (input, MyClass(input)))
    reduzed = inClass.reduceByKey(lambda a, b: a.addValue(b.getValue()))
    print(reduzed.collect())
Sign up to request clarification or add additional context in comments.

4 Comments

Thanks for the answer - it solved my problem! Maybe one additional question: is it advisable (e.g. performance wise) to use classes/objects or is it better to use tuples and primitives? Thanks!
Primitive types should give better performance since Java objects need to be converted to python objects to execute python functions (e.g., lambdas) which is easy for primitive types but requires additional serialization for classes.
Hi @KevinS just for curiosity: I am in a similar situation with this issue: stackoverflow.com/questions/43042241/… I'll try to use dill. In alternative, shall I change the serialization mechanism?
Hi @KevinS just for curiosity: I am in a similar situation with this issue: stackoverflow.com/questions/43042241/… I'll try to use dill. In alternative, shall I change the serialization mechanism?

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.