4

When running following piece of PySpark code:

nlp = NLPFunctions()

def parse_ingredients(ingredient_lines):
    parsed_ingredients = nlp.getingredients_bulk(ingredient_lines)[0]
    return list(chain.from_iterable(parsed_ingredients))


udf_parse_ingredients = UserDefinedFunction(parse_ingredients, ArrayType(StringType()))

I get the following error: _pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.lock objects

I imagine this is because PySpark can not serialize this custom class. But how can I avoid the overhead of instantiating this expensive object on every run of the parse_ingredients_line function?

3 Answers 3

4

Let's say you want to use Identity class defined like this (identity.py):

class Identity(object):                   
    def __getstate__(self):
        raise NotImplementedError("Not serializable")

    def identity(self, x):
        return x

you can for example use a callable object (f.py) and store an Identity instance as a class member:

from identity import Identity

class F(object):                          
    identity = None

    def __call__(self, x):
        if not F.identity:
            F.identity = Identity()
        return F.identity.identity(x)

and use these as shown below:

from pyspark.sql.functions import udf
import f

sc.addPyFile("identity.py")
sc.addPyFile("f.py")

f_ = udf(f.F())

spark.range(3).select(f_("id")).show()
+-----+
|F(id)|
+-----+
|    0|
|    1|
|    2|
+-----+

or standalone function and closure:

from pyspark.sql.functions import udf
import identity

sc.addPyFile("identity.py")

def f(): 
    dict_ = {}                 
    @udf()              
    def f_(x):                 
        if "identity" not in dict_:
            dict_["identity"] = identity.Identity()
        return dict_["identity"].identity(x)
    return f_


spark.range(3).select(f()("id")).show()
+------+
|f_(id)|
+------+
|     0|
|     1|
|     2|
+------+
Sign up to request clarification or add additional context in comments.

5 Comments

I don't quite understand the example. Where do you show that you are able to keep state between executions of the udf?
@Vitaliy This is standard Python code - in both cases we keep object of interest in the outer scope so it is lifetime is not limited to the scope itself. You could use nonlocal in place of mutable dict if you prefer. Obviously it cannot outlive parent interpreter, over which you have no control. Otherwise you can easily add logging and use debugger to see that the initialization is applied only on the first call.
This works really well!! Super fast - this is why we use spark :)
user6910411 - Are you sure your code does not create 3 instances of Identity class? I checked your "standalone function and closure" sample code and that's what happened to me.
@PawełBatko This code will create as many instances of Identity as many executor interpreters are spawned by Spark (remember there is no shared memory here, and each executor "thread' is actually a process in PySpark). So the actual number will depend on the number of executors which are reused - with upper bound being a total number tasks (including the ones which were restarted). There are more sophisticated strategies, but these are rather out of scope for this specific answer.
2

I solved it based on (https://github.com/scikit-learn/scikit-learn/issues/6975) by making all dependencies of the NLPFunctions class serializable.

Comments

-1

Edit: this answer is wrong. The object is still serialized and then de-serialized when it is broadcast, and so serialization is not avoided. (Tips for properly using large broadcast variables?)


Try using a broadcast variable.

sc = SparkContext()
nlp_broadcast = sc.broadcast(nlp) # Stores nlp in de-serialized format.

def parse_ingredients(ingredient_lines):
    parsed_ingredients = nlp_broadcast.value.getingredients_bulk(ingredient_lines)[0]
    return list(chain.from_iterable(parsed_ingredients))

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.