SaltyCrane Blog — Notes on JavaScript and web development

Notes on parallel processing with Python and Twisted

Twisted is a networking engine written in Python, that among many other things, can be used to do parallel processing. It is very big, though, so I had a hard time finding what I needed. I browsed through the Twisted Documentation and the Twisted O'Reilly book. There is also a Recipe in the Python Cookbook. However, I found Bruce Eckel's article, Concurrency with Python, Twisted, and Flex to be the most helpful. (See also Bruce Eckel's initial article on Twisted: Grokking Twisted)

Here are my notes on running Bruce Eckel's example. I removed the Flex part because I didn't need or know anything about it. This example runs a Controller which starts a number of separate parallel processes running Solvers (a.ka. workers). It also allows for communication between the Controller and Solvers. Though this example only runs on one machine, the article said extending this to multiple machines is not difficult. For a good explanation of how this works, please see the original article.

Here is solver.py which is copied from the original article. The actual "work" is done in the step method. I only added some debugging print statements for myself.

"""
solver.py
Original version by Bruce Eckel
Solves one portion of a problem, in a separate process on a separate CPU
"""
import sys, random, math
from twisted.spread import pb
from twisted.internet import reactor

class Solver(pb.Root):

    def __init__(self, id):
        print "solver.py %s: solver init" % id
        self.id = id

    def __str__(self): # String representation
        return "Solver %s" % self.id

    def remote_initialize(self, initArg):
        return "%s initialized" % self

    def step(self, arg):
        print "solver.py %s: solver step" % self.id
        "Simulate work and return result"
        result = 0
        for i in range(random.randint(1000000, 3000000)):
            angle = math.radians(random.randint(0, 45))
            result += math.tanh(angle)/math.cosh(angle)
        return "%s, %s, result: %.2f" % (self, str(arg), result)

    # Alias methods, for demonstration version:
    remote_step1 = step
    remote_step2 = step
    remote_step3 = step

    def remote_status(self):
        print "solver.py %s: remote_status" % self.id
        return "%s operational" % self

    def remote_terminate(self):
        print "solver.py %s: remote_terminate" % self.id
        reactor.callLater(0.5, reactor.stop)
        return "%s terminating..." % self

if __name__ == "__main__":
    port = int(sys.argv[1])
    reactor.listenTCP(port, pb.PBServerFactory(Solver(sys.argv[1])))
    reactor.run()

Here is controller.py. This is also copied from the original article but I removed the Flex interface and created calls to start and terminate in the Controller class. I'm not sure if this makes sense, but at least this allowed me to run the example. I also moved the terminate method from the FlexInterface to the Controller.

"""
Controller.py
Original version by Bruce Eckel
Starts and manages solvers in separate processes for parallel processing.
"""
import sys
from subprocess import Popen
from twisted.spread import pb
from twisted.internet import reactor, defer

START_PORT = 5566
MAX_PROCESSES = 2

class Controller(object):

    def broadcastCommand(self, remoteMethodName, arguments, nextStep, failureMessage):
        print "controller.py: broadcasting..."
        deferreds = [solver.callRemote(remoteMethodName, arguments) 
                     for solver in self.solvers.values()]
        print "controller.py: broadcasted"
        reactor.callLater(3, self.checkStatus)

        defer.DeferredList(deferreds, consumeErrors=True).addCallbacks(
            nextStep, self.failed, errbackArgs=(failureMessage))
    
    def checkStatus(self):
        print "controller.py: checkStatus"
        for solver in self.solvers.values():
            solver.callRemote("status").addCallbacks(
                lambda r: sys.stdout.write(r + "\n"), self.failed, 
                errbackArgs=("Status Check Failed"))
                                                     
    def failed(self, results, failureMessage="Call Failed"):
        print "controller.py: failed"
        for (success, returnValue), (address, port) in zip(results, self.solvers):
            if not success:
                raise Exception("address: %s port: %d %s" % (address, port, failureMessage))

    def __init__(self):
        print "controller.py: init"
        self.solvers = dict.fromkeys(
            [("localhost", i) for i in range(START_PORT, START_PORT+MAX_PROCESSES)])
        self.pids = [Popen(["python", "solver.py", str(port)]).pid
                     for ip, port in self.solvers]
        print "PIDS: ", self.pids
        self.connected = False
        reactor.callLater(1, self.connect)

    def connect(self):
        print "controller.py: connect"
        connections = []
        for address, port in self.solvers:
            factory = pb.PBClientFactory()
            reactor.connectTCP(address, port, factory)
            connections.append(factory.getRootObject())
        defer.DeferredList(connections, consumeErrors=True).addCallbacks(
            self.storeConnections, self.failed, errbackArgs=("Failed to Connect"))

        print "controller.py: starting parallel jobs"
        self.start()

    def storeConnections(self, results):
        print "controller.py: storeconnections"
        for (success, solver), (address, port) in zip(results, self.solvers):
            self.solvers[address, port] = solver
        print "controller.py: Connected; self.solvers:", self.solvers
        self.connected = True

    def start(self):
        "controller.py: Begin the solving process"
        if not self.connected:
            return reactor.callLater(0.5, self.start)
        self.broadcastCommand("step1", ("step 1"), self.step2, "Failed Step 1")

    def step2(self, results):
        print "controller.py: step 1 results:", results
        self.broadcastCommand("step2", ("step 2"), self.step3, "Failed Step 2")

    def step3(self, results):
        print "controller.py: step 2 results:", results
        self.broadcastCommand("step3", ("step 3"), self.collectResults, "Failed Step 3")

    def collectResults(self, results):
        print "controller.py: step 3 results:", results
        self.terminate()
        
    def terminate(self):
        print "controller.py: terminate"
        for solver in self.solvers.values():
            solver.callRemote("terminate").addErrback(self.failed, "Termination Failed")
        reactor.callLater(1, reactor.stop)
        return "Terminating remote solvers"

if __name__ == "__main__":
    controller = Controller()
    reactor.run()

To run it, put the two files in the same directory and run python controller.py. You should see 2 CPUs (if you have 2) go up to 100% usage. And here is the screen output:

controller.py: init
PIDS:  [12173, 12174]
solver.py 5567: solver init
solver.py 5566: solver init
controller.py: connect
controller.py: starting parallel jobs
controller.py: storeconnections
controller.py: Connected; self.solvers: {('localhost', 5567): , ('localhost', 5566): }
controller.py: broadcasting...
controller.py: broadcasted
solver.py 5566: solver step
solver.py 5567: solver step
controller.py: checkStatus
solver.py 5566: remote_status
Solver 5566 operational
solver.py 5567: remote_status
controller.py: step 1 results: [(True, 'Solver 5567, step 1, result: 683825.75'), (True, 'Solver 5566, step 1, result: 543177.17')]
controller.py: broadcasting...
controller.py: broadcasted
Solver 5567 operational
solver.py 5566: solver step
solver.py 5567: solver step
controller.py: checkStatus
solver.py 5566: remote_status
Solver 5566 operational
solver.py 5567: remote_status
controller.py: step 2 results: [(True, 'Solver 5567, step 2, result: 636793.90'), (True, 'Solver 5566, step 2, result: 335358.16')]
controller.py: broadcasting...
controller.py: broadcasted
Solver 5567 operational
solver.py 5566: solver step
solver.py 5567: solver step
controller.py: checkStatus
solver.py 5566: remote_status
Solver 5566 operational
solver.py 5567: remote_status
controller.py: step 3 results: [(True, 'Solver 5567, step 3, result: 847386.43'), (True, 'Solver 5566, step 3, result: 512120.15')]
controller.py: terminate
Solver 5567 operational
solver.py 5566: remote_terminate
solver.py 5567: remote_terminate

Comments