K8s Operator Prometheus Metrics Viewer

As you hopefully know from reading some of my recent articles, I really enjoy writing Kubernetes operators! I've written a bit about them on this blog, and also just recently presented a talk about how to build them at the inaugural FOSSY conference in Portland, Oregon. So here goes another operator-related article, although this one is about one of the later stages of operator development: performance tuning.

Since I've been working on an operator that includes multiple controllers, each with 4 reconcilers running concurrently that make API calls to resources outside of the cluster, I wanted to start profiling my reconciliation loop performance to ensure that I could handle real-world throughput requirements.

Prometheus Metrics

Kubebuilder offers the ability to export operator-specific Prometheus metrics as part of its default scaffolding. This provides a great way to profile and monitor your operator performance under both testing and live conditions.

Some of these metrics (many on a per-controller basis) include:

  • A histogram of reconcile times
  • Number of active workers
  • Workqueue depth
  • Total reconcile time
  • Total reconcile errors
  • API server request method & response counts
  • Process memory usage & GC performance

I wanted a way to monitor these metrics for controllers that are under development on my local machine. This means that they are running as processes outside of any cluster context, which makes it difficult to scrape their exposed metrics using a standard prometheus/grafana setup.

operator-prom-metrics-viewer

So I thought, why not craft a little GUI to display this information? Inspired by the absolutely incredible k9s project, I decided to make a terminal-driven GUI using the tview library.

Since I'm not (yet) storing this information for further analysis, I decided to only display the latest scraped data point for each metric. But I also didn't want to implement my own prometheus metric parser to do this, so I imported pieces of the prometheus scrape library itself to handle the metric parsing. All I had to do was implement a few interfaces for storing and querying my metrics to interop with the scrape library.

Prometheus in-memory, transient storage

The prometheus storage and query interfaces are relatively simple, and it helps that we actually don't need to implement all of their methods!

Storage

The underlying storage mechanism of my InMemoryMetricStorage class is a simple map along with a mutex for locking it during multithreaded IO. In this map, we are only storing the latest value for each metric, so there's no problem overwriting a key's value if it already exists. And we also don't need to worry about out-of-order writes or any other time-series database problems.

type InMemoryAppender struct {
	data map[uint64]DataPoint
	mu   *sync.Mutex
}

The DataPoint struct maps directly to a prometheus metric's structure. If you've worked with prometheus metrics before, this should look pretty familiar to you.

type DataPoint struct {
	Labels    labels.Labels
	Timestamp int64
	Value     float64
}

To populate each DataPoint's uint64 key in the map, we can simply use the prometheus builtin labels.Hash() function to generate a unique key for each DataPoint, without having to do any extra work on our end.

To demonstrate, here's an example of a prometheus metric:

# HELP http_requests_total Total number of http api requests
# TYPE http_requests_total counter
http_requests_total{api="add_product"} 4633433

And how it would be represented by the DataPoint struct:

d := DataPoint{
    Labels: []Label{
        {Name: "api", Value: "add_product"},
    },
    Value: 4633433,
    Timestamp: time.Now(),
}

Now that we have our data model, we need a way to store DataPoints for further retrieval.

Appender

Here's the interface of the storage backend appender:

type Appender interface {
	Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error)
	Commit() error
	Rollback() error

	ExemplarAppender
	HistogramAppender
	MetadataUpdater
}

Because we're building a simple in-memory tool (with no persistence or availability guarantees), we can ignore the Commit() and Rollback() methods (by turning them into no-ops). Furthermore, we can avoid implementing the bottom three interfaces, since the metrics that are exposed by kubebuilder are not exemplars, metadata, or histograms (the framework actually uses gauges to represent the histogram values, more on this below!). So this only leaves the Append() function to implement, which just writes to the InMemoryAppender's underlying map storage in a threadsafe manner.

func (a *InMemoryAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
	a.mu.Lock()
	defer a.mu.Unlock()
	a.data[l.Hash()] = DataPoint{Labels: l, Timestamp: t, Value: v}
	return ref, nil
}

Querier

The querier implementation is also fairly straightforward. We just need to implement a single Query method against our datastore to extract metrics from our store.

My solution is far from an optimized inner-loop; it has awful exponential performance in the worst case. But for looping through a few metrics at a rate of at most once-per-second, I'm not overly concerned with the time-complexity of this function.

func (a *InMemoryAppender) Query(metric string, l labels.Labels) []DataPoint {
	a.mu.Lock()
	defer a.mu.Unlock()

	dataToReturn := []DataPoint{}

	for _, d := range a.data {
		if d.Labels.Get("__name__") != metric {
			continue
		}

		var isMatch bool = true
		for _, label := range l {
			if d.Labels.Get(label.Name) != label.Value {
				isMatch = false
			}
		}

		if isMatch {
			dataToReturn = append(dataToReturn, d)
		}

	}

	return dataToReturn
}

Histogram implementation

Since the controller reconcile time histogram data is stored in prometheus gauges, I did need to write some additional code to transform these into a workable schema to actually generate a histogram. To aid in the construction of the histogram buckets, I created 2 structs, one to represent the histogram itself, and the other to represent each of its buckets.

type HistogramData struct {
	buckets []Bucket
	curIdx  int
}

type Bucket struct {
	Label string
	Value int
}

For each controller, we can run a query against our storage backend for the workqueue_queue_duration_seconds_bucket metric with the name=<controller-name> label. This will return all histogram buckets for a specific controller, each with a separate label (le) that denotes the bucket's limit. We can iterate through these metrics to create Bucket objects, append them to the HistogramData.buckets slice, and eventually sort them before rendering the histogram to keep the display consistent and in the correct order.

The curIdx field on the HistogramData struct is only used internally when rendering the histogram, to keep track of the current index of the buckets slice that we're on. This way, we can consume buckets using a for loop and exit when we return the final bucket. It's a bit clunky, but it works!

v1.0

After all of this hacking, I ended up with something that looks like this:

Screenshot

It's far from perfect; since I don't manipulate the histogram values (yet) or have any display scrolling, it's very likely that the histogram will spill off the right of the screen. The UX can also be improved, since you can only switch the controller that is being displayed by pressing the Up and Down buttons.

But it's been a fantastic tool for me when debugging slow reconciles due to API rate limiting, and has also highlighted errors for me when I miss them in the logs. Give it a whirl and let me know what you think!

Link to the Github: https://github.com/sklarsa/operator-prom-metrics-viewer

How are Kubernetes VolumeAttachments Named?

This is a short article with a little k8s persistent storage nugget that I recently uncovered. FWIW, I've only checked this while using the AWS EBS Container Storage Interface (CSI) driver, but the article should apply to any cluster using CSI drivers to mount volumes.

Background

After a PersistentVolume (PV) is created and bound to a Persistent Volume Claim (PVC), a Pod can mount the claim as a volume to access its data. While the Pod is being provisioned, the k8s storage controller creates a VolumeAttachment, which declares its intent to physically attach the volume to the Pod's Node. The storage driver running on the Node will be notified when this VolumeAttachment is created, recognize that it needs to mount a volume to its host, and perform the volume mount, thus making the volume available for the Pod to use.

Lets say that you want to check if a particular volume is actually attached to a Node. For example, maybe you want to ensure that a volume is unmounted before destroying a Node to ensure data integrity of a high-throughput database? So how can you find a PV's corresponding VolumeAttachment to determine if its backing volume is unmounted?

Querying VolumeAttachments

The VolumeAttachment Spec contains nodeName and source fields, which as expected, correspond to the Node's name and PV's name respectively. But these are not valid fieldSelector fields, so you cannot query the API server for a VolumeAttachment with a matching Node or PV name. Instead, you need to LIST all VolumeAttachments in the cluster and iterate over them to find the one that you want. In a large cluster, there could be hundreds (or even thousands) of VolumeAttachments, so this operation could take quite a while to run inside of a controller reconcile loop.

Maybe we could somehow craft a GET request to find a PV's matching VolumeAttachment? Unfortunately, the "name" field, at least when using a supported k8s CSI driver, is a vague string that looks something like csi-e52f481e06b9cf4dc9d95a56d1788026b4707cdce2e105d4444f0be9d6206a09. Where does this name come from? Can we use this to check if a PV is mounted to a particular Node?

VolumeAttachment Naming

After a bit of digging, I found the answer in the k8s csi source code

// getAttachmentName returns csi-<sha256(volName,csiDriverName,NodeName)>
func getAttachmentName(volName, csiDriverName, nodeName string) string {
	result := sha256.Sum256([]byte(fmt.Sprintf("%s%s%s", volName, csiDriverName, nodeName)))
	return fmt.Sprintf("csi-%x", result)
}

The attachment suffix is a sha256 sum of the names of the following components:

  1. The volume handle (part of the CSI Spec)
  2. The driver doing the mounting, and
  3. The target Node

With this discovery, we can now determine if a specific PV is mounted to a Node by using a GET request based on the imputed VolumeAttachment name.

Here's a small function that I wrote to generate a VolumeAttachment name based on a PV and Node:

package csi

import (
	"crypto/sha256"
	"fmt"

	v1core "k8s.io/api/core/v1"
)

func GetVolumeAttachmentName(pv *v1core.PersistentVolume, n *v1core.Node) string {
	if pv.Spec.CSI == nil {
		return ""
	}

	var (
		volName       = pv.Spec.CSI.VolumeHandle
		csiDriverName = pv.Spec.CSI.Driver
		nodeName      = n.Name
	)

	result := sha256.Sum256([]byte(fmt.Sprintf("%s%s%s", volName, csiDriverName, nodeName)))
	return fmt.Sprintf("csi-%x", result)
}

This function accepts a PV and a Node, and sha256sums pieces of their metadata to construct a valid VolumeAttachment name. Now, you can now use this function to generate an input to check if a PV is mounted to a Node, all in a single API request!


func IsPvAttached(pv *v1core.PersistentVolume, n *v1core.Node) bool {
	va := &v1storage.VolumeAttachment{}
	return !apierrors.IsNotFound(
		client.Get(
			ctx,
			types.NamespacedName{
				Name: GetVolumeAttachmentName(pv, n),
			},
			va,
		),
	)
}

The storage controller will delete a VolumeAttachment when a PV is unmounted from a Node, so we can simply check for the existence of a VolumeAttachment with the PV's name & driver and the Node's name by using our GetVolumeAttachmentName function from above. If the VolumeAttachment does not exist, we can definitively say that the volume is unmounted from the Node, and safely delete the node without any data corruption on the volume.

Hopefully this demystifies those strange VolumeAttachment names, and unlocks some new ideas when writing your next k8s controller.

Annotations in Kubernetes Operator Design

It seems that annotations are everywhere in the Kubernetes (k8s) ecosystem. Ingress controllers, cloud providers, and operators of all kinds use the metadata stored in annotations to perform targeted actions inside of a cluster. So how can we leverage these when developing a new k8s operator?

To the Docs

Despite their widespread use, the official documentation of annotations is actually quite brief. In fact, it only takes two short sentences at the top of the page to define an annotation:

You can use Kubernetes annotations to attach arbitrary non-identifying metadata to objects. Clients such as tools and libraries can retrieve this metadata.

While technically accurate, this definition is still pretty vague and not entirely helpful.

The docs expand on this by providing a few examples of the types of metadata that can be stored in an annotation. But these samples range from build information all the way to individuals' "phone or pager numbers" (who still carries a pager these days anyway?).

Somewhere within their ambiguity lies the true power of k8s annotations; they grant the ability to tag any cluster resource with structured data in almost any format. It's like having a dedicated key-value store attached to every resource in your cluster! So how can we harness this power in an operator?

In this post, I will detail a way in which I recently used annotations while writing an operator for my company's product, QuestDB. Hopefully this will give you an idea of how you can incorporate annotations into your own operators to harness their full potential.

Background

The operator that I've been working on is designed to manage the full lifecycle of a QuestDB database instance, including version and hardware upgrades, config changes, backups, and (eventually) recovery from node failure. I used the Operator SDK and kubebuilder frameworks to provide scaffolding and API support.

It always comes back to a JWK

In order to take advantage of the database's many performance optimizations (such as importing over 300k rows/sec with io_uring), we recommend that users ingest data over InfluxDB Line Protocol. One of the features that we offer, which is not part of the original protocol, is authentication over TCP using a JSON Web Key (JWK).

This feature can be configured in a file that is referenced by the main server config on launch. You just need to add your JWK's key id and public data to the file in this format:

testUser1 ec-p-256-sha256 fLKYEaoEb9lrn3nkwLDA-M_xnuFOdSt9y0Z7_vWSHLU Dt5tbS1dEDMSYfym3fgMv0B99szno-dFc1rYF9t0aac
# [key/user id] [key type] {keyX keyY}

Let's say that you have your private key stored elsewhere in a k8s cluster as a Secret, so your client application can securely push data to your QuestDB instance. The JWK secret data would look something like this:

{
  "kty": "EC",
  "d": "5UjEMuA0Pj5pjK8a-fa24dyIf-Es5mYny3oE_Wmus48",
  "crv": "P-256",
  "kid": "testUser1",
  "x": "fLKYEaoEb9lrn3nkwLDA-M_xnuFOdSt9y0Z7_vWSHLU",
  "y": "Dt5tbS1dEDMSYfym3fgMv0B99szno-dFc1rYF9t0aac"
}

When a user creates a QuestDB Custom Resource (CR) in the cluster, we want to be able to point our operator to this private key and reformat the public values ("kid", "x", and "y") so that it can create a valid auth.conf ConfigMap value to mount to the Pod running our QuestDB instance. The operator can then add line.tcp.auth.db.path=auth.conf to the main server config to make it aware of the new authentication file, and the client application can communicate to QuestDB securely over ILP using the private key.

How can we let the operator know which Secret to use?

Using the Spec

One approach is to simply create a field on the QuestDB Custom Resource:

type QuestDBSpec struct {
    ...
    IlpSecretName      string `json:"ilpSecretName,omitempty"`
    IlpSecretNamespace string `json:"ilpSecretNamespace,omitempty"`
    ...
}

With these fields, a user can now set their values to the name and namespace of the secret that contains the JWK's private key, like so:

apiVersion: crd.questdb/v1
kind: QuestDB
...
spec:
  ilpSecretName: my-private-key
  ilpSecretNamespace: default

After applying the above yaml to the cluster, the operator will kick off a reconciliation loop of the newly created (or updated) QuestDB CR. Inside this loop, the operator will query the k8s API for the Secret default/my-private-key, obtain the "kid", "x", and "y" values from the Secret's data, modify the ConfigMap that is holding the QuestDB configuration, and continue the process as described above.

Even though this technically works, the approach is fairly naive and can lead to some issues down the line. For example, if you want to rotate your JWK, how will the operator know to update the public key in the QuestDB auth ConfigMap? Or, what will happen if the secret does not even exist? Let's use some kubebuilder primitives to help answer these questions and improve the solution.

Kubebuilder Watches

Kubebuilder has built-in support for watching resources that are managed both by the operator and also externally by another component. A watch is a function that registers the controller with the k8s API server, so that the controller is notified when a "watched" resource has changed. This allows the operator to kick off a reconciliation loop against the changed object, to ensure that the actual resource spec matches the desired spec (through operator's custom logic).

Using kubebuilder, resource watches can be configured in a function:

func (r *QuestDBReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&questdbv1.QuestDB{}).
        Owns(&corev1.ConfigMap{}).
        Watches(
            &source.Kind{Type: &corev1.Secret{}},
            handler.EnqueueRequestsFromMapFunc(r.secretToQuestDB),
            builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}),
        ).
        Complete(r)
}

In this function, we register our reconciler with a controller manager and set up 3 different types of watches:

  • For(&questdbv1.QuestDB{}) instructs the manager that the controller's primary managed resource is a questdbv1.QuestDB. This watch function registers the manager with the k8s API so it will be notified about any changes that happen to a QuestDB CR. When a change has been identified, the manager will kick off a reconcile of that object, calling the QuestDBReconciler.Reconcile() function to migrate the resource status to its desired state. Only one For clause can be used when registering a new controller, which goes along hand-in-hand with the recommendation that a controller should be responsible for a single CR.

  • Owns(&corev1.ConfigMap{}) will kick off a reconcile of any QuestDB CR when a ConfigMap that is owned by a QuestDB changes. To own an object, you can use the controllerutil.SetControllerReference function to create a parent-child relationship between the QuestDB parent and ConfigMap child. So then, changes to that ConfigMap will trigger a reconcile of the parent QuestDB in the controller.

  • Based on the function signature alone, the Watches block is clearly very different than the previous types. In this case, we are listening for changes to any corev1.Secret inside the entire cluster, regardless of ownership constraints. The watch is also set up with a specific predicate to filter out some events (predicate.ResourceVersionChangedPredicate). This predicate will match cluster events when a Secret's version is incremented (as the result of a Spec or Status change). So when a corev1.Secret change is found anywhere in the cluster, the manager will run the secretToQuestDB function to map that Secret to zero-or-more QuestDB NamespacedName references, based on its characteristics.

Below, we will use this function to update a QuestDB's config if a JWK value has changed. To do this, we need to map from a Secret to any QuestDBs that are using that Secret's value for ILP authentication.

Let's take a deeper look at this mapper function to see how to accomplish this.

EnqueueRequestsFromMapFunc

The sigs.k8s.io/controller-runtime package defines a MapFunc that is an input to the Watches function:

type MapFunc func(client.Object) []reconcile.Request

This function accepts a generic API object and returns a list of reconcile requests, which are simple wrappers on top of namespaced names (usually seen in the form "namespace/name"):

type Request struct {
  // NamespacedName is the name and namespace of the object to reconcile.
  types.NamespacedName
}

So how can we turn a generic client.Object (that is a generic abstraction on top of a Secret) into the name and namespace of a QuestDB object that we want to reconcile?

There are many possible answers to this question!

One idea is to create a naming convention that somehow encodes the name and namespace of the target QuestDB into the Secret's name, so we could use the client.Object.GetName() and client.Object.GetNamespace() to build a NamespacedName to reconcile. Perhaps something like questdb-${DB_NAME}-ilp. But this would limit what we could name Secrets, which might not interop well if something like external secrets controller is syncing the secret from an external source like Vault. Or if a developer simply forgets the naming convention, and needs to debug why their QuestDB's ILP auth isn't working.

Maybe we could reuse the IlpSecretName and IlpSecretNamespace spec fields from the previous section? We could query for a QuestDB that has a Spec.IlpSecretName == client.Object.GetName() (and likewise for namespace) inside our mapper function. But this doesn't work for a few reasons.

The first is that you are unable to use field selectors with CRDs, so this query is literally impossible in the current version of k8s!

Secondly, lets say you try to bypass this restriction by storing the secret name on the QuestDB object in something that could queried against, like resource labels. Since the function only accepts a client.Object and does not return an error along with its []reconcile.Request, there's no clean place to instantiate a new client inside a MapFunc. To do that, you would need a cancelable context and a standardized way to handle API errors. You can create all of this inside a MapFunc, but you wouldn't be able to use the rest of kubebuilder's built-in error handling capabilities and its context that is attached to every other API request in the system. So based on the signature of MapFunc, it's clear that the designers don't want you making any queries inside of them!

Then how can we only use the data found in the client.Object to create a list of QuestDBs to reconcile?

Annotations to the rescue!

To solve this issue, I decided to create a new annotation: "crd.questdb.io/name". This annotation will be attached to a Secret and points to the name of the QuestDB CR that will use its data to construct an ILP auth config file. For simplicity, I will assume that the Secret will only be used by a single QuestDB, and that both the Secret and QuestDB will reside in the same namespace.

This allows us to create a very simple mapper function that looks something like this:

func CheckSecretForQdbs(obj client.Object) []reconcile.Request {

  var (
    requests = []reconcile.Request{}
  )

  // Exit if the object is not a Secret
  if _, ok := obj.(*v1core.Secret); !ok {
    return requests
  }

  // Extract the target QuestDB from the annotation
  qdbName, ok := obj.GetAnnotations()["crd.questdb.io/name"]
  if !ok {
    return requests
  }

  requests = append(requests, reconcile.Request{
    NamespacedName: client.ObjectKey{
      Name:      qdbName,
      // The Secret and QuestDB must reside in
      // the same namespace for this to work
      Namespace: obj.GetNamespace(),
    },
  })

  return requests

}

Reconciliation logic

But we're not done yet! The controller still needs to find this Secret and use its data to construct the auth config.

Inside our QuestDB reconciliation loop, we can query for all Secrets in a QuestDB's namespace and iterate over them until we find the one we're looking for, based on our new annotation. Here's a small code sample of that, without any additional error-checking.

func (r *QuestDBReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
  q := &questdbv1.QuestDB{}

  // Assumes that the QuestDB exists (for simplicity)
  err := r.Get(ctx, req.NamespacedName, q)
  if err != nil {
    return ctrl.Result{}, err
  }

  allSecrets := &v1core.SecretList{}
  authSecret := v1core.Secret{}

  // Get a list of all secrets in the namespace
  if err := r.List(ctx, allSecrets, client.InNamespace(q.Namespace)); err != nil {
    return nil, err
  }

  // Iterate over them to find the secret with the desired annotation
  for _, secret := range allSecrets.Items {
    if secret.Annotations["crd.questdb.io/name"] == q.Name {
      authSecret = secret
    }
  }

  if authSecret.Name == "" {
    return errors.New("auth secret not found")
  }

  var (
    x   = authSecret["x"]
    y   = authSecret["y"]
    kid = authSecret["kid"]
  )

  // Construct the ILP auth string to add to the QuestDB config
  var auth string = constructIlpAuthConfig(x, y, kid)

  // Add this auth string to a ConfigMap value and update...
}

As you can see, the new annotation allows us to fully decouple the Secret from the QuestDB operator, since there are no domain-specific naming requirements for the Secret. You don't even need to change the QuestDB CR spec to update the config. All you need to do is add the annotation to any Secret in the QuestDB's namespace, set the value to the name of the QuestDB resource, and the operator will be automatically be notified of the change and update your QuestDB's config to use the Secret's public key data.

Note that this is a golden-path solution; we still need to handle cases where more than 1 Secret has the annotation or a matching Secret does not have the required keys that are needed to generate the JWK public key.

No limits

The beauty of annotations is that you can store anything in them, and with a custom operator, use that data to perform any cluster automation that you can dream of! K8s doesn't even prescribe the format of an annotation's value, as long as it can be represented in a YAML string. This means you can use simple strings, JSON, or even base64-encoded binary blobs as annotation values for an operator to use! Still, since k8s is a young-ish and constantly evolving system, I would probably stick with simple annotation values to abide by KISS as much as possible.

After using annotations in my operator code, I've started to gain more of an appreciation for why the k8s annotation docs are so vague; because they can be used for any custom action, it's not really possible to define all of their capabilities. It's up to the operator developer to use annotations in his or her own way.

I hope this example has sparked some of your own ideas about how to use annotations in your own operators. Let me know if it has!