Kafka Vs. PostgreSQL: How We Implemented Our Queuing System Using PostgreSQL

Overview

Introduction to Queuing Systems

Queuing System — CS101 Implementation

For using a queuing system in practice, though, especially in a complex system like RudderStack, it must address several challenges.

Challenge 1: Persistence in Queuing System

Furthermore, it is often desirable for performance reasons to read & write in units of multiple blocks, often as large as a few MBs. It is not practical to store individual events in the disk as shown in basic CS101 implementation in the previous section.

You can address this problem by batching events in the memory and flushing the whole batch to the disk, as shown below. In our case, we batch in units of ~128 events. We found the ideal batch size to provide a good balance between I/O time and the client acknowledgment (ACK) latency through testing. We don’t send an ACK to the client unless the event persists to disk. Furthermore, we can’t have the clients hanging too long for an ACK.

On the consumer side, we can process much bigger batches to avoid the overhead of doing small I/O. In this case, we create on-disk segments of 100K events, which the system processes together. Once the system processes a segment of batches, one can delete it (see the diagram below), in most cases:

Persistence

Challenge 2: Processing Order of Queues

For example, if we fail to deliver an event, we mark it as failed and block further events from the user. In the meantime, the other unrelated events present in the queue after this failed event can be processed. This process could lead to a scenario (shown in the following diagram) where some events have been processed (marked with a green tick) while others (marked with a red cross) are waiting to be processed:

Processing Order — Partial Processing

This dynamic of inconsistent event delivery makes reclaiming space complicated. While we could wait for all the events in a block to be processed before deleting it, this is not ideal. A single unprocessed event can prevent the entire block from being reclaimed, leading to space overage.

Processing Order — Compaction

During the compaction process, the system removes already-processed events, while it retains the unprocessed events. As you can see in the diagram below, the process has compacted the two leading processed blocks into a single block:

Processing Order — Compaction

You can run the compaction algorithm periodically or by triggering. For example, once there are enough uncompacted blocks or both, as in our case — we run compaction every 30 minutes. However, we do this only if there are at least two blocks with at least 80 percent processed jobs. This logic ensures that we can reclaim enough space during compaction. At the same time, it also ensures we don’t run it too often (as compaction slows down the system).

In a normal steady-state, all jobs succeed, and the compaction job handles blocks that don’t have any pending jobs, while the main processing jobs handle unprocessed blocks. As a result, there is little to no contention between the compaction job and the main processing job.

Challenge 3: In-place Status Update with Status Queue

It would require us to read the block from the disk (incurring a random I/O), update in memory, and sync it back (incurring another random I/O). The two random I/Os incurred in this process can take anywhere from a few milliseconds to hundreds of milliseconds. These I/Os would be a bottleneck in our high throughput event processing pipeline.

We address this problem by not updating in-place and keeping the event status in a separate queue instead. As the system processes events, it appends the success/failure status and other event metadata (like event ID, failure reason, etc.) to a separate queue.

It’s important to note that the system may process an event multiple times. For example, if the event fails the first time, the system will retry it, resulting in two separate entries in the status queue.

Status Queue

Marking a status incurs sequential I/O into the status queue, which can be orders of magnitude faster than a random I/O for an in-place update. However, this approach’s downside is that we need to access two separate blocks to get the job status and identify the next job that the system needs to process.

In practice, though, the system would process current blocks (the status block and the event block) as well as cache them in memory, so these operations don’t require disk I/O.

Conclusion

The statuses are stored in separate tables <queue_name>_job_status_1, <queue_name>_job_status_2, etc. Once a particular table or a sequence of tables has been processed, they are compacted into a single block (or no block if all jobs are successful). This compaction runs in a separate thread.

The queries to find unprocessed jobs in the main thread also find processed jobs during compaction, and they are all implemented in SQL. The SQL interface has also been a great debugging tool for inspecting and updating the jobs. For example, we sometimes have to force retry jobs which have aborted, or vice-versa, which can be easily done in SQL.

Implementing these concepts is certainly interesting, but we will let you dig into the actual code yourself in our GitHub repository.

The Customer Data Platform for Developers, Written in Go and React- https://rudderstack.com

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store