0

In the project I am currently working on, I am parsing a json file and according to client list, I need to start processes in parallel. However there might be the case where a inside a client list, I may start multiple processes. As in the example below, there are total four processes must be started parallel.

[
  [Client11],
  [Client21, Client22],
  [Client31]
]

For this purpose, I define a class to encapsulate related jobs. The name of the class is ClientHandler and it takes a dictionary object client_dict. If the 'ServerGroup' attribute is not None, it is created multiple mp.Process objects and these objects are pushed to the self.process list.

class ClientHandler:
  def __init__(self, client_dict):
      self.tests = client_dict['Tests']
      self.server = client_dict['Server']
      self.server_group = client_dict['ServerGroup']
      self.process = []
      # other necessary initializations from client_dict to members

      manager = mp.Manager()
      test_list = manager.list(self.tests)
      lock = manager.Lock()
      
      if self.server is not None:
         self.process.append(mp.Process(target=self.process_handle, args=(self.server, test_list, lock)))
      
      for server in self.server_group:
         self.process.append(mp.Process(target=self.process_handle, args=(server, test_list, lock)))
  
  def do_work(self, test, server):
      #do work

  def start(self):
      for process in self.process:
          process.start()

  def process_handle(self, server, test_list, lock):
      while True:
         with lock:
            if not test_list:
               break
            item = test_list.pop(0)

         self.do_work(item, server)

Currently the input of the program is "ServerGroup" : ["Client11", "Client21"]. So two processes are created and pushed to the self.process_list of single ClientHandler object. After the construction of a single ClientHandler object is done, every process in the self.process_list is started under the start() function. But during runtime, in the second iteration of the loop, the program throws "Cannot pickle 'weakref' object" exception in process.start() line.

I tried to solve the problem via several methods. First I moved the process_handle() from ClientHandler to global. Then in the constructor instead of the lines below;

for server in self.server_group:
         self.process.append(mp.Process(target=self.process_handle, args=(server, test_list, lock)))

I implemented the code below.

for server in self.server_group:
         self.process.append(mp.Process(target=process_handle, args=(self, server, test_list, lock)))

By doing this, I updated process_handle as well.

def process_handle(client_handler, server, test_list, lock):
      while True:
         with lock:
            if not test_list:
               break
            item = test_list.pop(0)

         client_handler.do_work(item, server)

Nothing changed. According to suggestion of sakib11 in python multiprocessing module updates a class attribute inside the called process but is not updated globally I moved manager, test_list and lock objects to outside. So every ClientHandler object gets these objects via argument in the constructor but the same problem in the same place occurs still.

Python how to do multiprocessing inside of a class? in this post, it is suggested as not to start processes inside a constructor. And I am not starting but creating process objects in the constructor and after that I am starting processes.

I wonder where is my fault. Why I cannot start a parallel processes with the same object? I appreciate any help.

4
  • Are you on windows or unix or other OS? Commented Aug 12 at 15:04
  • @vassiliev on windows. Commented Aug 12 at 15:18
  • You said 'in the second iteration of the loop'. what is the code of the loop? Commented Aug 12 at 15:20
  • @vassiliev look at the third code block the one that shows the start function in the question. Second iteration of process.start() Commented Aug 13 at 3:53

1 Answer 1

0

You should avoid passing methods of an instance to a subprocess. There may be a contradiction in the logic of your code if you have to do so.

Try this code:

import multiprocessing as mp
from itertools import product

servers = {
    "ServerGroup": ["Client11", "Client21"],
    "Tests": [1, 2]
}

def do_work(test, server):
    print('doing', test, server)


class ClientHandler:
    def __init__(self, client_dict):
        self.tests = client_dict['Tests']
        self.process = []
        self.server_group = client_dict['ServerGroup']

        test_list = self.tests

        for server, test in product(self.server_group, test_list):
            self.process.append((server, test))


    def start(self):
        for s, t in self.process:
            mp.Process(target=do_work, args=(t, s)).start()


if __name__ == '__main__':
    a = ClientHandler(servers)
    a.start()

It will output like:

doing 1 Client11
doing 2 Client11
doing 1 Client21
doing 2 Client21

Process finished with exit code 0
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.