Skip to content

User-Defined Functions#

User-Defined Functions (UDFs) are a mechanism to run Python code on the data that lives in a DataFrame. A UDF can be used just like Expressions, allowing users to express computation that should be executed by Daft lazily.

To write a UDF, you should use the @udf decorator, which can decorate either a Python function or a Python class, producing a UDF.

Learn more about UDFs in Daft User Guide.

Creating UDFs#

udf #

udf(*, return_dtype: DataTypeLike, num_cpus: float | None = None, num_gpus: float | None = None, memory_bytes: int | None = None, ray_options: dict[str, Any] | None = None, batch_size: int | None = None, concurrency: int | None = None, use_process: bool | None = None) -> Callable[[UserDefinedPyFuncLike], UDF]

(DEPRECATED) @udf Decorator to convert a Python function/class into a UDF.

UDFs allow users to run arbitrary Python code on the outputs of Expressions.

Parameters:

Name Type Description Default
return_dtype DataType

Returned type of the UDF

required
num_cpus float | None

Number of CPUs to allocate each running instance of your UDF. Note that this is purely used for placement (e.g. if your machine has 8 CPUs and you specify num_cpus=4, then Daft can run at most 2 instances of your UDF at a time). The default None indicates that Daft is free to allocate as many instances of the UDF as it wants to.

None
num_gpus float | None

Number of GPUs to allocate each running instance of your UDF. This is used for placement and also for allocating the appropriate GPU to each UDF using CUDA_VISIBLE_DEVICES.

None
memory_bytes int | None

Amount of memory to allocate each running instance of your UDF in bytes. If your UDF is experiencing out-of-memory errors, this parameter can help hint Daft that each UDF requires a certain amount of heap memory for execution.

None
ray_options dict[str, Any] | None

Extra Ray options, e.g. {"label_selector": {...}}. see more https://docs.ray.io/en/latest/ray-core/api/doc/ray.actor.ActorClass.options.html#ray.actor.ActorClass.options

None
batch_size int | None

Enables batching of the input into batches of at most this size. Results between batches are concatenated.

None
concurrency int | None

Spin up N number of persistent replicas of the UDF to process all partitions. Defaults to None which will spin up one UDF per partition. This is especially useful for expensive initializations that need to be amortized across partitions such as loading model weights for model batch inference.

None
use_process bool | None

Run the UDF on a separate process. This is useful for UDFs that run a lot of Python-only code, since it avoids GIL overhead. This is not necessary for UDFs that run C-extension code, like NumPy or PyTorch. Defaults to None where Daft will automatically choose based on runtime performance. Note: Users should generally never set this flag manually.

None

Returns:

Type Description
Callable[[UserDefinedPyFuncLike], UDF]

Callable[[UserDefinedPyFuncLike], UDF]: UDF decorator - converts a user-provided Python function as a UDF that can be called on Expressions

Note

In most cases, UDFs will be slower than a native kernel/expression because of the required Rust and Python overheads. If your computation can be expressed using Daft expressions, you should do so instead of writing a UDF. If your UDF expresses a common use-case that isn't already covered by Daft, you should file a ticket or contribute this functionality back to Daft as a kernel!

Examples:

In the example below, we create a UDF that:

  1. Receives data under the argument name x
  2. Iterates over the x Daft Series
  3. Adds a Python constant value c to every element in x
  4. Returns a new list of Python values which will be coerced to the specified return type: return_dtype=DataType.int64().
  5. We can call our UDF on a dataframe using any of the dataframe projection operations (df.with_column(), df.select(), etc.)
1
2
3
4
5
6
7
8
>>> import daft
>>> @daft.udf(return_dtype=daft.DataType.int64())
... def add_constant(x: daft.Series, c=10):
...     return [v + c for v in x]
>>>
>>> df = daft.from_pydict({"x": [1, 2, 3]})
>>> df = df.with_column("new_x", add_constant(df["x"], c=20))
>>> df.show()
╭───────┬───────╮
│ x     ┆ new_x │
│ ---   ┆ ---   │
│ Int64 ┆ Int64 │
╞═══════╪═══════╡
│ 1     ┆ 21    │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2     ┆ 22    │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 3     ┆ 23    │
╰───────┴───────╯
(Showing first 3 of 3 rows)

Resource Requests:

You can also hint Daft about the resources that your UDF will require to run. For example, the following UDF requires 2 CPUs to run. On a machine/cluster with 8 CPUs, Daft will be able to run up to 4 instances of this UDF at once!

1
2
3
4
5
6
7
8
>>> import daft
>>> @daft.udf(return_dtype=daft.DataType.int64(), num_cpus=2)
... def udf_needs_2_cpus(x: daft.Series):
...     return x
>>>
>>> df = daft.from_pydict({"x": [1, 2, 3]})
>>> df = df.with_column("new_x", udf_needs_2_cpus(df["x"]))
>>> df.show()
╭───────┬───────╮
│ x     ┆ new_x │
│ ---   ┆ ---   │
│ Int64 ┆ Int64 │
╞═══════╪═══════╡
│ 1     ┆ 1     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2     ┆ 2     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 3     ┆ 3     │
╰───────┴───────╯
(Showing first 3 of 3 rows)

Your UDF's resources can also be overridden before you call it like so:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
>>> import daft
>>> @daft.udf(return_dtype=daft.DataType.int64(), num_cpus=4)
... def udf_needs_4_cpus(x: daft.Series):
...     return x
>>>
>>> # Override the num_cpus to 2 instead
>>> udf_needs_2_cpus = udf_needs_4_cpus.override_options(num_cpus=2)
>>>
>>> df = daft.from_pydict({"x": [1, 2, 3]})
>>> df = df.with_column("new_x", udf_needs_2_cpus(df["x"]))
>>> df.show()
╭───────┬───────╮
│ x     ┆ new_x │
│ ---   ┆ ---   │
│ Int64 ┆ Int64 │
╞═══════╪═══════╡
│ 1     ┆ 1     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2     ┆ 2     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 3     ┆ 3     │
╰───────┴───────╯
(Showing first 3 of 3 rows)

Concurrency:

With the concurrency parameter, you can tell Daft how many instances of your UDF you want to run at the same time. If concurrency is set with a class UDF, only that many instances of the class will be run at a time, and each instance will reused for different batches.

This is especially useful if your UDF has a costly initialization step, for example, if you are loading a ML model into memory.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
>>> import daft
>>> @daft.udf(
...     return_dtype=daft.DataType.string(),
...     concurrency=4,  # only create 4 instances of this UDF
... )
... class MLModelUDF:
...     def __init__(self):
...         self.model = some_slow_initialization_step()
...
...     def __call__(self, data):
...         return self.model(data.to_pylist())
Source code in daft/udf/legacy.py
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
def udf(
    *,
    return_dtype: DataTypeLike,
    num_cpus: float | None = None,
    num_gpus: float | None = None,
    memory_bytes: int | None = None,
    ray_options: dict[str, Any] | None = None,
    batch_size: int | None = None,
    concurrency: int | None = None,
    use_process: bool | None = None,
) -> Callable[[UserDefinedPyFuncLike], UDF]:
    """(DEPRECATED) `@udf` Decorator to convert a Python function/class into a `UDF`.

    UDFs allow users to run arbitrary Python code on the outputs of Expressions.

    Args:
        return_dtype (DataType): Returned type of the UDF
        num_cpus: Number of CPUs to allocate each running instance of your UDF. Note that this is purely used for placement (e.g. if your
            machine has 8 CPUs and you specify num_cpus=4, then Daft can run at most 2 instances of your UDF at a time). The default `None`
            indicates that Daft is free to allocate as many instances of the UDF as it wants to.
        num_gpus: Number of GPUs to allocate each running instance of your UDF. This is used for placement and also for allocating
            the appropriate GPU to each UDF using `CUDA_VISIBLE_DEVICES`.
        memory_bytes: Amount of memory to allocate each running instance of your UDF in bytes. If your UDF is experiencing out-of-memory errors,
            this parameter can help hint Daft that each UDF requires a certain amount of heap memory for execution.
        ray_options: Extra Ray options, e.g. {"label_selector": {...}}. see more https://docs.ray.io/en/latest/ray-core/api/doc/ray.actor.ActorClass.options.html#ray.actor.ActorClass.options
        batch_size: Enables batching of the input into batches of at most this size. Results between batches are concatenated.
        concurrency: Spin up `N` number of persistent replicas of the UDF to process all partitions. Defaults to `None` which will spin up one
            UDF per partition. This is especially useful for expensive initializations that need to be amortized across partitions such as
            loading model weights for model batch inference.
        use_process: Run the UDF on a separate process.
            This is useful for UDFs that run a lot of Python-only code, since it avoids GIL overhead.
            This is not necessary for UDFs that run C-extension code, like NumPy or PyTorch.
            Defaults to `None` where Daft will automatically choose based on runtime performance.
            Note: Users should generally never set this flag manually.

    Returns:
        Callable[[UserDefinedPyFuncLike], UDF]: UDF decorator - converts a user-provided Python function as a UDF that can be called on Expressions

    Note:
        In most cases, UDFs will be slower than a native kernel/expression because of the required Rust and Python overheads. If
        your computation can be expressed using Daft expressions, you should do so instead of writing a UDF. If your UDF expresses a
        common use-case that isn't already covered by Daft, you should file a ticket or contribute this functionality back to Daft
        as a kernel!

    Examples:
        In the example below, we create a UDF that:

        1. Receives data under the argument name ``x``
        2. Iterates over the ``x`` Daft Series
        3. Adds a Python constant value ``c`` to every element in ``x``
        4. Returns a new list of Python values which will be coerced to the specified return type: ``return_dtype=DataType.int64()``.
        5. We can call our UDF on a dataframe using any of the dataframe projection operations ([df.with_column()](https://docs.daft.ai/en/latest/api/dataframe/#daft.DataFrame.with_column),
        [df.select()](https://docs.daft.ai/en/latest/api/dataframe/#daft.DataFrame.select), etc.)

        >>> import daft
        >>> @daft.udf(return_dtype=daft.DataType.int64())
        ... def add_constant(x: daft.Series, c=10):
        ...     return [v + c for v in x]
        >>>
        >>> df = daft.from_pydict({"x": [1, 2, 3]})
        >>> df = df.with_column("new_x", add_constant(df["x"], c=20))
        >>> df.show()
        ╭───────┬───────╮
        │ x     ┆ new_x │
        │ ---   ┆ ---   │
        │ Int64 ┆ Int64 │
        ╞═══════╪═══════╡
        │ 1     ┆ 21    │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
        │ 2     ┆ 22    │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
        │ 3     ┆ 23    │
        ╰───────┴───────╯
        <BLANKLINE>
        (Showing first 3 of 3 rows)

        **Resource Requests:**

        You can also hint Daft about the resources that your UDF will require to run. For example, the following UDF requires 2 CPUs to run. On a
        machine/cluster with 8 CPUs, Daft will be able to run up to 4 instances of this UDF at once!

        >>> import daft
        >>> @daft.udf(return_dtype=daft.DataType.int64(), num_cpus=2)
        ... def udf_needs_2_cpus(x: daft.Series):
        ...     return x
        >>>
        >>> df = daft.from_pydict({"x": [1, 2, 3]})
        >>> df = df.with_column("new_x", udf_needs_2_cpus(df["x"]))
        >>> df.show()
        ╭───────┬───────╮
        │ x     ┆ new_x │
        │ ---   ┆ ---   │
        │ Int64 ┆ Int64 │
        ╞═══════╪═══════╡
        │ 1     ┆ 1     │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
        │ 2     ┆ 2     │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
        │ 3     ┆ 3     │
        ╰───────┴───────╯
        <BLANKLINE>
        (Showing first 3 of 3 rows)

        Your UDF's resources can also be overridden before you call it like so:

        >>> import daft
        >>> @daft.udf(return_dtype=daft.DataType.int64(), num_cpus=4)
        ... def udf_needs_4_cpus(x: daft.Series):
        ...     return x
        >>>
        >>> # Override the num_cpus to 2 instead
        >>> udf_needs_2_cpus = udf_needs_4_cpus.override_options(num_cpus=2)
        >>>
        >>> df = daft.from_pydict({"x": [1, 2, 3]})
        >>> df = df.with_column("new_x", udf_needs_2_cpus(df["x"]))
        >>> df.show()
        ╭───────┬───────╮
        │ x     ┆ new_x │
        │ ---   ┆ ---   │
        │ Int64 ┆ Int64 │
        ╞═══════╪═══════╡
        │ 1     ┆ 1     │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
        │ 2     ┆ 2     │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
        │ 3     ┆ 3     │
        ╰───────┴───────╯
        <BLANKLINE>
        (Showing first 3 of 3 rows)

        **Concurrency:**

        With the `concurrency` parameter, you can tell Daft how many instances of your UDF you want to run at the same time.
        If `concurrency` is set with a class UDF, only that many instances of the class will be run at a time, and each instance will reused for different batches.

        This is especially useful if your UDF has a costly initialization step, for example, if you are loading a ML model into memory.

        >>> import daft
        >>> @daft.udf(
        ...     return_dtype=daft.DataType.string(),
        ...     concurrency=4,  # only create 4 instances of this UDF
        ... )
        ... class MLModelUDF:
        ...     def __init__(self):
        ...         self.model = some_slow_initialization_step()
        ...
        ...     def __call__(self, data):
        ...         return self.model(data.to_pylist())

    """
    warnings.warn(
        "The `@daft.udf` decorator is deprecated since Daft version >= 0.7.0 and will be removed in >= 0.8.0. Please use `@daft.func` and `@daft.cls` instead.\nSee the migration guide for more details: https://docs.daft.ai/en/stable/custom-code/migration/",
        category=DeprecationWarning,
        stacklevel=2,
    )

    inferred_return_dtype = DataType._infer(return_dtype)

    def _udf(f: UserDefinedPyFuncLike) -> UDF:
        # Grab a name for the UDF. It **should** be unique.
        module_name = getattr(f, "__module__", "")
        qual_name = getattr(f, "__qualname__")

        if module_name:
            name = f"{module_name}.{qual_name}"
        else:
            name = qual_name

        resource_request = (
            None
            if num_cpus is None and num_gpus is None and memory_bytes is None
            else ResourceRequest(
                num_cpus=num_cpus,
                num_gpus=num_gpus,
                memory_bytes=memory_bytes,
            )
        )
        udf = UDF(
            inner=f,
            name=name,
            return_dtype=inferred_return_dtype,
            resource_request=resource_request,
            batch_size=batch_size,
            concurrency=concurrency,
            use_process=use_process,
            ray_options=ray_options,
        )

        daft.attach_function(udf)
        return udf

    return _udf

Using UDFs#

UDF #

UDF(inner: UserDefinedPyFuncLike, name: str, return_dtype: DataType, init_args: InitArgsType = None, concurrency: int | None = None, resource_request: ResourceRequest | None = None, batch_size: int | None = None, use_process: bool | None = None, ray_options: dict[str, Any] | None = None)

A class produced by applying the @daft.udf decorator over a Python function or class.

Calling this class produces a daft.Expression that can be used in a DataFrame function.

Examples:

1
2
3
4
5
6
7
8
>>> import daft
>>> @daft.udf(return_dtype=daft.DataType.float64())
... def multiply_and_add(x: daft.Series, y: float, z: float):
...     return x.to_arrow().to_numpy() * y + z
>>>
>>> df = daft.from_pydict({"x": [1, 2, 3]})
>>> df = df.with_column("result", multiply_and_add(df["x"], 2.0, z=1.5))
>>> df.show()
╭───────┬─────────╮
│ x     ┆ result  │
│ ---   ┆ ---     │
│ Int64 ┆ Float64 │
╞═══════╪═════════╡
│ 1     ┆ 3.5     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 2     ┆ 5.5     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 3     ┆ 7.5     │
╰───────┴─────────╯
(Showing first 3 of 3 rows)

Methods:

Name Description
__call__
override_options

Replace the resource requests for running each instance of your UDF.

run_on_process

Override whether this UDF should run on a separate process or not.

with_concurrency

Override the concurrency of this UDF, which tells Daft how many instances of your UDF to run concurrently.

with_init_args

Replace initialization arguments for a class UDF when calling __init__ at runtime on each instance of the UDF.

Attributes:

Name Type Description
batch_size int | None
concurrency int | None
init_args InitArgsType
inner UserDefinedPyFuncLike
name str
ray_options dict[str, Any] | None
resource_request ResourceRequest | None
return_dtype DataType
use_process bool | None

batch_size #

batch_size: int | None = None

concurrency #

concurrency: int | None = None

init_args #

init_args: InitArgsType = None

inner #

inner: UserDefinedPyFuncLike

name #

name: str

ray_options #

ray_options: dict[str, Any] | None = None

resource_request #

resource_request: ResourceRequest | None = None

return_dtype #

return_dtype: DataType

use_process #

use_process: bool | None = None

__call__ #

__call__(*args: Any, **kwargs: Any) -> Expression
Source code in daft/udf/legacy.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def __call__(self, *args: Any, **kwargs: Any) -> Expression:
    self._validate_init_args()

    check_serializable(
        self.inner,
        "`@daft.udf` requires that the UDF is serializable. Please double-check that the function does not use any global variables.\n\nIf it does, please use the legacy `@daft.udf` with a class UDF instead and initialize the global in the `__init__` method.",
    )

    bound_args = self._bind_args(*args, **kwargs)
    expressions = list(bound_args.expressions().values())

    ray_options = self.ray_options.copy() if self.ray_options is not None else {}
    if "num_cpus" in ray_options:
        raise ValueError(
            "Cannot set 'num_cpus' in `ray_options`. Please use the 'num_cpus' argument in @udf instead."
        )
    if "num_gpus" in ray_options:
        raise ValueError(
            "Cannot set 'num_gpus' in `ray_options`. Please use the 'num_gpus' argument in @udf instead."
        )
    if "memory" in ray_options:
        raise ValueError(
            "Cannot set 'memory' in `ray_options`. Please use the 'memory_bytes' argument in @udf instead."
        )

    return Expression.udf(
        name=self.name,
        inner=self.wrapped_inner,
        bound_args=bound_args,
        expressions=expressions,
        return_dtype=self.return_dtype,
        init_args=self.init_args,
        resource_request=self.resource_request,
        batch_size=self.batch_size,
        concurrency=self.concurrency,
        use_process=self.use_process,
        ray_options=ray_options,
    )

override_options #

override_options(*, num_cpus: float | None = _UnsetMarker, num_gpus: float | None = _UnsetMarker, memory_bytes: int | None = _UnsetMarker, ray_options: dict[str, Any] | None = None, batch_size: int | None = _UnsetMarker) -> UDF

Replace the resource requests for running each instance of your UDF.

Parameters:

Name Type Description Default
num_cpus float | None

Number of CPUs to allocate each running instance of your UDF. Note that this is purely used for placement (e.g. if your machine has 8 CPUs and you specify num_cpus=4, then Daft can run at most 2 instances of your UDF at a time).

_UnsetMarker
num_gpus float | None

Number of GPUs to allocate each running instance of your UDF. This is used for placement and also for allocating the appropriate GPU to each UDF using CUDA_VISIBLE_DEVICES.

_UnsetMarker
memory_bytes int | None

Amount of memory to allocate each running instance of your UDF in bytes. If your UDF is experiencing out-of-memory errors, this parameter can help hint Daft that each UDF requires a certain amount of heap memory for execution.

_UnsetMarker
ray_options dict[str, Any] | None

Ray options to pass to the UDF. see more https://docs.ray.io/en/latest/ray-core/api/doc/ray.actor.ActorClass.options.html#ray.actor.ActorClass.options

None
batch_size int | None

Enables batching of the input into batches of at most this size. Results between batches are concatenated.

_UnsetMarker

Examples:

For instance, if your UDF requires 4 CPUs to run, you can configure it like so:

1
2
3
4
5
6
7
8
9
>>> import daft
>>>
>>> @daft.udf(return_dtype=daft.DataType.string())
... def example_udf(inputs):
...     # You will have access to 4 CPUs here if you configure your UDF correctly!
...     return inputs
>>>
>>> # Parametrize the UDF to run with 4 CPUs
>>> example_udf_4CPU = example_udf.override_options(num_cpus=4)
Source code in daft/udf/legacy.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def override_options(
    self,
    *,
    num_cpus: float | None = _UnsetMarker,
    num_gpus: float | None = _UnsetMarker,
    memory_bytes: int | None = _UnsetMarker,
    ray_options: dict[str, Any] | None = None,
    batch_size: int | None = _UnsetMarker,
) -> UDF:
    """Replace the resource requests for running each instance of your UDF.

    Args:
        num_cpus: Number of CPUs to allocate each running instance of your UDF. Note that this is purely used for placement (e.g. if your
            machine has 8 CPUs and you specify num_cpus=4, then Daft can run at most 2 instances of your UDF at a time).
        num_gpus: Number of GPUs to allocate each running instance of your UDF. This is used for placement and also for allocating
            the appropriate GPU to each UDF using `CUDA_VISIBLE_DEVICES`.
        memory_bytes: Amount of memory to allocate each running instance of your UDF in bytes. If your UDF is experiencing out-of-memory errors,
            this parameter can help hint Daft that each UDF requires a certain amount of heap memory for execution.
        ray_options: Ray options to pass to the UDF. see more  https://docs.ray.io/en/latest/ray-core/api/doc/ray.actor.ActorClass.options.html#ray.actor.ActorClass.options
        batch_size: Enables batching of the input into batches of at most this size. Results between batches are concatenated.

    Examples:
        For instance, if your UDF requires 4 CPUs to run, you can configure it like so:

        >>> import daft
        >>>
        >>> @daft.udf(return_dtype=daft.DataType.string())
        ... def example_udf(inputs):
        ...     # You will have access to 4 CPUs here if you configure your UDF correctly!
        ...     return inputs
        >>>
        >>> # Parametrize the UDF to run with 4 CPUs
        >>> example_udf_4CPU = example_udf.override_options(num_cpus=4)

    """
    new_resource_request = ResourceRequest() if self.resource_request is None else self.resource_request
    if num_cpus is not _UnsetMarker:
        new_resource_request = new_resource_request.with_num_cpus(num_cpus)
    if num_gpus is not _UnsetMarker:
        new_resource_request = new_resource_request.with_num_gpus(num_gpus)
    if memory_bytes is not _UnsetMarker:
        new_resource_request = new_resource_request.with_memory_bytes(memory_bytes)

    new_ray_options = ray_options if ray_options is not None else self.ray_options
    new_batch_size = self.batch_size if batch_size is _UnsetMarker else batch_size

    return dataclasses.replace(
        self, resource_request=new_resource_request, batch_size=new_batch_size, ray_options=new_ray_options
    )

run_on_process #

run_on_process(use_process: bool) -> UDF

Override whether this UDF should run on a separate process or not.

Examples:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
>>> import daft
>>>
>>> @daft.udf(return_dtype=daft.DataType.string(), num_gpus=1)
... class MyGpuUdf:
...     def __init__(self, text=" world"):
...         self.text = text
...
...     def __call__(self, data):
...         return [x + self.text for x in data]
>>>
>>> # New UDF that will run on a separate process
>>> MyGpuUdf_separate_process = MyGpuUdf.run_on_process(True)
Source code in daft/udf/legacy.py
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
def run_on_process(self, use_process: bool) -> UDF:
    """Override whether this UDF should run on a separate process or not.

    Examples:
        >>> import daft
        >>>
        >>> @daft.udf(return_dtype=daft.DataType.string(), num_gpus=1)
        ... class MyGpuUdf:
        ...     def __init__(self, text=" world"):
        ...         self.text = text
        ...
        ...     def __call__(self, data):
        ...         return [x + self.text for x in data]
        >>>
        >>> # New UDF that will run on a separate process
        >>> MyGpuUdf_separate_process = MyGpuUdf.run_on_process(True)
    """
    return dataclasses.replace(self, use_process=use_process)

with_concurrency #

with_concurrency(concurrency: int) -> UDF

Override the concurrency of this UDF, which tells Daft how many instances of your UDF to run concurrently.

Examples:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
>>> import daft
>>>
>>> @daft.udf(return_dtype=daft.DataType.string(), num_gpus=1)
... class MyGpuUdf:
...     def __init__(self, text=" world"):
...         self.text = text
...
...     def __call__(self, data):
...         return [x + self.text for x in data]
>>>
>>> # New UDF that will have 8 concurrent running instances (will require 8 total GPUs)
>>> MyGpuUdf_8_concurrency = MyGpuUdf.with_concurrency(8)
Source code in daft/udf/legacy.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
def with_concurrency(self, concurrency: int) -> UDF:
    """Override the concurrency of this UDF, which tells Daft how many instances of your UDF to run concurrently.

    Examples:
        >>> import daft
        >>>
        >>> @daft.udf(return_dtype=daft.DataType.string(), num_gpus=1)
        ... class MyGpuUdf:
        ...     def __init__(self, text=" world"):
        ...         self.text = text
        ...
        ...     def __call__(self, data):
        ...         return [x + self.text for x in data]
        >>>
        >>> # New UDF that will have 8 concurrent running instances (will require 8 total GPUs)
        >>> MyGpuUdf_8_concurrency = MyGpuUdf.with_concurrency(8)
    """
    return dataclasses.replace(self, concurrency=concurrency)

with_init_args #

with_init_args(*args: Any, **kwargs: Any) -> UDF

Replace initialization arguments for a class UDF when calling __init__ at runtime on each instance of the UDF.

Examples:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
>>> import daft
>>>
>>> @daft.udf(return_dtype=daft.DataType.string())
... class MyUdfWithInit:
...     def __init__(self, text=" world"):
...         self.text = text
...
...     def __call__(self, data):
...         return [x + self.text for x in data]
>>>
>>> # Create a customized version of MyUdfWithInit by overriding the init args
>>> MyUdfWithInit_CustomInitArgs = MyUdfWithInit.with_init_args(text=" my old friend")
>>>
>>> df = daft.from_pydict({"foo": ["hello", "hello", "hello"]})
>>> df = df.with_column("bar_world", MyUdfWithInit(df["foo"]))
>>> df = df.with_column("bar_custom", MyUdfWithInit_CustomInitArgs(df["foo"]))
>>> df.show()
╭────────┬─────────────┬─────────────────────╮
│ foo    ┆ bar_world   ┆ bar_custom          │
│ ---    ┆ ---         ┆ ---                 │
│ String ┆ String      ┆ String              │
╞════════╪═════════════╪═════════════════════╡
│ hello  ┆ hello world ┆ hello my old friend │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ hello  ┆ hello world ┆ hello my old friend │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ hello  ┆ hello world ┆ hello my old friend │
╰────────┴─────────────┴─────────────────────╯
(Showing first 3 of 3 rows)
Source code in daft/udf/legacy.py
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
def with_init_args(self, *args: Any, **kwargs: Any) -> UDF:
    """Replace initialization arguments for a class UDF when calling `__init__` at runtime on each instance of the UDF.

    Examples:
        >>> import daft
        >>>
        >>> @daft.udf(return_dtype=daft.DataType.string())
        ... class MyUdfWithInit:
        ...     def __init__(self, text=" world"):
        ...         self.text = text
        ...
        ...     def __call__(self, data):
        ...         return [x + self.text for x in data]
        >>>
        >>> # Create a customized version of MyUdfWithInit by overriding the init args
        >>> MyUdfWithInit_CustomInitArgs = MyUdfWithInit.with_init_args(text=" my old friend")
        >>>
        >>> df = daft.from_pydict({"foo": ["hello", "hello", "hello"]})
        >>> df = df.with_column("bar_world", MyUdfWithInit(df["foo"]))
        >>> df = df.with_column("bar_custom", MyUdfWithInit_CustomInitArgs(df["foo"]))
        >>> df.show()
        ╭────────┬─────────────┬─────────────────────╮
        │ foo    ┆ bar_world   ┆ bar_custom          │
        │ ---    ┆ ---         ┆ ---                 │
        │ String ┆ String      ┆ String              │
        ╞════════╪═════════════╪═════════════════════╡
        │ hello  ┆ hello world ┆ hello my old friend │
        ├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
        │ hello  ┆ hello world ┆ hello my old friend │
        ├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
        │ hello  ┆ hello world ┆ hello my old friend │
        ╰────────┴─────────────┴─────────────────────╯
        <BLANKLINE>
        (Showing first 3 of 3 rows)
    """
    if not isinstance(self.inner, type):
        raise ValueError("Function UDFs cannot have init args.")

    init_sig = inspect.signature(self.inner.__init__)  # type: ignore
    init_sig.bind(
        # Placeholder for `self`
        None,
        *args,
        **kwargs,
    )
    return dataclasses.replace(self, init_args=(args, kwargs))

New UDFs#

@daft.func and @daft.cls are the new interface for creating user-defined functions in Daft. They provide a streamlined way to turn Python functions into Daft operations that work seamlessly with DataFrame expressions.

Learn more in the User Guide.

func #

func = _FuncDecorator()

_FuncDecorator #

Methods:

Name Description
__call__

Decorator to convert a Python function into a Daft user-defined function.

batch

Decorator to convert a Python function into a Daft user-defined batch function.

__call__ #

__call__(*, return_dtype: DataTypeLike | None = None, unnest: bool = False, cpus: float | None = None, gpus: float = 0, use_process: bool | None = None, max_concurrency: int | None = None, max_retries: int | None = None, on_error: Literal['raise', 'log', 'ignore'] | None = None, ray_options: dict[str, Any] | None = None) -> Callable[[Callable[P, T]], Func[P, T, None]]
__call__(fn: Callable[P, T], *, return_dtype: DataTypeLike | None = None, unnest: bool = False, gpus: float = 0, use_process: bool | None = None, max_concurrency: int | None = None, max_retries: int | None = None, on_error: Literal['raise', 'log', 'ignore'] | None = None) -> Func[P, T, None]
__call__(fn: Callable[P, T] | None = None, *, return_dtype: DataTypeLike | None = None, unnest: bool = False, cpus: float | None = None, gpus: float = 0, use_process: bool | None = None, max_concurrency: int | None = None, max_retries: int | None = None, on_error: Literal['raise', 'log', 'ignore'] | None = None, ray_options: dict[str, Any] | None = None) -> Callable[[Callable[P, T]], Func[P, T, None]] | Func[P, T, None]

Decorator to convert a Python function into a Daft user-defined function.

Parameters:

Name Type Description Default
return_dtype DataTypeLike | None

The data type that this function should return or yield. If not specified, it is derived from the function's return type hint.

None
unnest bool

Whether to unnest/flatten out return type fields into columns. Return dtype must be DataType.struct(..) when this is set to true.

False
use_process bool | None

Whether to run each instance of the function in a separate process. If unset, Daft will automatically choose based on runtime performance.

None
max_concurrency int | None

The maximum number of concurrent coroutines for async functions. Only valid for async functions; raises an error if used with synchronous functions.

None

Daft function variants: - Row-wise (1 row in, 1 row out) - the default variant - Async row-wise (1 row in, 1 row out) - created by decorating a Python async function - Generator (1 row in, N rows out) - created by decorating a Python generator function

Decorated functions accept both their original argument types and Daft Expressions. When any arguments are Expressions, they return a Daft Expression that can be used in DataFrame operations. When called without Expression arguments, they execute immediately and the behavior is the same as if the function was not decorated.

Examples:

Basic Example

1
2
3
4
5
6
7
>>> import daft
>>> @daft.func
... def my_sum(a: int, b: int) -> int:
...     return a + b
>>>
>>> df = daft.from_pydict({"x": [1, 2, 3], "y": [4, 5, 6]})
>>> df.select(my_sum(df["x"], df["y"])).collect()
╭───────╮
│ x     │
│ ---   │
│ Int64 │
╞═══════╡
│ 5     │
├╌╌╌╌╌╌╌┤
│ 7     │
├╌╌╌╌╌╌╌┤
│ 9     │
╰───────╯
(Showing first 3 of 3 rows)

Calling the decorator directly on an existing function

1
2
3
4
5
6
7
8
>>> import daft
>>> def tokenize(text: str) -> list[int]:
...     vocab = {char: i for i, char in enumerate(text)}
...     return [vocab[char] for char in text]
>>>
>>> daft_tokenize = daft.func(tokenize)  # creates a new function rather than modifying `tokenize`
>>> df = daft.from_pydict({"text": ["hello", "world", "daft"]})
>>> df.select(daft_tokenize(df["text"])).collect()
╭─────────────────╮
│ text            │
│ ---             │
│ List[Int64]     │
╞═════════════════╡
│ [0, 1, 3, 3, 4] │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ [0, 1, 2, 3, 4] │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ [0, 1, 2, 3]    │
╰─────────────────╯
(Showing first 3 of 3 rows)

Manually specifying the return type

1
2
3
4
5
6
7
>>> import daft
>>> @daft.func(return_dtype=daft.DataType.int32())
... def my_sum(a: int, b: int):
...     return a + b
>>>
>>> df = daft.from_pydict({"x": [1, 2, 3], "y": [4, 5, 6]})
>>> df.select(my_sum(df["x"], df["y"])).collect()
╭───────╮
│ x     │
│ ---   │
│ Int32 │
╞═══════╡
│ 5     │
├╌╌╌╌╌╌╌┤
│ 7     │
├╌╌╌╌╌╌╌┤
│ 9     │
╰───────╯
(Showing first 3 of 3 rows)

Decorating an async function

1
2
3
4
5
6
7
8
9
>>> import daft
>>> import asyncio
>>> @daft.func
... async def my_sum(a: int, b: int) -> int:
...     await asyncio.sleep(1)
...     return a + b
>>>
>>> df = daft.from_pydict({"x": [1], "y": [2]})
>>> df.select(my_sum(df["x"], df["y"])).collect()
╭───────╮
│ x     │
│ ---   │
│ Int64 │
╞═══════╡
│ 3     │
╰───────╯
(Showing first 1 of 1 rows)

Decorating a generator function

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
>>> import daft
>>> from typing import Iterator
>>> @daft.func
... def my_gen_func(to_repeat: str, n: int) -> Iterator[str]:
...     for _ in range(n):
...         yield to_repeat
>>>
>>> df = daft.from_pydict({"id": [0, 1, 2], "value": ["pip", "install", "daft"], "occurrences": [0, 2, 4]})
>>> df = df.select("id", my_gen_func(df["value"], df["occurrences"]))
>>> df.collect()  # other output columns are repeated to match generator output length
╭───────┬─────────╮
│ id    ┆ value   │
│ ---   ┆ ---     │
│ Int64 ┆ String  │
╞═══════╪═════════╡
│ 0     ┆ None    │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 1     ┆ install │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 1     ┆ install │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 2     ┆ daft    │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 2     ┆ daft    │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 2     ┆ daft    │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 2     ┆ daft    │
╰───────┴─────────╯
(Showing first 7 of 7 rows)

Unnesting multiple return fields

1
2
3
4
5
6
7
8
9
>>> import daft
>>> from daft import DataType
>>> @daft.func(
...     return_dtype=DataType.struct({"int": DataType.int64(), "str": DataType.string()}), unnest=True
... )
... def my_multi_return(val: int):
...     return {"int": val * 2, "str": str(val) * 2}
>>> df = daft.from_pydict({"x": [1, 2, 3]})
>>> df.select(my_multi_return(df["x"])).collect()
╭───────┬────────╮
│ int   ┆ str    │
│ ---   ┆ ---    │
│ Int64 ┆ String │
╞═══════╪════════╡
│ 2     ┆ 11     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 4     ┆ 22     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 6     ┆ 33     │
╰───────┴────────╯
(Showing first 3 of 3 rows)
Source code in daft/udf/__init__.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def __call__(
    self,
    fn: Callable[P, T] | None = None,
    *,
    return_dtype: DataTypeLike | None = None,
    unnest: bool = False,
    cpus: float | None = None,
    gpus: float = 0,
    use_process: bool | None = None,
    max_concurrency: int | None = None,
    max_retries: int | None = None,
    on_error: Literal["raise", "log", "ignore"] | None = None,
    ray_options: dict[str, Any] | None = None,
) -> Callable[[Callable[P, T]], Func[P, T, None]] | Func[P, T, None]:
    """Decorator to convert a Python function into a Daft user-defined function.

    Args:
        return_dtype: The data type that this function should return or yield. If not specified, it is derived from the function's return type hint.
        unnest: Whether to unnest/flatten out return type fields into columns. Return dtype must be `DataType.struct(..)` when this is set to true.
        use_process: Whether to run each instance of the function in a separate process. If unset, Daft will automatically choose based on runtime performance.
        max_concurrency: The maximum number of concurrent coroutines for async functions. Only valid for async functions; raises an error if used with synchronous functions.

    Daft function variants:
    - **Row-wise** (1 row in, 1 row out) - the default variant
    - **Async row-wise** (1 row in, 1 row out) - created by decorating a Python async function
    - **Generator** (1 row in, N rows out) - created by decorating a Python generator function

    Decorated functions accept both their original argument types and Daft Expressions.
    When any arguments are Expressions, they return a Daft Expression that can be used in DataFrame operations.
    When called without Expression arguments, they execute immediately and the behavior is the same as if the function was not decorated.

    Examples:
        Basic Example

        >>> import daft
        >>> @daft.func
        ... def my_sum(a: int, b: int) -> int:
        ...     return a + b
        >>>
        >>> df = daft.from_pydict({"x": [1, 2, 3], "y": [4, 5, 6]})
        >>> df.select(my_sum(df["x"], df["y"])).collect()
        ╭───────╮
        │ x     │
        │ ---   │
        │ Int64 │
        ╞═══════╡
        │ 5     │
        ├╌╌╌╌╌╌╌┤
        │ 7     │
        ├╌╌╌╌╌╌╌┤
        │ 9     │
        ╰───────╯
        <BLANKLINE>
        (Showing first 3 of 3 rows)

        Calling the decorator directly on an existing function

        >>> import daft
        >>> def tokenize(text: str) -> list[int]:
        ...     vocab = {char: i for i, char in enumerate(text)}
        ...     return [vocab[char] for char in text]
        >>>
        >>> daft_tokenize = daft.func(tokenize)  # creates a new function rather than modifying `tokenize`
        >>> df = daft.from_pydict({"text": ["hello", "world", "daft"]})
        >>> df.select(daft_tokenize(df["text"])).collect()
        ╭─────────────────╮
        │ text            │
        │ ---             │
        │ List[Int64]     │
        ╞═════════════════╡
        │ [0, 1, 3, 3, 4] │
        ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
        │ [0, 1, 2, 3, 4] │
        ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
        │ [0, 1, 2, 3]    │
        ╰─────────────────╯
        <BLANKLINE>
        (Showing first 3 of 3 rows)

        Manually specifying the return type

        >>> import daft
        >>> @daft.func(return_dtype=daft.DataType.int32())
        ... def my_sum(a: int, b: int):
        ...     return a + b
        >>>
        >>> df = daft.from_pydict({"x": [1, 2, 3], "y": [4, 5, 6]})
        >>> df.select(my_sum(df["x"], df["y"])).collect()
        ╭───────╮
        │ x     │
        │ ---   │
        │ Int32 │
        ╞═══════╡
        │ 5     │
        ├╌╌╌╌╌╌╌┤
        │ 7     │
        ├╌╌╌╌╌╌╌┤
        │ 9     │
        ╰───────╯
        <BLANKLINE>
        (Showing first 3 of 3 rows)

        Decorating an async function

        >>> import daft
        >>> import asyncio
        >>> @daft.func
        ... async def my_sum(a: int, b: int) -> int:
        ...     await asyncio.sleep(1)
        ...     return a + b
        >>>
        >>> df = daft.from_pydict({"x": [1], "y": [2]})
        >>> df.select(my_sum(df["x"], df["y"])).collect()
        ╭───────╮
        │ x     │
        │ ---   │
        │ Int64 │
        ╞═══════╡
        │ 3     │
        ╰───────╯
        <BLANKLINE>
        (Showing first 1 of 1 rows)

        Decorating a generator function

        >>> import daft
        >>> from typing import Iterator
        >>> @daft.func
        ... def my_gen_func(to_repeat: str, n: int) -> Iterator[str]:
        ...     for _ in range(n):
        ...         yield to_repeat
        >>>
        >>> df = daft.from_pydict({"id": [0, 1, 2], "value": ["pip", "install", "daft"], "occurrences": [0, 2, 4]})
        >>> df = df.select("id", my_gen_func(df["value"], df["occurrences"]))
        >>> df.collect()  # other output columns are repeated to match generator output length
        ╭───────┬─────────╮
        │ id    ┆ value   │
        │ ---   ┆ ---     │
        │ Int64 ┆ String  │
        ╞═══════╪═════════╡
        │ 0     ┆ None    │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
        │ 1     ┆ install │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
        │ 1     ┆ install │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
        │ 2     ┆ daft    │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
        │ 2     ┆ daft    │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
        │ 2     ┆ daft    │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
        │ 2     ┆ daft    │
        ╰───────┴─────────╯
        <BLANKLINE>
        (Showing first 7 of 7 rows)

        Unnesting multiple return fields

        >>> import daft
        >>> from daft import DataType
        >>> @daft.func(
        ...     return_dtype=DataType.struct({"int": DataType.int64(), "str": DataType.string()}), unnest=True
        ... )
        ... def my_multi_return(val: int):
        ...     return {"int": val * 2, "str": str(val) * 2}
        >>> df = daft.from_pydict({"x": [1, 2, 3]})
        >>> df.select(my_multi_return(df["x"])).collect()
        ╭───────┬────────╮
        │ int   ┆ str    │
        │ ---   ┆ ---    │
        │ Int64 ┆ String │
        ╞═══════╪════════╡
        │ 2     ┆ 11     │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
        │ 4     ┆ 22     │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
        │ 6     ┆ 33     │
        ╰───────┴────────╯
        <BLANKLINE>
        (Showing first 3 of 3 rows)
    """

    def partial_func(fn: Callable[P, T]) -> Func[P, T, None]:
        return Func._from_func(
            fn,
            return_dtype,
            unnest,
            cpus,
            gpus,
            use_process,
            False,
            None,
            max_concurrency=max_concurrency,
            max_retries=max_retries,
            on_error=on_error,
            ray_options=ray_options,
        )

    return partial_func if fn is None else partial_func(fn)

batch #

batch(*, return_dtype: DataTypeLike, unnest: bool = False, cpus: float | None = None, gpus: float = 0, use_process: bool | None = None, max_concurrency: int | None = None, batch_size: int | None = None, max_retries: int | None = None, on_error: Literal['raise', 'log', 'ignore'] | None = None, ray_options: dict[str, Any] | None = None) -> Callable[[Callable[P, T]], Func[P, T, None]]

Decorator to convert a Python function into a Daft user-defined batch function.

Parameters:

Name Type Description Default
return_dtype DataTypeLike

The data type that this function should return.

required
unnest bool

Whether to unnest/flatten out return type fields into columns. Return dtype must be DataType.struct(..) when this is set to true.

False
use_process bool | None

Whether to run each instance of the function in a separate process. If unset, Daft will automatically choose based on runtime performance.

None
max_concurrency int | None

The maximum number of concurrent coroutines for async functions. Only valid for async functions; raises an error if used with synchronous functions.

None
batch_size int | None

The max number of rows in each input batch.

None

Batch functions receive daft.Series arguments, and return a daft.Series, list, numpy.ndarray, or pyarrow.Array. You can also call them with scalar arguments, which will be passed in without modification. When called without Expression arguments, they execute immediately and the behavior is the same as if the function was not decorated.

Examples:

Basic Usage

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
>>> import daft
>>> from daft import DataType, Series
>>>
>>> @daft.func.batch(return_dtype=DataType.int64())
... def my_sum(a: Series, b: Series) -> Series:
...     import pyarrow.compute as pc
...
...     a = a.to_arrow()
...     b = b.to_arrow()
...     result = pc.add(a, b)
...     return result
>>>
>>> df = daft.from_pydict({"x": [1, 2, 3], "y": [4, 5, 6]})
>>> df.select(my_sum(df["x"], df["y"])).collect()
╭───────╮
│ x     │
│ ---   │
│ Int64 │
╞═══════╡
│ 5     │
├╌╌╌╌╌╌╌┤
│ 7     │
├╌╌╌╌╌╌╌┤
│ 9     │
╰───────╯
(Showing first 3 of 3 rows)

Mixing Series and Scalar Arguments

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
>>> @daft.func.batch(return_dtype=daft.DataType.int64())
... def my_sum_with_scalar(a: daft.Series, b: int) -> daft.Series:
...     import pyarrow.compute as pc
...
...     a = a.to_arrow()
...     result = pc.add(a, b)
...     return result
>>>
>>> df = daft.from_pydict({"x": [1, 2, 3]})
>>> df.select(my_sum_with_scalar(df["x"], 4)).collect()
╭───────╮
│ x     │
│ ---   │
│ Int64 │
╞═══════╡
│ 5     │
├╌╌╌╌╌╌╌┤
│ 6     │
├╌╌╌╌╌╌╌┤
│ 7     │
╰───────╯
(Showing first 3 of 3 rows)
Source code in daft/udf/__init__.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def batch(
    self,
    *,
    return_dtype: DataTypeLike,
    unnest: bool = False,
    cpus: float | None = None,
    gpus: float = 0,
    use_process: bool | None = None,
    max_concurrency: int | None = None,
    batch_size: int | None = None,
    max_retries: int | None = None,
    on_error: Literal["raise", "log", "ignore"] | None = None,
    ray_options: dict[str, Any] | None = None,
) -> Callable[[Callable[P, T]], Func[P, T, None]]:
    """Decorator to convert a Python function into a Daft user-defined batch function.

    Args:
        return_dtype: The data type that this function should return.
        unnest: Whether to unnest/flatten out return type fields into columns. Return dtype must be `DataType.struct(..)` when this is set to true.
        use_process: Whether to run each instance of the function in a separate process. If unset, Daft will automatically choose based on runtime performance.
        max_concurrency: The maximum number of concurrent coroutines for async functions. Only valid for async functions; raises an error if used with synchronous functions.
        batch_size: The max number of rows in each input batch.

    Batch functions receive `daft.Series` arguments, and return a `daft.Series`, `list`, `numpy.ndarray`, or `pyarrow.Array`.
    You can also call them with scalar arguments, which will be passed in without modification.
    When called without Expression arguments, they execute immediately and the behavior is the same as if the function was not decorated.

    Examples:
        Basic Usage

        >>> import daft
        >>> from daft import DataType, Series
        >>>
        >>> @daft.func.batch(return_dtype=DataType.int64())
        ... def my_sum(a: Series, b: Series) -> Series:
        ...     import pyarrow.compute as pc
        ...
        ...     a = a.to_arrow()
        ...     b = b.to_arrow()
        ...     result = pc.add(a, b)
        ...     return result
        >>>
        >>> df = daft.from_pydict({"x": [1, 2, 3], "y": [4, 5, 6]})
        >>> df.select(my_sum(df["x"], df["y"])).collect()
        ╭───────╮
        │ x     │
        │ ---   │
        │ Int64 │
        ╞═══════╡
        │ 5     │
        ├╌╌╌╌╌╌╌┤
        │ 7     │
        ├╌╌╌╌╌╌╌┤
        │ 9     │
        ╰───────╯
        <BLANKLINE>
        (Showing first 3 of 3 rows)

        Mixing Series and Scalar Arguments

        >>> @daft.func.batch(return_dtype=daft.DataType.int64())
        ... def my_sum_with_scalar(a: daft.Series, b: int) -> daft.Series:
        ...     import pyarrow.compute as pc
        ...
        ...     a = a.to_arrow()
        ...     result = pc.add(a, b)
        ...     return result
        >>>
        >>> df = daft.from_pydict({"x": [1, 2, 3]})
        >>> df.select(my_sum_with_scalar(df["x"], 4)).collect()
        ╭───────╮
        │ x     │
        │ ---   │
        │ Int64 │
        ╞═══════╡
        │ 5     │
        ├╌╌╌╌╌╌╌┤
        │ 6     │
        ├╌╌╌╌╌╌╌┤
        │ 7     │
        ╰───────╯
        <BLANKLINE>
        (Showing first 3 of 3 rows)
    """

    def partial_func(fn: Callable[P, T]) -> Func[P, T, None]:
        return Func._from_func(
            fn,
            return_dtype,
            unnest,
            cpus,
            gpus,
            use_process,
            True,
            batch_size,
            max_concurrency=max_concurrency,
            max_retries=max_retries,
            on_error=on_error,
            ray_options=ray_options,
        )

    return partial_func

cls #

cls(*, cpus: float | None = None, gpus: float = 0, use_process: bool | None = None, max_concurrency: int | None = None, max_retries: int | None = None, on_error: Literal['raise', 'log', 'ignore'] | None = None, name_override: str | None = None, ray_options: dict[str, Any] | None = None) -> Callable[[type], type]
cls(class_: type, *, cpus: float | None = None, gpus: float = 0, use_process: bool | None = None, max_concurrency: int | None = None, max_retries: int | None = None, on_error: Literal['raise', 'log', 'ignore'] | None = None, name_override: str | None = None, ray_options: dict[str, Any] | None = None) -> type
cls(class_: type | None = None, *, cpus: float | None = None, gpus: float = 0, use_process: bool | None = None, max_concurrency: int | None = None, max_retries: int | None = None, on_error: Literal['raise', 'log', 'ignore'] | None = None, name_override: str | None = None, ray_options: dict[str, Any] | None = None) -> type | Callable[[type], type]

Decorator to convert a Python class into a Daft user-defined class.

Parameters:

Name Type Description Default
cpus float | None

The number of CPUs each instance of the class requires. Defaults to None (let the engine decide).

None
gpus float

The number of GPUs each instance of the class requires. Defaults to 0. Fractional values between 0 and 1.0, such as 0.5, are supported. This can be useful when running multiple small models on the same GPU. However, fractional values greater than 1.0, such as 1.5 or 2.5, are not supported.

0
use_process bool | None

Whether to run each instance of the class in a separate process. If unset, Daft will automatically choose based on runtime performance.

None
max_concurrency int | None

The maximum number of concurrent invocations. For sync methods, this controls the number of actor pool processes. For async methods, this controls the number of concurrent coroutines.

None
name_override str | None

The name to display for the UDF class in the plan and progress bars.

None
ray_options dict[str, Any] | None

Options to pass to the Ray executor (e.g. {"num_cpus": 1, "num_gpus": 1}).

None

Daft classes allow you to initialize a class instance once, and then reuse it for multiple rows of data. This is useful for expensive initializations that need to be amortized across multiple rows of data, such as loading a model or establishing a network connection.

Daft classes are initialized lazily. This means that when you create a Daft class, the arguments are saved and only passed into the __init__ method of each instance once a query is executed. Methods can also be called with scalar arguments to run locally, in which case __init__ will be called locally first.

Methods in a Daft class can be used as Daft functions. Use the @daft.method decorator to override default arguments.

Examples:

Basic Usage

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
>>> import daft
>>> from daft import DataType
>>> @daft.cls
... class MyModel:
...     def __init__(self, model_path: str):
...         self.model = some_slow_initialization_step(model_path)
...
...     def __call__(self, prompt: str) -> str:
...         return self.model(prompt)
>>>
>>> my_model = MyModel("path/to/model")
>>>
>>> df = daft.from_pydict({"prompt": ["hello", "world", "daft"]})
>>> df = df.with_columns(
...     {
...         "generated": my_model(df["prompt"]),
...     }
... )

Multiple Methods

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
>>> import daft
>>> from daft import DataType
>>> @daft.cls
>>> class MyModel:
...     def __init__(self, model_path: str):
...         self.model = some_slow_initialization_step(model_path)
...
...     # no decoration is equivalent to `@daft.method` (default arguments)
...     def generate(self, prompt: str) -> str:
...         return self.model(prompt)
...
...     # decorate with `@daft.method` to override default arguments
...     @daft.method(return_dtype=DataType.list(DataType.string()))
...     def classify(self, value: str):
...         return self.model.classify(value)
...
...     # batch method
...     @daft.method.batch(return_dtype=DataType.list(DataType.string()))
...     def batch_classify(self, value: daft.Series):
...         return self.model.batch_classify(value)
>>> # Specify the initialization arguments for the class. `__init__` will not be called yet.
>>> my_model = MyModel("path/to/model")
>>> df = daft.from_pydict({"prompt": ["hello", "world", "daft"]})
>>> # Use class methods as Daft functions.
>>> df = df.with_columns(
...     {
...         "generated": my_model.generate(df["prompt"]),
...         "classified": my_model.classify(df["prompt"]),
...         "batch_classified": my_model.batch_classify(df["prompt"]),
...     }
... )
Source code in daft/udf/__init__.py
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
def cls(
    class_: type | None = None,
    *,
    cpus: float | None = None,
    gpus: float = 0,
    use_process: bool | None = None,
    max_concurrency: int | None = None,
    max_retries: int | None = None,
    on_error: Literal["raise", "log", "ignore"] | None = None,
    name_override: str | None = None,
    ray_options: dict[str, Any] | None = None,
) -> type | Callable[[type], type]:
    """Decorator to convert a Python class into a Daft user-defined class.

    Args:
        cpus: The number of CPUs each instance of the class requires. Defaults to None (let the engine decide).
        gpus: The number of GPUs each instance of the class requires. Defaults to 0.
              Fractional values between 0 and 1.0, such as 0.5, are supported. This can be useful when running multiple small models on the same GPU.
              However, fractional values greater than 1.0, such as 1.5 or 2.5, are not supported.
        use_process: Whether to run each instance of the class in a separate process. If unset, Daft will automatically choose based on runtime performance.
        max_concurrency: The maximum number of concurrent invocations. For sync methods, this controls the number of actor pool processes. For async methods, this controls the number of concurrent coroutines.
        name_override: The name to display for the UDF class in the plan and progress bars.
        ray_options: Options to pass to the Ray executor (e.g. {"num_cpus": 1, "num_gpus": 1}).

    Daft classes allow you to initialize a class instance once, and then reuse it for multiple rows of data.
    This is useful for expensive initializations that need to be amortized across multiple rows of data, such as loading a model or establishing a network connection.

    Daft classes are initialized lazily. This means that when you create a Daft class, the arguments are saved and only passed into the `__init__` method of each instance once a query is executed.
    Methods can also be called with scalar arguments to run locally, in which case `__init__` will be called locally first.

    Methods in a Daft class can be used as Daft functions. Use the `@daft.method` decorator to override default arguments.

    Examples:
        Basic Usage

        >>> import daft
        >>> from daft import DataType
        >>> @daft.cls
        ... class MyModel:
        ...     def __init__(self, model_path: str):
        ...         self.model = some_slow_initialization_step(model_path)
        ...
        ...     def __call__(self, prompt: str) -> str:
        ...         return self.model(prompt)
        >>>
        >>> my_model = MyModel("path/to/model")
        >>>
        >>> df = daft.from_pydict({"prompt": ["hello", "world", "daft"]})
        >>> df = df.with_columns(
        ...     {
        ...         "generated": my_model(df["prompt"]),
        ...     }
        ... )

        Multiple Methods

        >>> import daft
        >>> from daft import DataType
        >>> @daft.cls # doctest: +SKIP
        >>> class MyModel:  # doctest: +SKIP
        ...     def __init__(self, model_path: str):  # doctest: +SKIP
        ...         self.model = some_slow_initialization_step(model_path)  # doctest: +SKIP
        ...
        ...     # no decoration is equivalent to `@daft.method` (default arguments)
        ...     def generate(self, prompt: str) -> str:  # doctest: +SKIP
        ...         return self.model(prompt)  # doctest: +SKIP
        ...
        ...     # decorate with `@daft.method` to override default arguments
        ...     @daft.method(return_dtype=DataType.list(DataType.string()))  # doctest: +SKIP
        ...     def classify(self, value: str):  # doctest: +SKIP
        ...         return self.model.classify(value)  # doctest: +SKIP
        ...
        ...     # batch method
        ...     @daft.method.batch(return_dtype=DataType.list(DataType.string()))  # doctest: +SKIP
        ...     def batch_classify(self, value: daft.Series):  # doctest: +SKIP
        ...         return self.model.batch_classify(value)  # doctest: +SKIP
        >>> # Specify the initialization arguments for the class. `__init__` will not be called yet.
        >>> my_model = MyModel("path/to/model")  # doctest: +SKIP
        >>> df = daft.from_pydict({"prompt": ["hello", "world", "daft"]})  # doctest: +SKIP
        >>> # Use class methods as Daft functions.
        >>> df = df.with_columns(  # doctest: +SKIP
        ...     {
        ...         "generated": my_model.generate(df["prompt"]),
        ...         "classified": my_model.classify(df["prompt"]),
        ...         "batch_classified": my_model.batch_classify(df["prompt"]),
        ...     }
        ... )
    """
    if cpus is not None and cpus < 0:
        raise ValueError(f"num_cpus must be non-negative, got {cpus}")

    # Validate GPU resource request early: allow fractional values up to 1.0; values > 1.0 must be integers.
    if gpus < 0:
        raise ValueError(f"num_gpus must be non-negative, got {gpus}")
    if gpus > 1 and not float(gpus).is_integer():
        raise ValueError(f"ResourceRequest num_gpus greater than 1 must be an integer, got {gpus}")

    def partial_cls(c: type) -> type:
        return wrap_cls(
            c, cpus, gpus, use_process, max_concurrency, max_retries, on_error, name_override, ray_options=ray_options
        )

    return partial_cls if class_ is None else partial_cls(class_)

method #

_MethodDecorator #

Methods:

Name Description
__call__

Decorator to convert a Python method into a Daft user-defined function. This should be used in a class that is decorated with @daft.cls.

batch

Decorator to convert a Python method into a Daft user-defined batch function. This should be used in a class that is decorated with @daft.cls.

__call__ #

__call__(*, return_dtype: DataTypeLike | None = None, unnest: bool = False) -> Callable[[Callable[P, T]], Callable[P, T]]
__call__(method: Callable[P, T], *, return_dtype: DataTypeLike | None = None) -> Callable[P, T]
__call__(method: Callable[P, T]) -> Callable[P, T]
__call__(method: Callable[P, T] | None = None, *, return_dtype: DataTypeLike | None = None, unnest: bool = False, max_retries: int | None = None, on_error: Literal['raise', 'log', 'ignore'] | None = None) -> Callable[P, T] | Callable[[Callable[P, T]], Callable[P, T]]

Decorator to convert a Python method into a Daft user-defined function. This should be used in a class that is decorated with @daft.cls.

Parameters:

Name Type Description Default
return_dtype DataTypeLike | None

The data type that this function should return or yield. If not specified, it is derived from the method's return type hint.

None
unnest bool

Whether to unnest/flatten out return type fields into columns. Return dtype must be DataType.struct(..) when this is set to true. Defaults to false.

False

Similar to @daft.func, @daft.method supports three variants: row-wise, async row-wise, and generator. See @daft.func for more details.

Decorated methods accept both their original argument types and Daft Expressions. When any arguments are Expressions, they return a Daft Expression that can be used in DataFrame operations. When called without Expression arguments, methods execute immediately, first initializing a local instance of the class if it does not already exist.

See @daft.func and @daft.cls for more details.

Source code in daft/udf/__init__.py
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
def __call__(
    self,
    method: Callable[P, T] | None = None,
    *,
    return_dtype: DataTypeLike | None = None,
    unnest: bool = False,
    max_retries: int | None = None,
    on_error: Literal["raise", "log", "ignore"] | None = None,
) -> Callable[P, T] | Callable[[Callable[P, T]], Callable[P, T]]:
    """Decorator to convert a Python method into a Daft user-defined function. This should be used in a class that is decorated with `@daft.cls`.

    Args:
        return_dtype: The data type that this function should return or yield. If not specified, it is derived from the method's return type hint.
        unnest: Whether to unnest/flatten out return type fields into columns. Return dtype must be `DataType.struct(..)` when this is set to true. Defaults to false.

    Similar to `@daft.func`, `@daft.method` supports three variants: row-wise, async row-wise, and generator. See `@daft.func` for more details.

    Decorated methods accept both their original argument types and Daft Expressions.
    When any arguments are Expressions, they return a Daft Expression that can be used in DataFrame operations.
    When called without Expression arguments, methods execute immediately, first initializing a local instance of the class if it does not already exist.

    See `@daft.func` and `@daft.cls` for more details.
    """

    def partial_method(m: Callable[P, T]) -> Callable[P, T]:
        return mark_cls_method(m, return_dtype, unnest, False, None, max_retries, on_error)

    return partial_method if method is None else partial_method(method)

batch #

batch(*, return_dtype: DataTypeLike | None = None, unnest: bool = False, batch_size: int | None = None, max_retries: int | None = None, on_error: Literal['raise', 'log', 'ignore'] | None = None) -> Callable[[Callable[P, T]], Callable[P, T]]
batch(method: Callable[P, T], *, return_dtype: DataTypeLike | None = None, unnest: bool = False, batch_size: int | None = None, max_retries: int | None = None, on_error: Literal['raise', 'log', 'ignore'] | None = None) -> Callable[P, T]
batch(method: Callable[P, T] | None = None, *, return_dtype: DataTypeLike | None = None, unnest: bool = False, batch_size: int | None = None, max_retries: int | None = None, on_error: Literal['raise', 'log', 'ignore'] | None = None) -> Callable[P, T] | Callable[[Callable[P, T]], Callable[P, T]]

Decorator to convert a Python method into a Daft user-defined batch function. This should be used in a class that is decorated with @daft.cls.

Parameters:

Name Type Description Default
return_dtype DataTypeLike | None

The data type that this function should return.

None
unnest bool

Whether to unnest/flatten out return type fields into columns. Return dtype must be DataType.struct(..) when this is set to true.

False
batch_size int | None

The max number of rows in each input batch.

None

Batch methods receive daft.Series arguments, and return a daft.Series, list, numpy.ndarray, or pyarrow.Array. You can also call them with scalar arguments, which will be passed in without modification. When called without Expression arguments, they execute immediately, first initializing a local instance of the class if it does not already exist.

See @daft.func.batch and @daft.cls for more details.

Source code in daft/udf/__init__.py
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
def batch(
    self,
    method: Callable[P, T] | None = None,
    *,
    return_dtype: DataTypeLike | None = None,
    unnest: bool = False,
    batch_size: int | None = None,
    max_retries: int | None = None,
    on_error: Literal["raise", "log", "ignore"] | None = None,
) -> Callable[P, T] | Callable[[Callable[P, T]], Callable[P, T]]:
    """Decorator to convert a Python method into a Daft user-defined batch function. This should be used in a class that is decorated with `@daft.cls`.

    Args:
        return_dtype: The data type that this function should return.
        unnest: Whether to unnest/flatten out return type fields into columns. Return dtype must be `DataType.struct(..)` when this is set to true.
        batch_size: The max number of rows in each input batch.

    Batch methods receive `daft.Series` arguments, and return a `daft.Series`, `list`, `numpy.ndarray`, or `pyarrow.Array`.
    You can also call them with scalar arguments, which will be passed in without modification.
    When called without Expression arguments, they execute immediately, first initializing a local instance of the class if it does not already exist.

    See `@daft.func.batch` and `@daft.cls` for more details.
    """

    def partial_method(m: Callable[P, T]) -> Callable[P, T]:
        return mark_cls_method(m, return_dtype, unnest, True, batch_size, max_retries, on_error)

    return partial_method if method is None else partial_method(method)

Func #

Func(_cls: ClsBase[C], _method: Callable[Concatenate[C, P], T], is_generator: bool, is_async: bool, is_batch: bool, batch_size: int | None, unnest: bool, cpus: float | None, gpus: float, use_process: bool | None, max_concurrency: int | None, max_retries: int | None, on_error: str | None, return_dtype: DataType, name_override: str | None = None, ray_options: dict[str, Any] | None = None)

Methods:

Name Description
__call__
__post_init__

Post-init checks and setup.

with_concurrency

Create a new Daft function with the specified maximum concurrency.

with_ray_options

Create a new Daft function with the specified Ray options.

Attributes:

Name Type Description
batch_size int | None
cpus float | None
func_id str
gpus float
is_async bool
is_batch bool
is_generator bool
max_concurrency int | None
max_retries int | None
name str
name_override str | None
on_error str | None
ray_options dict[str, Any] | None
return_dtype DataType
unnest bool
use_process bool | None

batch_size #

batch_size: int | None

cpus #

cpus: float | None

func_id #

func_id: str = field(init=False)

gpus #

gpus: float

is_async #

is_async: bool

is_batch #

is_batch: bool

is_generator #

is_generator: bool

max_concurrency #

max_concurrency: int | None

max_retries #

max_retries: int | None

name #

name: str = field(init=False)

name_override #

name_override: str | None = None

on_error #

on_error: str | None

ray_options #

ray_options: dict[str, Any] | None = None

return_dtype #

return_dtype: DataType

unnest #

unnest: bool

use_process #

use_process: bool | None

__call__ #

__call__(*args: args, **kwargs: kwargs) -> T
__call__(*args: Expression, **kwargs: Expression) -> Expression
__call__(*args: Any, **kwargs: Any) -> Expression | T
__call__(*args: Any, **kwargs: Any) -> Expression | T
Source code in daft/udf/udf_v2.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
def __call__(self, *args: Any, **kwargs: Any) -> Expression | T:
    expr_args = []
    for arg in args:
        if isinstance(arg, Expression):
            expr_args.append(arg._expr)
    for arg in kwargs.values():
        if isinstance(arg, Expression):
            expr_args.append(arg._expr)

    # evaluate the function eagerly if there are no expression arguments
    if len(expr_args) == 0:
        bound_method = self._cls._daft_bind_method(self._method)
        return bound_method(*args, **kwargs)

    # When building expression-based UDFs, we must avoid incorrectly sharing call-site state across multiple uses of the same function.
    call_seq = getattr(self, "_daft_call_seq", 0)
    setattr(self, "_daft_call_seq", call_seq + 1)
    call_id = f"{self.func_id}-{call_seq}"

    check_serializable(
        self._method,
        "Daft functions must be serializable. If your function accesses a non-serializable global or nonlocal variable to avoid reinitialization, use `@daft.cls` with a setup method instead.",
    )
    check_serializable(
        self._cls,
        "Daft classes must be serializable. If your class accesses a non-serializable global or nonlocal variable, initialize it in the setup method instead.",
    )

    # Extract resource requests from ray_options if present
    ray_options = self.ray_options.copy() if self.ray_options is not None else {}
    if "num_cpus" in ray_options:
        raise ValueError(
            "Cannot set 'num_cpus' in `ray_options`. Please use the 'cpus' argument in @daft.func or @daft.cls instead."
        )
    if "num_gpus" in ray_options:
        raise ValueError(
            "Cannot set 'num_gpus' in `ray_options`. Please use the 'gpus' argument in @daft.func or @daft.cls instead."
        )
    if "memory" in ray_options:
        raise ValueError(
            "Cannot set 'memory' in `ray_options`. Please use the 'memory_bytes' argument in @daft.func or @daft.cls instead."
        )

    # If there are any ray options (other than the banned resource ones), we must use an actor pool
    max_concurrency = self.max_concurrency or (1 if ray_options else None)

    # TODO: implement generator UDFs on the engine side
    if self.is_generator:

        def method(s: C, *args: P.args, **kwargs: P.kwargs) -> list[Any]:
            return list(self._method(s, *args, **kwargs))  # type: ignore[call-overload]

        expr = Expression._from_pyexpr(
            row_wise_udf(
                call_id,
                self.name,
                self._cls,
                method,
                self.name_override is not None,
                self.is_async,
                DataType.list(self.return_dtype)._dtype,
                self.cpus,
                self.gpus,
                self.use_process,
                max_concurrency,
                self.max_retries,
                self.on_error,
                (args, kwargs),
                expr_args,
                ray_options if ray_options else None,
            )
        ).explode()
    elif self.is_batch:
        expr = Expression._from_pyexpr(
            batch_udf(
                call_id,
                self.name,
                self._cls,
                self._method,
                self.name_override is not None,
                self.is_async,
                self.return_dtype._dtype,
                self.cpus,
                self.gpus,
                self.use_process,
                max_concurrency,
                self.batch_size,
                self.max_retries,
                self.on_error,
                (args, kwargs),
                expr_args,
                ray_options if ray_options else None,
            )
        )
    else:
        expr = Expression._from_pyexpr(
            row_wise_udf(
                call_id,
                self.name,
                self._cls,
                self._method,
                self.name_override is not None,
                self.is_async,
                self.return_dtype._dtype,
                self.cpus,
                self.gpus,
                self.use_process,
                max_concurrency,
                self.max_retries,
                self.on_error,
                (args, kwargs),
                expr_args,
                ray_options if ray_options else None,
            )
        )

    if self.unnest:
        expr = expr.unnest()

    return expr

__post_init__ #

__post_init__() -> None

Post-init checks and setup.

Source code in daft/udf/udf_v2.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def __post_init__(self) -> None:
    """Post-init checks and setup."""
    functools.update_wrapper(self, self._method)
    self.func_id, self.name = self._derive_function_names()

    if self.unnest and not self.return_dtype.is_struct():
        raise ValueError(
            f"Expected Daft function `return_dtype` to be `DataType.struct(..)` when `unnest=True`, instead found: {self.return_dtype}"
        )

    if not self.is_batch and self.batch_size is not None:
        raise ValueError("Non-batch Daft functions cannot have a batch size.")

    if self.is_async and self.is_generator:
        raise ValueError("Daft functions do not yet support both async and generator functions.")

    if self.max_concurrency is not None and self.max_concurrency == 0:
        raise ValueError("max_concurrency for udf must be non-zero")

    if self.cpus is not None and self.cpus < 0:
        raise ValueError(f"num_cpus must be non-negative, got {self.cpus}")

    # Validate GPU resource request: allow fractional values up to 1.0; values > 1.0 must be integers.
    if self.gpus < 0:
        raise ValueError(f"num_gpus must be non-negative, got {self.gpus}")
    if self.gpus > 1 and not float(self.gpus).is_integer():
        raise ValueError(f"ResourceRequest num_gpus greater than 1 must be an integer, got {self.gpus}")

with_concurrency #

with_concurrency(max_concurrency: int) -> Func[P, T, C]

Create a new Daft function with the specified maximum concurrency.

Parameters:

Name Type Description Default
max_concurrency int

The maximum concurrency to use for this function.

required

Returns:

Type Description
Func[P, T, C]

A new Daft function with the specified maximum concurrency.

Source code in daft/udf/udf_v2.py
216
217
218
219
220
221
222
223
224
225
def with_concurrency(self, max_concurrency: int) -> Func[P, T, C]:
    """Create a new Daft function with the specified maximum concurrency.

    Args:
        max_concurrency: The maximum concurrency to use for this function.

    Returns:
        A new Daft function with the specified maximum concurrency.
    """
    return replace(self, max_concurrency=max_concurrency)

with_ray_options #

with_ray_options(**ray_options: Any) -> Func[P, T, C]

Create a new Daft function with the specified Ray options.

Parameters:

Name Type Description Default
**ray_options Any

Ray options to use for this function.

{}

Returns:

Type Description
Func[P, T, C]

A new Daft function with the specified Ray options.

Source code in daft/udf/udf_v2.py
203
204
205
206
207
208
209
210
211
212
213
214
def with_ray_options(self, **ray_options: Any) -> Func[P, T, C]:
    """Create a new Daft function with the specified Ray options.

    Args:
        **ray_options: Ray options to use for this function.

    Returns:
        A new Daft function with the specified Ray options.
    """
    new_ray_options = self.ray_options.copy() if self.ray_options is not None else {}
    new_ray_options.update(ray_options)
    return replace(self, ray_options=new_ray_options)

Aggregate UDFs#

@daft.udaf lets you define custom aggregation functions with a three-stage pipeline (aggregate, combine, finalize) that plugs into Daft's distributed aggregation engine.

Learn more in the User Guide.

udaf #

Functions:

Name Description
udaf

Decorator to create a user-defined aggregate function (UDAF) from a class.

udaf #

udaf(cls: type | None = None, *, return_dtype: DataTypeLike, state: DataTypeLike | dict[str, DataTypeLike]) -> type | Callable[[type], type]

Decorator to create a user-defined aggregate function (UDAF) from a class.

The execution pipeline follows three stages:

.. code-block:: text

1
2
3
Aggregation:   aggregate(inputs)  -> partial state
Combination:   combine(states)    -> merged state   (associative & commutative)
Finalization:  finalize(state)    -> final output

The class must define exactly three methods:

  • aggregate(*inputs): Consume all rows of the input columns for one group and produce the initial partial state. Receives one :class:Series per input column. Returns either a single scalar (single-state mode) or a dict[str, scalar] (multi-state mode).

  • combine(states): Merge multiple partial states into one. Must be associative and commutative — the framework does not guarantee the order in which partial states arrive. In single-state mode receives a :class:Series of scalars; in multi-state mode receives a dict[str, Series].

  • finalize(state): Produce the final output value from the fully-merged state. Called exactly once per group. In single-state mode receives a single Python scalar; in multi-state mode receives a dict[str, scalar].

State is typed: the state parameter declares one data type per state component. The framework carries state between stages using these types, which lets Arrow and the query planner reason about intermediate results.

Parameters:

Name Type Description Default
return_dtype DataTypeLike

The output data type of the aggregate function.

required
state DataTypeLike | dict[str, DataTypeLike]

The intermediate state type(s). Either a single DataType for simple accumulators, or a dict of {name: DataType} for multi-field state.

required

Examples:

Single-state UDAF:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
>>> import daft
>>> from daft import DataType, Series
>>> @daft.udaf(return_dtype=DataType.float64(), state=DataType.float64())
... class MySum:
...     def aggregate(self, values: Series) -> float:
...         return float(values.sum())
...
...     def combine(self, states: Series) -> float:
...         return float(states.sum())
...
...     def finalize(self, state: float) -> float:
...         return state

Multi-state UDAF:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
>>> @daft.udaf(
...     return_dtype=DataType.float64(),
...     state={"sum": DataType.float64(), "count": DataType.int64()},
... )
... class MyMean:
...     def aggregate(self, values: Series) -> dict:
...         return {"sum": float(values.sum()), "count": int(values.count())}
...
...     def combine(self, states: dict) -> dict:
...         return {"sum": float(states["sum"].sum()), "count": int(states["count"].sum())}
...
...     def finalize(self, state: dict) -> float:
...         return state["sum"] / state["count"]

Usage:

1
2
>>> my_sum = MySum()
>>> # df.groupby("category").agg(my_sum(df["value"]))
Source code in daft/udf/udaf.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def udaf(
    cls: type | None = None,
    *,
    return_dtype: DataTypeLike,
    state: DataTypeLike | dict[str, DataTypeLike],
) -> type | Callable[[type], type]:
    """Decorator to create a user-defined aggregate function (UDAF) from a class.

    The execution pipeline follows three stages:

    .. code-block:: text

        Aggregation:   aggregate(inputs)  -> partial state
        Combination:   combine(states)    -> merged state   (associative & commutative)
        Finalization:  finalize(state)    -> final output

    The class must define exactly three methods:

    - ``aggregate(*inputs)``: Consume all rows of the input columns for one
      group and produce the initial partial state.  Receives one :class:`Series`
      per input column.  Returns either a single scalar (single-state mode) or
      a ``dict[str, scalar]`` (multi-state mode).

    - ``combine(states)``: Merge multiple partial states into one.  Must be
      **associative and commutative** — the framework does not guarantee the
      order in which partial states arrive.  In single-state mode receives a
      :class:`Series` of scalars; in multi-state mode receives a
      ``dict[str, Series]``.

    - ``finalize(state)``: Produce the final output value from the fully-merged
      state.  Called exactly once per group.  In single-state mode receives a
      single Python scalar; in multi-state mode receives a
      ``dict[str, scalar]``.

    State is typed: the ``state`` parameter declares one data type per state
    component. The framework carries state between stages using these types,
    which lets Arrow and the query planner reason about intermediate results.

    Args:
        return_dtype: The output data type of the aggregate function.
        state: The intermediate state type(s). Either a single DataType for
            simple accumulators, or a dict of ``{name: DataType}`` for
            multi-field state.

    Examples:
        Single-state UDAF:

        >>> import daft
        >>> from daft import DataType, Series
        >>> @daft.udaf(return_dtype=DataType.float64(), state=DataType.float64())
        ... class MySum:
        ...     def aggregate(self, values: Series) -> float:
        ...         return float(values.sum())
        ...
        ...     def combine(self, states: Series) -> float:
        ...         return float(states.sum())
        ...
        ...     def finalize(self, state: float) -> float:
        ...         return state

        Multi-state UDAF:

        >>> @daft.udaf(
        ...     return_dtype=DataType.float64(),
        ...     state={"sum": DataType.float64(), "count": DataType.int64()},
        ... )
        ... class MyMean:
        ...     def aggregate(self, values: Series) -> dict:
        ...         return {"sum": float(values.sum()), "count": int(values.count())}
        ...
        ...     def combine(self, states: dict) -> dict:
        ...         return {"sum": float(states["sum"].sum()), "count": int(states["count"].sum())}
        ...
        ...     def finalize(self, state: dict) -> float:
        ...         return state["sum"] / state["count"]

        Usage:

        >>> my_sum = MySum()
        >>> # df.groupby("category").agg(my_sum(df["value"]))
    """
    resolved_return_dtype = DataType._infer(return_dtype)

    if isinstance(state, dict):
        state_field_names = list(state.keys())
        state_field_dtypes = [DataType._infer(v) for v in state.values()]
    else:
        state_field_names = ["_state"]
        state_field_dtypes = [DataType._infer(state)]

    def _wrap(klass: type) -> type:
        for method_name in ("aggregate", "combine", "finalize"):
            attr = inspect.getattr_static(klass, method_name, None)
            if attr is None or not inspect.isfunction(attr):
                raise ValueError(f"UDAF class `{klass.__name__}` must define a `{method_name}` method.")

        module_name = getattr(klass, "__module__", "")
        qual_name = getattr(klass, "__qualname__", klass.__name__)
        func_name = f"{module_name}.{qual_name}" if module_name else qual_name

        class WrappedUDAF:
            def __init__(self, *args: Any, **kwargs: Any) -> None:
                self._daft_cls_factory = klass
                self._daft_init_args: tuple[tuple[Any, ...], dict[str, Any]] = (args, kwargs)
                self._daft_func_name = func_name
                self._daft_func_id = f"{func_name}-{random.randint(0, 1_000_000)}"
                self._daft_call_seq = 0
                check_serializable(
                    self._daft_cls_factory,
                    "UDAF class must be serializable. Ensure it can be pickled.",
                )

            def __call__(self, *args: Any, **kwargs: Any) -> Expression:
                expr_args = []
                for arg in args:
                    if isinstance(arg, Expression):
                        expr_args.append(arg._expr)
                for arg in kwargs.values():
                    if isinstance(arg, Expression):
                        expr_args.append(arg._expr)

                if len(expr_args) == 0:
                    raise ValueError(
                        "UDAF must be called with at least one Expression argument. "
                        "Use it in a groupby().agg() context, e.g.: df.groupby('col').agg(my_udaf(df['value']))"
                    )

                call_id = f"{self._daft_func_id}-{self._daft_call_seq}"
                self._daft_call_seq += 1

                result = udaf_expr(
                    call_id,
                    self._daft_func_name,
                    self._daft_cls_factory,
                    self._daft_init_args,
                    (args, kwargs),
                    resolved_return_dtype._dtype,
                    state_field_names,
                    [dt._dtype for dt in state_field_dtypes],
                    expr_args,
                )
                return Expression._from_pyexpr(result)

            def __repr__(self) -> str:
                return f"AggFunc({func_name})"

        WrappedUDAF.__name__ = klass.__name__
        WrappedUDAF.__qualname__ = klass.__qualname__
        WrappedUDAF.__module__ = klass.__module__
        return WrappedUDAF

    if cls is not None:
        return _wrap(cls)
    return _wrap