Window Functions
Window functions allow you to perform calculations across a set of rows that are related to the current row. They operate on a group of rows (called a window frame) and return a result for each row based on the values in its window frame, without collapsing the result into a single row like aggregate functions do. See Window Functions Tutorial for a step-by-step tutorial.
Window
Describes how to partition data and in what order to apply the window function.
This class provides a way to specify window definitions for window functions. Window functions operate on a group of rows (called a window frame) and return a result for each row based on the values in its window frame.
Examples:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 | >>> from daft import Window, col
>>>
>>> # Basic window aggregation with a single partition column:
>>> window_spec = Window().partition_by("category")
>>> df = df.select(
... col("value").sum().over(window_spec).alias("category_total"),
... col("value").mean().over(window_spec).alias("category_avg"),
... )
>>>
>>> # Partitioning by multiple columns:
>>> window_spec = Window().partition_by(["department", "category"])
>>> df = df.select(col("sales").sum().over(window_spec).alias("dept_category_total"))
>>>
>>> # Using window aggregations in expressions:
>>> window_spec = Window().partition_by("category")
>>> df = df.select((col("value") / col("value").sum().over(window_spec)).alias("pct_of_category"))
|
Methods:
| Name | Description |
order_by | Orders rows within each partition by specified columns or expressions. |
partition_by | Partitions the dataset by one or more columns or expressions. |
range_between | Restricts each window to a range-based frame between start and end boundaries. |
rows_between | Restricts each window to a row-based frame between start and end boundaries. |
Attributes:
Source code in daft/window.py
| def __init__(self) -> None:
self._spec = _WindowSpec.new()
|
unbounded_following
unbounded_following = unbounded_following()
unbounded_preceding
unbounded_preceding = unbounded_preceding()
order_by
order_by(*cols: ManyColumnsInputType, desc: bool | list[bool] = False, nulls_first: bool | list[bool] | None = None) -> Window
Orders rows within each partition by specified columns or expressions.
Parameters:
| Name | Type | Description | Default |
*cols | ManyColumnsInputType | Columns or expressions to determine ordering within the partition. Can be column names as strings, Expression objects, or iterables of these. | () |
desc | bool | list[bool] | Sort descending (True) or ascending (False). Can be a single boolean value applied to all columns, or a list of boolean values corresponding to each column. Default is False (ascending). | False |
nulls_first | bool | list[bool] | None | Whether to position NULL values at the beginning (True) or end (False) of the partition. Can be a single boolean value applied to all columns, or a list of boolean values corresponding to each column. Default is None, which means NULL values are positioned at the end for ascending order (default) and at the beginning for descending order. | None |
Returns:
| Name | Type | Description |
Window | Window | A window specification with the given ordering. |
Examples:
| >>> from daft import Window, col
>>> # Order by 'date' ascending (default)
>>> window_spec = Window().partition_by("category").order_by("date")
>>> # Order by 'sales' descending
>>> window_spec_desc = Window().partition_by("category").order_by("sales", desc=True)
>>> # Order by 'date' ascending and 'sales' descending
>>> window_spec_multi = Window().partition_by("category").order_by("date", "sales", desc=[False, True])
|
Source code in daft/window.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 | def order_by(
self, *cols: ManyColumnsInputType, desc: bool | list[bool] = False, nulls_first: bool | list[bool] | None = None
) -> Window:
"""Orders rows within each partition by specified columns or expressions.
Args:
*cols: Columns or expressions to determine ordering within the partition.
Can be column names as strings, Expression objects, or iterables of these.
desc: Sort descending (True) or ascending (False). Can be a single boolean value applied to all columns,
or a list of boolean values corresponding to each column. Default is False (ascending).
nulls_first: Whether to position NULL values at the beginning (True) or end (False) of the partition.
Can be a single boolean value applied to all columns, or a list of boolean values corresponding to each column.
Default is None, which means NULL values are positioned at the end for ascending order (default) and at the beginning for descending order.
Returns:
Window: A window specification with the given ordering.
Examples:
>>> from daft import Window, col
>>> # Order by 'date' ascending (default)
>>> window_spec = Window().partition_by("category").order_by("date")
>>> # Order by 'sales' descending
>>> window_spec_desc = Window().partition_by("category").order_by("sales", desc=True)
>>> # Order by 'date' ascending and 'sales' descending
>>> window_spec_multi = Window().partition_by("category").order_by("date", "sales", desc=[False, True])
"""
expressions = []
for c in cols:
expressions.extend(column_inputs_to_expressions(c))
if isinstance(desc, bool):
desc_flags = [desc] * len(expressions)
else:
if len(desc) != len(expressions):
raise ValueError("Length of descending flags must match number of order by columns")
desc_flags = desc
if nulls_first is None:
nulls_first_flags = desc_flags
elif isinstance(nulls_first, bool):
nulls_first_flags = [nulls_first] * len(expressions)
else:
if len(nulls_first) != len(expressions):
raise ValueError("Length of nulls first flags must match number of order by columns")
nulls_first_flags = nulls_first
window = Window()
window._spec = self._spec.with_order_by([expr._expr for expr in expressions], desc_flags, nulls_first_flags)
return window
|
partition_by
partition_by(*cols: ManyColumnsInputType) -> Window
Partitions the dataset by one or more columns or expressions.
Parameters:
| Name | Type | Description | Default |
*cols | ManyColumnsInputType | Columns or expressions on which to partition data. Can be column names as strings, Expression objects, or iterables of these. | () |
Returns:
| Name | Type | Description |
Window | Window | A window specification with the given partitioning. |
Raises:
| Type | Description |
ValueError | If no partition columns are specified. |
Examples:
| >>> from daft import Window, col
>>> # Partition by a single column 'category'
>>> window_spec = Window().partition_by("category")
>>> # Partition by multiple columns 'department' and 'region'
>>> window_spec_multi = Window().partition_by("department", "region")
|
Source code in daft/window.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 | def partition_by(self, *cols: ManyColumnsInputType) -> Window:
"""Partitions the dataset by one or more columns or expressions.
Args:
*cols: Columns or expressions on which to partition data.
Can be column names as strings, Expression objects, or iterables of these.
Returns:
Window: A window specification with the given partitioning.
Raises:
ValueError: If no partition columns are specified.
Examples:
>>> from daft import Window, col
>>> # Partition by a single column 'category'
>>> window_spec = Window().partition_by("category")
>>> # Partition by multiple columns 'department' and 'region'
>>> window_spec_multi = Window().partition_by("department", "region")
"""
if not cols:
raise ValueError("At least one partition column must be specified")
expressions = []
for c in cols:
expressions.extend(column_inputs_to_expressions(c))
if not expressions:
raise ValueError("At least one partition column must be specified")
window = Window()
window._spec = self._spec.with_partition_by([expr._expr for expr in expressions])
return window
|
range_between
range_between(start: Any, end: Any, min_periods: int = 1) -> Window
Restricts each window to a range-based frame between start and end boundaries.
This defines a window frame based on a range of values relative to the current row's value in the ordering column. Requires exactly one order_by column, which must be numeric or temporal type.
Parameters:
| Name | Type | Description | Default |
start | Any | Boundary definition for the start of the window's range. Can be: Window.unbounded_preceding: Include all rows with order value <= current row's order value + start offset. Window.current_row: Start range at the current row's order value. - Offset value (e.g.,
-10, datetime.timedelta(days=-1)): The start of the range is defined as current_row_order_value + start. The type of the offset must match the order-by column type. Negative values indicate a lower bound less than the current row's value. Positive values indicate a lower bound more than the current row's value. | required |
end | Any | Boundary definition for the end of the window's range. Syntax is similar to start. Negative values indicate an upper bound less than the current row's value. Positive values indicate an upper bound more than the current row's value. | required |
min_periods | int | Minimum number of rows required in the window frame to compute a result (default = 1). If fewer rows exist in the frame, the function returns NULL. | 1 |
Returns:
| Name | Type | Description |
Window | Window | A window specification with the given range-based frame bounds. |
Raises:
| Type | Description |
NotImplementedError | This feature is not yet implemented. |
Examples:
1
2
3
4
5
6
7
8
9
10
11
12
13
14 | >>> from daft import Window, col
>>> import datetime
>>> # Assume df has columns 'sensor_id', 'timestamp', 'reading'
>>> # Frame includes rows within 10 units *before* the current row's reading
>>> val_window = (
... Window().partition_by("sensor_id").order_by("reading").range_between(-10, Window.current_row)
... )
>>> # Frame includes rows from 1 day before to 1 day after the current row's timestamp
>>> time_window = (
... Window()
... .partition_by("sensor_id")
... .order_by("timestamp")
... .range_between(datetime.timedelta(days=-1), datetime.timedelta(days=1))
... )
|
Source code in daft/window.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259 | def range_between(
self,
start: Any,
end: Any,
min_periods: int = 1,
) -> Window:
"""Restricts each window to a range-based frame between start and end boundaries.
This defines a window frame based on a range of values relative to the current row's
value in the ordering column. Requires exactly one `order_by` column, which must be
numeric or temporal type.
Args:
start: Boundary definition for the start of the window's range. Can be:
* ``Window.unbounded_preceding``: Include all rows with order value <= current row's order value + start offset.
* ``Window.current_row``: Start range at the current row's order value.
* Offset value (e.g., ``-10``, ``datetime.timedelta(days=-1)``): The start of the range is defined as
`current_row_order_value + start`. The type of the offset must match the order-by column type.
Negative values indicate a lower bound *less than* the current row's value. Positive values indicate a lower bound
*more than* the current row's value.
end: Boundary definition for the end of the window's range. Syntax is similar to `start`.
Negative values indicate an upper bound *less than* the current row's value. Positive values indicate an upper bound
*more than* the current row's value.
min_periods: Minimum number of rows required in the window frame to compute a result (default = 1).
If fewer rows exist in the frame, the function returns NULL.
Returns:
Window: A window specification with the given range-based frame bounds.
Raises:
NotImplementedError: This feature is not yet implemented.
Examples:
>>> from daft import Window, col
>>> import datetime
>>> # Assume df has columns 'sensor_id', 'timestamp', 'reading'
>>> # Frame includes rows within 10 units *before* the current row's reading
>>> val_window = (
... Window().partition_by("sensor_id").order_by("reading").range_between(-10, Window.current_row)
... )
>>> # Frame includes rows from 1 day before to 1 day after the current row's timestamp
>>> time_window = (
... Window()
... .partition_by("sensor_id")
... .order_by("timestamp")
... .range_between(datetime.timedelta(days=-1), datetime.timedelta(days=1))
... )
"""
if isinstance(start, _PyWindowBoundary):
start_boundary = start
else:
start_expr = Expression._to_expression(start)
start_boundary = _PyWindowBoundary.range_offset(start_expr._expr)
if isinstance(end, _PyWindowBoundary):
end_boundary = end
else:
end_expr = Expression._to_expression(end)
end_boundary = _PyWindowBoundary.range_offset(end_expr._expr)
frame = _WindowFrame(
start=start_boundary,
end=end_boundary,
)
new_window = Window()
new_window._spec = self._spec.with_frame(frame).with_min_periods(min_periods)
return new_window
|
rows_between
rows_between(start: int | PyWindowBoundary, end: int | PyWindowBoundary, min_periods: int = 1) -> Window
Restricts each window to a row-based frame between start and end boundaries.
This defines a sliding window based on row offsets relative to the current row.
Parameters:
| Name | Type | Description | Default |
start | int | PyWindowBoundary | Boundary definitions for the start of the window. Can be: Window.unbounded_preceding: Include all rows before the current row. Window.current_row: Start at the current row. - Integer value (e.g.,
-3): A negative integer indicates the number of rows preceding the current row. - Integer value (e.g.,
1): A positive integer indicates the number of rows following the current row. | required |
end | int | PyWindowBoundary | Boundary definitions for the end of the window. Can be: Window.unbounded_following: Include all rows after the current row. Window.current_row: End at the current row. - Integer value (e.g.,
1): A positive integer indicates the number of rows following the current row. - Integer value (e.g.,
-1): A negative integer indicates the number of rows preceding the current row. | required |
min_periods | int | Minimum number of rows required in the window frame to compute a result (default = 1). If fewer rows exist in the frame, the function returns NULL. | 1 |
Returns:
| Name | Type | Description |
Window | Window | A window specification with the given row-based frame bounds. |
Examples:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 | >>> from daft import Window
>>> # Frame includes the current row and the 2 preceding rows
>>> window_spec = Window().partition_by("cat").order_by("val").rows_between(-2, Window.current_row)
>>> # Frame includes all rows from the beginning of the partition up to the current row
>>> cum_window = (
... Window()
... .partition_by("cat")
... .order_by("val")
... .rows_between(Window.unbounded_preceding, Window.current_row)
... )
>>> # Frame includes the preceding row, current row, and following row
>>> sliding_window = Window().partition_by("cat").order_by("val").rows_between(-1, 1)
>>> # Frame includes the current row and the 3 following rows, requiring at least 2 rows
>>> lookahead_window = (
... Window().partition_by("cat").order_by("val").rows_between(Window.current_row, 3, min_periods=2)
... )
|
Source code in daft/window.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189 | def rows_between(
self,
start: int | _PyWindowBoundary,
end: int | _PyWindowBoundary,
min_periods: int = 1,
) -> Window:
"""Restricts each window to a row-based frame between start and end boundaries.
This defines a sliding window based on row offsets relative to the current row.
Args:
start: Boundary definitions for the start of the window. Can be:
* ``Window.unbounded_preceding``: Include all rows before the current row.
* ``Window.current_row``: Start at the current row.
* Integer value (e.g., ``-3``): A negative integer indicates the number of rows *preceding* the current row.
* Integer value (e.g., ``1``): A positive integer indicates the number of rows *following* the current row.
end: Boundary definitions for the end of the window. Can be:
* ``Window.unbounded_following``: Include all rows after the current row.
* ``Window.current_row``: End at the current row.
* Integer value (e.g., ``1``): A positive integer indicates the number of rows *following* the current row.
* Integer value (e.g., ``-1``): A negative integer indicates the number of rows *preceding* the current row.
min_periods: Minimum number of rows required in the window frame to compute a result (default = 1).
If fewer rows exist in the frame, the function returns NULL.
Returns:
Window: A window specification with the given row-based frame bounds.
Examples:
>>> from daft import Window
>>> # Frame includes the current row and the 2 preceding rows
>>> window_spec = Window().partition_by("cat").order_by("val").rows_between(-2, Window.current_row)
>>> # Frame includes all rows from the beginning of the partition up to the current row
>>> cum_window = (
... Window()
... .partition_by("cat")
... .order_by("val")
... .rows_between(Window.unbounded_preceding, Window.current_row)
... )
>>> # Frame includes the preceding row, current row, and following row
>>> sliding_window = Window().partition_by("cat").order_by("val").rows_between(-1, 1)
>>> # Frame includes the current row and the 3 following rows, requiring at least 2 rows
>>> lookahead_window = (
... Window().partition_by("cat").order_by("val").rows_between(Window.current_row, 3, min_periods=2)
... )
"""
if isinstance(start, int):
start = _PyWindowBoundary.offset(start)
if isinstance(end, int):
end = _PyWindowBoundary.offset(end)
frame = _WindowFrame(
start=start,
end=end,
)
new_window = Window()
new_window._spec = self._spec.with_frame(frame).with_min_periods(min_periods)
return new_window
|
Applying Window Functions
over
Apply the expression as a window function.
See Also
daft.functions.over
Source code in daft/expressions/expressions.py
1490
1491
1492
1493
1494
1495
1496
1497
1498 | def over(self, window: Window) -> Expression:
"""Apply the expression as a window function.
Tip: See Also
[`daft.functions.over`](https://docs.daft.ai/en/stable/api/functions/over/)
"""
from daft.functions import over
return over(self, window)
|
Aggregate Functions
Standard aggregate functions (e.g., sum, mean, count, min, max, etc) can be used as window functions by applying them with .over. They work with all valid window specifications (partition by only, partition + order by, partition + order by + frame). Refer to the Expressions API for a full list of aggregate functions.
Note
When using aggregate functions with both partition by and order by, the default window frame includes all rows from the start of the partition up to the current row — equivalent to rows between unbounded preceding and current row.
Ranking Functions
These functions compute ranks within a window partition. They require an order_by clause without a rows_between or range_between clause in the window specification.
Lead/Lag Functions
These functions access data from preceding or succeeding rows within a window partition. They require an order_by clause without a rows_between or range_between clause in the window specification.
lag
lag(offset: int = 1, default: Any | None = None) -> Expression
Get the value from a previous row within a window partition.
See Also
daft.functions.lag
Source code in daft/expressions/expressions.py
1500
1501
1502
1503
1504
1505
1506
1507
1508 | def lag(self, offset: int = 1, default: Any | None = None) -> Expression:
"""Get the value from a previous row within a window partition.
Tip: See Also
[`daft.functions.lag`](https://docs.daft.ai/en/stable/api/functions/lag/)
"""
from daft.functions import lag
return lag(self, offset=offset, default=default)
|
lead
lead(offset: int = 1, default: Any | None = None) -> Expression
Get the value from a future row within a window partition.
See Also
daft.functions.lead
Source code in daft/expressions/expressions.py
1510
1511
1512
1513
1514
1515
1516
1517
1518 | def lead(self, offset: int = 1, default: Any | None = None) -> Expression:
"""Get the value from a future row within a window partition.
Tip: See Also
[`daft.functions.lead`](https://docs.daft.ai/en/stable/api/functions/lead/)
"""
from daft.functions import lead
return lead(self, offset=offset, default=default)
|