Join us for a virtual meetup on Zoom at 8 PM, July 31 (PDT) about using One Time Series Database for Both Metrics and Logs 👉🏻 Register Now

Skip to content
On this page
Engineering
December 22, 2023

Time-series data ingestion from Rust WebAssembly application, leveraging GreptimeDB and WasmEdge

WebAssembly (Wasm) is becoming a popular compilation target, aiding developers in creating portable applications for different platforms. Greptime and WasmEdge have collaborated to support the execution of Wasm applications on the WasmEdge platform, enabling the read and write of GreptimeDB's time-series data through the MySQL protocol.

What is WebAssembly

WebAssembly is a new instruction format that offers cross-platform compatibility and execution speeds close to native machine code. By compiling C/C++ or Rust code into WebAssembly, program performance can be enhanced within browsers.

Additionally, in environments outside of browsers, particularly at the edge of CDNs or IoT, WebAssembly can be used to implement advanced functionalities like sandboxing and dynamic loading of plugins.

What is WasmEdge

WasmEdge is a sandbox project of the Cloud Native Computing Foundation (CNCF), offering the sandboxing capabilities mentioned earlier. It allows developers to extend the resources and interfaces they can access on top of the standard WebAssembly. For instance, WasmEdge provides additional capabilities such as TLS, networking, and AI abilities for Wasm, significantly enriching its range of applications.

WasmEdge GitHub address: https://github.com/WasmEdge/WasmEdge.

Installing GreptimeDB and WasmEdge

If you have already installed GreptimeDB, you can skip this step.

Download and run GreptimeDB

rust
curl -L https://github.com/GreptimeTeam/greptimedb/raw/develop/scripts/install.sh | sh
./greptime standalone start

Install WasmEdge

rust
curl -sSf https://raw.githubusercontent.com/WasmEdge/WasmEdge/master/utils/install.sh | bash -s

Writing a GreptimeDB WASM Application

In WasmEdge, we can use the MySQL protocol to connect Rust-written applications to GreptimeDB.

First, create a new Rust project using cargo new. Our compilation target will be wasm32-wasi. You can create a .cargo/config.toml file in the project root directory to specify the default compilation target. This way, you won't need to specify --target after every cargo build command.

python
# .cargo/config.toml
[build]
target = "wasm32-wasi"

Edit the Cargo.toml to add dependencies. The application of mysql_async requires the tokio runtime and WasmEdge maintains the modified versions of these two libraries, enabling them to be compiled into WebAssembly code and run in the WasmEdge environment.

python
[package]
name = "greptimedb"
version = "0.1.0"
edition = "2021"

[dependencies]
mysql_async_wasi = "0.31"
time = "0.3"
tokio_wasi = { version = "1", features = [ "io-util", "fs", "net", "time", "rt", "macros"] }

To further edit the src/main.rs file and incorporate database access logic, you should follow these steps. The following code will demonstrate:

  1. Reading the database address from environment variables and creating a connection pool.
  2. Executing SQL statements to create a data table.
  3. Inserting data.
  4. Querying data.

Define data structure

rust
#[derive(Debug)]
struct CpuMetric {
    hostname: String,
    environment: String,
    usage_user: f64,
    usage_system: f64,
    usage_idle: f64,
    ts: i64,
}

impl CpuMetric {
    fn new(
        hostname: String,
        environment: String,
        usage_user: f64,
        usage_system: f64,
        usage_idle: f64,
        ts: i64,
    ) -> Self {
        Self {
            hostname,
            environment,
            usage_user,
            usage_system,
            usage_idle,
            ts,
        }
    }
}

Initializing a database connection pool

rust
use mysql_async::{
    prelude::*, Opts, OptsBuilder, Pool, PoolConstraints, PoolOpts, Result,
};
use time::PrimitiveDateTime;

fn get_url() -> String {
    if let Ok(url) = std::env::var("DATABASE_URL") {
        let opts = Opts::from_url(&url).expect("DATABASE_URL invalid");
        if opts
            .db_name()
            .expect("a database name is required")
            .is_empty()
        {
            panic!("database name is empty");
        }
        url
    } else {
        "mysql://root:[email protected]:3306/mysql".into()
    }
}


#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
    // Alternative: The "easy" way with a default connection pool
    // let pool = Pool::new(Opts::from_url(&*get_url()).unwrap());
    // let mut conn = pool.get_conn().await.unwrap();

    // Below we create a customized connection pool
    let opts = Opts::from_url(&*get_url()).unwrap();
    let builder = OptsBuilder::from_opts(opts);
    // The connection pool will have a min of 1 and max of 2 connections.
    let constraints = PoolConstraints::new(1, 2).unwrap();
    let pool_opts = PoolOpts::default().with_constraints(constraints);

    let pool = Pool::new(builder.pool_opts(pool_opts));
    let mut conn = pool.get_conn().await.unwrap();
    
    
    
    Ok(())
}

Creating a data table

rust
// Create table if not exists
    r"CREATE TABLE IF NOT EXISTS wasmedge_example_cpu_metrics (
    hostname STRING,
    environment STRING,
    usage_user DOUBLE,
    usage_system DOUBLE,
    usage_idle DOUBLE,
    ts TIMESTAMP,
    TIME INDEX(ts),
    PRIMARY KEY(hostname, environment)
);"
    .ignore(&mut conn)
    .await?;

Inserting data

rust
let metrics = vec![
        CpuMetric::new(
            "host0".into(),
            "test".into(),
            32f64,
            3f64,
            4f64,
            1680307200050,
        ),
        CpuMetric::new(
            "host1".into(),
            "test".into(),
            29f64,
            32f64,
            50f64,
            1680307200050,
        ),
        CpuMetric::new(
            "host0".into(),
            "test".into(),
            32f64,
            3f64,
            4f64,
            1680307260050,
        ),
        CpuMetric::new(
            "host1".into(),
            "test".into(),
            29f64,
            32f64,
            50f64,
            1680307260050,
        ),
        CpuMetric::new(
            "host0".into(),
            "test".into(),
            32f64,
            3f64,
            4f64,
            1680307320050,
        ),
        CpuMetric::new(
            "host1".into(),
            "test".into(),
            29f64,
            32f64,
            50f64,
            1680307320050,
        ),
    ];

    r"INSERT INTO wasmedge_example_cpu_metrics (hostname, environment, usage_user, usage_system, usage_idle, ts)
      VALUES (:hostname, :environment, :usage_user, :usage_system, :usage_idle, :ts)"
        .with(metrics.iter().map(|metric| {
            params! {
                "hostname" => &metric.hostname,
                "environment" => &metric.environment,
                "usage_user" => metric.usage_user,
                "usage_system" => metric.usage_system,
                "usage_idle" => metric.usage_idle,
                "ts" => metric.ts,
            }
        }))
        .batch(&mut conn)
        .await?;

Querying data

rust
let loaded_metrics = "SELECT * FROM wasmedge_example_cpu_metrics"
        .with(())
        .map(
            &mut conn,
            |(hostname, environment, usage_user, usage_system, usage_idle, raw_ts): (
                String,
                String,
                f64,
                f64,
                f64,
                PrimitiveDateTime,
            )| {
                let ts = raw_ts.assume_utc().unix_timestamp() * 1000;
                CpuMetric::new(
                    hostname,
                    environment,
                    usage_user,
                    usage_system,
                    usage_idle,
                    ts,
                )
            },
        )
        .await?;
    println!("{:?}", loaded_metrics);

The tokio and mysql_async libraries provided by the WasmEdge team are fully compatible with the original version's programming interface, allowing for a seamless transition of Rust applications to the WebAssembly platform.

By compiling this project, we can get the greptimedb.wasm file.

rust
cargo build
ls -lh target/wasm32-wasi/debug/greptimedb.wasm

Run our application through WasmEdge:

rust
wasmedge --env "DATABASE_URL=mysql://localhost:4002/public" target/wasm32-wasi/debug/greptimedb.wasm

The above sample program has been added to the WasmEdge database usage demonstration. You can find the full source code in the GitHub repository at https://github.com/WasmEdge/wasmedge-db-examples/tree/main/greptimedb.

Conclusion

WasmEdge offers expanded capabilities for WebAssembly applications. If you deploy your application in a WebAssembly environment, you can also use the OpenTelemetry SDK in the future to collect metric data and store it directly in GreptimeDB. Download GreptimeDB now or activate a GreptimeCloud instance to run the above example and experience the convenience of time-series data ingestion for Rust WebAssembly applications.

Join our community

Get the latest updates and discuss with other users.