By default, operators built using Kubebuilder and controller-runtime process a single reconcile request at a time. This is a sensible setting, since it's easier for operator developers to reason about and debug the logic in their applications. It also constrains throughput from the controller to core Kubernetes resources like ectd and the API server.

But what if your work queue starts backing up and average reconciliation times increase due to requests that are left sitting in the queue, waiting to be processed? Luckily for us, a controller-runtime Controller struct includes a MaxConcurrentReconciles field (as I previously mentioned in my Kubebuilder Tips article). This option allows you to set the number of concurrent reconcile loops that are running in a single controller. So with a value above 1, you can reconcile multiple Kubernetes resources simultaneously.

Early in my operator journey, one question that I had was how could we guarantee that the same resource isn't being reconciled at the same time by 2 or more goroutines? With MaxConcurrentReconciles set above 1, this could lead to all sorts of race conditions and undesireable behavior, as the state of an object inside a reconciliation loop could change via a side-effect from an external source (a reconciliation loop running in a different thread).

I thought about this for a while, and even implemented a sync.Map-based approach that would allow a goroutine to acquire a lock for a given resource (based on its namespace/name).

It turns out that all of this effort was for naught, since I recently learned (in a k8s slack channel) that the controller workqueue already includes this feature! Albeit with a simpler implementation.

This is a quick story about how a k8s controller's workqueue guarantees that unique resources are reconciled sequentially. So even if MaxConcurrentReconciles is set above 1, you can be confident that only a single reconciliation function is acting on any given resource at a time.

client-go/util

Controller-runtime uses the client-go/util/workqueue library to implement its underlying reconciliation queue. In the package's doc.go file, a comment states that the workqueue supports these properties:

  • Fair: items processed in the order in which they are added.
  • Stingy: a single item will not be processed multiple times concurrently, and if an item is added multiple times before it can be processed, it will only be processed once.
  • Multiple consumers and producers. In particular, it is allowed for an item to be reenqueued while it is being processed.
  • Shutdown notifications.

Wait a second... My answer is right here in the second bullet, the "Stingy" property! According to these docs, the queue will automatically handle this concurrency issue for me, without having to write a single line of code. Let's run through the implementation.

How does the workqueue work?

The workqueue struct has 3 main methods, Add, Get, and Done. Inside a controller, an informer would Add reconcile requests (namespaced-names of generic k8s resources) to the workqueue. A reconcile loop running in a separate goroutine would then Get the next request from the queue (blocking if it is empty). The loop would perform whatever custom logic is written in the controller, and then the controller would call Done on the queue, passing in the reconcile request as an argument. This would start the process over again, and the reconcile loop would call Get to retrieve the next work item.

This is similar to processing messages in RabbitMQ, where a worker pops an item off the queue, processes it, and then sends an "Ack" back to the message broker indicating that processing has completed and it's safe to remove the item from the queue.

Still, I have an operator running in production that powers QuestDB Cloud's infrastructure, and wanted to be sure that the workqueue works as advertised. So a wrote a quick test to validate its behavior.

A little test

Here is a simple test that validates the "Stingy" property:

package main_test

import (
    "testing"

    "github.com/stretchr/testify/assert"

    "k8s.io/client-go/util/workqueue"
)

func TestWorkqueueStingyProperty(t *testing.T) {

    type Request int

    // Create a new workqueue and add a request
    wq := workqueue.New()
    wq.Add(Request(1))
    assert.Equal(t, wq.Len(), 1)

    // Subsequent adds of an identical object
    // should still result in a single queued one
    wq.Add(Request(1))
    wq.Add(Request(1))
    assert.Equal(t, wq.Len(), 1)

    // Getting the object should remove it from the queue
    // At this point, the controller is processing the request
    obj, _ := wq.Get()
    req := obj.(Request)
    assert.Equal(t, wq.Len(), 0)

    // But re-adding an identical request before it is marked as "Done"
    // should be a no-op, since we don't want to process it simultaneously
    // with the first one
    wq.Add(Request(1))
    assert.Equal(t, wq.Len(), 0)

    // Once the original request is marked as Done, the second
    // instance of the object will be now available for processing
    wq.Done(req)
    assert.Equal(t, wq.Len(), 1)

    // And since it is available for processing, it will be
    // returned by a Get call
    wq.Get()
    assert.Equal(t, wq.Len(), 0)
}

Since the workqueue uses a mutex under the hood, this behavior is threadsafe. So even if I wrote more tests that used multiple goroutines simultaneously reading and writing from the queue at high speeds in an attempt to break it, the workqueue's actual behavior would be the same as that of our single-threaded test.

All is not lost

Kubernetes did it

There are a lot of little gems like this hiding in the Kubernetes standard libraries, some of which are in not-so-obvious places (like a controller-runtime workqueue found in the go client package). Despite this discovery, and others like it that I've made in the past, I still feel that my previous attempts at solving these issues are not complete time-wasters. They force you to think critically about fundamental problems in distributed systems computing, and help you to understand more of what is going on under the hood. So that by the time I've discovered that "Kubernetes did it", I'm relieved that I can simplify my codebase and perhaps remove some unnecessary unit tests.