How to aggregate a group by queryset in django?

I'm working with time series data which are represented using this model:

class Price:
    timestamp = models.IntegerField()
    price = models.FloatField()

Assuming timestamp has 1 min interval data, this is how I would resample it to 1 hr:

queryset = (
    Price.objects.annotate(timestamp_agg=Floor(F('timestamp') / 3600))
    .values('timestamp_agg')
    .annotate(
        timestamp=Min('timestamp'),
        high=Max('price'),
    )
    .values('timestamp', 'high')
    .order_by('timestamp')
)

which runs the following sql under the hood:

select min(timestamp) timestamp, max(price) high
from core_price
group by floor((timestamp / 3600))
order by timestamp

Now I want to calculate a 4 hr moving average, usually calculated in the following way:

select *, avg(high) over (order by timestamp rows between 4 preceding and current row) ma
from (select min(timestamp) timestamp, max(price) high
      from core_price
      group by floor((timestamp / 3600))
      order by timestamp)

or

Window(expression=Avg('price'), frame=RowRange(start=-4, end=0))

How to apply the window aggregation above to the first query? Obviously I can't do something like this since the first query is already an aggregation:

>>> queryset.annotate(ma=Window(expression=Avg('high'), frame=RowRange(start=-4, end=0)))
django.core.exceptions.FieldError: Cannot compute Avg('high'): 'high' is an aggregate

Using SQL:

query = """
    SELECT * FROM (
        SELECT min(timestamp) as timestamp, 
               max(price) as high
        FROM core_price
        GROUP BY floor((timestamp / 3600))
    ) subquery
    CROSS JOIN LATERAL (
        SELECT avg(high) OVER (
            ORDER BY timestamp 
            ROWS BETWEEN 4 PRECEDING AND CURRENT ROW
        ) as ma
    ) window_calc
    ORDER BY timestamp
"""

results = Price.objects.raw(query)

Or, alternatively, for Django 4.2+:

from django.db.models import CTE, Window, F, FloatField
from django.db.models.functions import Floor, Min, Max, Avg
from django.db.models.expressions import RowRange

#Create the CTE for hourly aggregation
hourly_cte = CTE(
    Price.objects
    .annotate(timestamp_agg=Floor(F('timestamp') / 3600))
    .values('timestamp_agg')
    .annotate(
        timestamp=Min('timestamp'),
        high=Max('price'),
    )
    .values('timestamp', 'high'),
    name='hourly_data'
)

#Query the CTE with window function
queryset = (
    hourly_cte.queryset()
    .with_cte(hourly_cte)
    .annotate(
        ma=Window(
            expression=Avg('high'),
            order_by=F('timestamp').asc(),
            frame=RowRange(start=-4, end=0)
        )
    )
    .order_by('timestamp')
)
Вернуться на верх