Stateless UDFs with @daft.func#
When Daft's built-in functions aren't sufficient for your needs, the @daft.func and @daft.cls decorators let you run your own Python code over each row of data. Simply decorate a Python function or class, and it becomes usable in Daft DataFrame operations.
Quick Example#
1 2 3 4 5 6 7 8 9 | |
1 2 3 4 5 6 7 8 9 10 11 | |
Function Variants#
Daft supports multiple function variants to optimize for different use cases:
- Row-wise (default): Regular Python functions process one row at a time
- Async row-wise: Async Python functions process rows concurrently
- Generator: Generator functions produce multiple output rows per input row
- Batch (
@daft.func.batch): Process entire batches of data withdaft.Seriesfor high performance
Daft automatically detects which variant to use for regular functions based on your function signature. For batch functions, you must use the @daft.func.batch decorator.
Row-wise Functions#
Row-wise functions are the default variant. They process one row at a time and return one value per row.
1 2 3 4 5 6 7 8 9 | |
1 2 3 4 5 6 7 8 9 10 11 | |
Type Inference#
Daft automatically infers the return type from your function's type hint:
1 2 3 4 5 6 7 8 9 10 | |
If you need to override the inferred type, use the return_dtype parameter:
1 2 3 | |
Mixing Expressions and Literals#
You can mix DataFrame expressions with literal values:
1 2 3 4 5 6 7 | |
1 2 3 4 5 6 7 8 9 10 11 | |
Keyword Arguments#
Functions with default arguments work as expected:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | |
Eager Evaluation#
When called without any expressions, functions execute immediately:
1 2 3 4 5 6 7 8 9 10 | |
Async Row-wise Functions#
Decorate async functions to enable concurrent execution across rows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | |
Use max_concurrency to limit the number of concurrent coroutines, for example to rate-limit API calls:
1 2 3 4 5 | |
Generator Functions#
Generator functions use yield to produce multiple output rows per input row. Other columns in the DataFrame are automatically broadcast to match the number of generated values. You may only use one generator function per DataFrame operation.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | |
Notice how the id column values are repeated to match the number of generated values.
Type Hints for Generators#
Use Iterator[T] or Generator[T, None, None] type hints to indicate the yielded type:
1 2 3 4 5 6 | |
Alternatively, specify the return type explicitly:
1 2 3 4 | |
Handling Empty Generators#
If a generator yields no values for a particular input row, a null value is inserted:
1 2 3 4 5 6 7 8 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 | |
Batch UDFs with @daft.func.batch#
For performance-critical operations, batch UDFs process entire batches of data at once using daft.Series objects instead of individual values. This allows you to leverage optimized libraries like PyArrow or NumPy for efficient vectorized operations.
When to Use Batch UDFs#
Use batch UDFs when:
- Performance is critical: Vectorized operations may be significantly faster than row-wise processing
- Working with optimized libraries: You want to use PyArrow compute functions, NumPy operations, or other libraries that support vectorized operations on batch data
- Running batch inference: You are running a model that supports batch inference
Basic Batch UDF#
Batch UDFs receive daft.Series objects as arguments and return a daft.Series, list, numpy.ndarray, or pyarrow.Array:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | |
1 2 3 4 5 6 7 8 9 10 11 | |
Mixing Series and Scalar Arguments#
Batch UDFs can accept both Series and scalar arguments. Scalar arguments are passed through without modification:
1 2 3 4 5 6 7 8 9 10 11 | |
1 2 3 4 5 6 7 8 9 10 11 | |
Eager Evaluation#
Like regular functions, batch UDFs execute immediately when called with scalars:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | |
Resources, Concurrency, and Error Handling#
@daft.func and @daft.func.batch accept a common set of keyword arguments for concurrency control, scheduling, and error handling:
1 2 3 4 5 6 7 8 9 10 | |
cpus and gpus#
cpus and gpus are used for concurrency control and scheduling — not placement. Daft uses them to decide how many invocations of your function can run in parallel on a given machine.
If you annotate a function with cpus=2 and it runs on a machine with 4 CPUs, Daft runs at most 2 invocations in parallel on that machine. gpus works the same way: @daft.func(gpus=1) on a machine with 2 GPUs means at most 2 concurrent invocations.
Both accept fractional values (e.g. cpus=0.5, gpus=0.5). gpus values above 1.0 must be integers.
max_concurrency#
max_concurrency controls the maximum number of concurrent invocations of an async @daft.func or @daft.func.batch. It does not apply to synchronous (non-async) UDFs — setting it on a sync function raises.
1 2 3 4 5 | |
use_process#
Runs the function in a subprocess instead of a thread on the main process. Use this when your function is not thread-safe or holds the GIL heavily. The default (None) lets Daft pick at runtime based on observed performance.
max_retries and on_error#
Control what happens when a function invocation raises an exception.
max_retries=Nretries failing invocations up toNtimes with exponential backoff starting at 100 ms, doubling each attempt, capped at 60 s, with ±25% jitter. If the raised exception is adaft.ai.utils.RetryAfterError, the specified retry-after delay is honored instead of the default backoff.on_errordecides what to do after retries are exhausted:"raise"(default) — fail the query."log"— log the exception and emitNonefor that invocation."ignore"— silently emitNonefor that invocation.
Advanced Features#
Unnesting Struct Returns#
When your function returns a struct (dictionary), you can use unnest=True to automatically expand the struct fields into separate columns:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | |
1 2 3 4 5 6 7 8 9 | |
Without unnest=True, you would get a single column containing struct values.
Combining Generators with Unnest#
You can combine generator functions with unnest=True to yield multiple structs that get expanded into columns:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | |