Introduction
Over the past few weeks, I've been diving into the world of query plan optimization. It's a fascinating (but complex) rabbit hole that has led to several new features in the upcoming MatchFlow release. Here's what's new, why I built it, and how it can help you.
Query Plan Optimization (optional, but awesome!)
As data pipelines become more complex, I realized MatchFlow could benefit from smarter optimization. Enter the internal FlowOptimizer
class. This is an optional feature you can enable to automatically rewrite your pipeline plans, pushing filters and limits closer to data sources, fusing steps, and generally improving efficiency.
You can opt-in to using it by:
from penaltyblog.matchflow import Flow
flow = Flow.from_jsonl("events.jsonl", optimize=True)
Why make it optional? Correctness and safety are my top priorities. Since this is the initial release of the optimizer, I'm making it opt-in while it gets thoroughly battle-tested.
Once I'm confident it handles edge cases correctly, I'll make it the default. For now though, enabling it is straightforward but the choice remains yours.
Importantly, the optimizer is also transparent and explainable. Use .explain(compare=True)
to see exactly what optimizations have taken place.
Improved Plan Visualization with .plot_plan
Understanding exactly what your pipeline does is crucial. With Flow.plot_plan
, you can now generate visual representations of your pipelines. Quickly visualize steps, identify bottlenecks, and simplify debugging. You can also set compare=True
to compare the plan before and after the optimizer has potentially rewritten it.
from penaltyblog.matchflow import Flow
data = [
{"team": "A", "score": 1},
{"team": "B", "score": 2},
{"team": "A", "score": 3},
{"team": "B", "score": 4},
]
flow = (
Flow.from_records(data, optimize=True)
.select("team", "score")
.assign(x=lambda r: r["score"] * 2)
.limit(2)
)
flow.plot_plan(compare=True)
Figure 1: Flow plan before and after optimizer
You can see from the example above how the optimizer has pushed the limit
step closer to the data source, reducing the amount of data that needs to be processed. This can lead to significant performance improvements, especially for larger datasets.
Let's look at another example:
from penaltyblog.matchflow import Flow
data = [
{"team": "A", "score": 1},
{"team": "B", "score": 2},
{"team": "A", "score": 3},
{"team": "B", "score": 4},
]
flow = (
Flow.from_records(data, optimize=True)
.filter(lambda r: r["team"] == "A")
.filter(lambda r: r["score"] > 2)
.limit(2)
)
flow.plot_plan(compare=True)
Figure 2: Flow plan before and after optimizer
This time the optimizer has recognized that the two filters are redundant and has fused them into a single filter step. This can lead to significant performance improvements by reducing the number of steps that need to be executed.
Rolling Summaries for Groups
Sports data often needs rolling summaries, think moving averages or cumulative sums. MatchFlow now supports this natively using a sliding window:
from penaltyblog.matchflow import Flow, where_equals
import datetime
flow = (
Flow.statsbomb.events(16023)
.filter(where_equals("type.name", "Shot"))
.assign(
timestamp=lambda r: datetime.timedelta(
minutes=r["minute"],
seconds=r["second"]
)
)
.group_by("team.name")
.sort_by("timestamp")
.rolling_summary(
window="5m",
aggregators={
"xg": ("sum", "shot.statsbomb_xg"),
"shot_count": ("count", "shot.statsbomb_xg")
},
time_field="timestamp",
)
.select("team.name", "timestamp", "xg", "shot_count")
)
flow.show()
What it does
- Groups your stream by one or more keys (e.g. team).
- Sorts each group by a timestamp (must be pre-sorted).
- Slides a fixed-duration window (5m here but can also be the number of records) over each group.
- Computes aggregations (moving average, running sum, count, etc.) within each window.
Why pre-sorting matters
To guarantee correct sliding-window results, you must call .sort_by()
before .rolling_summary()
. If records arrive at the window out of order, you risk skewed or invalid metrics.
Automatic safety checks
I debated with myself a lot about auto-sorting inside .rolling_summary()
, but eventually decided against it because it would:
- Add overhead when data is already sorted
- Hide errors when upstream sources emit out-of-order events
Instead, I settled on MatchFlow’s new optimizer emitting a warning whenever it detects that .rolling_summary()
follows a non-sorted stream. This reminds you to verify order without penalizing optimized pipelines.
Figure 3: Flow plan before and after optimizer
Time Buckets
If you’d rather have one summary per uniform time interval, say total xG every 5 minutes, you can use .time_bucket()
:
from penaltyblog.matchflow import Flow, where_equals
import datetime
flow = (
Flow.statsbomb.events(16023)
.filter(where_equals("type.name", "Shot"))
.assign(
timestamp=lambda r: datetime.timedelta(
minutes=r["minute"],
seconds=r["second"]
)
)
.group_by("team.name")
.time_bucket(
freq="5m",
aggregators={
"xg": ("sum", "shot.statsbomb_xg"),
"shot_count": ("count", "shot.statsbomb_xg")
},
time_field="timestamp",
label="left"
)
)
flow.show()
What it does
- Non-overlapping bins: divides time into [0–5), [5–10), … minute windows.
- Single row per bucket: each group emits exactly one record per interval, with your chosen aggregations.
- Flexible labeling: choose
label="left"
(interval start) orlabel="right"
(interval end), and name that column withbucket_name
.
Quick Tips
- Always
.assign()
your timestamp field as a properdatetime
ortimedelta
when using time-based windows. - Don't forget to use
.sort_by()
before.rolling_summary()
to ensure correct results. - Use
.time_bucket()
when you need uniform reporting intervals; use.rolling_summary()
for sliding-window analytics.
Schema Validation
Messy data can be frustrating so I've also introduced .with_schema()
, to help catch data issues early:
from datetime import datetime
def parse_datetime(dt_str):
return datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
flow.with_schema({"team.name": str, "score": int, "timestamp": parse_datetime}, strict=True, drop_extra=True)
You can set strict=True
to raise an error if any data doesn't match the schema, and drop_extra=True
to drop any fields that are not defined in the schema.
Quick Data Previews
Because I'm often working in notebooks or terminals, I've added a simple .show()
method to quickly preview data in a nice, readable format:
flow.limit(10).show()
Quality-of-Life Improvements
I've also added some quality-of-life improvements to existing methods, including:
- Optional progress bars on
.collect()
- Improved
.explain()
for grouped pipelines - Documentation updates
Conclusions
These improvements reflect my ongoing plan to make MatchFlow as powerful while remaining simple as possible.
MatchFlow is still new and I'm sure there's edge cases I haven't discovered yet so if you find any issues or or have any feedback, please do let me know. I'm always open to collaboration and excited about new possibilities.
What's Next?
As a quick teaser, the new optimizer opens the door to exciting future developments like predicate pushdown and parallel processing. I'm also currently experimenting with a custom binary data format designed for faster loading and even more efficient filtering. Lots of great stuff on the roadmap so stay tuned!