Hero image for Blog Post

Time-series Data and Databases with Rust

I became a software developer during the mid and late 2000s. During that time, whenever we needed to store data, it would end up in a SQL database without question. No matter what use case we had to solve, the SQL database was the store of choice, even if we had to implement sophisticated search functionality.

It was a revelation for me when I learned about other data stores, such as CouchDB, MongoDB, and Elasticsearch. Suddenly, it was so much easier to sync data, for example, or search across large data sets.

It became clear that each database has its valid use cases, and it’s legitimate to use more than one data store in a project.

Building a database from scratch

Over the years, I came into contact with Prometheus a few times. I never fully understood how to use it until recently.

A customer needed a specialized Prometheus Exporter at work, so I went to work and started building one. Getting to know Prometheus more, it turns out it is a time-series database.

While I was developing this custom exporter in Rust, I got the idea: What if I could put my skills to the test and reimplement Prometheus in Rust, see how far it would manage to come? It wasn’t my goal to deliver a drop-in replacement for Prometheus. I wanted to understand what it takes to create database software.

Over time, I had encountered several blog posts which outlined the complexities of relational databases. While a relational database would be a demanding challenge, I was more intrigued by time-series data stores. For instance: How does it organize data? How does querying that data work?

I wanted answers to all these different questions. And I got to work.

Managing time-series data

Before I started writing a single line of code, I spent time researching any deep-dives or walkthroughs of time-series databases in general or Prometheus specifically.

Reading a deep dive blog post seemed more convenient than digging through thousands of lines of source code. Luckily, I found an article that answered many questions I had. The following section includes information from that article.

The difference between relational and time-series data

A relational database focuses on saving data that conforms to a pre-defined structure. Its strength lies in the user’s ability to ask random questions in SQL queries. Queries are dynamic, meaning the database does not know them in advance.

On the other hand, a Time-series database focuses on points in time (hence the name). A single record in a relational database can have many fields. A record stored in a time-series database like Prometheus consists of a name (the metric), a value (a counter or gauge), and optionally, a few attached labels.

The main focus is to analyze data trends and changes over time to answer possible questions such as “How many error codes did the API return of the last week?”

Simplified, Prometheus stores data internally that conforms to this format:

identifier -> (t0, v0), (t1, v1), (t2, v2), (t3, v3), ....

The identifier represents a metric name, such as failed_http_requests. Each data point consists of a tuple. The first value in the tuple is the timestamp (t0, t1, t2, …), while the second one represents a numerical value, such as a counter or gauge (v0, v1, v2, …).

Querying data

While a relational database does not know beforehand what queries to expect, Prometheus does. Prometheus’ main focus lies on metrics and their corresponding time series.

A query always starts with the metric and includes a time frame. Since this is known beforehand, we can structure the data internally to allow for efficient queries.

Retrieving data from the store starts with the metric name, followed by the timeframe.

Once we narrow down the timeframe, we automatically get all the attached data points.

Implementation

Next, let’s get into the implementation details. My implementation is not nearly feature-complete. It deviates a lot from Prometheus.

You can find the entire source code here.

Fetching data from Prometheus Exporters

Before we can store any data or query it, we need to retrieve it somehow. Prometheus retrieves data from so-called scrape targets. A scrape target is a service that provides metrics via an HTTP endpoint.

To keep matters simple, I chose the Node Exporter as a scrape target. Node exporter provides metrics from the machine it currently runs on, such as memory usage or CPU utilization.

The Node Exporter provides metrics at http://localhost:9100/metrics.

Prometheus’ scrape data format is text-based and consists of key-value pairs and comments. Let’s have a look at some sample output below:

# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 3.8996e-05
go_gc_duration_seconds{quantile="0.25"} 4.5926e-05
go_gc_duration_seconds{quantile="0.5"} 5.846e-05

(Example output from Node Exporter, source https://prometheus.io/docs/guides/node-exporter/)

Given the text-based nature, parsing this format is straightforward. To make matters a bit easier, in the beginning, I omitted all lines starting with a hash symbol (#), as well as lines including curly braces, only focusing on counters and gauges without any additional labels.

My first and naive implementation of a parser looks like this:

#[derive(Debug)]
pub struct Metric {
    pub name: String,
    pub value: f64,
}

impl Metric {
    pub fn from_str(metric_str: &str) -> Option<Metric> {
        if metric_str.contains("{") || metric_str.contains("}") {
            return None;
        }

        let mut iter = metric_str.split_whitespace();
        let metric_name = iter.next();
        let metric_value_str = iter.next();

        if metric_name.is_none() || metric_value_str.is_none() {
            return None;
        }

        let parsed_metric_value = metric_value_str.unwrap().parse::<f64>();
        if parsed_metric_value.is_err() {
            return None;
        }

        Some(Metric {
            name: metric_name.unwrap().to_string(),
            value: parsed_metric_value.unwrap(),
        })
    }
}

Next was fetching data from the endpoint and parsing it with the code shown above:

mod metrics;

fn scrape_endpoint() -> &'static str {
    "http://localhost:9100/metrics"
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let metric_payload = reqwest::get(scrape_endpoint())
        .await?
        .text()
        .await?;
    let metrics = metric_payload
        .lines()
        .map(|line| metrics::Metric::from_str(line))
        .filter(|metric_or_none| metric_or_none.is_some())
        .map(|m| m.unwrap())
        .collect::<Vec<_>>();
    println!("{:?}", metrics);
    Ok(())
}

This feature was also straightforward to implement using async reqwest methods. You already notice that the application is set up for async code (looking at the tokio::main attribute, for instance). Tokio will play a more prominent role in the following sections.

Currently, this code performs a single HTTP GET request, parses raw metrics it retrieved, and prints them to stdout.

Periodic tasks with Tokio

When you configure a Prometheus instance to scrape data from targets such as Node Exporters, you need to configure a few things, such as the scrape_interval. scrape_interval determines how long Prometheus should wait in between scrapes.

To support this use case, this meant to come up with code that would perform a scrape every n seconds. I purposefully left out any configuration code. Therefore the interval is hard-coded.

After some googling, I found out about tokio::timer. With tokio::timer, I can create a task that gets executed every n seconds.

Let’s have a look at the source code:

mod metrics;

fn scrape_endpoint() -> &'static str {
    "http://localhost:9100/metrics"
}

async fn fetch_metrics() -> Result<(), Box<dyn std::error::Error>> {
    let metric_payload = reqwest::get(scrape_endpoint())
        .await?
        .text()
        .await?;
    let metrics = metric_payload
        .lines()
        .map(|line| metrics::Metric::from_str(line))
        .filter(|metric_or_none| metric_or_none.is_some())
        .map(|m| m.unwrap())
        .collect::<Vec<_>>();
    println!("{:?}", metrics);
    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut interval_timer = tokio::time::interval(chrono::Duration::seconds(5).to_std().unwrap());
    loop {
        // Wait for the next interval tick
        interval_timer.tick().await;
        tokio::spawn(async { fetch_metrics().await; }); // For async task
    }


    Ok(())
}

You see that the HTTP scraping code got moved into a function, making it easier to call it periodically.

First of all, in main, we create an interval_timer that gets triggered every five seconds.

In an endless loop, we wait for the five seconds to lapse via interval_timer.tick().await. Then, we perform the scrape via tokio::spawn(async { fetch_metrics().await; });.

It’s helpful to have the endless loop scraping the target every five seconds, but data is not stored anywhere yet.

See the complete change here.

Storing time-series data

Effectively storing and retrieving data could provide enough material for an entire series of blog posts. If you’re interested to learn more about these topics, I can recommend this book. For starters, I wanted to figure out how I could store the fetched data in memory in a simple struct.

Passing data across thread boundaries requires some thought. We could create a new struct that holds raw metrics and transforms them into time-series data, for starters. But who would own this struct? The async thread that gets kicked off periodically seems like a candidate. But what happens when the thread has performed its task? It dies. And so would our data store.

Therefore we need to keep time-series data somewhere else and figure out how to pass data from periodically-spawned tasks to the long-living main thread.

In other languages such as Java or C#, we use something like a Mutex to synchronize access to this object. A Mutex ensures that one thread at a time can access the shared resource. That would solve our problem and create potential issues down the line because accessing the mutex is a blocking and exclusive operation. If one thread locks the mutex and does not release it (because it dies unexpectedly or takes a long time to finish up), other threads would wait indefinitely for their turn to write and cause the program to fail eventually.

“Hello, this is Thread A speaking” — Message Passing

While Rust would allow a Mutex, this is not the best practice. I was wondering, what other options do I have to solve this problem?

Luckily, Rust and Tokio offer solutions for these use cases: Message passing. Instead of allowing all threads to access a globally shared resource, we spawn up another task that owns the storage struct. This task guards the resource and receives new data via channels and message-passing. For that, Tokio offers several channel primitives (Quoting from the documentation):

  • oneshot: single-producer, single-consumer channel. A single value can be sent
  • mpsc: multi-producer, single-consumer channel. Multiple values can be sent
  • broadcast: multi-producer, multi-consumer. Many values can be sent. Each receiver sees every value.
  • watch: single-producer, multi-consumer. Many values can be sent, but no history is kept. Receivers only see the most recent value.

From the Tokio documentation

In our case, we have two tasks:

  • Fetching metrics. This task is our producer, sending raw metrics as a string.
  • Storing data. This task is the receiver, receiving raw metrics and storing them as time-series data.

mpsc (Multi-producer, single consumer) looks like the perfect candidate for this use case.

How to pass messages between tasks

Once we have two tasks set up and picked the appropriate channel primitive, how do they communicate with each other?

With web services, we solve this problem by clearly defining message formats. An HTTP endpoint might accept JSON data via an HTTP POST request. An incoming request’s payload must conform to the expected message format. Otherwise, it gets rejected.

We’re using the same paradigm for two Tokio tasks communicating with each other. Whenever tasks communicate, the format is defined in a Rust struct or enum. I solved it with an enum outlining all available operations:

#[derive(Debug, Clone)]
pub enum Command {
    Store(String),
    Query(String),
    QueryResults(Vec<(i64, f64)>)
}

See Tokio Docs for more details.

Passing messages

Now that we established a message format and which channel to use, how do we pass messages between tasks?

Let’s look at this Commit. Here, we’re establishing the initial communication between the fetch_metrics and storage_task:

//…

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (tx, mut rx) = mpsc::channel(32);

    let manager = tokio::spawn(async move {
        let mut storage = Storage::new();
        while let Some(cmd) = rx.recv().await {
            match storage.store(cmd) {
                Ok(_) => {
                    println!("Stored metric");
                }
                Err(err) => {
                    eprintln!("ERR: {}", err.to_string());
                }
            }
        }
    });

Part one: We set up a manager task that is long-running and receives storage commands. This task owns storage, our struct that will handle everything related to storing and retrieving time-series data.

Using a while loop, we’re waiting for incoming messages. What’s important to point out here: We use Tokio’s mpsc channel type here. tx will be used further down in main to send data. rx is the receiving end.

Let’s also examine the fetch_metrics part. Using a combination of Tokio and chrono, we’re spinning off a new task every five seconds to fetch metrics from the Node exporter:

    let forever = task::spawn(async move {
        let mut interval_timer =
            tokio::time::interval(chrono::Duration::seconds(5).to_std().unwrap());
        loop {
            // Wait for the next interval tick
            interval_timer.tick().await;
            let tx = tx.clone();
            tokio::spawn(async move {
                let results = fetch_metrics().await.unwrap();
                for result in results {
                    if let Err(err) = tx.send(result).await {
                        eprintln!("Encountered Error {:?}", err);
                    }
                }
            });
        }
    });
    manager.await.unwrap();
    forever.await.unwrap();

    Ok(())
}

We receive a list of raw metrics from fetch_metrics. Each of these raw metrics gets sent to our storage task via tx.send.

Since we’re sending each metric individually, the queue fills up quickly and would block eventually. To mitigate this, we could send metrics in batches.

What’s missing?

With this foundation in place, it opens doors for a lot more use cases:

  • Handling metric labels
  • Querying data
  • persisting data on disk
  • Support for more than one scrape target

Conclusion

Before starting this project, I always treated database development as something unattainable. Granted, writing a small Proof of Concept does not compare at all to a production-grade product.

But the barrier to entry, to get started, is lower than expected.

Historically, most major relational databases were written in C or C . Mastering either of these languages is a challenge in itself and takes time, not even mentioning asynchronous programming.

Choosing Rust and Tokio as a basis for implementation allows for natively compiled safe and secure code.

Focusing on implementing a rudimentary database becomes much more attainable with that out of the way.

This project is far from being done. I feel like I barely scratched the surface of what’s possible.

I already learned from it: With the right tools, even writing a database can be possible.

If you found this article insightful, comment below with your thoughts.

Find the source code here. How did this blog post help you? Share your thoughts with me on Twitter.

Leave a Reply