There are a lot of tutorials on multiprocessing, and at least a dozen of techniques about managing the worker processes. I found myself in a situation where I needed a pool-like functionality, so that no worker process will be killed until all the tasks are finished. Kind of a “no man left behind” situation. I got a separate process which creates the tasks, and a group of workers destined to solve them all.

Workers must not terminate themselves until all of them remain without jobs. Task results are optionally collected by the pool controller and returned at the end of execution. Technically I’m reinventing the wheel here, but to hell with it, is’s only 100+ lines of code..

# -*- coding: utf-8 -*-
"""
pmpi.py

Poor Man's Pool Implementation
"""
from Queue import Empty as QueueEmpty
from multiprocessing import Process, Queue, Manager, cpu_count


class BaseTaskScheduler(Process):
    """
    Destined to provide the tasks for the worker processes. The tasks are put
    into a queue, which is shared with the worker processes.
    """
    def __init__(self, task_queue, workers_stopped):
        Process.__init__(self)
        self.task_queue = task_queue
        self.workers_stopped = workers_stopped
        # mark the scheduler initially that it's not finished
        self.workers_stopped[self.name] = False

    def generate_tasks(self):
        raise NotImplementedError()

    def run(self):
        for task in self.generate_tasks():
            # block until there is space to put the task into the queue
            self.task_queue.put(task, block=True)
        # scheduler is finished
        self.workers_stopped[self.name] = True


class BaseWorker(Process):
    """
    A worker process, which does the heavy lifting. Tasks are retrieved through
    the shared task queue. A separate result queue is used to store the results
    of the tasks.
    """
    def __init__(self, task_queue, result_queue, workers_stopped):
        Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.workers_stopped = workers_stopped
        # mark the worker initially that it's not finished
        self.workers_stopped[self.name] = False

    def solve(self, task):
        raise NotImplementedError()

    def run(self):
        # a worker process won't terminate itself until all the workers report
        # that they have no tasks left, and the scheduler reports that it will
        # not send new tasks.
        while not all(self.workers_stopped.values()):
            try:
                # as tasks are generated in a separate process, there is a
                # possibility that the workers will complete certain tasks
                # a bit faster than the scheduler can provide new ones.
                # for that case, block until a new task arrives for 3 secs.
                task = self.task_queue.get(block=True, timeout=3)
                self.workers_stopped[self.name] = False
            except QueueEmpty:
                # for more than 3 secs no new task has arrived, so this worker
                # will report to the others that it has no tasks left, but it
                # won't terminate itself
                self.workers_stopped[self.name] = True
            else:
                # task retrieved successfully, now solve it
                result = self.solve(task)
                # put the result of the task into the result queue
                self.result_queue.put(result)


class BasePoolController(object):
    """
    In charge of creating the scheduler and worker processes, processing and
    optionally collecting the returned results.
    """
    def __init__(self, cls_scheduler, cls_worker, worker_count=cpu_count(),
                 task_queue_limit=0, collect_results=False):
        self.cls_scheduler = cls_scheduler
        self.cls_worker = cls_worker
        self.worker_count = worker_count
        self.task_queue_limit = task_queue_limit
        self.collect_results = collect_results

    def process_result(self, result):
        raise NotImplementedError()

    def start(self):
        # limit the size of the task queue for memory efficiency
        task_queue = Queue(maxsize=self.task_queue_limit)
        result_queue = Queue()
        manager = Manager()
        workers_stopped = manager.dict()

        # create the worker processes
        workers = [self.cls_worker(task_queue, result_queue, workers_stopped)
                   for _ in range(self.worker_count)]
        # the scheduler is started before the workers, so ideally there will be
        # tasks available by the time the workers are started.
        scheduler = self.cls_scheduler(task_queue, workers_stopped)
        scheduler.start()
        # start the worker processes
        [w.start() for w in workers]

        self.results = []
        # loop infinitely until the result queue is not empty and until the
        # scheduler and workers are alive
        while True:
            try:
                # when the loop starts the task scheduler and worker processes
                # need a little time to provide some results so block until a
                # result arrives
                result = result_queue.get(block=True, timeout=3)
            except QueueEmpty:
                # no result in the queue, check if the scheduler and the
                # workers terminated themselves. if that's the case, the
                # mission is completed.
                workers_dead = all(not w.is_alive() for w in workers)
                if workers_dead and not scheduler.is_alive():
                    break
            else:
                processed_result = self.process_result(result)
                if self.collect_results:
                    # if results are not needed to be collected, don't waste
                    # the precious memory
                    self.results.append(processed_result)

        return self.results

The main process first starts a separate scheduler process which creates the tasks, and then the worker processes. Tasks are distributed through a shared queue among all the involved parties. To solve common synchronization problems, I had to set up the following rules:

  1. Workers can’t shut themselves down until the following conditions are met:
    • The task queue is empty
    • All workers finished their jobs
    • The scheduler reported that it won’t send new tasks to the queue
  2. The main process which starts both the scheduler and the worker group must not exit until:
    • The result queue is empty
    • All workers reported back as inactive
    • The scheduler reported back as inactive

These checks were necessary as we can easily run into a race condition, where workers boot-up faster than the scheduler can send tasks into the queue. Also, if tasks arrive slowly, we need to make sure that the workers won’t go home if they don’t have anything to do at the moment.

A shared dict is used by the scheduler and the workers to report when they are inactive.

There is a also a high possibility that tasks will arrive much faster than the workers can process them, and in case these tasks involve large data, the app can grow quite large regarding memory usage. To address that issue, the queue size is limited, and the scheduler process will wait with the addition of new tasks until the queue can accept them.

As the previous code was an abstract implementation, to actually use the above classes, they must be subclassed and the required methods properly implemented. Here’s an example:

# -*- coding: utf-8 -*-
import random
import time

from pmpi import BaseTaskScheduler, BaseWorker, BasePoolController


class TestTaskScheduler(BaseTaskScheduler):

    def generate_tasks(self):
        # randomly choose the number of tasks we intend to create
        #task_count = random.randint(50, 100)
        task_count = random.randint(50, 100)
        for _ in range(task_count):
            # generating some dummy tasks
            task = {'op': random.choice(['add', 'sub', 'mul', 'div']),
                    'x': random.random() * 1000,
                    'y': random.random() * 1000}
            yield task


class TestWorker(BaseWorker):

    def solve(self, task):
        # a random sleep, just to simulate that a task can take some time
        time_took = random.randint(0, 3)
        time.sleep(time_took)

        if task['op'] == 'add':
            result = task['x'] + task['y']
        elif task['op'] == 'sub':
            result = task['x'] - task['y']
        elif task['op'] == 'mul':
            result = task['x'] * task['y']
        else:
            try:
                result = task['x'] / task['y']
            except ZeroDivisionError:
                result = 0

        return result, time_took


class TestPoolController(BasePoolController):

    def process_result(self, result):
        result, time_took = result
        print 'result: {0} , took time: ~{1}secs'.format(result, time_took)
        return result


if __name__ == '__main__':
    mission_control = TestPoolController(cls_scheduler=TestTaskScheduler,
                                         cls_worker=TestWorker,
                                         worker_count=4,
                                         task_queue_limit=256,
                                         collect_results=True)
    print mission_control.start()

Drop a line if you could use this…

Discussion

comments powered by Disqus