ORDER BY ... LIMIT 10 looks like it should be cheap. Often it isn't. When the table is large and the sort key isn't the Time Index, the database typically has to read a lot of rows before it knows which ten to return.
GreptimeDB v1.0 addresses this by handing the runtime bound that the TopK operator is building back to the scan side, as early as possible. Once a row group can't possibly contribute to the final result, the scan doesn't read it. On a real trace dataset, that change took ORDER BY end_time DESC LIMIT 10 from roughly half a minute down to sub-second.
This landed in two steps. GreptimeDB #7545 introduced the core dynamic-filter machinery, wiring DataFusion's upstream runtime TopK filtering into GreptimeDB's own Mito scan layer. #7912 then folded the time-index TopK case onto the same path. The rest of this post walks through how the mechanism works and what it actually buys you.
Why it was slow
SELECT start_time, end_time, run_type, status
FROM langchain_traces
ORDER BY end_time DESC
LIMIT 10;On the old path, the scan node had no visibility into TopK's intermediate state. Even though the query only returns 10 rows, the scan kept reading until the query was done. By the time everything finished, most of what was scanned turned out to be unnecessary.
One detail matters here: start_time is the table's Time Index column; end_time is not. GreptimeDB had an existing optimization for time-indexed TopK called windowed sort (with PartSortExec doing the local sort inside). It depends on the data distribution of the Time Index and is fairly intricate. For a non-time-index sort key like end_time, there was no equivalent pruning. That's the gap #7545 was meant to close.
#7912 then pushed further. Benchmarks showed that for TopK specifically, plain SortExec: TopK + dynamic filter was faster than WindowedSortExec + PartSortExec. So GreptimeDB now disables the windowed-sort rewrite whenever the query has a LIMIT k, and routes both time-index and non-time-index TopK through the same dynamic-filter path. Windowed sort is kept only for true full sort (no LIMIT).
How the mechanism works
The idea is direct: have TopK feed its current bound back into the scan node as a runtime predicate.
- At query start, the filter is wide open and lets nearly everything through.
- As candidate rows arrive, the worst row in the current top-K becomes the new bound — and that bound keeps tightening.
- The bound is wrapped as a
DynamicFilterPhysicalExprand pushed down to the scan node via DataFusion's filter pushdown. - The scan side combines this runtime condition with SST file and row-group min/max statistics to skip data that can no longer possibly qualify.
For ORDER BY end_time DESC LIMIT 10, the win isn't a smaller result — there were always only ten rows to return. The win is that the scan range shrinks while the query is running.
The animation below puts the two paths side by side. Top half is the old path: the scan layer doesn't know the TopK threshold, so all eight row groups get read. Bottom half is the new path: the threshold is pushed down as a dynamic condition, and the scanner uses each row group's max statistic to skip entire row groups that can't reach the top 10. (Labels in the animation are in Chinese — 旧路径 = old path, 新路径 = new path, 已扫描 row group = row groups scanned, 动态过滤条件 = dynamic filter condition, 剪枝规则 = pruning rule.)

At the execution-plan level, the mechanism looks like this:
┌────────────────────────────────┐
│ TopK keeps updating its bound │
└──────────────┬─────────────────┘
│
▼
┌────────────────────────────────┐
│ Dynamic filter tightens │
└──────────────┬─────────────────┘
│
▼
┌────────────────────────────────┐
│ Scan re-evaluates remaining │
│ files / row groups using │
│ row group stats │
└──────────────┬─────────────────┘
│
▼
┌────────────────────────────────┐
│ Row groups that can't qualify │
│ are skipped entirely │
└────────────────────────────────┘What the bound looks like
Once TopK has built a meaningful bound, GreptimeDB wraps it as a NULL-aware predicate. For our example query it looks roughly like:
end_time IS NULL OR end_time > 1753660799999000000The IS NULL branch is there because ORDER BY ... DESC NULLS FIRST puts NULLs first, so they have to stay candidates and can't be pruned.
The scan side doesn't need to read every row to benefit. As long as a row group's statistics say its value range can't possibly satisfy the latest condition, GreptimeDB can skip the whole row group before reading any of it.
Key code paths
A few places in the codebase carry this end-to-end:
The
RegionScannertrait (src/store-api/src/region_engine.rs) gained a new method:rustfn add_dyn_filter_to_predicate( &mut self, filter_exprs: Vec<Arc<dyn PhysicalExpr>>, ) -> Vec<bool>;The return value tells DataFusion which filters were actually accepted for row-group pruning. Anything not accepted is still evaluated higher up.
RegionScanExec::handle_child_pushdown_result(src/table/src/table/scan.rs) is where DataFusion meets GreptimeDB's scan layer. It hands theparent_filtersfrom the parent operator (TopK or Hash Join) to the scanner, and reports support back to DataFusion'sFilterPushdownPropagation.Predicate(src/table/src/predicate.rs) stores dynamic filters in a hot-swappable structure:rustdyn_filters: Arc<ArcSwap<Vec<Arc<DynamicFilterPhysicalExpr>>>>,With
ArcSwap, the scan refreshes the bound lock-free. In-flight reads see at most a stale snapshot — they don't block, and they don't break consistency.All three Mito scan paths —
seq_scan.rs,series_scan.rs,unordered_scan.rs— forward incoming filters intoPredicateGroup::add_dyn_filtersinsideScanInput, populating bothpredicate_allandpredicate_without_region.The actual pruning happens in
FileRange::in_dynamic_filter_range(src/mito2/src/sst/parquet/file_range.rs). Before reading each row group, it runsprune_with_statsagainst the latestdyn_filterstogether withRowGroupPruningStats, and skips the whole row group on a hit.
The key property of this chain: every row group is evaluated against the latest bound, not against a snapshot taken when the query started.
Confirming it's actually wired up
EXPLAIN ANALYZE VERBOSE shows everything you need.
The TopK node shows its current runtime filter:
SortExec: TopK(fetch=10), expr=[end_time@1 DESC], preserve_partitioning=[true],
filter=[end_time@1 IS NULL OR end_time@1 > 1753660799999000000]The scan node shows that the filter has been pushed down to it:
SeqScan: region=..., {"projection": [...],
"dyn_filters": ["DynamicFilter [ end_time@1 IS NULL OR end_time@1 > 1753660799999000000 ]"],
"files": [...]}Both signals matter. SortExec: TopK carrying filter=[...] means TopK has produced a runtime threshold; dyn_filters on the scan node means the scan layer has received it and is pruning against it. When both show up, dynamic filtering is end-to-end. The repo has corresponding sqlness tests at tests/cases/standalone/common/filter/topk_dyn_filter.sql.
What it buys you
The dataset is a set of langchain traces. The most telling query is the one from the top of the post:
SELECT *
FROM langchain_traces
ORDER BY end_time DESC
LIMIT 10;End-to-end latency
| Query | Old path | dyn filter path | Notes |
|---|---|---|---|
ORDER BY end_time DESC LIMIT 10 | ~28.9s | ~0.21s | end_time is not the Time Index; the old path essentially scans the full table. With dynamic filtering, the scan gets pruned much earlier |
ORDER BY start_time DESC LIMIT 10 | ~0.33s (windowed sort) | ~0.23–0.24s (dyn filter) | start_time is the Time Index, originally handled by windowed sort. #7912 measured windowed sort to be slower here and moved TopK onto the unified dyn filter path |
Operator-level breakdown
| Metric | main | dyn_filter | Speedup |
|---|---|---|---|
| Total Query Time (User Time) | 28.70 s | 0.20 s | ~143× |
| Scan Node Cost (Total) | 28.81 s | 0.20 s | ~144× |
| Sort Exec Compute Time | 6.55 s | 0.009 s | ~720× |
| Scan Rows before Filter | High (Full Scan) | Near 0 (Pruned) | Significant |
The end_time case was slow because the old path scanned the entire table. With dynamic filtering, the TopK bound prunes the rest of the scan early — SortExec's own compute time drops from 6.55s to 9ms simply because it no longer has to compare anywhere near as many rows.
The start_time case is a different shape. It used to go through windowed sort (WindowedSortExec with PartSortExec inside), relying on partition-range and native SST time-range pruning. In the #7545 era this path didn't pick up the dyn filter and was already decent (~0.33s). #7912 ran another round of measurements and moved it onto dyn filter too, bringing p50 down further to ~0.23s.
One thing worth being explicit about: this optimization changes how much data is scanned, not operator complexity. The actual speedup depends on data distribution, row-group statistic selectivity, and the size of LIMIT.
Why time-index TopK now uses dyn filter too
In #7545 the deliberate choice was: don't enable dyn filter pushdown for time-index TopK. At the time, PartSortExec + dyn filter measured slower than either path alone, so windowed sort kept ownership of this case.
#7912 revisited that choice. Two things happened.
First, a lifetime bug in PartSortExec::with_new_children got fixed. The old code went through try_new, which constructed a brand-new PartSortExec and dropped the dynamic-filter handle that had been wired in from the outside. In practice, the dyn filter never saw TopK's current bound. After the fix, with_new_children clones the existing PartSortExec and only swaps the input, preserving the live dyn-filter reference:
// before: try_new rebuilt the node and gave it a fresh dyn filter that no one updated
// after: keep self's dyn filter, only update the input
let mut new_exec = self.as_ref().clone();
new_exec.input = new_input.clone();
new_exec.properties = new_input.properties().clone();Second, benchmarks were rerun with the fix in place. With PartSortExec + dyn filter actually functional, ORDER BY start_time DESC LIMIT k measured:
| LIMIT | p50 |
|---|---|
| 5 | 0.586s |
| 10 | 0.587s |
| 100 | 0.569s |
| 1000 | 0.584s |
A variant that skips windowed sort entirely and falls back to plain SortExec: TopK + dyn filter lands in the 0.23–0.24s range. So even with the correctness fix, the windowed-sort TopK path is still roughly 2.5× slower than plain TopK + dyn filter.
The actual code change is one line plus a comment:
// src/query/src/optimizer/windowed_sort.rs
if /* ... matches the windowed-sort pattern ... */
&& sort_exec.fetch().is_none()
// skip if there is a limit, as dyn filter alone is good enough in this case
{
// do the rewrite
} else {
return Ok(Transformed::no(plan));
}The behavior becomes:
SortExecwithoutfetch(i.e., full sort, noLIMIT) — still goes through the windowed-sort rewrite, keeping partition-range pruning;SortExecwithfetch(i.e., TopK) — stays asSortExec: TopKand lets dyn filter pushdown take over. Time-index and non-time-index sort keys share the same path.
The PartSortExec exception preserved by #7545 is now obsolete: in today's execution plans, TopK never produces a PartSortExec at all.
When this helps most
The clearest wins happen when several conditions hold together:
kis small, so the bound tightens quickly;- row-group min/max statistics are selective enough;
- data distribution lets the TopK bound become meaningful early.
Whether the sort key is the Time Index used to be the decisive factor. Not anymore — TopK uses the same dyn filter path either way. The only difference is that time-indexed data is naturally ordered, so the bound converges a little faster.
Cases where the win is smaller:
- The query is already fast (e.g., Time Index with a small window);
- The filter stays close to
truefor most of the run; - Row-group min/max ranges are wide, so little can be pruned;
LIMITis large enough that the bound never gets tight.
Short version: if a query is dominated by scan cost and the data shape cooperates, this path can change the cost profile substantially.
Status and what's next
The dynamic filtering landed by #7545 and #7912 currently covers:
- Local TopK (
SortExec: TopK) runtime filter pushdown — live, and now uniform across Time Index and non-Time-Index sort keys. - Local Hash Join (
HashJoinwithin the same Datanode) dyn filter — covered by sqlness cases includinghash_join_dyn_filterandhash_join_topk_dyn_filter. - Distributed Hash Join (a join on the Frontend pushing a dyn filter down to scans on Datanodes) — not yet supported.
The distributed version needs a "remote dyn filter" mechanism: something that can carry a runtime condition produced mid-execution back across RPC to the scan operator on the Datanode. RFC #7931 and infrastructure work in #7979 are still being shaped.
Troubleshooting checklist
If you have a slow ORDER BY ... LIMIT k query, here's what to check:
- Does the TopK node in the plan carry
filter=[...]?- If not, the query might not be in TopK shape at all — planned as a full sort, or split so the limit happens after an aggregation. It's also possible you're on a GreptimeDB version older than #7912, where the Time Index column still gets planned as
WindowedSortExec.
- If not, the query might not be in TopK shape at all — planned as a full sort, or split so the limit happens after an aggregation. It's also possible you're on a GreptimeDB version older than #7912, where the Time Index column still gets planned as
- Does the scan node carry
dyn_filters: [...]?- If not, the dyn filter isn't making it down. Look at the sort column, the projection, and whether anything in between is blocking it (complex expressions,
UNION, etc.).
- If not, the dyn filter isn't making it down. Look at the sort column, the projection, and whether anything in between is blocking it (complex expressions,
- Are the row-group statistics selective enough?
- If the bound is being pushed down but the scan is still large, the row-group min/max ranges are probably too wide, or the data wasn't clustered on the relevant column at write time.
- Does the plan show
WindowedSortExecorPartSortExec?- If you see it for a full sort (no
LIMIT), that's by design — windowed sort keeps that case. If you see it for a TopK query, thesort_exec.fetch().is_none()branch is misfiring, and the rewrite rule itself is what to look at.
- If you see it for a full sort (no
For this class of TopK query, the important shift isn't that only a few rows come back. It's that the scan after them is allowed to stop early.


