Testing Postgres advisory locks with asyncio and asyncpg.
Recently, here on the Cloud team at Elastic we started working on building a new service in Python 3.7. This service
fetches data from a Postgres database, transforms it, and then submits that data to another service. Like many
cloud-based services, ours runs in an orchestrated container environment where N instances can be running at any time.
Often that's a good thing, but our service has a few critical sections where only one instance should be able to process
data. Since we are retrieving data from Postgres, we decided to go ahead and make use of
advisory locks to control
these critical sections. In this article I want to explain what advisory locks are, provide an implementation, and test
to verify functionality.
Postgres provides the ability to create locks that only have meaning within the context of your application. These are advisory locks. You use advisory locks to control an application’s ability to process data. Anytime your application is about to enter a critical path, you attempt to acquire the lock. When you acquire the lock, you can safely continue processing.
async with AdvisoryLock("gold_leader", dbconfig) as connection:
If it fails, then your application may retry, wait, or exit. Since this lock is external to the application, this allows for multiple instances of the application to run while providing safe critical path concurrency.
Building the lock
As part of our work, we wanted to make using advisory locks easy. To do this, we created the
context manager. Since this is meant to be used with
asyncpg, we control the acquisition and release of
the lock via
class AdvisoryLock: async def __aenter__(self) -> asyncpg.connection.Connection: self.locked_connection = await asyncpg.connect(...) await self._set_lock() if self.got_lock: return self.locked_connection else: if self.locked_connection: await self.locked_connection.close() raise AdvisoryLockException async def __aexit__(self, exc_type, exc_val, exc_tb): await self._release()
Now this can be called like any other async context manager.
async with AdvisoryLock(config, "appname") as connection: val = await connection.fetchrow("SELECT 1")
Testing the lock
Now that the
PostgresAdvisoryLock class is implemented, we need to test it. To start we verify the base functionality
by acquiring the lock, running a query, and validating we can't get the lock inside the same scope. I recommend using
asynctest library to help work with
async def test_get_results_with_lock(self): async with AdvisoryLock("gold_leader", dbconfig) as connection: val = await connection.fetchrow("SELECT 1;") self.assertEqual(val, 1) async def test_lock_prevents_second_lock(self): with self.assertRaises(AdvisoryLockException): async with AdvisoryLock("gold_leader", dbconfig) as connection: await connection.fetchrow("SELECT 1;") async with AdvisoryLock("gold_leader", dbconfig) as second_connection: await second_connection.fetchrow("SELECT 1;")
Since we are going to use this to control the execution of code across many processes, we also need to verify external
process behavior. To do this we use the
subprocess.create_subprocess_exec function to create a new process.
This process attempts to get the lock our main process already has, and it should fail.
async def test_advisory_lock_prevents_access_from_separate_process(self): with self.assertRaises(AdvisoryLockException): async with AdvisoryLock("gold_leader", dbconfig) as connection: proc = await asyncio.subprocess.create_subprocess_exec( sys.executable, "-c", executable, stderr=asyncio.subprocess.PIPE, )
When we started to build our new application, we knew we would be waiting on the network and database. Since we also had
work that could happen during the wait, we decided to use
asyncio. Additionally we identified a critical path where we
used Postgres to achieve concurrency control. To make critical path control easier we created a module and a series of tests.
Once finished we realized this might be helpful to others looking for the same control, or as a reference for those
learning to test with asyncio.
You can find the full implementation and Docker setup on Sourcehut.