Join us for a virtual meetup on Zoom at 8 PM, July 31 (PDT) about using One Time Series Database for Both Metrics and Logs 👉🏻 Register Now

Skip to content
On this page
Tutorial
June 4, 2024

Real-time Data Processing with Continuous Aggregation in GreptimeDB

GreptimeDB v0.8 introduces a continuous aggregation feature that helps users compute and query data aggregation results in real time. This article details how to run continuous aggregation on GreptimeDB and the steps to create and manage Flow tasks.

In the latest released version, GreptimeDB has implemented the Flow Engine to support continuous aggregations. Users can define a flow task to calculate sums, averages, or other aggregations into a sink table and query the aggregated result in real time.

This article introduces the basic usage and features of continuous aggregations in GreptimeDB and provides examples to illustrate the process of creating, using, and deleting Flow tasks.

What is Continuous Aggregation

Continuous Aggregation is a query that continuously runs to provide incrementally updated, consistent, materialized views.

Continuous aggregation has many practical applications, such as Streaming ETL, real-time analysis, monitoring, alerting, etc. One common use case is downsampling, where window functions can be used to downsample a signal with a millisecond-level output frequency to a second-level frequency (e.g., calculating the average value within one second), thus saving storage and computational costs.

Furthermore, suppose a speed sensor is reading high-frequency data. In that case, continuous aggregation can filter out data points with speeds below or above a certain value, calculate the average speed every five minutes, and then output the results to a result table.

Continuous aggregations are supported by the Flow engine. Flow engine is a built-in lightweight stream processing engine that offers continuous aggregation and window calculation, among other functions. Users can create a Flow task for continuous aggregation directly using SQL statements without writing additional business code. Flow tasks can be used for real-time data processing and calculations.

Application Example

Continuous aggregations can be defined using SQL. This article demonstrates the entire process from creating a Flow task, receiving data for stream processing, to deleting the Flow task.

As an example, let's consider a speed sensor that reads the instantaneous speeds of the left and right wheels. We filter out abnormal values that are too high or too low, and then calculate the average speed over a five-second interval.

First, create a source data table as input:

sql
CREATE TABLE velocity (
    ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    left_wheel FLOAT,
    right_wheel FLOAT,
    TIME INDEX(ts)
);

And create a result table for the Flow task output:

sql
CREATE TABLE avg_speed (
    avg_speed DOUBLE,
    start_window TIMESTAMP TIME INDEX,
    end_window TIMESTAMP,
    update_at TIMESTAMP,
);

Now you can create a Flow task using the SQL extension we provide, as illustrated in the example below:

sql
CREATE FLOW calc_avg_speed
SINK TO avg_speed
AS
SELECT avg((left_wheel+right_wheel)/2)
FROM velocity
WHERE left_wheel > 0.5 AND right_wheel > 0.5 AND left_wheel < 60 AND right_wheel < 60
GROUP BY tumble(ts, '5 second');

The SQL statement above creates a Flow task named calc_avg_speed, and outputs results to the avg_speed table. The query it runs is defined by the SELECT statement following AS.

In this example, the specific query operates as follows:

  1. First, it filters out speed values for the left and right wheels that are too small or too large (less than or equal to 0.5, or greater than or equal to 60).

  2. Then, based on the input table velocity, it calculates the average speed within each five-second window, using the ts column as the time index.

The query in the Flow job is entirely based on SQL syntax, with necessary extensions implemented as needed (e.g., tumble window).

Now that the Flow task is created, to observe the continuous aggregation results in avg_speed, simply insert data into the source data table velocity:

sql
INSERT INTO velocity 
VALUES
    ("2021-07-01 00:00:00.200", 0.0, 0.7),
    ("2021-07-01 00:00:00.200", 0.0, 61.0),
    ("2021-07-01 00:00:02.500", 2.0, 1.0,);

Note that the first two rows are filtered out because they do not meet the conditions, leaving only the third row for calculation. Querying the output table will then yield the computed result.

sql
SELECT * FROM avg_speed;
sql
avg_speed |        start_window        |         end_window         |         update_at          
-----------+----------------------------+----------------------------+----------------------------
       1.5 | 2021-07-01 00:00:00.000000 | 2021-07-01 00:00:05.000000 | 2024-06-04 03:35:20.670000
(1 row)

Try inserting more data into the velocity table:

sql
INSERT INTO velocity 
VALUES
    ("2021-07-01 00:00:05.100", 5.0, 4.0),
    ("2021-07-01 00:00:09.600", 2.3, 2.1);

The avg_speed table now contains two rows, representing the average speeds for two 5-second windows, which are 1.5 and 3.35 (calculated as (4.5+2.2)/2), respectively.

sql
SELECT * FROM avg_speed;
sql
avg_speed |        start_window        |         end_window         |         update_at          
-----------+----------------------------+----------------------------+----------------------------
       1.5 | 2021-07-01 00:00:00.000000 | 2021-07-01 00:00:05.000000 | 2024-06-04 03:35:20.670000
      3.35 | 2021-07-01 00:00:05.000000 | 2021-07-01 00:00:10.000000 | 2024-06-04 03:35:34.693000

The columns in the out_num_cnt table are explained as follows:

  • sum_number: The sum of the number column within the window.

  • start_window: The start time of the window.

  • end_window: The end time of the window.

  • update_at: The time the row data was updated.

The start_window and end_window columns are automatically added by the Flow engine's window function tumble. The update_at column is automatically added by the Flow engine to the Flow task output table, marking the latest update time of this row of data to help understand the operation status of the Flow task.

Lastly, use DROP FLOW to delete this Flow task:

sql
DROP FLOW calc_avg_speed;

Flow Management and Advanced Features

Create or update a Flow

sql
CREATE FLOW [ IF NOT EXISTS ] <flow-name>
SINK TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT = "<string>" ]
AS 
<SQL>;

The syntax for creating a Flow task can be explained as follows:

  • flow-name: A globally unique identifier.

  • sink-table-name: The name of the table where aggregated data is stored.

It can be either an existing table or a new table. If the target table does not exist, Flow will automatically create one.

  • EXPIRE AFTER: An optional time interval (use SQL syntax INTERVAL) used to clear expired intermediate states from the Flow engine.

  • COMMENT: A descriptive comment for the Flow task.

  • <SQL>: Defines the specific continuous aggregation query. The Flow engine will extract the referenced tables as the source tables for the Flow task.

A simple example:

sql
CREATE FLOW IF NOT EXISTS my_flow
SINK TO my_sink_table
EXPIRE AFTER INTERVAL '1 hour'
COMMENT = "My first flow in GreptimeDB"
AS
SELECT count(item) from my_source_table GROUP BY tumble(time_index, '5 minutes');

Let's explain a bit more about EXPIRE AFTER clause. In simple terms, like all modern stream processing systems, the Flow engine has two important concepts: system time and event time.

  • System time: Also called processing time, this refers to the system time of the machine executing the respective operation.

  • Event time: The time an event represented by a row of data happened, usually recorded in a column of that row of data. Flow considers the TIME INDEX column as the event time.

The EXPIRE AFTER expiration mechanism uses the difference between system time and event time to discard outdated rows in the Flow intermediate state. In the SQL example above, rows with event times more than one hour prior to the system time will be discarded and will not participate in calculations.

TIP

Note that EXPIRE AFTER only applies to newly arriving data. Therefore, the results in the output table will not change simply due to the passage of time; instead, it ensures that data arriving late will not be updated in the output table.

Additionally, the intermediate state of Flow is currently unpersisted but is purely stored in memory. State persistence will be added in future releases to ensure data correctness even after a restart.

Delete Flow

To delete a Flow task, use the following statement:

sql
DROP FLOW [IF EXISTS] <name>

Supported Flow Functions

Currently, Flow supports:

  • Aggregation functions: count, sum, avg, min, and max;

  • Scalar functions: addition, subtraction, multiplication, division, comparison, and logical operations;

  • The fixed window tumble function.

In the future, more aggregation functions, scalar functions, and window functions will be supported for continuous aggregation.

Summary

This article introduces the basic usage and features of continuous aggregations in GreptimeDB. It provides examples to illustrate the process of creating, using, and deleting Flow tasks.

Continuous aggregation allows for real-time, low-latency (second-level or sub-second) access to information of interest to users while avoiding additional memory and computational overhead.

In the future, in addition to supporting more functions, we will also support the persistence of intermediate states in stream processing and advanced features such as Temporal Filter. For more detailed information, please refer to the relevant documentation and development guides.


About Greptime

We help industries that generate large amounts of time-series data, such as Connected Vehicles (CV), IoT, and Observability, to efficiently uncover the hidden value of data in real-time.

Visit the latest version from any device to get started and get the most out of your data.

  • GreptimeDB, written in Rust, is a distributed, open-source, time-series database designed for scalability, efficiency, and powerful analytics.
  • GreptimeCloud is a fully-managed cloud database-as-a-service (DBaaS) solution built on GreptimeDB. It efficiently supports applications in fields such as observability, IoT, and finance. The built-in observability solution, GreptimeAI, helps users comprehensively monitor the cost, performance, traffic, and security of LLM applications.
  • Vehicle-Cloud Integrated TSDB solution is tailored for business scenarios of automotive enterprises. It addresses the practical business pain points that arise when enterprise vehicle data grows exponentially.

If anything above draws your attention, don't hesitate to star us on GitHub or join GreptimeDB Community on Slack. Also, you can go to our contribution page to find some interesting issues to start with.

continuous aggregation
data analytics

Join our community

Get the latest updates and discuss with other users.