Thursday, March 26, 2015

How Celery Chord Synchronization Works

Celery is a powerful tool for managing asynchronous tasks in Python. The basic model is your synchronous Python code pushes a task (in the form of a serialized message) into a message queue (the Celery "broker", which can be a variety of technologies - Redis, RabbitMQ, Memcached, or even a database), and worker processes pull tasks off the queue and execute them. But Celery's 16,000 lines of application code certainly provide a lot more functionality than a simple task queue. Celery exposes a number of powerful synchronization (or "workflow" in Celery parlance) primitives - ways to execute groups of tasks together, chain async task results in a synchronous manner, or execute a callback after a group of tasks have finished executing.

At ePantry, we make extensive use of this final primitive, called a "chord" in Celery. Here's a trivial chord example, from the Celery docs:

>>> from celery import chord
>>> from tasks import add, tsum

>>> chord(add.s(i, i)
...       for i in xrange(100))(tsum.s()).get()

This queues up 100 'add' tasks (blue), the results of which Celery aggregates into a list, which is passed into the callback function (red) when all of the tasks have finished executing.

We use chords for all sorts of tasks. For example, every night we charge recurring shipments, and send out an internal email with the results of how many charges succeeded, and how many failed due to things like expired credit cards. The charge tasks are all asynchronous, distributed across many workers, and the final email is the chord's callback function.

Pretty cool! But exactly does this coordination work? The Celery docs leave this somewhat cryptic comment:

"The synchronization step is costly, so you should avoid using chords as much as possible. "


Reading a bit further, we learn that:

By default the synchronization step is implemented by having a recurring task poll the completion of the group every second, calling the signature when ready.

That does sound a bit costly. But wait! There's more!

This is used by all result backends except Redis and Memcached, which increment a counter after each task in the header, then applying the callback when the counter exceeds the number of tasks in the set.
Ah-ha! The most common broker backends for Celery are Redis and RabbitMQ, which will serve as exemplars for the two types of synchronization as we dig in.

Redis is often described as a "swiss army knife" for data. It functions great as a message broker, but also as a general-purpose (non-relational) data store. This means that Celery can store a count in Redis, of the number of tasks originally in the chord, and increment that counter every time a task completes. Here's the relevant code from the Celery source:

_, readycount, totaldiff, _, _ = client.pipeline()              \
     .rpush(jkey, self.encode([1, tid, state, result]))          \
     .llen(jkey)                                                 \
     .get(tkey)                                                  \
     .expire(jkey, 86400)                                        \
     .expire(tkey, 86400)                                        \

     totaldiff = int(totaldiff or 0)

         callback = maybe_signature(request.chord, app=app)
         total = callback['chord_size'] + totaldiff
         if readycount == total:
             decode, unpack = self.decode, self._unpack_chord_result

This code (which is part of a larger function) executes every time one of the chord's subtasks completes by binding to the on_chord_part_return property. Through some clever Redis pipelining, the count gets incremented, then retrieved into 'readycount'. Then, if readycount==total, the chord callback gets executed. Cool!

Is it expensive, as the docs claim? Well...sort of. It causes a slew of Redis commands to fire after every subtasks completes. It uses connection pooling, and the pipeline redis primitive, so there is only one round-trip to the broker. It depends on your use case whether you consider this expensive or not. it seems like a small price to pay for the synchronization.

Now that we understand Redis, let's see how RabbitMQ does it.

RabbitMQ is a message queue. It is not designed to persist arbitrary data, but is purpose built as a broker. There isn't a place to store a counter, so Celery relies on a polling strategy to determine if the chord is ready to complete. The code for this is spread out across a number of source files, but it all starts with apply_chord() in the base broker class (which RabbitMQ, referred to as 'amqp' in Celery, inherits from).

When the chord starts, apply_chord calls callback_chord_unlock, which in turn queues up the builtin celery.chord_unlock task. Here's the crucial bit of code in chord_unlock that polls for the completion of the subtasks:

    ready = deps.ready()
except Exception as exc:
    raise self.retry(
               exc=exc, countdown=interval, max_retries=max_retries,

If the subtasks are ready, the callback gets queued for execution. Otherwise, chord_unlock gets queued for a retry. this expensive? Well, maybe. Certainly more so than checking a counter. Check out how ResultSet.ready() works -- it checks the .ready() property of each subtask, which ultimately results in a call to _get_task_meta for each subtask. Celery does a good job caching the task metadata, but nonetheless this means examining the metadata for each task, each time the chord_unlock polling task runs. I suspect this is what the docs refer to when they warn of the synchronization being potentially expensive.

So that's it -- an under-the-covers look at how Celery actually coordinates tasks. I was surprised to learn that the strategy varied to radically based on the broker backend, but certainly reaffirms my love for Redis and all it's flexibility.

Saturday, April 19, 2014

Refactoring for Testability: A Real World Example in Python/Django

At ePantry, we strive to have enough automated test coverage that we can deploy with confidence, without being dogmatic about our approach. We certainly aren't a TDD shop, and we don't have rules about code coverage metrics ("no commit can reduce code coverage!"), but it's very unusual that a non-cosmetic change makes it to production without some sort of test coverage.

But it wasn't a straightforward journey - our team didn't have much experience with "good" testing practices. In a few previous companies, tests if they were required at all, were viewed as a burden; "extra work" to tack on to your commit at the very end.

So building a culture around automated testing took time. To the extent we've succeeded, we did it by brute force: we just wrote the damn tests. After a couple of months of this and one by one, we built up enough actually internalize two key lessons:

  1. It's actually faster to develop if you write the tests alongside your code. Clicking around in the browser reproducing bugs is slow and unreliable and not very fun.
  2. Writing tests actually catches a lot of bugs before they make it into production. I'm now physically uncomfortable deploying untested code.

But it took a bit of wandering through the testing wilderness to get there.

The other day at ePantry we refactored a bit of code for testability that I think provides a nice example of both the mechanics and the value of automated testing - something that I think can be tricky to understand from toy examples. The refactoring was simple, but yielded code that is easier to maintain, more composable, and (most importantly) allowed us to deploy a major feature with a high degree of confidence.

First, some background:

At ePantry we generate suggested shipments of household goods based on predicted consumption habits of the household. When a customers signs up, we generate roughly a year of suggested shipments. For instance, here's what a schedule looks like to a user on the front end (you can even try a sample dashboard here):

Over time, the household "consumes" those shipments and we need to generate more to keep the calendar populated with future shipments. We wrote a function that would run as an overnight chron job to "top up" the shipments of all of our customers so each customer always has at least a year of shipments on the calendar.

Below is a code snippet as it first appeared in our codebase, sans-tests. The code itself is simple, but it was a bit scary to deploy into production because it runs asynchronously, touches a lot of customers, and would not be easy to unwind if it ran amok. I've removed some bits of logging for clarity, but this is otherwise just as it appeared in the original commit.

def create_shipments_async():
    search_date = add_months(, \
    customers = Customer.objects \
                 .annotate(last_shipment_date=Max('pantry__shipments__arrival_date')) \
                    .filter(last_shipment_date__lt=search_date) \

    for customer in customers:
        cur_date = customer.last_shipment_arrival_date()
        while cur_date < search_date:
            shipment = customer.pantry.create_next_shipment()
            cur_date = shipment.arrival_date

Here's what it does:

·       First we find the date in the future that we want all customers to have shipments until. This is set to 12 months in our settings file.
·       Next we get all of the active customers (those who have a credit card on file) whose last shipment arrives before the cutoff date. These customers need more shipments generated.
·       Finally, for each of the customers, we loop through and create shipments for each customer until the arrival date of the last shipment is past our target date.

Simple! But also untested and therefore scary. We tested it locally by cloning the production database and running the function through the Django shell. It seemed to work. But that's not good enough. This is too important and too complicated to be left to manual testing. So let's write some unit tests.

As is, this code is difficult to test. It's one big function doing a bunch of stuff and we can't test the components in isolation. If you look at the english language explanation of the code above, there are really three distinct steps, each of which can be separately verified. We can refactor to represent each of those three steps:

def _shipments_until_date():
    return add_months(, \

def _get_customers_without_enough_shipments(search_date):
    return Customer.objects \
        .annotate(last_shipment_date=Max('pantry__shipments__arrival_date')) \
        .filter(last_shipment_date__lt=search_date) \

def _create_until(customer, last_shipment_date, search_date):
    if last_shipment_date < search_date:
        _create_until(customer, \
                      customer.pantry.create_next_shipment().arrival_date, \

Now what does our top-level function look like?

def create_shipments_async():
    map(lambda customer: _create_until(customer, \
                                   customer.last_shipment_arrival_date(), \
                                   _shipments_until_date()), \
           , _get_customers_without_enough_shipments(_shipments_until_date()))

Piece of cake! Literally one line of code. We map our _create_until function onto the collection of customers without enough shipments. All the detail is in our three component functions (note that _create_until is just a recursive loop - it could certainly be implemented in a more imperative style of that's your thing).

We just need a handful of tests on these functions, and one good test on the high level function, and we gain a tremendous amount of confidence and robustness in the code. I glossed over it here, but I also discovered a few small bugs in our Django ORM query while writing the tests. Here are a few example tests, using Django's test framework and Factory Boy to create mock objects.

from django.test import TestCase
from pypantry.tests.factories import PantryFactory
from schedule.tests.factories import ShipmentFactory
from pypantry.tasks import *
from datetime import timedelta, datetime
from utils.utils import add_months

class TestShipmentGenTask(TestCase):

    def setUp(self):
        self.p = PantryFactory()
        self.c = self.p.customer
        self.s1 = ShipmentFactory(pantry=self.c.pantry)

    def test_finds_customers(self):
        self.s1.arrival_date = _shipments_until_date() - timedelta(1)
        res = _get_customers_without_enough_shipments(_shipments_until_date())
        self.assertEqual(len(res), 1)

    def test_ignores_customers_with_enough_shipments(self):
        self.s1.arrival_date = _shipments_until_date()
        res = _get_customers_without_enough_shipments(_shipments_until_date())
        self.assertEqual(len(res), 0)

    def test_shipments_get_created(self):
        self.s1.arrival_date = datetime.strptime('15042014', "%d%m%Y").date()
        _create_until(self.c, self.s1.arrival_date, add_months(self.s1.arrival_date, 12))
        # one extra because it goes one shipment "past" the target date
        self.assertEqual(self.p.shipments.count(), 13) 

    def test_task(self):
        self.s1.arrival_date = _shipments_until_date() - timedelta(1)
        self.assertEqual(self.p.shipments.count(), 2)

Ta-da! Of course there are plenty of other tests you could write, but just these four basic tests exercise the code reasonably well. And it would have been virtually impossible to get this level of testing fidelity with the original function. Plus, if one of these tests fails due to a later commit, we can immediately hone in on the cause. The refactoring and tests took less than an hour, and it will save me at least that much worry.

I hope you found this a useful, real world example of refactoring for testability. Love to hear any additional thoughts in the comments!

P.S. This code made it into production yesterday and ran successfully for the first time last night. Phew! Also -- you should sign up for ePantry and never run out of soap or toilet paper ever again.

Saturday, November 30, 2013

The No-Frills Guide to PGP on OS X

Want to get started using PGP on your Mac, but confused by the morass of professor-doctor style sites with seemingly out-of-date software and plugins? Have no fear! I'll have you up and running in minutes with this handy guide. You'll learn how to use public key servers and how to encrypt and send and receive and decrypt emails, and how to sign and verify messages using GPG. By the time you're done, you might actually find the GPG manual pages for simple tasks illuminating instead of totally incoherent.

Really explaining how public key encryption works or why you'd want to use it is not the goal of this post by a long shot - this is just a simple block-and-tackle tutorial on how to use the stuff. If you want an easy-to-read, non-technical intro to asymmetric cryptography, go read the first section of chapter 10 of Little Brother by Cory Doctorow. But with that said, I'd be remiss if I didn't at least provide...

Part 0 - A Two-Sentence Introduction to Public Key Encryption

Everyone who wants to communicate securely generates their own pair of keys, one of which they publicize, and one of which they keep private. To send someone a secure message, just encrypt your message with the recipient's public key, and that person (and only that person) will be able to decrypt it using their private key.

Got it?

Part 1 - A Quick Disambiguation of PGP, OpenPGP, and GPG

PGP stands for Pretty Good Privacy and is a piece of software that implements the OpenPGP public key (or asymmetric) encryption standard. The PGP implementation is owned by Symantec. GPG is another, free, implementation of OpenPGP that stands for Gnu Privacy Guard. GPG is very common on *nix systems and it's what we'll use here. So basically GPG and PGP are functionally equivalent.

Part 2 - Installing Stuff

We're going to use a Thunderbird (which is a great email client by Mozilla) extension called Enigmail to send and receive GPG emails. Let's get installing!

  1. Get Thunderbird and hook it up to an email account you use. Thunderbird is pretty great, and works well with Gmail (you only need to do the super short instructions in the "configuring your gmail" section - it's that easy). Ok done? Good.
  2. Now we need to install GPG. For OS X, you'll want GPG Suite. It's super easy to install, and will walk you through creating your first GPG key pair just after installation. It will default to an RSA key of 2048 bits, but I'd recommend using 4096. It's quite a bit more secure and really doesn't have any downsides - it's a bit more computationally expensive, but that doesn't matter unless you're using the key for something like SSL. We're just encrypting emails here.
  3. Finally, install Enigmail. This is a Thunderbird extension that provides simple integration with GPG so you don't have to muck about with a bunch of command line tools just to deal with sending and receiving email. There are a few configuration options - odds are, unless you are a character in Cryptonomicon, you don't want to encrypt or sign your messages by default. The official homepage of Engimail makes it look a little long in the tooth but it works great with the latest version of Thunderbird.

Part 3 - Using Key Servers

Now that you have a key pair created, you need to share it with the world - so that anyone who wants to communicate with you securely can encrypt messages to you with your public key. There are a number of organizations that maintain public keyservers to do just this. Key servers are just searchable directories of public keys. I use the MIT's because Phil Zimmerman, the inventor of PGP, is an MIT guy, and the server has been around a heck of a long time.

Go ahead and search for me. That's my key!

If you click on the key, you'll actually see the long block of gibberish that is my public RSA key. But you don't even have to interact with that - the GPG Suite makes it super easy to publish and install keys public keys. In the GPG Keychain Access tool (under Applications on your Mac), go to Apple > Preferences and point to the MIT key server:

Now you can publish your public key to the server by right-clicking your key and hitting "Send public key to Keyserver":

Now go search for your email on MIT's servers and you can find your public key! Not only that, MIT's key server will propagate your key to other keyservers all around the world.

Installing keys is just as easy. You can install mine by going to Key > Retrieve From Keyserver and putting in my key ID from MIT's server (0x6f0eff6b2e0593ad). That's me! And now the world can find your public key just as easily.

Part 4 - Sending & Receiving Encrypted Messages

With Engimail installed, this is really pretty easy. Open Thunderbird and compose a new message. You'll see the OpenPGP drop down menu at the top, and you can elect to encrypt the message.

When you go to send the message, you'll of course have to select the public key with which to encrypt the message. Enigmail will detect the key automatically if you have a key with a matching email address on file with your GPG Keychain already, but if not you'll have to select one (or, more likely, go retrieve and install the correct one from a keyserver).

So if you write me a message like this:

And choose to encrypt it with my private key, here is the email that actually gets sent (note that the subject line is NOT encrypted):

And if I open it in Thunderbird, Engimail detects the encryption, prompts for my password, decrypts it, and lets me know that it's decrypted a message:

Couldn't be easier! The harder thing is actually finding someone who also knows how to send and receive encrypted emails. Hah hah.

Part 5 - Signing & Verifying Emails

Besides keeping communication private, public key cryptography has another superpower - identify verification. The properties of the private/public key pairs means that, not only can someone else use your public key to encrypt a message that is only decryptable by you, but you can encrypt a message using your private key that can only be decrypted by your public key. Now, you may be asking yourself "why is that useful? Why bother encrypting a message that anyone can then read?". Well, if the message can be decrypted using your public key, then it must have been signed using your private key - which means you must have sent the message! Thus we can exploit public key cryptography for identity verification, as well as secret-message sending.

A signed message looks like this:

Hash: SHA512
Version: GnuPG/MacGPG2 v2.0.22 (Darwin)
Comment: GPGTools -
Comment: Using GnuPG with Thunderbird -

Under the hood, the signature was generated by hashing the message contents (in this case you can see Enigmail inserted the hashing algorithm it used - SHA512), then using the sender's private key to encrypt the hash along with a timestamp. The recipient can then verify the message by decrypting the signature blog, re-hashing the contents, and comparing. Not only is the sender's identity verified, but so are the message contents and the sending time. Cool!

Sending and receiving signed messages through Engimail is just as easy as sending and receiving encrypted messages - just select the option when sending, and Engimail handles everything for you. Signed messages are automatically verified. Here's what the above message looks like when viewed in Engimail:

Sometimes you might want to verify blocks of text (or an email you received through a client other than Thunderbird). That's really simple with GPG as well, though you'll have to go to the command line. To try it out for yourself, save the above signed message as test.txt and then run the following from a terminal window:

Ta-da! Piece of cake.

A Quick Digression About Trusted Signatures

You'll see an ominous WARNING in the gpg output above, letting you know that this is not a trusted signature. This simply means that you have not indicated that you trust the source of the public key you used to decrypt the message. For instance, what if I am not Chris Clark at all? But some nefarious impersonator who has broken into Chris' blog and provided a public key in this post that is not Chris' at all? What then?? Who can we trust?
Turns out this is actually a pretty hard problem to solve. The PGP community has a concept called the web of trust, which you can read about on your own. Another approach is key signing parties. Ultimately you will need to make the trust determination on your own. You can then set your trust level for each public key in the GPG Keychain tool. By default all keys except your own are "undefined", and this warning won't go away until you've indicated you have "ultimate" trust in the key. 
Back to verification - for kicks, try modifying the message slightly - replace my "Really!" with "Really." and re-run the verification (also note that I've now set my own key back to "Ultimate" trust so the warning is gone):

And that's it! You know how to use the GPG tool to communicate securely. There are all sorts of intricacies and details that are loads of fun to learn about, and you should now have a bit of a foundation to go exploring. Maybe those manual pages aren't so cryptic now after all. Good luck, and leave a comment if you'd like to exchange some encrypted emails with yours truly. You know where to find me.