How to aggregate a group by queryset in django?
I'm working with time series data which are represented using this model:
class Price:
timestamp = models.IntegerField()
price = models.FloatField()
Assuming timestamp has 1 min interval data, this is how I would resample it to 1 hr:
queryset = (
Price.objects.annotate(timestamp_agg=Floor(F('timestamp') / 3600))
.values('timestamp_agg')
.annotate(
timestamp=Min('timestamp'),
high=Max('price'),
)
.values('timestamp', 'high')
.order_by('timestamp')
)
which runs the following sql under the hood:
select min(timestamp) timestamp, max(price) high
from core_price
group by floor((timestamp / 3600))
order by timestamp
Now I want to calculate a 4 hr moving average, usually calculated in the following way:
select *, avg(high) over (order by timestamp rows between 4 preceding and current row) ma
from (select min(timestamp) timestamp, max(price) high
from core_price
group by floor((timestamp / 3600))
order by timestamp)
or
Window(expression=Avg('price'), frame=RowRange(start=-4, end=0))
How to apply the window aggregation above to the first query? Obviously I can't do something like this since the first query is already an aggregation:
>>> queryset.annotate(ma=Window(expression=Avg('high'), frame=RowRange(start=-4, end=0)))
django.core.exceptions.FieldError: Cannot compute Avg('high'): 'high' is an aggregate
Using SQL:
query = """
SELECT * FROM (
SELECT min(timestamp) as timestamp,
max(price) as high
FROM core_price
GROUP BY floor((timestamp / 3600))
) subquery
CROSS JOIN LATERAL (
SELECT avg(high) OVER (
ORDER BY timestamp
ROWS BETWEEN 4 PRECEDING AND CURRENT ROW
) as ma
) window_calc
ORDER BY timestamp
"""
results = Price.objects.raw(query)
Or, alternatively, for Django 4.2+:
from django.db.models import CTE, Window, F, FloatField
from django.db.models.functions import Floor, Min, Max, Avg
from django.db.models.expressions import RowRange
#Create the CTE for hourly aggregation
hourly_cte = CTE(
Price.objects
.annotate(timestamp_agg=Floor(F('timestamp') / 3600))
.values('timestamp_agg')
.annotate(
timestamp=Min('timestamp'),
high=Max('price'),
)
.values('timestamp', 'high'),
name='hourly_data'
)
#Query the CTE with window function
queryset = (
hourly_cte.queryset()
.with_cte(hourly_cte)
.annotate(
ma=Window(
expression=Avg('high'),
order_by=F('timestamp').asc(),
frame=RowRange(start=-4, end=0)
)
)
.order_by('timestamp')
)