Skip to content

A @coroutine decorator for Twisted

August 17, 2012

UPDATE: the approach described in this post is now packaged as http://pypi.python.org/pypi/txcoroutine. In addition, it provides memory-efficient tail recursion.

Twisted has a nice and useful @inlineCallbacks which allows one to write asynchronous code while avoiding ending up with a callback jungle. If you don’t already know it, I suggest you check it out. However, while @inlineCallbacks solves many problems and effectively provides a simple coroutine implementation on top of Python generators (see the PDF at http://www.dabeaz.com/generators/ and then at http://www.dabeaz.com/coroutines/), it doesn’t really provide the full coroutine feature set. I’ll explain.

Coroutines are meant to resemble lightweight (green) threads with fully deterministic (cooperative) multitasking. I won’t delve further into this here though. However, the (current) implementation @inlineCallbacks only provides very primitive control over how your coroutines behave. It basically assumes that you never want to control the execution of your coroutines from the outside, thus it’s impossible to (generically) stop, suspend or resume a coroutine once you’ve created it. A generator-producing function wrapped with @inlineCallbacks simply returns a Deferred instance that is fired when the coroutine somehow exits.

On closer inspection, though, you will notice that a Deferred instance has the methods cancel, pause and unpause—so, one might think these correspond to stopping, suspending and resuming the coroutine, respectively, however, calling those methods only affects how the Deferred itself behaves and has no effect whatsoever on what is happening inside of the coroutine. So for example calling .cancel() on the Deferred, does not stop the coroutine from executing but simply ignores the outcome of the coroutine. .pause() and .unpause() temporarily ignore and unignore the result of the coroutine, but still, nothing is affected inside of it.

Then, I realised how easy it would be to change that by writing a modified version of inlineCallbacks (and calling it coroutine, for both compatibility and semantic correctness, because it’s not only about just callbacks anymore). Deferred‘s constructor takes an optional canceller argument which is invoked when the Deferred is .cancel()-ed, so changing the following line in def inlineCallbacks:

return _inlineCallbacks(None, gen, Deferred())

to

return _inlineCallbacks(None, gen, Deferred(canceller=lambda _: gen.close()))

adds stopping support for coroutines. Now, one can produce code such as:

@coroutine
def some_process():
    try:
        while True:
            msg = yield get_message()
            process_message(msg)
    finally:  # could use `except GeneratorExit` but `finally` is more illustrative
        print("coroutine stopped, cleaning up")

proc = some_process()
proc.cancel()
# ==> coroutine stopped, cleaning up

Moving on. Adding suspending and resuming support, and support for automatically cancelling the inner Deferred, was trickier. There is no way to hook into pausing and unpausing of a Deferred. So I was creative and subclassed Deferred.

class Coroutine(Deferred):
    # this is something like chaining, but firing of the other deferred does not cause this deferred to fire.
    # also, we manually unchain and rechain as the coroutine yields new Deferreds.
    cancelling = False
    depends_on = None

    def pause(self):
        self.depends_on.pause()
        return Deferred.pause(self)

    def unpause(self):
        self.depends_on.unpause()
        return Deferred.unpause(self)

    def cancel(self):
        # to signal _inlineCallbacks to not fire self.errback with CancelledError;
        # otherwise we'd have to call `Deferred.cancel(self)` immediately, but it
        # would be semantically unnice if, by the time the coroutine is told to do
        # its clean-up routine, the inner Deferred hadn't yet actually been cancelled.
        self.cancelling = True

        # this errback is added as the last one, so anybody else who is already listening for CancelledError
        # will still get it.
        swallow_cancelled_error = lambda f: f.trap(CancelledError)

        self.depends_on.addErrback(swallow_cancelled_error)
        self.depends_on.cancel()

        self.addErrback(swallow_cancelled_error)
        Deferred.cancel(self)

Then it was only a matter of changing:

return _inlineCallbacks(None, gen, Deferred(canceller=lambda _: gen.close()))

to:

return _inlineCallbacks(None, gen, Coroutine(canceller=lambda _: gen.close()))

And keeping the Deferred’s depends_on field up to date in _inlineCallbacks where the generator magic is happening:

 if isinstance(result, Deferred):
+    deferred.depends_on = result

There was one more trick to cancellation support though. We don’t want a CancelledError to be sent to our main Deferred when we’re cancelling the inner one, also because then we won’t even be able to cancel the main one because its canceller would already have been called. So right at the start of the while loop inside _inlineCallbacks, we make the following amendment:

 if isFailure:
+    if deferred.cancelling:  # must be that CancelledError that we want to ignore
+        return

That’s it. Now you have stopping, suspending and resuming support for coroutines in Twisted. Here’s a full code example:

from __future__ import print_function

from twisted.internet import reactor
from twisted.internet.defer import Deferred

def get_message():
    d = Deferred(canceller=lambda _: (
        print("cancelled getting a message"),
        heavylifting.cancel(),
    ))
    print("getting a message...")
    heavylifting = reactor.callLater(1.0, d.callback, 'dummy-message')
    return d

@coroutine
def some_process():
    try:
        while True:
            msg = yield get_message()
            print("processing message: %s" % (msg,))
    finally:  # could use `except GeneratorExit` but `finally` is more illustrative
        print("coroutine stopped, cleaning up")

def main():
    proc = some_process()
    reactor.callLater(3, proc.cancel)

reactor.callWhenRunning(main)
reactor.run()
# ==> getting a message...
# ==> processing message: dummy-message
# ==> getting a message...
# ==> processing message: dummy-message
# ...
# ==> cancelled getting a message
# ==> coroutine stopped, cleaning up
About these ads

From → Uncategorized

Leave a Comment

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: