34

I'm trying to process a file(every line is a json document). The size of the file can go up to 100's of mbs to gb's. So I wrote a generator code to fetch each document line by line from file.

def jl_file_iterator(file):
    with codecs.open(file, 'r', 'utf-8') as f:
        for line in f:
            document = json.loads(line)
            yield document

My system has 4 cores, So I would like to process 4 lines of the file in parallel. Currently I have this code which takes 4 lines at a time and calls the code for parallel processing

threads = 4
files, i = [], 1
for jl in jl_file_iterator(input_path):
    files.append(jl)
    if i % (threads) == 0:
        # pool.map(processFile, files)
        parallelProcess(files, o)
        files = []
    i += 1

if files:
    parallelProcess(files, o)
    files = []

This is my code where actual processing happens

def parallelProcess(files, outfile):
    processes = []
    for i in range(len(files)):
        p = Process(target=processFile, args=(files[i],))
        processes.append(p)
        p.start()
    for i in range(len(files)):
        processes[i].join()

def processFile(doc):
    extractors = {}
    ... do some processing on doc
    o.write(json.dumps(doc) + '\n')

As you can see I wait for all the 4 lines to finish processing before I send the next 4 files to process. But what I would like to do is as soon as one process finish processing file I want to start the next line to be assigned to realeased processor. How do I do that?

PS: The problem is since its an generator I cannot load all the files and use something like map to run the processes.

Thanks for your help

2
  • 6
    Easiest is with a queue. But you're probably not going to get all that much from parallelizing this. But if you want to, since your file is structured document per line, just split your file into N and run N copies of your script. Commented Mar 28, 2017 at 20:15
  • Ah, I think in my case splitting file into N chunks will help. I'll try that. Thanks! Commented Mar 28, 2017 at 21:04

5 Answers 5

37

As @pvg said in a comment, a (bounded) queue is the natural way to mediate among a producer and consumers with different speeds, ensuring they all stay as busy as possible but without letting the producer get way ahead.

Here's a self-contained, executable example. The queue is restricted to a maximum size equal to the number of worker processes. If the consumers run much faster than the producer, it could make good sense to let the queue get bigger than that.

In your specific case, it would probably make sense to pass lines to the consumers and let them do the document = json.loads(line) part in parallel.

import multiprocessing as mp

NCORE = 4

def process(q, iolock):
    from time import sleep
    while True:
        stuff = q.get()
        if stuff is None:
            break
        with iolock:
            print("processing", stuff)
        sleep(stuff)

if __name__ == '__main__':
    q = mp.Queue(maxsize=NCORE)
    iolock = mp.Lock()
    pool = mp.Pool(NCORE, initializer=process, initargs=(q, iolock))
    for stuff in range(20):
        q.put(stuff)  # blocks until q below its max size
        with iolock:
            print("queued", stuff)
    for _ in range(NCORE):  # tell workers we're done
        q.put(None)
    pool.close()
    pool.join()
Sign up to request clarification or add additional context in comments.

3 Comments

Great example, thanks! One question - what's the purpose of the iolock? Is it just to keep the prints un-garbled?
Yup! Just to keep the output sane, no matter how screwy timing is on the platform. More generally, of course, the same pattern can be used - when desired - to ensure mutual exclusion among workers and the main process regardless of the nature of the work being done.
Nice answer! I have a further complication, I need the returned result of the process function. To ad a second complication, I'm processing a huge number of documents and need to keep this result within constant memory usage (its been generators up to this final point). Is it possible?
10

So I ended up running this succesfully. By creating chunks of lines from my file and running the lines parallely. Posting it here so it can be useful to somebody in future.

def run_parallel(self, processes=4):
    processes = int(processes)
    pool = mp.Pool(processes)
    try:
        pool = mp.Pool(processes)
        jobs = []
        # run for chunks of files
        for chunkStart,chunkSize in self.chunkify(input_path):
            jobs.append(pool.apply_async(self.process_wrapper,(chunkStart,chunkSize)))
        for job in jobs:
            job.get()
        pool.close()
    except Exception as e:
        print e

def process_wrapper(self, chunkStart, chunkSize):
    with open(self.input_file) as f:
        f.seek(chunkStart)
        lines = f.read(chunkSize).splitlines()
        for line in lines:
            document = json.loads(line)
            self.process_file(document)

# Splitting data into chunks for parallel processing
def chunkify(self, filename, size=1024*1024):
    fileEnd = os.path.getsize(filename)
    with open(filename,'r') as f:
        chunkEnd = f.tell()
        while True:
            chunkStart = chunkEnd
            f.seek(size,1)
            f.readline()
            chunkEnd = f.tell()
            yield chunkStart, chunkEnd - chunkStart
            if chunkEnd > fileEnd:
                break

Comments

5

The answer of Tim Peters is great.
But my specific case was slightly different, and I had to modify his answer to fit my need. Referencing here.
This answer the question of @CpILL in the comments.


In my case, I used a chain of generator (to create a pipeline).
Among this chain of generator, one of them was doing heavy computations, slowing the whole pipeline.

Something like this :

def fast_generator1():
    for line in file:
        yield line

def slow_generator(lines):
    for line in lines:
        yield heavy_processing(line)

def fast_generator2():
    for line in lines:
        yield fast_func(line)

if __name__ == "__main__":
    lines = fast_generator1()
    lines = slow_generator(lines)
    lines = fast_generator2(lines)
    for line in lines:
        print(line)

To make it faster, we have to execute the slow generator with multiple processes.
The modified code look like :

import multiprocessing as mp

NCORE = 4

def fast_generator1():
    for line in file:
        yield line

def slow_generator(lines):
    def gen_to_queue(input_q, lines):
        # This function simply consume our generator and write it to the input queue
        for line in lines:
            input_q.put(line)
        for _ in range(NCORE):    # Once generator is consumed, send end-signal
            input_q.put(None)

    def process(input_q, output_q):
        while True:
            line = input_q.get()
            if line is None:
                output_q.put(None)
                break
            output_q.put(heavy_processing(line))


    input_q = mp.Queue(maxsize=NCORE * 2)
    output_q = mp.Queue(maxsize=NCORE * 2)

    # Here we need 3 groups of worker :
    # * One that will consume the input generator and put it into a queue. It will be `gen_pool`. It's ok to have only 1 process doing this, since this is a very light task
    # * One that do the main processing. It will be `pool`.
    # * One that read the results and yield it back, to keep it as a generator. The main thread will do it.
    gen_pool = mp.Pool(1, initializer=gen_to_queue, initargs=(input_q, lines))
    pool = mp.Pool(NCORE, initializer=process, initargs=(input_q, output_q))

    finished_workers = 0
    while True:
        line = output_q.get()
        if line is None:
            finished_workers += 1
            if finished_workers == NCORE:
                break
        else:
            yield line

def fast_generator2():
    for line in lines:
        yield fast_func(line)

if __name__ == "__main__":
    lines = fast_generator1()
    lines = slow_generator(lines)
    lines = fast_generator2(lines)
    for line in lines:
        print(line)

With this implementation, we have a multiprocess generator : it is used exactly like other generators (like in the first example of this answer), but all the heavy computation are done using multiprocessing, accelerating it !

5 Comments

Your example is working in Linux, but in Windows have some issues because pickling the gen_to_queue and process functions because there aren't in the top model of class. In the other way, on Windows OS is not possible to pickle a generator, I think that this is not possible using Windows.
@user1814720 just move those two methods to global namespace and it works
I've a very similar usecase (a functional pipeline for processing content) and am now at the stage of trying to encourage it to execute beyond a single process. How has this progressed for you? Is it still a viable option? My problem is the pipeline is in the form of a generator, so I get TypeError: can't pickle generator objects when going down this route.
@ThomasKimber did you try the solution mentioned by sluki ?
I didn't - but I'm not sure how I'd go about doing that, is there an example of doing that in practice? My generators are defined within a class, potentially, I could wrap them in something defined at module level, but it would only be superficially moving them to a global namespace, as I can't practically redefine the underlying code-chains from where they're currently defined.
3

You can also use the higher-level concurrent.futures module like so:

import concurrent.futures as cf
import time
from concurrent.futures import ProcessPoolExecutor

def reader(infile):
    with open(infile, "r") as f:
        for line in f:
            yield line.strip()


def process(line):
    # Simulate doing some heavy processing on `line`
    time.sleep(3)
    return line.upper()


def run_parallel(num_workers, lines):
    with ProcessPoolExecutor(max_workers=num_workers) as p:
        futures = {p.submit(process, line) for line in lines}
        for future in cf.as_completed(futures):
            yield future.result()


def write(outfile, lines):
    with open(outfile, "w") as f:
        for line in lines:
            f.write(line + "\n")


NUM_WORKERS = 4

if __name__ == "__main__":
    start = time.time()
    lines = reader("infile.txt")
    lines = run_parallel(NUM_WORKERS, lines)
    write("outfile.txt", lines)
    print(time.time() - start)

Input file:

a
b
c
d
e
f
g
h
i
j

Output file:

A
F
B
G
E
D
C
H
I
J

Stdout:

9.016341924667358

1 Comment

Does not appear to work under Windows, error is: concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.. It mentions return _ForkingPickler.loads(res), so it appears as if this might be a Linux-specific technique.
1

Late to the party. Had a similar problem. Producers and Consumers basically. Like few have pointed out a Queue is best suited to this problem.

You can create an Executor Pool (Threaded or Process) and use it in conjuction with a semaphore to ensure n number of tasks are picked up at the same time. If anyother task is submitted by your generator, it is blocked until the semaphore counter decreases.

Found a ready made solution. Check out this Gist

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.