Note: The solutions discussed in this post are based on GreptimeDB v0.15. We highly recommend upgrading to this version for optimal compatibility.
We have published and will continue to release articles in this series:
- GreptimeDB v0.15 Launch! Pipeline VRL Processing and Bulk Ingestion for High-Throughput Scenarios
- A Comprehensive Guide to Query Management GreptimeDB v0.15
······
Stay tuned for more updates!
In today’s era of big data and the Internet of Things (IoT), real-time data processing and analytics form the backbone of modern applications. Apache Flink, as a leading industry-standard stream processing framework, provides robust real-time computation capabilities. GreptimeDB, meanwhile, is a high-performance, cloud-native database purpose-built for observability data workloads. By integrating the two, you can assemble a powerful and flexible platform for real-time data processing and analytics at scale.

This article will take an in-depth look at GreptimeDB’s unique capabilities for efficient data writing, and explain step-by-step how to leverage Flink to ingest data at scale into GreptimeDB—powered by a custom-built Flink Sink, GreptimeSink. This solution is designed for stability, scalability, and maximum throughput in demanding streaming environments.
Why Choose GreptimeDB as a Sink for Flink?
Before diving into the code, it’s essential to understand why GreptimeDB is particularly well-suited as a sink for Apache Flink:
1. High-Performance Ingestion with Columnar Storage
- Advanced Engine: GreptimeDB adopts an architecture similar to LSM-Tree coupled with columnar storage, dramatically improving write throughput and making it ideal for handling high-concurrency data streams.
- Optimized Write Path: Incoming data are first buffered in memory (MemTable), then merged and flushed in bulk to persistent storage—bypassing the overhead of costly random writes and maximizing throughput.
2. Cloud-Native, Distributed, and Elastic Scalability
- Separation of Compute & Storage: Compute nodes (Datanodes) and shared storage (e.g., Amazon S3) are fully decoupled. This enables resource-optimized, flexible node scaling—aligning perfectly with Flink’s elastic TaskManager parallelism.
- Kubernetes-Friendly: Both Flink and GreptimeDB support rapid, automated scaling via Kubernetes, eliminating the compute-bound ingestion bottlenecks found in traditional databases.
3. Native, Asynchronous gRPC API
- Modern Protocol: GreptimeDB exposes a native, non-blocking gRPC API, which is inherently compatible with Flink’s distributed streaming architecture.
- Bulk & Streaming Support: Supports both batch and streaming ingestion, maximizing network efficiency and sustaining high throughput for real-time data pipelines.
Implementation Walkthrough: Building GreptimeSink
for Efficient Streaming Ingestion
To ingest DataStreams from Flink efficiently into GreptimeDB, a custom Sink implementation is required. The core responsibilities include connecting to the GreptimeDB client, defining table schema, and implementing a SinkWriter
for batch-optimized, high-throughput ingestion.
All code is open source:
- Main Repo
- End-to-end Demo(Flink parsing Nginx access logs + GreptimeDB, deployable via Docker Compose)
GreptimeSink: Key Implementation Points
Let's see the implementation of GreptimeSink
class GreptimeSink implements Sink<String> {
@Override
public SinkWriter<String> createWriter(WriterInitContext context) {
// 1. Initialize GreptimeDB client
GreptimeDB greptimeDb = GreptimeDB.create(GreptimeOptions.newBuilder(endpoint, "public").build());
// 2. Define table schema
TableSchema tableSchema = TableSchema.newBuilder("my-table")
.addTimestamp("timestamp", DataType.TimestampMillisecond)
.addField("f1", DataType.String)
// ... add more fields as needed
.addField("fn", DataType.String)
.build();
// 3. Create and return a GreptimeSinkWriter using BulkStreamWriter for high-throughput streaming ingestion
return new GreptimeSinkWriter(greptimeDb.bulkStreamWriter(tableSchema));
}
}
Key takeaway: The createWriter
method instantiates a BulkStreamWriter
, optimized for bulk and streaming writes—a substantial performance upgrade over traditional row-by-row ingestion.
The GreptimeSinkWriter
: Buffering, Batching, and High Throughput
GreptimeSinkWriter
implements an intelligent buffering strategy, collecting incoming records in memory and flushing them in bulk to GreptimeDB for maximum efficiency:
class GreptimeSinkWriter implements SinkWriter<String> {
private static final int MAX_ACCUMULATED_ROWS = 1_000; // Adjustable batch size
private final BulkStreamWriter writer;
private Table.TableBufferRoot buffer;
private int accumulatedRows = 0;
GreptimeSinkWriter(BulkStreamWriter writer) {
this.writer = writer;
this.buffer = createRowBuffer();
}
Table.TableBufferRoot createRowBuffer() {
return writer.tableBufferRoot(MAX_ACCUMULATED_ROWS);
}
@Override
public void write(String input, Context context) {
Object[] row = parse(input);
buffer.addRow(row);
if (++accumulatedRows >= MAX_ACCUMULATED_ROWS) {
bulkInsert();
}
}
@Override
public void flush(boolean endOfInput) {
if (accumulatedRows == 0) return;
bulkInsert();
}
void bulkInsert() {
buffer.complete();
CompletableFuture<Integer> future = writer.writeNext();
Integer rows = future.get(); // Simplified for demo
buffer = createRowBuffer();
accumulatedRows = 0;
}
}
Breakdown of Key Mechanisms
1. Buffering Mechanism
The MAX_ACCUMULATED_ROWS
parameter (tunable from hundreds to hundreds of thousands) sets the batch size for writes, striking a balance between latency and throughput.
The write()
method parses and buffers each incoming record, avoiding immediate writes and instead accumulating them for bulk insertion.
Once the batch size is reached, bulkInsert()
performs the write.
2. Efficient Bulk Insert
buffer.complete()
finalizes the buffer for writing.writer.writeNext()
streams the entire batch to GreptimeDB asynchronously over gRPC, eliminating per-row network overhead.- While this demo uses a blocking
future.get()
for simplicity, the API supports fully async handling via callbacks.
3. Reliable Flushing
- The
flush()
method ensures all buffered data are written, even if the buffer is not full—providing robustness during Flink checkpoints or job termination, and eliminating the risk of data loss.
Common Use Cases
This high-volume Flink-to-GreptimeDB data pipeline is perfectly suited for:
1. Log Analytics Pipeline

Process non-structured log streams at high concurrency and ingest them as structured records, empowering real-time operational intelligence.
2. Real-Time Metrics Collection & Analysis

Capture and aggregate events with sub-second latency; ideal for application monitoring, business dashboards, and alerting systems ensuring data freshness and accuracy.
3. IoT Data Ingestion

Seamlessly handle high-frequency device data and rules-driven alerts at scale, enabling smart operations and efficient industrial monitoring.
Conclusion & Key Takeaways
Ingesting data from Flink into GreptimeDB can be both robust and lightning-fast. The custom GreptimeSink
leverages the BulkStreamWriter
from the GreptimeDB Java SDK for maximal efficiency.
To recap:
- Batch Processing is Key: Buffer and bulk insert records for throughput—never fall back to row-by-row ingestion.
BulkStreamWriter
Unlocks Performance: Purpose-built for streaming and high concurrency, abstracting away the lower-level gRPC streaming management.- Robustness Through Flushing: Seamless integration with Flink’s checkpointing ensures zero data loss, even during failovers.
By combining Flink’s real-time stream computation with GreptimeDB’s high-performance time-series storage, you’re ready to build next-generation data platforms for monitoring, IoT, analytics, and beyond.
Explore the flink-demo repo, get hands-on with the code, and start building your own high-throughput Flink + GreptimeDB data pipeline today!