Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dealing with race condition when de-queuing #7

Open
riyaz-ali opened this issue Dec 21, 2022 · 8 comments
Open

Dealing with race condition when de-queuing #7

riyaz-ali opened this issue Dec 21, 2022 · 8 comments
Assignees
Labels
bug Something isn't working

Comments

@riyaz-ali
Copy link
Contributor

riyaz-ali commented Dec 21, 2022

Following write-up is an attempt to document a race condition we ran into when testing out the schema, the solutions we explored and trade-offs we made.

Below is a snapshot of the schema we were dealing with:

schema

The Race Condition

As is evident from the schema above, we have a concept of "queue concurrency". A queue can be assigned a concurrency, and the engine would ensure that, at any given point in time, there are at-most queues.concurrency jobs running for the queue across all coordinating workers.

The issue arises when there are two workers that try to concurrently de-queue jobs from a given queue. Note that the problem we are dealing with is "workers exceeding a queue's concurrency limit" and not "workers pulling in same job to process" (the later can be solved using the FOR UPDATE SKIP LOCKED approach).

To de-queue a task, we use a query that resembles the following:

WITH queues AS (
    -- we start by pulling (and locking) all the queues we are interested in
    SELECT name, concurrency, priority FROM sqlq.queues WHERE name = 'a' FOR UPDATE
), running (name, count) AS (
    -- we compute the number of currently active jobs running across all the queues we are interested in
    SELECT queue, COUNT(*) FROM sqlq.jobs INNER JOIN queues queue ON job.queue = queue.name
           WHERE status = 'running' GROUP BY queue
), queue_with_capacity AS (
    -- we filter out queues that have run out of capacity (ie. running >= concurrency)
    -- concurrent IS NULL is a special case for queues that do not have any concurrency set
    SELECT queues.name, queues.priority FROM queues LEFT OUTER JOIN running USING(name)
        WHERE (concurrency IS NULL OR (concurrency - COALESCE(running.count, 0) > 0))
), dequeued(id) AS (
    -- from the filtered list of queues, we line up the jobs and pick the first one, 
    -- skipping it if it's already picked by someone else
    SELECT job.id FROM sqlq.jobs job, queue_with_capacity q
        WHERE job.status = 'pending' AND job.queue = q.name
    ORDER BY q.priority, job.priority, job.created_at
    LIMIT 1 FOR UPDATE SKIP LOCKED
)
-- and finally, we update the status of the job
UPDATE sqlq.jobs SET status = 'running'
    FROM dequeued dq
WHERE jobs.id = dq.id
RETURNING *;

The FOR UPDATE lock on sqlq.queues is to ensure that no one else de-queues from the same queue as we.

So where does it fail?

Let's assume that there are 2 workers and both of them open 1 transaction each, say, TX#1 and TX#2, respectively.

Now, in TX#1, we run the above query. It acquires a lock on sqlq.queues, check for running tasks in the queues, pick one and update its status to running. It then COMMIT the transaction and continues with processing the job.

Concurrently, in TX#2, when we try to acquire a lock on sqlq.queues, we have to wait until TX#1 releases its lock. After TX#1 commits (or rolls back for that matter), we get the lock. But when we try to calculate the number of running tasks, we DO NOT see the update made by TX#1 (where it de-queued and started execution of a task).

This becomes a problem now because if a had a concurrency value set, we won't see the newest job from a which started execution while we were blocked. And because of that we might end up de-queueing more jobs from a than allowed by its concurrency settings.

image

What can we do?

The problem here essentially boils down to dealing with a transaction's isolation / snapshot and its serialisability. If our transactions (and by extension the workers) can see changes made by other workers while they were blocked, it'd solve this problem.

The problem is captured pretty concisely by this StackOverflow post.

Let's see how the different Transaction Isolation levels would work in this scenario.

Read Committed

READ COMMITTED is the default isolation level for PostgreSQL. It'd seem like this level should be sufficient for us, as we are interested in fetching data from transaction that has recently committed. But when we run the above tests, it still fails as we are not able to compute the correct value for number of running tasks. But why?

Quoting from the documentation,

When a transaction uses this isolation level, a SELECT query (without a FOR UPDATE/SHARE clause) sees only data committed before the query began; it never sees either uncommitted data or changes committed during query execution by concurrent transactions. In effect, a SELECT query sees a snapshot of the database as of the instant the query begins to run.

In our case, the query execution starts with the CTE and then blocks for the lock. When it acquires the lock, it resumes execution (with the SELECT COUNT(*) ... GROUP BY query that doesn't have a FOR UPDATE) but doesn't see the new changes that were applied. This is inline with what the documentation says.

One way to workaround this is to split the query and run the SELECT ... FOR UPDATE on sqlq.queues as a separate query within the same transaction. This way after we've acquired the lock, when we execute the main query it'd see committed data from any other transaction as well.

Repeatable Read

Quoting from the documentation,

The Repeatable Read isolation level only sees data committed before the transaction began; it never sees either uncommitted data or changes committed during transaction execution by concurrent transactions.

At first, this doesn't seem particularly helpful in our case as we'd actually want to see changes made by any concurrent transaction, to make our dequeue logic work.

But if we drop any custom, explicit locks (FOR UPDATE on sqlq.queues and FOR UPDATE SKIP LOCKED on sqlq.jobs) a concurrent REPEATABLE READ transaction would block on the UPDATE sqlq.jobs SET status = 'running' stage, and the operation would fail with a serialisation anomaly. This is good as now our application can now retry dequeuing!

This approach is more seamless than having to run multiple queries and managing locks ourselves.

Serializable

SERIALIZABLE is the strictest of all. The actual behaviour is very well covered in the official documentation.

In our case, what SERIALIZABLE would allow us to do is emulate a serial execution of the dequeue logic, even in the presence of multiple concurrent workers. If two workers try to dequeue from same queue or try to dequeue same job, the transaction will fail with a serialisation anomaly error and will be retried.

It's not bad as it sounds 🙃 If we think about it, we are doing serial dequeue by acquiring a lock on the queue (and blocking during that operation). With SERIALIZABLE we needn't worry about that anymore as the database would provide the consistency guarantees that we are trying to get with the explicit locks.

SERIALIZABLE in postgres is implemented using predicate locking and non-blocking SI locks, which the official documentation claims to be more performant than the equivalent FOR UPDATE strategy.


In our case, both REPEATABLE READ and SERIALIZABLE provide the same level of guarantees we require, and ideally we should benchmark both solutions. Switching between isolation level won't functionally change anything in the implementation either, as in both the cases you run the same query for deqeueing. For SERIALIZABLE, we can maybe further optimise the query by having better, targeted predicates.

@riyaz-ali riyaz-ali self-assigned this Dec 21, 2022
@riyaz-ali
Copy link
Contributor Author

This comment is a summarisation of the discussion between me & @amenowanna.


JLo (@amenowanna) suggested going with an approach based on PostgreSQL's Advisory Locks.

Within this approach, we'll start of a new transaction with the default (READ_COMMITTED) isolation mode. Next, we'll acquire an exclusive, transaction-level advisory lock using the pg_advisory_xact_lock() routine. The lock acquired will be a global lock and would ensure that only one worker can run the dequeue function at any given time.

This ensures that all the calculations and update we do stay consistent throughout the dequeuing operation, allowing us to maintain the concurrency guarantees. It all requires fewer locks, compared to both SERIALIZABLE and REPEATABLE_READ.

The main downside though is that it'd block all dequeue operations, even if the dequeues operate on different queues, in which case they (ideally) shouldn't block each other.

One way we found around this was by giving each sqlq.queue its unique lock id, and locking just the queues we are interest in. In the experimentation though, we found that it is possible to run into a deadlock scenario with this.

Because of the above mentioned limitations (global locking & potential for a deadlock), we started looking into other modes. We determined SERIALIZABLE to be too aggressive, it acquired relatively more locks some of which were more coarse than other. Although, most of the locks it acquired were of type SIReadLock (and there's more detail in the documentation around the overall performance of SERIALIZABLE), it still had considerable overhead.

On the other hand, REPEATABLE_READ feels like a good, medium ground. It has reasonably low overhead (compared to SERIALIZABLE) while still providing all the consistency guarantees we need.


So, for now, we'll go with REPEATABLE_READ isolation level 🙌

@riyaz-ali riyaz-ali added the bug Something isn't working label Dec 26, 2022
@riyaz-ali
Copy link
Contributor Author

@amenowanna I tried a use-case with additional filters applied when picking jobs, like, a filter on jobs.typename to filter jobs of specific type, etc. And REPEATABLE_READ fails in those scenarios as different workers might end up picking (and updating) different jobs which would then not set off the serialization anomaly we've been relying on.

This, however, does not happen with SERIALIZABLE as it also tracks read/write dependencies between transactions.

Any thoughts?

@amenowanna
Copy link
Contributor

My gut says a global mutex with the advisory lock will perform better than serializable. Can we run some load testing to find out?

@riyaz-ali
Copy link
Contributor Author

Moving this to lower priority ready item as we have an interim fix (using SERIALIZABLE). But @amenowanna we should definitely try out and benchmark the advisory lock approach.

@amenowanna
Copy link
Contributor

Spending the time to research the performance difference is not as important right now to have a solution. We know both solutions work. Let's move forward with Serializabe as the current solution and hold on to this ticket to test the performance of the the two different options.

@amenowanna
Copy link
Contributor

@riyaz-ali we seem to also have a race condition when enqueueing. We are seeing this now that we are enabling multiple workers to run at the same time. Thoughts on how we could address this?

@riyaz-ali
Copy link
Contributor Author

There's not a defined way to handle task de-duplication when calling Enqueue() because sqlq cannot establish equivalence between tasks (is A same as B?)

Ideally, this would be something the caller would need to ensure on their end somehow. Other platforms (Celery etc.) also behave similarly, deferring this to the caller.

@amenowanna
Copy link
Contributor

Totally agree with this. Enqueue is a consumer problem. I will open an issue in the mergestat repo and we can address it there. But want to have this conversation here for the community to see the discussion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants