Skip to content

I/O#

Daft offers a variety of approaches to creating a DataFrame from reading various data sources (in-memory data, files, data catalogs, and integrations) and writing to various data sources. See more about other Connectors in Daft User Guide.

Input#

from_arrow #

from_arrow(data: Union[Table, list[Table], Iterable[Table], ArrowStreamExportable]) -> DataFrame

Creates a DataFrame from Arrow data.

Accepts pyarrow Tables, lists/iterables of pyarrow Tables, or any object implementing the Arrow PyCapsule Interface <https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface> (i.e. has an __arrow_c_stream__ method). This includes pyarrow RecordBatchReaders, pandas DataFrames (2.2+), nanoarrow arrays, and other Arrow-compatible libraries.

Parameters:

Name Type Description Default
data Union[Table, list[Table], Iterable[Table], ArrowStreamExportable]

Arrow data to convert into a Daft DataFrame.

required

Returns:

Name Type Description
DataFrame DataFrame

DataFrame created from the provided Arrow data.

Examples:

1
2
3
4
5
>>> import pyarrow as pa
>>> import daft
>>> t = pa.table({"a": [1, 2, 3], "b": ["foo", "bar", "baz"]})
>>> df = daft.from_arrow(t)
>>> df.show()
╭───────┬────────╮
│ a     ┆ b      │
│ ---   ┆ ---    │
│ Int64 ┆ String │
╞═══════╪════════╡
│ 1     ┆ foo    │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 2     ┆ bar    │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 3     ┆ baz    │
╰───────┴────────╯
(Showing first 3 of 3 rows)
Source code in daft/convert.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
@PublicAPI
def from_arrow(
    data: Union["pa.Table", list["pa.Table"], Iterable["pa.Table"], ArrowStreamExportable],
) -> "DataFrame":
    """Creates a DataFrame from Arrow data.

    Accepts pyarrow Tables, lists/iterables of pyarrow Tables, or any object
    implementing the `Arrow PyCapsule Interface <https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface>`
    (i.e. has an ``__arrow_c_stream__`` method). This includes pyarrow RecordBatchReaders,
    pandas DataFrames (2.2+), nanoarrow arrays, and other Arrow-compatible libraries.

    Args:
        data: Arrow data to convert into a Daft DataFrame.

    Returns:
        DataFrame: DataFrame created from the provided Arrow data.

    Examples:
        >>> import pyarrow as pa
        >>> import daft
        >>> t = pa.table({"a": [1, 2, 3], "b": ["foo", "bar", "baz"]})
        >>> df = daft.from_arrow(t)
        >>> df.show()
        ╭───────┬────────╮
        │ a     ┆ b      │
        │ ---   ┆ ---    │
        │ Int64 ┆ String │
        ╞═══════╪════════╡
        │ 1     ┆ foo    │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
        │ 2     ┆ bar    │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
        │ 3     ┆ baz    │
        ╰───────┴────────╯
        <BLANKLINE>
        (Showing first 3 of 3 rows)
    """
    from daft import DataFrame

    # pa.Table implements __arrow_c_stream__ but we prefer the pyarrow-aware path
    # because it handles types the Rust FFI stream cannot (e.g. extension types, Decimal256).
    if not isinstance(data, pa.Table) and isinstance(data, ArrowStreamExportable):
        return DataFrame._from_arrow_stream(data)

    return DataFrame._from_arrow(data)

from_dask_dataframe #

from_dask_dataframe(ddf: DataFrame) -> DataFrame

Creates a Daft DataFrame from a Dask DataFrame.

The provided Dask DataFrame must have been created using Dask-on-Ray.

Parameters:

Name Type Description Default
ddf DataFrame

The Dask DataFrame to create a Daft DataFrame from.

required

Returns:

Name Type Description
DataFrame DataFrame

Daft DataFrame created from the provided Dask DataFrame.

Note

This function can only work if Daft is running using the RayRunner

Examples:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
>>> import dask.dataframe as dd
>>> import pandas as pd
>>> import daft
>>> import ray
>>>
>>> daft.set_runner_ray()
>>>
>>> ddf = dd.from_pandas(pd.DataFrame({"a": [1, 2], "b": ["foo", "bar"]}), npartitions=2)
>>> df = daft.from_dask_dataframe(ddf)
>>> df.show()
╭───────┬────────╮
│ a     ┆ b      │
│ ---   ┆ ---    │
│ Int64 ┆ String │
╞═══════╪════════╡
│ 1     ┆ foo    │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 2     ┆ bar    │
╰───────┴────────╯
(Showing first 2 of 2 rows)
Source code in daft/convert.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
@PublicAPI
def from_dask_dataframe(ddf: "dask.DataFrame") -> "DataFrame":
    """Creates a Daft DataFrame from a Dask DataFrame.

    The provided Dask DataFrame must have been created using [Dask-on-Ray](https://docs.ray.io/en/latest/ray-more-libs/dask-on-ray.html).

    Args:
        ddf: The Dask DataFrame to create a Daft DataFrame from.

    Returns:
        DataFrame: Daft DataFrame created from the provided Dask DataFrame.

    Note:
        This function can only work if Daft is running using the RayRunner

    Examples:
        >>> import dask.dataframe as dd
        >>> import pandas as pd
        >>> import daft
        >>> import ray
        >>>
        >>> daft.set_runner_ray()  # doctest: +SKIP
        >>>
        >>> ddf = dd.from_pandas(pd.DataFrame({"a": [1, 2], "b": ["foo", "bar"]}), npartitions=2)  # doctest: +SKIP
        >>> df = daft.from_dask_dataframe(ddf)  # doctest: +SKIP
        >>> df.show()  # doctest: +SKIP
        ╭───────┬────────╮
        │ a     ┆ b      │
        │ ---   ┆ ---    │
        │ Int64 ┆ String │
        ╞═══════╪════════╡
        │ 1     ┆ foo    │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
        │ 2     ┆ bar    │
        ╰───────┴────────╯
        <BLANKLINE>
        (Showing first 2 of 2 rows)

    """
    from daft import DataFrame

    return DataFrame._from_dask_dataframe(ddf)

from_glob_path #

from_glob_path(path: str | list[str], io_config: IOConfig | None = None) -> DataFrame

Creates a DataFrame of file paths and other metadata from a glob path.

This method supports wildcards:

  1. * matches any number of any characters including none
  2. ? matches any single character
  3. [...] matches any single character in the brackets
  4. ** recursively matches any number of layers of directories

The returned DataFrame will have the following columns:

  1. path: the path to the file/directory
  2. size: size of the object in bytes
  3. rows: the total rows of parquet object, it's None for other formats.

Parameters:

Name Type Description Default
path str | list

Path to files on disk (allows wildcards).

required
io_config IOConfig

Configuration to use when running IO with remote services

None

Returns:

Name Type Description
DataFrame DataFrame

DataFrame containing the path to each file as a row, along with other metadata parsed from the provided filesystem.

Note

If no files match the glob pattern(s), an empty DataFrame is returned instead of raising an error.

Examples:

1
2
3
4
>>> df = daft.from_glob_path("/path/to/files/*.jpeg")
>>> df = daft.from_glob_path("/path/to/files/**/*.jpeg")
>>> df = daft.from_glob_path("/path/to/files/**/image-?.jpeg")
>>> df = daft.from_glob_path(["/path/to/files/*.jpeg", "/path/to/others/*.jpeg"])
Source code in daft/io/file_path.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@PublicAPI
def from_glob_path(path: str | list[str], io_config: IOConfig | None = None) -> DataFrame:
    """Creates a DataFrame of file paths and other metadata from a glob path.

    This method supports wildcards:

    1. `*` matches any number of any characters including none
    2. `?` matches any single character
    3. `[...]` matches any single character in the brackets
    4. `**` recursively matches any number of layers of directories

    The returned DataFrame will have the following columns:

    1. path: the path to the file/directory
    2. size: size of the object in bytes
    3. rows: the total rows of parquet object, it's None for other formats.

    Args:
        path (str|list): Path to files on disk (allows wildcards).
        io_config (IOConfig): Configuration to use when running IO with remote services

    Returns:
        DataFrame: DataFrame containing the path to each file as a row, along with other metadata parsed from the provided filesystem.

    Note:
        If no files match the glob pattern(s), an empty DataFrame is returned instead of raising an error.

    Examples:
        >>> df = daft.from_glob_path("/path/to/files/*.jpeg")
        >>> df = daft.from_glob_path("/path/to/files/**/*.jpeg")
        >>> df = daft.from_glob_path("/path/to/files/**/image-?.jpeg")
        >>> df = daft.from_glob_path(["/path/to/files/*.jpeg", "/path/to/others/*.jpeg"])
    """
    if isinstance(path, str):
        path = [path]

    if len(path) == 0:
        raise ValueError("Must specify at least one glob path")

    context = get_context()
    io_config = context.daft_planning_config.default_io_config if io_config is None else io_config

    builder = LogicalPlanBuilder.from_glob_scan(
        glob_paths=path,
        io_config=io_config,
    )
    return DataFrame(builder)

from_pandas #

from_pandas(data: Union[DataFrame, list[DataFrame]]) -> DataFrame

Creates a Daft DataFrame from a pandas DataFrame.

Parameters:

Name Type Description Default
data Union[DataFrame, list[DataFrame]]

pandas DataFrame(s) that we wish to convert into a Daft DataFrame.

required

Returns:

Name Type Description
DataFrame DataFrame

Daft DataFrame created from the provided pandas DataFrame.

Examples:

1
2
3
4
5
>>> import pandas as pd
>>> import daft
>>> pd_df = pd.DataFrame({"a": [1, 2, 3], "b": ["foo", "bar", "baz"]})
>>> df = daft.from_pandas(pd_df)
>>> df.show()
╭───────┬────────╮
│ a     ┆ b      │
│ ---   ┆ ---    │
│ Int64 ┆ String │
╞═══════╪════════╡
│ 1     ┆ foo    │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 2     ┆ bar    │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 3     ┆ baz    │
╰───────┴────────╯
(Showing first 3 of 3 rows)
Source code in daft/convert.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
@PublicAPI
def from_pandas(data: Union["pd.DataFrame", list["pd.DataFrame"]]) -> "DataFrame":
    """Creates a Daft DataFrame from a pandas DataFrame.

    Args:
        data: pandas DataFrame(s) that we wish to convert into a Daft DataFrame.

    Returns:
        DataFrame: Daft DataFrame created from the provided pandas DataFrame.

    Examples:
        >>> import pandas as pd
        >>> import daft
        >>> pd_df = pd.DataFrame({"a": [1, 2, 3], "b": ["foo", "bar", "baz"]})
        >>> df = daft.from_pandas(pd_df)
        >>> df.show()
        ╭───────┬────────╮
        │ a     ┆ b      │
        │ ---   ┆ ---    │
        │ Int64 ┆ String │
        ╞═══════╪════════╡
        │ 1     ┆ foo    │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
        │ 2     ┆ bar    │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
        │ 3     ┆ baz    │
        ╰───────┴────────╯
        <BLANKLINE>
        (Showing first 3 of 3 rows)
    """
    from daft import DataFrame

    return DataFrame._from_pandas(data)

from_pydict #

from_pydict(data: dict[str, InputListType]) -> DataFrame

Creates a DataFrame from a Python dictionary.

Parameters:

Name Type Description Default
data dict[str, InputListType]

Key -> Sequence[item] of data. Each Key is created as a column, and must have a value that is a Python list, Numpy array or PyArrow array. Values must be equal in length across all keys.

required

Returns:

Name Type Description
DataFrame DataFrame

DataFrame created from dictionary of columns

Examples:

1
2
3
>>> import daft
>>> df = daft.from_pydict({"foo": [1, 2]})
>>> df.show()
╭───────╮
│ foo   │
│ ---   │
│ Int64 │
╞═══════╡
│ 1     │
├╌╌╌╌╌╌╌┤
│ 2     │
╰───────╯
(Showing first 2 of 2 rows)
Source code in daft/convert.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@PublicAPI
def from_pydict(data: dict[str, InputListType]) -> "DataFrame":
    """Creates a DataFrame from a Python dictionary.

    Args:
        data: Key -> Sequence[item] of data. Each Key is created as a column, and must have a value that is
            a Python list, Numpy array or PyArrow array. Values must be equal in length across all keys.

    Returns:
        DataFrame: DataFrame created from dictionary of columns

    Examples:
        >>> import daft
        >>> df = daft.from_pydict({"foo": [1, 2]})
        >>> df.show()
        ╭───────╮
        │ foo   │
        │ ---   │
        │ Int64 │
        ╞═══════╡
        │ 1     │
        ├╌╌╌╌╌╌╌┤
        │ 2     │
        ╰───────╯
        <BLANKLINE>
        (Showing first 2 of 2 rows)
    """
    from daft import DataFrame

    return DataFrame._from_pydict(data)

from_pylist #

from_pylist(data: list[dict[str, Any]]) -> DataFrame

Creates a DataFrame from a list of dictionaries.

Parameters:

Name Type Description Default
data list[dict[str, Any]]

List of dictionaries, where each key is a column name.

required

Returns:

Name Type Description
DataFrame DataFrame

DataFrame created from list of dictionaries.

Examples:

1
2
3
>>> import daft
>>> df = daft.from_pylist([{"foo": 1}, {"foo": 2}])
>>> df.show()
╭───────╮
│ foo   │
│ ---   │
│ Int64 │
╞═══════╡
│ 1     │
├╌╌╌╌╌╌╌┤
│ 2     │
╰───────╯
(Showing first 2 of 2 rows)
Source code in daft/convert.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@PublicAPI
def from_pylist(data: list[dict[str, Any]]) -> "DataFrame":
    """Creates a DataFrame from a list of dictionaries.

    Args:
        data: List of dictionaries, where each key is a column name.

    Returns:
        DataFrame: DataFrame created from list of dictionaries.

    Examples:
        >>> import daft
        >>> df = daft.from_pylist([{"foo": 1}, {"foo": 2}])
        >>> df.show()
        ╭───────╮
        │ foo   │
        │ ---   │
        │ Int64 │
        ╞═══════╡
        │ 1     │
        ├╌╌╌╌╌╌╌┤
        │ 2     │
        ╰───────╯
        <BLANKLINE>
        (Showing first 2 of 2 rows)
    """
    from daft import DataFrame

    return DataFrame._from_pylist(data)

from_ray_dataset #

from_ray_dataset(ds: Dataset) -> DataFrame

Creates a DataFrame from a Ray Dataset.

Parameters:

Name Type Description Default
ds Dataset

The Ray Dataset to create a Daft DataFrame from.

required

Returns:

Name Type Description
DataFrame DataFrame

Daft DataFrame created from the provided Ray dataset.

Note

This function can only work if Daft is running using the RayRunner

Examples:

1
2
3
4
5
6
7
8
>>> import ray
>>> import daft
>>>
>>> daft.set_runner_ray()
>>>
>>> ds = ray.data.from_items([{"a": 1, "b": "foo"}, {"a": 2, "b": "bar"}])
>>> df = daft.from_ray_dataset(ds)
>>> df.show()
╭───────┬────────╮
│ a     ┆ b      │
│ ---   ┆ ---    │
│ Int64 ┆ String │
╞═══════╪════════╡
│ 1     ┆ foo    │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 2     ┆ bar    │
╰───────┴────────╯
(Showing first 2 of 2 rows)
Source code in daft/convert.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
@PublicAPI
def from_ray_dataset(ds: "RayDataset") -> "DataFrame":
    """Creates a DataFrame from a Ray Dataset.

    Args:
        ds: The Ray Dataset to create a Daft DataFrame from.

    Returns:
        DataFrame: Daft DataFrame created from the provided Ray dataset.

    Note:
        This function can only work if Daft is running using the RayRunner

    Examples:
        >>> import ray
        >>> import daft
        >>>
        >>> daft.set_runner_ray()  # doctest: +SKIP
        >>>
        >>> ds = ray.data.from_items([{"a": 1, "b": "foo"}, {"a": 2, "b": "bar"}])  # doctest: +SKIP
        >>> df = daft.from_ray_dataset(ds)  # doctest: +SKIP
        >>> df.show()  # doctest: +SKIP
        ╭───────┬────────╮
        │ a     ┆ b      │
        │ ---   ┆ ---    │
        │ Int64 ┆ String │
        ╞═══════╪════════╡
        │ 1     ┆ foo    │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
        │ 2     ┆ bar    │
        ╰───────┴────────╯
        <BLANKLINE>
        (Showing first 2 of 2 rows)

    """
    from daft import DataFrame

    return DataFrame._from_ray_dataset(ds)

read_csv #

read_csv(path: str | list[str], infer_schema: bool = True, schema: dict[str, DataType] | None = None, has_headers: bool = True, delimiter: str | None = None, double_quote: bool = True, quote: str | None = None, escape_char: str | None = None, comment: str | None = None, allow_variable_columns: bool = False, io_config: IOConfig | None = None, file_path_column: str | None = None, hive_partitioning: bool = False, _buffer_size: int | None = None, _chunk_size: int | None = None, checkpoint: CheckpointConfig | None = None) -> DataFrame

Creates a DataFrame from CSV file(s).

Parameters:

Name Type Description Default
path str

Path to CSV (allows for wildcards; supports remote URLs to object stores such as s3:// or gs://)

required
infer_schema bool

Whether to infer the schema of the CSV, defaults to True.

True
schema dict[str, DataType]

A schema that is used as the definitive schema for the CSV if infer_schema is False, otherwise it is used as a schema hint that is applied after the schema is inferred (overriding the types of inferred columns, and appending any new columns not found during inference).

None
has_headers bool

Whether the CSV has a header or not, defaults to True

True
delimiter Str

Delimiter used in the CSV, defaults to ","

None
double_quote bool

Whether to support double quote escapes, defaults to True

True
escape_char str

Character to use as the escape character for double quotes, or defaults to "

None
comment str

Character to treat as the start of a comment line, or None to not support comments

None
allow_variable_columns bool

Whether to allow for variable number of columns in the CSV, defaults to False. If set to True, Daft will append nulls to rows with less columns than the schema, and ignore extra columns in rows with more columns

False
io_config IOConfig

Config to be used with the native downloader

None
file_path_column str | None

Include the source path(s) as a column with this name. Defaults to None.

None
hive_partitioning bool

Whether to infer hive_style partitions from file paths and include them as columns in the Dataframe. Defaults to False.

False
checkpoint CheckpointConfig | None

Optional :class:daft.CheckpointConfig for progress tracking across runs. Bundles the checkpoint store, the source key column (on=), and optional anti-join tuning. Rows whose key already exists in the store are skipped on re-run. Requires the Ray runner.

None

Returns:

Name Type Description
DataFrame DataFrame

parsed DataFrame

Examples:

Read a CSV file from a local path:

1
2
3
>>> df = daft.read_csv("/path/to/file.csv")
>>> df = daft.read_csv("/path/to/directory")
>>> df = daft.read_csv("/path/to/files-*.csv")

Read a CSV file from a public S3 bucket:

1
2
3
4
>>> from daft.io import S3Config, IOConfig
>>> io_config = IOConfig(s3=S3Config(region="us-west-2", anonymous=True))
>>> df = daft.read_csv("s3://path/to/files-*.csv", io_config=io_config)
>>> df.show()
Source code in daft/io/_csv.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
@PublicAPI
def read_csv(
    path: str | list[str],
    infer_schema: bool = True,
    schema: dict[str, DataType] | None = None,
    has_headers: bool = True,
    delimiter: str | None = None,
    double_quote: bool = True,
    quote: str | None = None,
    escape_char: str | None = None,
    comment: str | None = None,
    allow_variable_columns: bool = False,
    io_config: IOConfig | None = None,
    file_path_column: str | None = None,
    hive_partitioning: bool = False,
    _buffer_size: int | None = None,
    _chunk_size: int | None = None,
    checkpoint: "CheckpointConfig | None" = None,
) -> DataFrame:
    """Creates a DataFrame from CSV file(s).

    Args:
        path (str): Path to CSV (allows for wildcards; supports remote URLs to object stores such as ``s3://`` or ``gs://``)
        infer_schema (bool): Whether to infer the schema of the CSV, defaults to True.
        schema (dict[str, DataType]): A schema that is used as the definitive schema for the CSV if infer_schema is False, otherwise it is used as a schema hint that is applied after the schema is inferred (overriding the types of inferred columns, and appending any new columns not found during inference).
        has_headers (bool): Whether the CSV has a header or not, defaults to True
        delimiter (Str): Delimiter used in the CSV, defaults to ","
        double_quote (bool): Whether to support double quote escapes, defaults to True
        escape_char (str): Character to use as the escape character for double quotes, or defaults to `"`
        comment (str): Character to treat as the start of a comment line, or None to not support comments
        allow_variable_columns (bool): Whether to allow for variable number of columns in the CSV, defaults to False. If set to True, Daft will append nulls to rows with less columns than the schema, and ignore extra columns in rows with more columns
        io_config (IOConfig): Config to be used with the native downloader
        file_path_column: Include the source path(s) as a column with this name. Defaults to None.
        hive_partitioning: Whether to infer hive_style partitions from file paths and include them as columns in the Dataframe. Defaults to False.
        checkpoint: Optional :class:`daft.CheckpointConfig` for progress tracking across runs. Bundles the
            checkpoint store, the source key column (``on=``), and optional anti-join tuning. Rows whose key
            already exists in the store are skipped on re-run. Requires the Ray runner.

    Returns:
        DataFrame: parsed DataFrame

    Examples:
        Read a CSV file from a local path:
        >>> df = daft.read_csv("/path/to/file.csv")
        >>> df = daft.read_csv("/path/to/directory")
        >>> df = daft.read_csv("/path/to/files-*.csv")

        Read a CSV file from a public S3 bucket:
        >>> from daft.io import S3Config, IOConfig
        >>> io_config = IOConfig(s3=S3Config(region="us-west-2", anonymous=True))
        >>> df = daft.read_csv("s3://path/to/files-*.csv", io_config=io_config)
        >>> df.show()
    """
    if isinstance(path, list) and len(path) == 0:
        raise ValueError("Cannot read DataFrame from empty list of CSV filepaths")

    if not infer_schema and schema is None:
        raise ValueError(
            "Cannot read DataFrame with infer_schema=False and schema=None, please provide a schema or set infer_schema=True"
        )

    io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config

    csv_config = CsvSourceConfig(
        delimiter=delimiter,
        has_headers=has_headers,
        double_quote=double_quote,
        quote=quote,
        escape_char=escape_char,
        comment=comment,
        allow_variable_columns=allow_variable_columns,
        buffer_size=_buffer_size,
        chunk_size=_chunk_size,
    )
    file_format_config = FileFormatConfig.from_csv_config(csv_config)
    storage_config = StorageConfig(True, io_config)

    builder = get_tabular_files_scan(
        path=path,
        infer_schema=infer_schema,
        schema=schema,
        file_format_config=file_format_config,
        storage_config=storage_config,
        file_path_column=file_path_column,
        hive_partitioning=hive_partitioning,
    )
    builder = attach_checkpoint(builder, checkpoint)
    return DataFrame(builder)

read_deltalake #

read_deltalake(table: Union[str, UnityCatalogTable], version: Union[int, str, datetime] | None = None, io_config: IOConfig | None = None, ignore_deletion_vectors: bool = False, _multithreaded_io: bool | None = None) -> DataFrame

Create a DataFrame from a Delta Lake table.

Parameters:

Name Type Description Default
table Union[str, UnityCatalogTable]

Either a URI for the Delta Lake table (supports remote URLs to object stores such as s3:// or gs://) or a UnityCatalogTable instance from a Unity Catalog client.

required
version optional

If int is passed, read the table with specified version number. Otherwise if string or datetime, read the timestamp version of the table. Strings must be RFC 3339 and ISO 8601 date and time format. Datetimes are assumed to be UTC timezone unless specified. By default, read the latest version of the table.

None
io_config optional

A custom :class:~daft.daft.IOConfig to use when accessing Delta Lake object storage data. Defaults to None.

None
ignore_deletion_vectors optional

Whether to skip checking for deletion vectors when reading the table. Defaults to False.

False
_multithreaded_io optional

Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing the amount of system resources (number of connections and thread contention) when running in the Ray runner. Defaults to None, which will let Daft decide based on the runner it is currently using.

None

Returns:

Name Type Description
DataFrame DataFrame

A DataFrame with the schema converted from the specified Delta Lake table.

Note

This function requires the use of deltalake, a Python library for interacting with Delta Lake.

Examples:

Read a Delta Lake table from a local path:

1
2
3
4
5
>>> df = daft.read_deltalake("some-table-uri")
>>>
>>> # Filters on this dataframe can now be pushed into the read operation from Delta Lake.
>>> df = df.where(df["foo"] > 5)
>>> df.show()

Read a Delta Lake table from a public S3 bucket:

1
2
3
4
>>> from daft.io import S3Config, IOConfig
>>> io_config = IOConfig(s3=S3Config(region="us-west-2", anonymous=True))
>>> df = daft.read_deltalake("s3://daft-oss-public-data/test_fixtures/delta_table/", io_config=io_config)
>>> df.show()
Source code in daft/io/delta_lake/_deltalake.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@PublicAPI
def read_deltalake(
    table: Union[str, "UnityCatalogTable"],
    version: Union[int, str, "datetime"] | None = None,
    io_config: IOConfig | None = None,
    ignore_deletion_vectors: bool = False,
    _multithreaded_io: bool | None = None,
) -> DataFrame:
    """Create a DataFrame from a Delta Lake table.

    Args:
        table: Either a URI for the Delta Lake table (supports remote URLs to object stores such as ``s3://`` or ``gs://``)
            or a ``UnityCatalogTable`` instance from a Unity Catalog client.
        version (optional): If int is passed, read the table with specified version number. Otherwise if string or datetime,
            read the timestamp version of the table. Strings must be RFC 3339 and ISO 8601 date and time format.
            Datetimes are assumed to be UTC timezone unless specified. By default, read the latest version of the table.
        io_config (optional): A custom :class:`~daft.daft.IOConfig` to use when accessing Delta Lake object storage data. Defaults to None.
        ignore_deletion_vectors (optional): Whether to skip checking for deletion vectors when reading the table. Defaults to False.
        _multithreaded_io (optional): Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing
            the amount of system resources (number of connections and thread contention) when running in the Ray runner.
            Defaults to None, which will let Daft decide based on the runner it is currently using.

    Returns:
        DataFrame: A DataFrame with the schema converted from the specified Delta Lake table.

    Note:
        This function requires the use of [deltalake](https://delta-io.github.io/delta-rs/), a Python library for interacting with Delta Lake.

    Examples:
        Read a Delta Lake table from a local path:
        >>> df = daft.read_deltalake("some-table-uri")
        >>>
        >>> # Filters on this dataframe can now be pushed into the read operation from Delta Lake.
        >>> df = df.where(df["foo"] > 5)
        >>> df.show()

        Read a Delta Lake table from a public S3 bucket:
        >>> from daft.io import S3Config, IOConfig
        >>> io_config = IOConfig(s3=S3Config(region="us-west-2", anonymous=True))
        >>> df = daft.read_deltalake("s3://daft-oss-public-data/test_fixtures/delta_table/", io_config=io_config)
        >>> df.show()
    """
    from daft.io.delta_lake.delta_lake_scan import DeltaLakeScanOperator

    # If running on Ray, we want to limit the amount of concurrency and requests being made.
    # This is because each Ray worker process receives its own pool of thread workers and connections
    multithreaded_io = (
        (runners.get_or_create_runner().name != "ray") if _multithreaded_io is None else _multithreaded_io
    )

    io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config
    storage_config = StorageConfig(multithreaded_io, io_config)

    if isinstance(table, str):
        table_uri = os.path.expanduser(table)
    elif unity_catalog.module_available() and isinstance(table, unity_catalog.UnityCatalogTable):
        table_uri = table.table_uri

        # Override the storage_config with the one provided by Unity catalog
        recordbatch_io_config = table.io_config
        if recordbatch_io_config is not None:
            storage_config = StorageConfig(multithreaded_io, recordbatch_io_config)
    else:
        raise ValueError(
            f"table argument must be a table URI string or UnityCatalogTable instance, but got: {type(table)}, {table}"
        )
    delta_lake_operator = DeltaLakeScanOperator(
        table_uri, storage_config=storage_config, version=version, ignore_deletion_vectors=ignore_deletion_vectors
    )

    handle = ScanOperatorHandle.from_python_scan_operator(delta_lake_operator)
    builder = LogicalPlanBuilder.from_tabular_scan(scan_operator=handle)
    return DataFrame(builder)

read_hudi #

read_hudi(table_uri: str, io_config: IOConfig | None = None, checkpoint: CheckpointConfig | None = None) -> DataFrame

Create a DataFrame from a Hudi table.

Parameters:

Name Type Description Default
table_uri str

URI to the Hudi table (supports remote URLs to object stores such as s3:// or gs://).

required
io_config IOConfig | None

A custom IOConfig to use when accessing Hudi table object storage data. Defaults to None.

None
checkpoint CheckpointConfig | None

Optional :class:daft.CheckpointConfig for progress tracking across runs. Bundles the checkpoint store, the source key column (on=), and optional anti-join tuning. Rows whose key already exists in the store are skipped on re-run. Requires the Ray runner.

None

Returns:

Name Type Description
DataFrame DataFrame

A DataFrame with the schema converted from the specified Hudi table.

Note

This function requires the use of Apache Hudi. To ensure that this is installed with Daft, you may install: pip install -U daft[hudi]

Examples:

Read a Hudi table from a local path:

1
2
3
>>> df = daft.read_hudi("some-table-uri")
>>> df = df.where(df["foo"] > 5)
>>> df.show()

Read a Hudi table from a public S3 bucket:

1
2
3
4
>>> from daft.io import S3Config, IOConfig
>>> io_config = IOConfig(s3=S3Config(region="us-west-2", anonymous=True))
>>> df = daft.read_hudi("s3://bucket/path/to/hudi_table/", io_config=io_config)
>>> df.show()
Source code in daft/io/hudi/_hudi.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@PublicAPI
def read_hudi(
    table_uri: str,
    io_config: IOConfig | None = None,
    checkpoint: "CheckpointConfig | None" = None,
) -> DataFrame:
    """Create a DataFrame from a Hudi table.

    Args:
        table_uri: URI to the Hudi table (supports remote URLs to object stores such as ``s3://`` or ``gs://``).
        io_config: A custom IOConfig to use when accessing Hudi table object storage data. Defaults to None.
        checkpoint: Optional :class:`daft.CheckpointConfig` for progress tracking across runs. Bundles the
            checkpoint store, the source key column (``on=``), and optional anti-join tuning. Rows whose key
            already exists in the store are skipped on re-run. Requires the Ray runner.

    Returns:
        DataFrame: A DataFrame with the schema converted from the specified Hudi table.

    Note:
        This function requires the use of Apache Hudi. To ensure that this is installed with Daft, you may install: ``pip install -U daft[hudi]``

    Examples:
        Read a Hudi table from a local path:
        >>> df = daft.read_hudi("some-table-uri")
        >>> df = df.where(df["foo"] > 5)
        >>> df.show()

        Read a Hudi table from a public S3 bucket:
        >>> from daft.io import S3Config, IOConfig
        >>> io_config = IOConfig(s3=S3Config(region="us-west-2", anonymous=True))
        >>> df = daft.read_hudi("s3://bucket/path/to/hudi_table/", io_config=io_config)
        >>> df.show()
    """
    from daft.io.hudi.hudi_scan import HudiScanOperator

    io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config

    multithreaded_io = runners.get_or_create_runner().name != "ray"
    storage_config = StorageConfig(multithreaded_io, io_config)

    hudi_operator = HudiScanOperator(table_uri, storage_config=storage_config)

    handle = ScanOperatorHandle.from_python_scan_operator(hudi_operator)
    builder = LogicalPlanBuilder.from_tabular_scan(scan_operator=handle)
    builder = attach_checkpoint(builder, checkpoint)
    return DataFrame(builder)

read_iceberg #

read_iceberg(table: Union[str, Table], snapshot_id: int | None = None, io_config: IOConfig | None = None, checkpoint: CheckpointConfig | None = None) -> DataFrame

Create a DataFrame from an Iceberg table.

Parameters:

Name Type Description Default
table str or Table

A path to an Iceberg metadata file (supports remote URLs to object stores such as s3:// or gs://) or a PyIceberg Table created using the PyIceberg library.

required
snapshot_id int

Snapshot ID of the table to query

None
io_config IOConfig

A custom IOConfig to use when accessing Iceberg object storage data. If provided, configurations set in table are ignored.

None
checkpoint CheckpointConfig | None

Optional :class:daft.CheckpointConfig for progress tracking across runs. Bundles the checkpoint store, the source key column (on=), and optional anti-join tuning. Rows whose key already exists in the store are skipped on re-run. Requires the Ray runner.

None

Returns:

Name Type Description
DataFrame DataFrame

a DataFrame with the schema converted from the specified Iceberg table

Note

This function requires the use of PyIceberg, which is the Apache Iceberg's official project for Python.

Examples:

Read an Iceberg table from a PyIceberg table:

1
2
3
4
5
6
7
8
>>> import pyiceberg
>>>
>>> table = pyiceberg.Table(...)
>>> df = daft.read_iceberg(table)
>>>
>>> # Filters on this dataframe can now be pushed into the read operation from Iceberg
>>> df = df.where(df["foo"] > 5)
>>> df.show()

Read an Iceberg table from S3 using IOConfig:

1
2
3
4
>>> from daft.io import S3Config, IOConfig
>>> io_config = IOConfig(s3=S3Config(region="us-west-2", anonymous=True))
>>> df = daft.read_iceberg("s3://bucket/path/to/iceberg/metadata.json", io_config=io_config)
>>> df.show()
Source code in daft/io/iceberg/_iceberg.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
@PublicAPI
def read_iceberg(
    table: Union[str, "PyIcebergTable"],
    snapshot_id: int | None = None,
    io_config: IOConfig | None = None,
    checkpoint: "CheckpointConfig | None" = None,
) -> DataFrame:
    """Create a DataFrame from an Iceberg table.

    Args:
        table (str or pyiceberg.table.Table): A path to an Iceberg metadata file (supports remote URLs to object stores
            such as ``s3://`` or ``gs://``) or a [PyIceberg Table](https://py.iceberg.apache.org/reference/pyiceberg/table/#pyiceberg.table.Table)
            created using the PyIceberg library.
        snapshot_id (int, optional): Snapshot ID of the table to query
        io_config (IOConfig, optional): A custom IOConfig to use when accessing Iceberg object storage data. If provided, configurations set in `table` are ignored.
        checkpoint: Optional :class:`daft.CheckpointConfig` for progress tracking across runs. Bundles the
            checkpoint store, the source key column (``on=``), and optional anti-join tuning. Rows whose key
            already exists in the store are skipped on re-run. Requires the Ray runner.

    Returns:
        DataFrame: a DataFrame with the schema converted from the specified Iceberg table

    Note:
        This function requires the use of [PyIceberg](https://py.iceberg.apache.org/), which is the Apache Iceberg's
        official project for Python.

    Examples:
        Read an Iceberg table from a PyIceberg table:
        >>> import pyiceberg
        >>>
        >>> table = pyiceberg.Table(...)
        >>> df = daft.read_iceberg(table)
        >>>
        >>> # Filters on this dataframe can now be pushed into the read operation from Iceberg
        >>> df = df.where(df["foo"] > 5)
        >>> df.show()

        Read an Iceberg table from S3 using IOConfig:
        >>> from daft.io import S3Config, IOConfig
        >>> io_config = IOConfig(s3=S3Config(region="us-west-2", anonymous=True))
        >>> df = daft.read_iceberg("s3://bucket/path/to/iceberg/metadata.json", io_config=io_config)
        >>> df.show()
    """
    from pyiceberg.table import StaticTable

    from daft.io.iceberg.iceberg_scan import IcebergScanOperator

    # support for read_iceberg('path/to/metadata.json')
    if isinstance(table, str):
        table = StaticTable.from_metadata(metadata_location=table)

    io_config = (
        _convert_iceberg_file_io_properties_to_io_config(table.io.properties) if io_config is None else io_config
    )
    io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config

    multithreaded_io = runners.get_or_create_runner().name != "ray"
    storage_config = StorageConfig(multithreaded_io, io_config)

    iceberg_operator = IcebergScanOperator(table, snapshot_id=snapshot_id, storage_config=storage_config)

    handle = ScanOperatorHandle.from_python_scan_operator(iceberg_operator)
    builder = LogicalPlanBuilder.from_tabular_scan(scan_operator=handle)
    builder = attach_checkpoint(builder, checkpoint)
    return DataFrame(builder)

read_json #

read_json(path: str | list[str], infer_schema: bool = True, schema: dict[str, DataType] | None = None, io_config: IOConfig | None = None, file_path_column: str | None = None, hive_partitioning: bool = False, skip_empty_files: bool = False, _buffer_size: int | None = None, _chunk_size: int | None = None, checkpoint: CheckpointConfig | None = None) -> DataFrame

Creates a DataFrame from line-delimited JSON file(s).

Parameters:

Name Type Description Default
path str

Path to JSON files (allows for wildcards; supports remote URLs to object stores such as s3:// or gs://)

required
infer_schema bool

Whether to infer the schema of the JSON, defaults to True.

True
schema dict[str, DataType]

A schema that is used as the definitive schema for the JSON if infer_schema is False, otherwise it is used as a schema hint that is applied after the schema is inferred (overriding the types of inferred columns, and appending any new columns not found during inference).

None
io_config IOConfig

Config to be used with the native downloader

None
file_path_column str | None

Include the source path(s) as a column with this name. Defaults to None.

None
hive_partitioning bool

Whether to infer hive_style partitions from file paths and include them as columns in the Dataframe. Defaults to False.

False
skip_empty_files bool

Whether to skip empty files when reading. Defaults to False.

False
checkpoint CheckpointConfig | None

Optional :class:daft.CheckpointConfig for progress tracking across runs. Bundles the checkpoint store, the source key column (on=), and optional anti-join tuning. Rows whose key already exists in the store are skipped on re-run. Requires the Ray runner.

None

Returns:

Name Type Description
DataFrame DataFrame

parsed DataFrame

Examples:

Read a JSON file from a local path:

1
2
3
>>> df = daft.read_json("/path/to/file.json")
>>> df = daft.read_json("/path/to/directory")
>>> df = daft.read_json("/path/to/files-*.json")

Read a JSON file from a public S3 bucket:

1
2
3
4
>>> from daft.io import S3Config, IOConfig
>>> io_config = IOConfig(s3=S3Config(region="us-west-2", anonymous=True))
>>> df = daft.read_json("s3://path/to/files-*.json", io_config=io_config)
>>> df.show()
Source code in daft/io/_json.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@PublicAPI
def read_json(
    path: str | list[str],
    infer_schema: bool = True,
    schema: dict[str, DataType] | None = None,
    io_config: IOConfig | None = None,
    file_path_column: str | None = None,
    hive_partitioning: bool = False,
    skip_empty_files: bool = False,
    _buffer_size: int | None = None,
    _chunk_size: int | None = None,
    checkpoint: "CheckpointConfig | None" = None,
) -> DataFrame:
    """Creates a DataFrame from line-delimited JSON file(s).

    Args:
        path (str): Path to JSON files (allows for wildcards; supports remote URLs to object stores such as ``s3://`` or ``gs://``)
        infer_schema (bool): Whether to infer the schema of the JSON, defaults to True.
        schema (dict[str, DataType]): A schema that is used as the definitive schema for the JSON if infer_schema is False, otherwise it is used as a schema hint that is applied after the schema is inferred (overriding the types of inferred columns, and appending any new columns not found during inference).
        io_config (IOConfig): Config to be used with the native downloader
        file_path_column: Include the source path(s) as a column with this name. Defaults to None.
        hive_partitioning: Whether to infer hive_style partitions from file paths and include them as columns in the Dataframe. Defaults to False.
        skip_empty_files: Whether to skip empty files when reading. Defaults to False.
        checkpoint: Optional :class:`daft.CheckpointConfig` for progress tracking across runs. Bundles the
            checkpoint store, the source key column (``on=``), and optional anti-join tuning. Rows whose key
            already exists in the store are skipped on re-run. Requires the Ray runner.

    Returns:
        DataFrame: parsed DataFrame

    Examples:
        Read a JSON file from a local path:
        >>> df = daft.read_json("/path/to/file.json")
        >>> df = daft.read_json("/path/to/directory")
        >>> df = daft.read_json("/path/to/files-*.json")

        Read a JSON file from a public S3 bucket:
        >>> from daft.io import S3Config, IOConfig
        >>> io_config = IOConfig(s3=S3Config(region="us-west-2", anonymous=True))
        >>> df = daft.read_json("s3://path/to/files-*.json", io_config=io_config)
        >>> df.show()
    """
    if isinstance(path, list) and len(path) == 0:
        raise ValueError("Cannot read DataFrame from empty list of JSON filepaths")

    if not infer_schema and schema is None:
        raise ValueError(
            "Cannot read DataFrame with infer_schema=False and schema=None, please provide a schema or set infer_schema=True"
        )

    io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config

    json_config = JsonSourceConfig(buffer_size=_buffer_size, chunk_size=_chunk_size, skip_empty_files=skip_empty_files)
    file_format_config = FileFormatConfig.from_json_config(json_config)
    storage_config = StorageConfig(True, io_config)

    builder = get_tabular_files_scan(
        path=path,
        infer_schema=infer_schema,
        schema=schema,
        file_format_config=file_format_config,
        storage_config=storage_config,
        file_path_column=file_path_column,
        hive_partitioning=hive_partitioning,
    )
    builder = attach_checkpoint(builder, checkpoint)
    return DataFrame(builder)

read_kafka #

read_kafka(bootstrap_servers: str | Sequence[str], topics: str | Sequence[str], *, start: object = _KIND_EARLIEST, end: object = _KIND_LATEST, group_id: str = 'daft-bounded-kafka-reader', partitions: Sequence[int] | None = None, chunk_size: int = 1024, kafka_client_config: Mapping[str, object] | None = None, timeout_ms: int = 10000) -> DataFrame

Creates a DataFrame by reading messages from Kafka topic(s).

.. warning::

1
2
3
This API is **experimental** and may change in future releases. Currently only bounded
batch reads are supported — there is no streaming/unbounded mode and no offset commit
management.

This function reads bounded ranges of messages from one or more Kafka topics. It supports multiple ways to specify the start and end bounds: earliest/latest, timestamp, or explicit partition offsets.

Parameters:

Name Type Description Default
bootstrap_servers str | Sequence[str]

Kafka bootstrap server(s) to connect to. Can be a single server string (e.g., "localhost:9092") or a sequence of servers.

required
topics str | Sequence[str]

Kafka topic(s) to read from. Can be a single topic string or a sequence of topics.

required
start object

The start bound for reading messages. Defaults to "earliest". Supported values: - "earliest": Start from the earliest available offset for each partition. - "latest": Start from the latest offset for each partition. - int: Timestamp in milliseconds since epoch. - datetime: A timezone-aware or naive datetime (naive datetimes are assumed UTC). - str: An ISO-8601 timestamp string (e.g., "2024-01-01T00:00:00Z"). - dict: For single topic: {partition: offset}. For multiple topics: {topic: {partition: offset}}.

_KIND_EARLIEST
end object

The end bound for reading messages. Defaults to "latest". Supports the same value types as start. The end offset is exclusive.

_KIND_LATEST
group_id str

Consumer group ID used for the Kafka consumer. Defaults to "daft-bounded-kafka-reader".

'daft-bounded-kafka-reader'
partitions Sequence[int] | None

Optional sequence of partition IDs to read from. If None, reads from all partitions of the specified topic(s). Defaults to None.

None
chunk_size int

Maximum number of messages per RecordBatch. Defaults to 1024.

1024
kafka_client_config Mapping[str, object] | None

Optional additional configuration options passed directly to the underlying Kafka consumer. These are merged with the default configuration. Defaults to None.

None
timeout_ms int

Timeout in milliseconds for Kafka operations (metadata queries, message consumption, etc.). Defaults to 10_000 (10 seconds).

10000

Returns:

Name Type Description
DataFrame DataFrame

A DataFrame with the following schema: - topic (string): The topic the message was read from. - partition (int32): The partition ID within the topic. - offset (int64): The offset of the message within the partition. - timestamp_ms (int64): The timestamp of the message in milliseconds since epoch, or null if not available. - key (binary): The message key as raw bytes, or null if not present. - value (binary): The message value as raw bytes.

Examples:

Read from a single topic with default bounds (earliest to latest):

1
>>> df = daft.read_kafka("localhost:9092", "my-topic")

Read from multiple topics:

1
>>> df = daft.read_kafka("localhost:9092", ["topic-a", "topic-b"])

Read from specific partitions:

1
>>> df = daft.read_kafka("localhost:9092", "my-topic", partitions=[0, 1])

Read from a timestamp range:

1
2
3
4
>>> from datetime import datetime, timezone
>>> start_dt = datetime(2024, 1, 1, tzinfo=timezone.utc)
>>> end_dt = datetime(2024, 1, 2, tzinfo=timezone.utc)
>>> df = daft.read_kafka("localhost:9092", "my-topic", start=start_dt, end=end_dt)

Read from specific partition offsets:

1
>>> df = daft.read_kafka("localhost:9092", "my-topic", start={0: 100, 1: 200})

Read from multiple topics with per-topic offsets:

1
2
3
4
5
>>> df = daft.read_kafka(
...     "localhost:9092",
...     ["topic-a", "topic-b"],
...     start={"topic-a": {0: 100}, "topic-b": {0: 50}},
... )

Configure Kafka client options:

1
2
3
4
5
>>> df = daft.read_kafka(
...     "localhost:9092",
...     "my-topic",
...     kafka_client_config={"enable.partition.eof": True, "session.timeout.ms": 30000},
... )
Note

This function requires the confluent-kafka package. Install it with: pip install daft[kafka] or pip install confluent-kafka

Timestamp bounds use Kafka message timestamps. If your cluster uses CreateTime, producers can publish late/out-of-order timestamps; timestamp bounds are not a safe exactly-once checkpoint. Prefer offset-based checkpoints (e.g., partition offset maps or committed consumer offsets)

Source code in daft/io/_kafka.py
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
@PublicAPI
def read_kafka(
    bootstrap_servers: str | Sequence[str],
    topics: str | Sequence[str],
    *,
    start: object = _KIND_EARLIEST,
    end: object = _KIND_LATEST,
    group_id: str = "daft-bounded-kafka-reader",
    partitions: Sequence[int] | None = None,
    chunk_size: int = 1024,
    kafka_client_config: Mapping[str, object] | None = None,
    timeout_ms: int = 10_000,
) -> DataFrame:
    """Creates a DataFrame by reading messages from Kafka topic(s).

    .. warning::

        This API is **experimental** and may change in future releases. Currently only bounded
        batch reads are supported — there is no streaming/unbounded mode and no offset commit
        management.

    This function reads bounded ranges of messages from one or more Kafka topics. It supports
    multiple ways to specify the start and end bounds: earliest/latest, timestamp, or explicit
    partition offsets.

    Args:
        bootstrap_servers (str | Sequence[str]): Kafka bootstrap server(s) to connect to.
            Can be a single server string (e.g., "localhost:9092") or a sequence of servers.
        topics (str | Sequence[str]): Kafka topic(s) to read from. Can be a single topic string
            or a sequence of topics.
        start (object): The start bound for reading messages. Defaults to "earliest".
            Supported values:
            - "earliest": Start from the earliest available offset for each partition.
            - "latest": Start from the latest offset for each partition.
            - int: Timestamp in milliseconds since epoch.
            - datetime: A timezone-aware or naive datetime (naive datetimes are assumed UTC).
            - str: An ISO-8601 timestamp string (e.g., "2024-01-01T00:00:00Z").
            - dict: For single topic: ``{partition: offset}``. For multiple topics: ``{topic: {partition: offset}}``.
        end (object): The end bound for reading messages. Defaults to "latest".
            Supports the same value types as ``start``. The end offset is exclusive.
        group_id (str): Consumer group ID used for the Kafka consumer. Defaults to
            "daft-bounded-kafka-reader".
        partitions (Sequence[int] | None): Optional sequence of partition IDs to read from.
            If None, reads from all partitions of the specified topic(s). Defaults to None.
        chunk_size (int): Maximum number of messages per RecordBatch. Defaults to 1024.
        kafka_client_config (Mapping[str, object] | None): Optional additional configuration
            options passed directly to the underlying Kafka consumer. These are merged with
            the default configuration. Defaults to None.
        timeout_ms (int): Timeout in milliseconds for Kafka operations (metadata queries,
            message consumption, etc.). Defaults to 10_000 (10 seconds).

    Returns:
        DataFrame: A DataFrame with the following schema:
            - topic (string): The topic the message was read from.
            - partition (int32): The partition ID within the topic.
            - offset (int64): The offset of the message within the partition.
            - timestamp_ms (int64): The timestamp of the message in milliseconds since epoch,
              or null if not available.
            - key (binary): The message key as raw bytes, or null if not present.
            - value (binary): The message value as raw bytes.

    Examples:
        Read from a single topic with default bounds (earliest to latest):
        >>> df = daft.read_kafka("localhost:9092", "my-topic")

        Read from multiple topics:
        >>> df = daft.read_kafka("localhost:9092", ["topic-a", "topic-b"])

        Read from specific partitions:
        >>> df = daft.read_kafka("localhost:9092", "my-topic", partitions=[0, 1])

        Read from a timestamp range:
        >>> from datetime import datetime, timezone
        >>> start_dt = datetime(2024, 1, 1, tzinfo=timezone.utc)
        >>> end_dt = datetime(2024, 1, 2, tzinfo=timezone.utc)
        >>> df = daft.read_kafka("localhost:9092", "my-topic", start=start_dt, end=end_dt)

        Read from specific partition offsets:
        >>> df = daft.read_kafka("localhost:9092", "my-topic", start={0: 100, 1: 200})

        Read from multiple topics with per-topic offsets:
        >>> df = daft.read_kafka(
        ...     "localhost:9092",
        ...     ["topic-a", "topic-b"],
        ...     start={"topic-a": {0: 100}, "topic-b": {0: 50}},
        ... )

        Configure Kafka client options:
        >>> df = daft.read_kafka(
        ...     "localhost:9092",
        ...     "my-topic",
        ...     kafka_client_config={"enable.partition.eof": True, "session.timeout.ms": 30000},
        ... )

    Note:
        This function requires the ``confluent-kafka`` package. Install it with:
        ``pip install daft[kafka]`` or ``pip install confluent-kafka``

        Timestamp bounds use Kafka message timestamps. If your cluster uses CreateTime, producers can publish
        late/out-of-order timestamps; timestamp bounds are not a safe exactly-once checkpoint. Prefer offset-based
        checkpoints (e.g., partition offset maps or committed consumer offsets)
    """
    if isinstance(bootstrap_servers, str):
        bootstrap_servers_str = bootstrap_servers
    else:
        bootstrap_servers_str = ",".join(str(s) for s in bootstrap_servers)

    if isinstance(topics, str):
        topics = [topics]
    else:
        topics = list(topics)
    topics = list(dict.fromkeys(topics))

    if not topics:
        raise ValueError("[read_kafka] topics must be non-empty")

    if timeout_ms <= 0:
        raise ValueError("[read_kafka] timeout_ms must be > 0")

    if chunk_size <= 0:
        raise ValueError("[read_kafka] chunk_size must be > 0")

    start_kind, start_timestamp_ms, start_topic_partition_offsets = _normalize_bound(start, topics)
    end_kind, end_timestamp_ms, end_topic_partition_offsets = _normalize_bound(end, topics)

    return KafkaSource(
        bootstrap_servers=bootstrap_servers_str,
        group_id=group_id,
        topics=topics,
        start_kind=start_kind,
        start_timestamp_ms=start_timestamp_ms,
        start_topic_partition_offsets=start_topic_partition_offsets,
        end_kind=end_kind,
        end_timestamp_ms=end_timestamp_ms,
        end_topic_partition_offsets=end_topic_partition_offsets,
        partitions=partitions,
        kafka_client_config=kafka_client_config,
        timeout_ms=timeout_ms,
        chunk_size=chunk_size,
    ).read()

read_lance #

read_lance(uri: str | PathLike[str], io_config: Any = None, version: Any = None, asof: Any = None, block_size: Any = None, commit_lock: Any = None, index_cache_size: Any = None, default_scan_options: Any = None, metadata_cache_size_bytes: Any = None, fragment_group_size: Any = None, include_fragment_id: Any = None, checkpoint: Any = None) -> Any

Create a DataFrame from a LanceDB table.

Parameters:

Name Type Description Default
uri str | PathLike[str]

The URI of the Lance table to read from. Accepts a local path or an object-store URI like "s3://bucket/path".

required
io_config Any

A custom IOConfig to use when accessing LanceDB data. Defaults to None.

None
version

optional, int | str If specified, load a specific version of the Lance dataset. Else, loads the latest version. A version number (int) or a tag (str) can be provided.

required
asof

optional, datetime or str If specified, find the latest version created on or earlier than the given argument value. If a version is already specified, this arg is ignored.

required
block_size

optional, int Block size in bytes. Provide a hint for the size of the minimal I/O request.

required
commit_lock

optional, lance.commit.CommitLock A custom commit lock. Only needed if your object store does not support atomic commits. See the user guide for more details.

required
index_cache_size

optional, int Index cache size. Index cache is a LRU cache with TTL. This number specifies the number of index pages, for example, IVF partitions, to be cached in the host memory. Default value is 256.

Roughly, for an IVF_PQ partition with n rows, the size of each index page equals the combination of the pq code (np.array([n,pq], dtype=uint8)) Approximately, n = Total Rows / number of IVF partitions. pq = number of PQ sub-vectors.

required
default_scan_options

optional, dict Default scan options that are used when scanning the dataset. This accepts the same arguments described in 🇵🇾meth:lance.LanceDataset.scanner. The arguments will be applied to any scan operation.

This can be useful to supply defaults for common parameters such as batch_size.

It can also be used to create a view of the dataset that includes meta fields such as _rowid or _rowaddr. If default_scan_options is provided then the schema returned by 🇵🇾meth:lance.LanceDataset.schema will include these fields if the appropriate scan options are set. like this: default_scan_options = {"with_row_address": True, "with_row_id" : True, "batch_size": 1024} more see: https://lance-format.github.io/lance-python-doc/dataset.html

required
metadata_cache_size_bytes

optional, int Size of the metadata cache in bytes. This cache is used to store metadata information about the dataset, such as schema and statistics. If not specified, a default size will be used.

required
fragment_group_size

optional, int Number of fragments to group together in a single scan task. If None or <= 1, each fragment will be processed individually (default behavior).

required
include_fragment_id

Optional, bool Whether to display fragment_id. if you have the behavior of 'merge_columns_df' or 'write_lance(mode = 'merge')', the include_fragment_id must be set to True

required
checkpoint Any

Optional :class:daft.CheckpointConfig for progress tracking across runs. Bundles the checkpoint store, the source key column (on=), and optional anti-join tuning. Rows whose key already exists in the store are skipped on re-run. Requires the Ray runner.

None

Returns:

Name Type Description
DataFrame Any

a DataFrame with the schema converted from the specified LanceDB table

Any

This function requires the use of LanceDB, which is the Python library for the LanceDB project.

Any

To ensure that this is installed with Daft, you may install: pip install daft[lance]

Examples:

Read a local LanceDB table:

1
2
>>> df = daft.read_lance("/path/to/lance/data/")
>>> df.show()

Read a LanceDB table and specify a version:

1
2
>>> df = daft.read_lance("/path/to/lance/data/", version=1)
>>> df.show()

Read a LanceDB table with fragment grouping:

1
2
>>> df = daft.read_lance("/path/to/lance/data/", fragment_group_size=5)
>>> df.show()

Read a LanceDB table from a public S3 bucket:

1
2
3
4
>>> from daft.io import S3Config, IOConfig
>>> io_config = IOConfig(s3=S3Config(region="us-west-2", anonymous=True))
>>> df = daft.read_lance("s3://daft-oss-public-data/lance/words-test-dataset", io_config=io_config)
>>> df.show()
Source code in daft/io/lance/_lance.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
@PublicAPI
def read_lance(
    uri: str | os.PathLike[str],
    io_config: Any = None,
    version: Any = None,
    asof: Any = None,
    block_size: Any = None,
    commit_lock: Any = None,
    index_cache_size: Any = None,
    default_scan_options: Any = None,
    metadata_cache_size_bytes: Any = None,
    fragment_group_size: Any = None,
    include_fragment_id: Any = None,
    checkpoint: Any = None,
) -> Any:
    """Create a DataFrame from a LanceDB table.

    Args:
        uri: The URI of the Lance table to read from. Accepts a local path or an
            object-store URI like "s3://bucket/path".
        io_config: A custom IOConfig to use when accessing LanceDB data. Defaults to None.
        version : optional, int | str
            If specified, load a specific version of the Lance dataset. Else, loads the
            latest version. A version number (`int`) or a tag (`str`) can be provided.
        asof : optional, datetime or str
            If specified, find the latest version created on or earlier than the given
            argument value. If a version is already specified, this arg is ignored.
        block_size : optional, int
            Block size in bytes. Provide a hint for the size of the minimal I/O request.
        commit_lock : optional, lance.commit.CommitLock
            A custom commit lock.  Only needed if your object store does not support
            atomic commits.  See the user guide for more details.
        index_cache_size : optional, int
            Index cache size. Index cache is a LRU cache with TTL. This number specifies the
            number of index pages, for example, IVF partitions, to be cached in
            the host memory. Default value is ``256``.

            Roughly, for an ``IVF_PQ`` partition with ``n`` rows, the size of each index
            page equals the combination of the pq code (``np.array([n,pq], dtype=uint8))``
            Approximately, ``n = Total Rows / number of IVF partitions``.
            ``pq = number of PQ sub-vectors``.
        default_scan_options : optional, dict
            Default scan options that are used when scanning the dataset.  This accepts
            the same arguments described in :py:meth:`lance.LanceDataset.scanner`.  The
            arguments will be applied to any scan operation.

            This can be useful to supply defaults for common parameters such as
            ``batch_size``.

            It can also be used to create a view of the dataset that includes meta
            fields such as ``_rowid`` or ``_rowaddr``.  If ``default_scan_options`` is
            provided then the schema returned by :py:meth:`lance.LanceDataset.schema` will
            include these fields if the appropriate scan options are set.
            like this:
            default_scan_options = {"with_row_address": True, "with_row_id" : True,  "batch_size": 1024}
            more see: https://lance-format.github.io/lance-python-doc/dataset.html
        metadata_cache_size_bytes : optional, int
            Size of the metadata cache in bytes. This cache is used to store metadata
            information about the dataset, such as schema and statistics. If not specified,
            a default size will be used.
        fragment_group_size : optional, int
            Number of fragments to group together in a single scan task. If None or <= 1,
            each fragment will be processed individually (default behavior).
        include_fragment_id : Optional, bool
            Whether to display fragment_id.
            if you have the behavior of 'merge_columns_df' or 'write_lance(mode = 'merge')', the `include_fragment_id` must be set to True
        checkpoint: Optional :class:`daft.CheckpointConfig` for progress tracking across runs. Bundles the
            checkpoint store, the source key column (``on=``), and optional anti-join tuning. Rows whose key
            already exists in the store are skipped on re-run. Requires the Ray runner.

    Returns:
        DataFrame: a DataFrame with the schema converted from the specified LanceDB table

        This function requires the use of [LanceDB](https://lancedb.github.io/lancedb/), which is the Python library for the LanceDB project.
        To ensure that this is installed with Daft, you may install: `pip install daft[lance]`

    Examples:
        Read a local LanceDB table:
        >>> df = daft.read_lance("/path/to/lance/data/")
        >>> df.show()

        Read a LanceDB table and specify a version:
        >>> df = daft.read_lance("/path/to/lance/data/", version=1)
        >>> df.show()

        Read a LanceDB table with fragment grouping:
        >>> df = daft.read_lance("/path/to/lance/data/", fragment_group_size=5)
        >>> df.show()

        Read a LanceDB table from a public S3 bucket:
        >>> from daft.io import S3Config, IOConfig
        >>> io_config = IOConfig(s3=S3Config(region="us-west-2", anonymous=True))
        >>> df = daft.read_lance("s3://daft-oss-public-data/lance/words-test-dataset", io_config=io_config)
        >>> df.show()
    """
    return _daft_lance.read_lance(
        uri,
        io_config=io_config,
        version=version,
        asof=asof,
        block_size=block_size,
        commit_lock=commit_lock,
        index_cache_size=index_cache_size,
        default_scan_options=default_scan_options,
        metadata_cache_size_bytes=metadata_cache_size_bytes,
        fragment_group_size=fragment_group_size,
        include_fragment_id=include_fragment_id,
        checkpoint=checkpoint,
    )

read_parquet #

read_parquet(path: str | list[str], row_groups: list[list[int]] | None = None, infer_schema: bool = True, schema: dict[str, DataType] | None = None, io_config: IOConfig | None = None, file_path_column: str | None = None, hive_partitioning: bool = False, coerce_int96_timestamp_unit: str | TimeUnit | None = None, _multithreaded_io: bool | None = None, _chunk_size: int | None = None, checkpoint: CheckpointConfig | None = None) -> DataFrame

Creates a DataFrame from Parquet file(s).

Parameters:

Name Type Description Default
path str

Path to Parquet file (allows for wildcards; supports remote URLs to object stores such as s3:// or gs://)

required
row_groups List[int] or List[List[int]]

List of row groups to read corresponding to each file.

None
infer_schema bool

Whether to infer the schema of the Parquet, defaults to True.

True
schema dict[str, DataType]

A schema that is used as the definitive schema for the Parquet file if infer_schema is False, otherwise it is used as a schema hint that is applied after the schema is inferred (overriding the types of inferred columns, and appending any new columns not found during inference).

None
io_config IOConfig

Config to be used with the native downloader

None
file_path_column str | None

Include the source path(s) as a column with this name. Defaults to None.

None
hive_partitioning bool

Whether to infer hive_style partitions from file paths and include them as columns in the Dataframe. Defaults to False.

False
coerce_int96_timestamp_unit str | TimeUnit | None

TimeUnit to coerce Int96 TimeStamps to. e.g.: [ns, us, ms], Defaults to None.

None
_multithreaded_io bool | None

Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing the amount of system resources (number of connections and thread contention) when running in the Ray runner. Defaults to None, which will let Daft decide based on the runner it is currently using.

None
checkpoint CheckpointConfig | None

Optional :class:daft.CheckpointConfig for progress tracking across runs. Bundles the checkpoint store, the source key column (on=), and optional anti-join tuning. Rows whose key already exists in the store are skipped on re-run. Requires the Ray runner.

None

Returns:

Name Type Description
DataFrame DataFrame

parsed DataFrame

Examples:

Read a Parquet file from a local path:

1
2
3
>>> df = daft.read_parquet("/path/to/file.parquet")
>>> df = daft.read_parquet("/path/to/directory")
>>> df = daft.read_parquet("/path/to/files-*.parquet")

Read a Parquet file from a public S3 bucket:

1
2
3
4
>>> from daft.io import S3Config, IOConfig
>>> io_config = IOConfig(s3=S3Config(region="us-west-2", anonymous=True))
>>> df = daft.read_parquet("s3://path/to/files-*.parquet", io_config=io_config)
>>> df.show()
Source code in daft/io/_parquet.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@PublicAPI
def read_parquet(
    path: str | list[str],
    row_groups: list[list[int]] | None = None,
    infer_schema: bool = True,
    schema: dict[str, DataType] | None = None,
    io_config: IOConfig | None = None,
    file_path_column: str | None = None,
    hive_partitioning: bool = False,
    coerce_int96_timestamp_unit: str | TimeUnit | None = None,
    _multithreaded_io: bool | None = None,
    _chunk_size: int | None = None,  # A hidden parameter for testing purposes.
    checkpoint: "CheckpointConfig | None" = None,
) -> DataFrame:
    """Creates a DataFrame from Parquet file(s).

    Args:
        path (str): Path to Parquet file (allows for wildcards; supports remote URLs to object stores such as ``s3://`` or ``gs://``)
        row_groups (List[int] or List[List[int]]): List of row groups to read corresponding to each file.
        infer_schema (bool): Whether to infer the schema of the Parquet, defaults to True.
        schema (dict[str, DataType]): A schema that is used as the definitive schema for the Parquet file if infer_schema is False, otherwise it is used as a schema hint that is applied after the schema is inferred (overriding the types of inferred columns, and appending any new columns not found during inference).
        io_config (IOConfig): Config to be used with the native downloader
        file_path_column: Include the source path(s) as a column with this name. Defaults to None.
        hive_partitioning: Whether to infer hive_style partitions from file paths and include them as columns in the Dataframe. Defaults to False.
        coerce_int96_timestamp_unit: TimeUnit to coerce Int96 TimeStamps to. e.g.: [ns, us, ms], Defaults to None.
        _multithreaded_io: Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing
            the amount of system resources (number of connections and thread contention) when running in the Ray runner.
            Defaults to None, which will let Daft decide based on the runner it is currently using.
        checkpoint: Optional :class:`daft.CheckpointConfig` for progress tracking across runs. Bundles the
            checkpoint store, the source key column (``on=``), and optional anti-join tuning. Rows whose key
            already exists in the store are skipped on re-run. Requires the Ray runner.

    Returns:
        DataFrame: parsed DataFrame

    Examples:
        Read a Parquet file from a local path:
        >>> df = daft.read_parquet("/path/to/file.parquet")
        >>> df = daft.read_parquet("/path/to/directory")
        >>> df = daft.read_parquet("/path/to/files-*.parquet")

        Read a Parquet file from a public S3 bucket:
        >>> from daft.io import S3Config, IOConfig
        >>> io_config = IOConfig(s3=S3Config(region="us-west-2", anonymous=True))
        >>> df = daft.read_parquet("s3://path/to/files-*.parquet", io_config=io_config)
        >>> df.show()
    """
    io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config

    if isinstance(path, list) and len(path) == 0:
        raise ValueError("Cannot read DataFrame from empty list of Parquet filepaths")

    # If running on Ray, we want to limit the amount of concurrency and requests being made.
    # This is because each Ray worker process receives its own pool of thread workers and connections
    multithreaded_io = (
        (runners.get_or_create_runner().name != "ray") if _multithreaded_io is None else _multithreaded_io
    )

    if isinstance(coerce_int96_timestamp_unit, str):
        coerce_int96_timestamp_unit = TimeUnit.from_str(coerce_int96_timestamp_unit)

    pytimeunit = coerce_int96_timestamp_unit._timeunit if coerce_int96_timestamp_unit is not None else None

    if isinstance(path, list) and row_groups is not None and len(path) != len(row_groups):
        raise ValueError("row_groups must be the same length as the list of paths provided.")
    if isinstance(row_groups, list) and not isinstance(path, list):
        raise ValueError("row_groups are only supported when reading multiple non-globbed/wildcarded files")

    file_format_config = FileFormatConfig.from_parquet_config(
        ParquetSourceConfig(coerce_int96_timestamp_unit=pytimeunit, row_groups=row_groups, chunk_size=_chunk_size)
    )
    storage_config = StorageConfig(multithreaded_io, io_config)

    builder = get_tabular_files_scan(
        path=path,
        infer_schema=infer_schema,
        schema=schema,
        file_format_config=file_format_config,
        storage_config=storage_config,
        file_path_column=file_path_column,
        hive_partitioning=hive_partitioning,
    )

    builder = attach_checkpoint(builder, checkpoint)

    return DataFrame(builder)

read_sql #

read_sql(sql: str, conn: Callable[[], Connection] | str, partition_col: str | None = None, num_partitions: int | None = None, partition_bound_strategy: str = 'min-max', disable_pushdowns_to_sql: bool = False, infer_schema: bool = True, infer_schema_length: int = 10, schema: dict[str, DataType] | None = None) -> DataFrame

Create a DataFrame from the results of a SQL query.

Parameters:

Name Type Description Default
sql str

SQL query to execute

required
conn Union[Callable[[], Connection], str]

SQLAlchemy connection factory or database URL

required
partition_col Optional[str]

Column to partition the data by, defaults to None

None
num_partitions Optional[int]

Number of partitions to read the data into, defaults to None, which will lets Daft determine the number of partitions. If specified, partition_col must also be specified.

None
partition_bound_strategy str

Strategy to determine partition bounds, either "min-max" or "percentile", defaults to "min-max"

'min-max'
disable_pushdowns_to_sql bool

Whether to disable pushdowns to the SQL query, defaults to False

False
infer_schema bool

Whether to turn on schema inference, defaults to True. If set to False, the schema parameter must be provided.

True
infer_schema_length int

The number of rows to scan when inferring the schema, defaults to 10. If infer_schema is False, this parameter is ignored. Note that if Daft is able to use ConnectorX to infer the schema, this parameter is ignored as ConnectorX is an Arrow backed driver.

10
schema Optional[Dict[str, DataType]]

A mapping of column names to datatypes. If infer_schema is False, this schema is used as the definitive schema for the data, otherwise it is used as a schema hint that is applied after the schema is inferred (overriding the types of inferred columns, and appending any new columns not found during inference). This can be useful if the types can be more precisely determined than what the inference can provide (e.g., if a column can be declared as a fixed-sized list rather than a list).

None

Returns:

Name Type Description
DataFrame DataFrame

Dataframe containing the results of the query

Note
  1. Supported dialects: Daft uses SQLGlot to build and translate SQL queries between dialects. For a list of supported dialects, see SQLGlot's dialect documentation.

  2. Partitioning: When partition_col is specified, the function partitions the query based on that column. You can define num_partitions or leave it to Daft to decide. Daft uses the partition_bound_strategy parameter to determine the partitioning strategy:

    • min_max: Daft calculates the minimum and maximum values of the specified column, then partitions the query using equal ranges between the minimum and maximum values.
    • percentile: Daft calculates the specified column's percentiles via a PERCENTILE_DISC function to determine partitions (e.g., for num_partitions=3, it uses the 33rd and 66th percentiles).
  3. Execution: Daft executes SQL queries using using ConnectorX or SQLAlchemy, preferring ConnectorX unless a SQLAlchemy connection factory is specified or the database dialect is unsupported by ConnectorX.

  4. Pushdowns: Daft pushes down operations such as filtering, projections, and limits into the SQL query when possible. You can disable pushdowns by setting disable_pushdowns_to_sql=True, which will execute the SQL query as is.

Examples:

Read data from a SQL query and a database URL:

1
>>> df = daft.read_sql("SELECT * FROM my_table", "sqlite:///my_database.db")

Read data from a SQL query and a SQLAlchemy connection factory:

1
2
3
>>> def create_conn():
...     return sqlalchemy.create_engine("sqlite:///my_database.db").connect()
>>> df = daft.read_sql("SELECT * FROM my_table", create_conn)

Read data from a SQL query and partition the data by a column:

1
>>> df = daft.read_sql("SELECT * FROM my_table", "sqlite:///my_database.db", partition_col="id")

Read data from a SQL query and partition the data into 3 partitions:

1
2
3
>>> df = daft.read_sql(
...     "SELECT * FROM my_table", "sqlite:///my_database.db", partition_col="id", num_partitions=3
... )
Source code in daft/io/_sql.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
@PublicAPI
def read_sql(
    sql: str,
    conn: Callable[[], "Connection"] | str,
    partition_col: str | None = None,
    num_partitions: int | None = None,
    partition_bound_strategy: str = "min-max",
    disable_pushdowns_to_sql: bool = False,
    infer_schema: bool = True,
    infer_schema_length: int = 10,
    schema: dict[str, DataType] | None = None,
) -> DataFrame:
    """Create a DataFrame from the results of a SQL query.

    Args:
        sql (str): SQL query to execute
        conn (Union[Callable[[], Connection], str]): SQLAlchemy connection factory or database URL
        partition_col (Optional[str]): Column to partition the data by, defaults to None
        num_partitions (Optional[int]): Number of partitions to read the data into,
            defaults to None, which will lets Daft determine the number of partitions.
            If specified, `partition_col` must also be specified.
        partition_bound_strategy (str): Strategy to determine partition bounds, either "min-max" or "percentile", defaults to "min-max"
        disable_pushdowns_to_sql (bool): Whether to disable pushdowns to the SQL query, defaults to False
        infer_schema (bool): Whether to turn on schema inference, defaults to True. If set to False, the schema parameter must be provided.
        infer_schema_length (int): The number of rows to scan when inferring the schema, defaults to 10. If infer_schema is False, this parameter is ignored. Note that if Daft is able to use ConnectorX to infer the schema, this parameter is ignored as ConnectorX is an Arrow backed driver.
        schema (Optional[Dict[str, DataType]]): A mapping of column names to datatypes. If infer_schema is False, this schema is used as the definitive schema for the data, otherwise it is used as a schema hint that is applied after the schema is inferred (overriding the types of inferred columns, and appending any new columns not found during inference).
            This can be useful if the types can be more precisely determined than what the inference can provide (e.g., if a column can be declared as a fixed-sized list rather than a list).

    Returns:
        DataFrame: Dataframe containing the results of the query

    Note:
        1. **Supported dialects**:
            Daft uses [SQLGlot](https://sqlglot.com/sqlglot.html) to build and translate SQL queries between dialects. For a list of supported dialects, see [SQLGlot's dialect documentation](https://sqlglot.com/sqlglot/dialects.html).

        2. **Partitioning**:
            When `partition_col` is specified, the function partitions the query based on that column.
            You can define `num_partitions` or leave it to Daft to decide.
            Daft uses the `partition_bound_strategy` parameter to determine the partitioning strategy:
            - `min_max`: Daft calculates the minimum and maximum values of the specified column, then partitions the query using equal ranges between the minimum and maximum values.
            - `percentile`: Daft calculates the specified column's percentiles via a `PERCENTILE_DISC` function to determine partitions (e.g., for `num_partitions=3`, it uses the 33rd and 66th percentiles).

        3. **Execution**:
            Daft executes SQL queries using using [ConnectorX](https://sfu-db.github.io/connector-x/intro.html) or [SQLAlchemy](https://docs.sqlalchemy.org/en/20/orm/quickstart.html#create-an-engine),
            preferring ConnectorX unless a SQLAlchemy connection factory is specified or the database dialect is unsupported by ConnectorX.

        4. **Pushdowns**:
            Daft pushes down operations such as filtering, projections, and limits into the SQL query when possible.
            You can disable pushdowns by setting `disable_pushdowns_to_sql=True`, which will execute the SQL query as is.

    Examples:
        Read data from a SQL query and a database URL:

        >>> df = daft.read_sql("SELECT * FROM my_table", "sqlite:///my_database.db")

        Read data from a SQL query and a SQLAlchemy connection factory:

        >>> def create_conn():
        ...     return sqlalchemy.create_engine("sqlite:///my_database.db").connect()
        >>> df = daft.read_sql("SELECT * FROM my_table", create_conn)

        Read data from a SQL query and partition the data by a column:

        >>> df = daft.read_sql("SELECT * FROM my_table", "sqlite:///my_database.db", partition_col="id")

        Read data from a SQL query and partition the data into 3 partitions:

        >>> df = daft.read_sql(
        ...     "SELECT * FROM my_table", "sqlite:///my_database.db", partition_col="id", num_partitions=3
        ... )
    """
    if num_partitions is not None and partition_col is None:
        raise ValueError("Failed to execute sql: partition_col must be specified when num_partitions is specified")

    if not infer_schema and schema is None:
        raise ValueError(
            "Cannot read DataFrame with infer_schema=False and schema=None, please provide a schema or set infer_schema=True"
        )

    io_config = context.get_context().daft_planning_config.default_io_config
    storage_config = StorageConfig(True, io_config)

    sql_conn = SQLConnection.from_url(conn) if isinstance(conn, str) else SQLConnection.from_connection_factory(conn)
    sql_operator = SQLScanOperator(
        sql,
        sql_conn,
        storage_config,
        disable_pushdowns_to_sql,
        infer_schema,
        infer_schema_length,
        schema,
        partition_col=partition_col,
        num_partitions=num_partitions,
        partition_bound_strategy=PartitionBoundStrategy.from_str(partition_bound_strategy),
    )
    handle = ScanOperatorHandle.from_python_scan_operator(sql_operator)
    builder = LogicalPlanBuilder.from_tabular_scan(scan_operator=handle)

    return DataFrame(builder)

read_video_frames #

read_video_frames(path: str | list[str], image_height: int, image_width: int, is_key_frame: bool | None = None, *, sample_interval_seconds: float | None = None, io_config: IOConfig | None = None) -> DataFrame

Creates a DataFrame by reading the frames of one or more video files.

This produces a DataFrame with the following fields
  • path (string): path to the video file that produced this frame.
  • frame_index (int): frame index in the video.
  • frame_time (float): frame time in fractional seconds as a floating point.
  • frame_time_base (str): fractional unit of seconds in which timestamps are expressed.
  • frame_pts (int): frame presentation timestamp in time_base units.
  • frame_dts (int): frame decoding timestamp in time_base units.
  • frame_duration (int): frame duration in time_base units.
  • is_key_frame (bool): true iff this is a key frame.
Warning

This requires PyAV which can be installed with pip install av.

Note

This function will stream the frames from all videos as a DataFrame of images. If you wish to load an entire video into a single row, this can be done with DataFrame.from_glob_path and url_download.

Parameters:

Name Type Description Default
path str | list[str]

Path(s) to the video file(s) which allows wildcards.

required
image_height int

Height to which each frame will be resized.

required
image_width int

Width to which each frame will be resized.

required
is_key_frame bool | None

If True, only include key frames; if False, only non-key frames; if None, include all frames.

None
sample_interval_seconds float | None

If provided and > 0, sample frames at approximately this time interval in seconds based on frame_time. The algorithm selects the first frame whose timestamp is >= target time (0, interval, 2*interval, ...). This is an approximate sampling strategy; actual sampling times depend on the video's frame timestamps. Frames without valid timestamps (frame_time=None) are skipped.

None
io_config IOConfig | None

Optional IOConfig.

None

Returns:

Name Type Description
DataFrame DataFrame

dataframe of images.

Examples:

1
2
3
4
>>> df = daft.read_video_frames("/path/to/file.mp4", image_height=480, image_width=640)
>>> df = daft.read_video_frames("/path/to/directory", image_height=480, image_width=640)
>>> df = daft.read_video_frames("/path/to/files-*.mp4", image_height=480, image_width=640)
>>> df = daft.read_video_frames("s3://path/to/files-*.mp4", image_height=480, image_width=640)

Sample approximately one frame per second:

1
2
3
>>> df = daft.read_video_frames(
...     "/path/to/file.mp4", image_height=480, image_width=640, sample_interval_seconds=1.0
... )

Sample approximately one frame every 5 seconds:

1
2
3
>>> df = daft.read_video_frames(
...     "/path/to/file.mp4", image_height=480, image_width=640, sample_interval_seconds=5.0
... )

Combine with key frame filtering:

1
2
3
>>> df = daft.read_video_frames(
...     "/path/to/file.mp4", image_height=480, image_width=640, is_key_frame=True, sample_interval_seconds=1.0
... )
Source code in daft/io/av/__init__.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def read_video_frames(
    path: str | list[str],
    image_height: int,
    image_width: int,
    is_key_frame: bool | None = None,
    *,
    sample_interval_seconds: float | None = None,
    io_config: IOConfig | None = None,
) -> DataFrame:
    """Creates a DataFrame by reading the frames of one or more video files.

    This produces a DataFrame with the following fields:
        * path (string): path to the video file that produced this frame.
        * frame_index (int): frame index in the video.
        * frame_time (float): frame time in fractional seconds as a floating point.
        * frame_time_base (str): fractional unit of seconds in which timestamps are expressed.
        * frame_pts (int): frame presentation timestamp in time_base units.
        * frame_dts (int): frame decoding timestamp in time_base units.
        * frame_duration (int): frame duration in time_base units.
        * is_key_frame (bool): true iff this is a key frame.

    Warning:
        This requires PyAV which can be installed with `pip install av`.

    Note:
        This function will stream the frames from all videos as a DataFrame of images.
        If you wish to load an entire video into a single row, this can be done with
        DataFrame.from_glob_path and url_download.

    Args:
        path (str|list[str]): Path(s) to the video file(s) which allows wildcards.
        image_height (int): Height to which each frame will be resized.
        image_width (int): Width to which each frame will be resized.
        is_key_frame (bool|None): If True, only include key frames; if False, only non-key frames; if None, include all frames.
        sample_interval_seconds (float|None): If provided and > 0, sample frames at approximately this time interval in seconds based on ``frame_time``.
            The algorithm selects the first frame whose timestamp is >= target time (0, interval, 2*interval, ...).
            This is an approximate sampling strategy; actual sampling times depend on the video's frame timestamps.
            Frames without valid timestamps (frame_time=None) are skipped.
        io_config (IOConfig|None): Optional IOConfig.

    Returns:
        DataFrame: dataframe of images.

    Examples:
        >>> df = daft.read_video_frames("/path/to/file.mp4", image_height=480, image_width=640)
        >>> df = daft.read_video_frames("/path/to/directory", image_height=480, image_width=640)
        >>> df = daft.read_video_frames("/path/to/files-*.mp4", image_height=480, image_width=640)
        >>> df = daft.read_video_frames("s3://path/to/files-*.mp4", image_height=480, image_width=640)

        Sample approximately one frame per second:
        >>> df = daft.read_video_frames(
        ...     "/path/to/file.mp4", image_height=480, image_width=640, sample_interval_seconds=1.0
        ... )

        Sample approximately one frame every 5 seconds:
        >>> df = daft.read_video_frames(
        ...     "/path/to/file.mp4", image_height=480, image_width=640, sample_interval_seconds=5.0
        ... )

        Combine with key frame filtering:
        >>> df = daft.read_video_frames(
        ...     "/path/to/file.mp4", image_height=480, image_width=640, is_key_frame=True, sample_interval_seconds=1.0
        ... )
    """
    try:
        from daft.io.av._read_video_frames import _VideoFramesSource
    except ImportError as e:
        raise ImportError("read_video_frames requires PyAV. Please install it with `pip install av`.") from e

    io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config
    return _VideoFramesSource(
        paths=[path] if isinstance(path, str) else path,
        image_height=image_height,
        image_width=image_width,
        is_key_frame=is_key_frame,
        io_config=io_config,
        sample_interval_seconds=sample_interval_seconds,
    ).read()

read_warc #

read_warc(path: str | list[str], io_config: IOConfig | None = None, file_path_column: str | None = None, _multithreaded_io: bool | None = None, checkpoint: CheckpointConfig | None = None) -> DataFrame

Creates a DataFrame from WARC or gzipped WARC file(s). This is an experimental feature and the API may change in the future.

Parameters:

Name Type Description Default
path Union[str, List[str]]

Path to WARC file (allows for wildcards; supports remote URLs to object stores such as s3:// or gs://)

required
io_config Optional[IOConfig]

Config to be used with the native downloader

None
file_path_column Optional[str]

Include the source path(s) as a column with this name. Defaults to None.

None
_multithreaded_io Optional[bool]

Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing the amount of system resources (number of connections and thread contention) when running in the Ray runner. Defaults to None, which will let Daft decide based on the runner it is currently using.

None
checkpoint CheckpointConfig | None

Optional :class:daft.CheckpointConfig for progress tracking across runs. Bundles the checkpoint store, the source key column (on=), and optional anti-join tuning. Rows whose key already exists in the store are skipped on re-run. Requires the Ray runner.

None

Returns:

Name Type Description
DataFrame DataFrame

parsed DataFrame with mandatory metadata columns ("WARC-Record-ID", "WARC-Type", "WARC-Date", "Content-Length"), one optional metadata column ("WARC-Identified-Payload-Type"), one column "warc_content" with the raw byte content of the WARC record, and one column "warc_headers" with the remaining headers of the WARC record stored as a JSON string.

Examples:

Read a WARC file from a local path:

1
2
3
>>> df = daft.read_warc("/path/to/file.warc")
>>> df = daft.read_warc("/path/to/directory")
>>> df = daft.read_warc("/path/to/files-*.warc")

Read a WARC file from a public S3 bucket:

1
2
3
4
>>> from daft.io import S3Config, IOConfig
>>> io_config = IOConfig(s3=S3Config(region="us-west-2", anonymous=True))
>>> df = daft.read_warc("s3://path/to/files-*.warc", io_config=io_config)
>>> df.show()
Source code in daft/io/_warc.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@PublicAPI
def read_warc(
    path: str | list[str],
    io_config: IOConfig | None = None,
    file_path_column: str | None = None,
    _multithreaded_io: bool | None = None,
    checkpoint: "CheckpointConfig | None" = None,
) -> DataFrame:
    """Creates a DataFrame from WARC or gzipped WARC file(s). This is an experimental feature and the API may change in the future.

    Args:
        path (Union[str, List[str]]): Path to WARC file (allows for wildcards; supports remote URLs to object stores such as ``s3://`` or ``gs://``)
        io_config (Optional[IOConfig]): Config to be used with the native downloader
        file_path_column (Optional[str]): Include the source path(s) as a column with this name. Defaults to None.
        _multithreaded_io (Optional[bool]): Whether to use multithreading for IO threads. Setting this to False can be helpful in reducing
            the amount of system resources (number of connections and thread contention) when running in the Ray runner.
            Defaults to None, which will let Daft decide based on the runner it is currently using.
        checkpoint: Optional :class:`daft.CheckpointConfig` for progress tracking across runs. Bundles the
            checkpoint store, the source key column (``on=``), and optional anti-join tuning. Rows whose key
            already exists in the store are skipped on re-run. Requires the Ray runner.

    Returns:
        DataFrame: parsed DataFrame with mandatory metadata columns ("WARC-Record-ID", "WARC-Type", "WARC-Date", "Content-Length"), one optional
            metadata column ("WARC-Identified-Payload-Type"), one column "warc_content" with the raw byte content of the WARC record,
            and one column "warc_headers" with the remaining headers of the WARC record stored as a JSON string.

    Examples:
        Read a WARC file from a local path:
        >>> df = daft.read_warc("/path/to/file.warc")
        >>> df = daft.read_warc("/path/to/directory")
        >>> df = daft.read_warc("/path/to/files-*.warc")

        Read a WARC file from a public S3 bucket:
        >>> from daft.io import S3Config, IOConfig
        >>> io_config = IOConfig(s3=S3Config(region="us-west-2", anonymous=True))
        >>> df = daft.read_warc("s3://path/to/files-*.warc", io_config=io_config)
        >>> df.show()
    """
    io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config

    if isinstance(path, list) and len(path) == 0:
        raise ValueError("Cannot read DataFrame from empty list of Warc filepaths")

    # If running on Ray, we want to limit the amount of concurrency and requests being made.
    # This is because each Ray worker process receives its own pool of thread workers and connections.
    multithreaded_io = (
        (runners.get_or_create_runner().name != "ray") if _multithreaded_io is None else _multithreaded_io
    )
    storage_config = StorageConfig(multithreaded_io, io_config)

    schema = {
        "WARC-Record-ID": DataType.uuid(),
        "WARC-Target-URI": DataType.string(),
        "WARC-Type": DataType.string(),
        "WARC-Date": DataType.timestamp(TimeUnit.ns(), timezone="Etc/UTC"),
        "Content-Length": DataType.int64(),
        "WARC-Identified-Payload-Type": DataType.string(),
        "warc_content": DataType.binary(),
        "warc_headers": DataType.string(),
    }

    warc_config = WarcSourceConfig()
    file_format_config = FileFormatConfig.from_warc_config(warc_config)

    builder = get_tabular_files_scan(
        path=path,
        infer_schema=False,
        schema=schema,
        file_format_config=file_format_config,
        storage_config=storage_config,
        file_path_column=file_path_column,
        hive_partitioning=False,
    )
    builder = attach_checkpoint(builder, checkpoint)
    return DataFrame(builder)

read_huggingface #

read_huggingface(repo: str, io_config: IOConfig | None = None) -> DataFrame

Create a DataFrame from a Hugging Face dataset.

Currently supports all public datasets and all private Parquet datasets. See the Hugging Face docs for more details.

Parameters:

Name Type Description Default
repo str

repository to read in the form username/dataset_name

required
io_config IOConfig

Config to use when reading data

None
Source code in daft/io/huggingface/__init__.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@PublicAPI
def read_huggingface(repo: str, io_config: IOConfig | None = None) -> DataFrame:
    """Create a DataFrame from a Hugging Face dataset.

    Currently supports all public datasets and all private Parquet datasets. See [the Hugging Face docs](https://huggingface.co/docs/dataset-viewer/en/parquet) for more details.

    Args:
        repo (str): repository to read in the form `username/dataset_name`
        io_config (IOConfig): Config to use when reading data
    """
    try:
        # Try the fast path: read parquet files directly
        return read_parquet(f"hf://datasets/{repo}", io_config=io_config)
    except FileNotFoundError as e:
        # No parquet files found (glob returned no matches)
        # Fall back to using the datasets library
        return _fallback_to_datasets_library(repo, e)
    except DaftCoreException as e:
        # Check if this is a 400 error (parquet files not yet available)
        if "Status(400" in str(e):
            # Fall back to using the datasets library
            return _fallback_to_datasets_library(repo, e)
        else:
            # Re-raise other errors
            raise

sql #

sql(sql: str, register_globals: bool = True, **bindings: DataFrame) -> DataFrame

Run a SQL query, returning the results as a DataFrame.

Parameters:

Name Type Description Default
sql str

SQL query to execute

required
register_globals bool

Whether to incorporate global variables into the supplied catalog, in which case a copy of the catalog will be made and the original not modified. Defaults to True.

True
**bindings DataFrame

(DataFrame): Additional DataFrame bindings (CTEs) to use for this query.

{}

Returns:

Name Type Description
DataFrame DataFrame

Dataframe containing the results of the query

Warning

This features is early in development and will likely experience API changes.

Examples:

A simple example joining 2 dataframes together using a SQL statement, relying on Daft to detect the names of SQL tables using their corresponding Python variable names.

1
2
3
4
5
6
7
8
>>> import daft
>>>
>>> df1 = daft.from_pydict({"a": [1, 2, 3], "b": ["foo", "bar", "baz"]})
>>> df2 = daft.from_pydict({"a": [1, 2, 3], "c": ["daft", None, None]})
>>>
>>> # Daft automatically detects `df1` and `df2` from your Python global namespace
>>> result_df = daft.sql("SELECT * FROM df1 JOIN df2 ON df1.a = df2.a")
>>> result_df.show()
╭───────┬────────┬────────╮
│ a     ┆ b      ┆ c      │
│ ---   ┆ ---    ┆ ---    │
│ Int64 ┆ String ┆ String │
╞═══════╪════════╪════════╡
│ 1     ┆ foo    ┆ daft   │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 2     ┆ bar    ┆ None   │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 3     ┆ baz    ┆ None   │
╰───────┴────────┴────────╯
(Showing first 3 of 3 rows)

A more complex example using CTE bindings to create a named subquery (DataFrame) called "my_df", which can then be referenced from inside your SQL statement.

1
2
3
4
5
6
7
8
>>> import daft
>>>
>>> df = daft.from_pydict({"a": [1, 2, 3], "b": ["foo", "bar", "baz"]})
>>>
>>> # Register dataframes as table expressions using a python dictionary.
>>> bindings = {"my_df": df}
>>>
>>> daft.sql("SELECT a FROM my_df", **bindings).show()
╭───────╮
│ a     │
│ ---   │
│ Int64 │
╞═══════╡
│ 1     │
├╌╌╌╌╌╌╌┤
│ 2     │
├╌╌╌╌╌╌╌┤
│ 3     │
╰───────╯
(Showing first 3 of 3 rows)
Source code in daft/sql/sql.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
@PublicAPI
def sql(
    sql: str,
    register_globals: bool = True,
    **bindings: DataFrame,
) -> DataFrame:
    """Run a SQL query, returning the results as a DataFrame.

    Args:
        sql (str): SQL query to execute
        register_globals (bool, optional): Whether to incorporate global
            variables into the supplied catalog, in which case a copy of the
            catalog will be made and the original not modified. Defaults to True.
        **bindings: (DataFrame): Additional DataFrame bindings (CTEs) to use for this query.

    Returns:
        DataFrame: Dataframe containing the results of the query

    Warning:
        This features is early in development and will likely experience API changes.

    Examples:
        A simple example joining 2 dataframes together using a SQL statement, relying on Daft to detect the names of
        SQL tables using their corresponding Python variable names.

        >>> import daft
        >>>
        >>> df1 = daft.from_pydict({"a": [1, 2, 3], "b": ["foo", "bar", "baz"]})
        >>> df2 = daft.from_pydict({"a": [1, 2, 3], "c": ["daft", None, None]})
        >>>
        >>> # Daft automatically detects `df1` and `df2` from your Python global namespace
        >>> result_df = daft.sql("SELECT * FROM df1 JOIN df2 ON df1.a = df2.a")
        >>> result_df.show()
        ╭───────┬────────┬────────╮
        │ a     ┆ b      ┆ c      │
        │ ---   ┆ ---    ┆ ---    │
        │ Int64 ┆ String ┆ String │
        ╞═══════╪════════╪════════╡
        │ 1     ┆ foo    ┆ daft   │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
        │ 2     ┆ bar    ┆ None   │
        ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
        │ 3     ┆ baz    ┆ None   │
        ╰───────┴────────┴────────╯
        <BLANKLINE>
        (Showing first 3 of 3 rows)

        A more complex example using CTE bindings to create a named subquery (DataFrame) called `"my_df"`, which can then be referenced from inside your SQL statement.

        >>> import daft
        >>>
        >>> df = daft.from_pydict({"a": [1, 2, 3], "b": ["foo", "bar", "baz"]})
        >>>
        >>> # Register dataframes as table expressions using a python dictionary.
        >>> bindings = {"my_df": df}
        >>>
        >>> daft.sql("SELECT a FROM my_df", **bindings).show()
        ╭───────╮
        │ a     │
        │ ---   │
        │ Int64 │
        ╞═══════╡
        │ 1     │
        ├╌╌╌╌╌╌╌┤
        │ 2     │
        ├╌╌╌╌╌╌╌┤
        │ 3     │
        ╰───────╯
        <BLANKLINE>
        (Showing first 3 of 3 rows)
    """
    # This the CTE bindings map which is built in the order globals->catalog->ctes.
    py_ctes: dict[str, _PyLogicalPlanBuilder] = {}

    # 1. Add all python DataFrame variables which are in scope.
    if register_globals:
        try:
            # Caller is back from func, annotation
            caller_frame = inspect.currentframe().f_back.f_back  # type: ignore
            caller_vars = {**caller_frame.f_globals, **caller_frame.f_locals}  # type: ignore
        except AttributeError as exc:
            # some interpreters might not implement currentframe; all reasonable
            # errors above should be AttributeError
            raise DaftCoreException(
                "Cannot get caller environment, please provide CTEs and set `register_globals=False`."
            ) from exc
        for alias, variable in caller_vars.items():
            if isinstance(variable, DataFrame):
                py_ctes[alias] = variable._builder._builder

    # 2. Add explicit CTEs last so these can't be shadowed.
    for alias, df in bindings.items():
        py_ctes[alias] = df._builder._builder

    py_sess = daft.current_session()._session
    py_config = get_context().daft_planning_config
    py_object = _sql_exec(sql, py_sess, py_ctes, py_config)

    if py_object is None:
        # for backwards compatibility on the return type i.e. don't introduce nullability
        return DataFrame._from_pydict({})
    elif isinstance(py_object, _PyLogicalPlanBuilder):
        return DataFrame(LogicalPlanBuilder(py_object))
    else:
        raise ValueError(f"Unsupported return type from sql exec: {type(py_object)}")

Output#

write_csv #

write_csv(root_dir: str | Path, write_mode: Literal['append', 'overwrite', 'overwrite-partitions'] = 'append', partition_cols: list[ColumnInputType] | None = None, io_config: IOConfig | None = None, delimiter: str | None = None, quote: str | None = None, escape: str | None = None, header: bool | None = True, date_format: str | None = None, timestamp_format: str | None = None) -> DataFrame

Writes the DataFrame as CSV files, returning a new DataFrame with paths to the files that were written.

Files will be written to <root_dir>/* with randomly generated UUIDs as the file names.

Parameters:

Name Type Description Default
root_dir str

root file path to write CSV files to.

required
write_mode str

Operation mode of the write. append will add new data, overwrite will replace the contents of the root directory with new data. overwrite-partitions will replace only the contents in the partitions that are being written to. Defaults to "append".

'append'
partition_cols Optional[List[ColumnInputType]]

How to subpartition each partition further. Defaults to None.

None
io_config Optional[IOConfig]

configurations to use when interacting with remote storage.

None
delimiter Optional[str]

Single-character field delimiter (default ,).

None
quote Optional[str]

Single-character quote used around fields containing delimiters default ".

None
escape Optional[str]

Single-character escape for special characters default \\.

None
header Optional[bool]

Whether to write a header row with column names, default True.

True
date_format Optional[str]

Format string for date columns. Uses chrono strftime format (e.g., "%Y-%m-%d", "%d/%m/%Y"). Defaults to None (ISO 8601 format).

None
timestamp_format Optional[str]

Format string for timestamp columns. Uses chrono strftime format (e.g., "%Y-%m-%d %H:%M:%S", "%+"). Defaults to None (ISO 8601 format).

None

Returns:

Name Type Description
DataFrame DataFrame

The filenames that were written out as strings.

Note

This call is blocking and will execute the DataFrame when called

Timezone handling: For timezone-aware timestamp columns, the timestamps are converted to the target timezone before formatting. For example, a timestamp stored as UTC but with timezone "America/New_York" will be formatted in Eastern Time, not UTC. If the timezone string is invalid, an error will be raised.

Examples:

Basic usage:

1
2
3
>>> import daft
>>> df = daft.from_pydict({"x": [1, 2, 3], "y": ["a", "b", "c"]})
>>> df.write_csv("output_dir", write_mode="overwrite")

Custom date format (e.g., DD/MM/YYYY):

1
2
3
>>> import datetime
>>> df = daft.from_pydict({"date": [datetime.date(2024, 1, 15)]})
>>> df.write_csv("output_dir", date_format="%d/%m/%Y")
# Output: 15/01/2024

Custom timestamp format:

1
2
>>> df = daft.from_pydict({"ts": [datetime.datetime(2024, 1, 15, 10, 30, 45)]})
>>> df.write_csv("output_dir", timestamp_format="%Y-%m-%d %H:%M:%S")
# Output: 2024-01-15 10:30:45

ISO 8601 / RFC 3339 timestamp format:

1
>>> df.write_csv("output_dir", timestamp_format="%+")
# Output: 2024-01-15T10:30:45+00:00
Tip

See also df.write_parquet() and df.write_json() other formats for writing DataFrames

Source code in daft/dataframe/dataframe.py
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
@DataframePublicAPI
def write_csv(
    self,
    root_dir: str | pathlib.Path,
    write_mode: Literal["append", "overwrite", "overwrite-partitions"] = "append",
    partition_cols: list[ColumnInputType] | None = None,
    io_config: IOConfig | None = None,
    delimiter: str | None = None,
    quote: str | None = None,
    escape: str | None = None,
    header: bool | None = True,
    date_format: str | None = None,
    timestamp_format: str | None = None,
) -> "DataFrame":
    r"""Writes the DataFrame as CSV files, returning a new DataFrame with paths to the files that were written.

    Files will be written to `<root_dir>/*` with randomly generated UUIDs as the file names.

    Args:
        root_dir (str): root file path to write CSV files to.
        write_mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace the contents of the root directory with new data. `overwrite-partitions` will replace only the contents in the partitions that are being written to. Defaults to "append".
        partition_cols (Optional[List[ColumnInputType]], optional): How to subpartition each partition further. Defaults to None.
        io_config (Optional[IOConfig], optional): configurations to use when interacting with remote storage.
        delimiter (Optional[str], optional): Single-character field delimiter (default `,`).
        quote (Optional[str], optional): Single-character quote used around fields containing delimiters default `"`.
        escape (Optional[str], optional): Single-character escape for special characters default `\\`.
        header (Optional[bool], optional): Whether to write a header row with column names, default True.
        date_format (Optional[str], optional): Format string for date columns. Uses chrono strftime format (e.g., "%Y-%m-%d", "%d/%m/%Y"). Defaults to None (ISO 8601 format).
        timestamp_format (Optional[str], optional): Format string for timestamp columns. Uses chrono strftime format (e.g., "%Y-%m-%d %H:%M:%S", "%+"). Defaults to None (ISO 8601 format).

    Returns:
        DataFrame: The filenames that were written out as strings.

    Note:
        This call is **blocking** and will execute the DataFrame when called

        **Timezone handling**: For timezone-aware timestamp columns, the timestamps are converted
        to the target timezone before formatting. For example, a timestamp stored as UTC but with
        timezone "America/New_York" will be formatted in Eastern Time, not UTC. If the timezone
        string is invalid, an error will be raised.

    Examples:
        Basic usage:

        >>> import daft
        >>> df = daft.from_pydict({"x": [1, 2, 3], "y": ["a", "b", "c"]})
        >>> df.write_csv("output_dir", write_mode="overwrite")  # doctest: +SKIP

        Custom date format (e.g., DD/MM/YYYY):

        >>> import datetime
        >>> df = daft.from_pydict({"date": [datetime.date(2024, 1, 15)]})
        >>> df.write_csv("output_dir", date_format="%d/%m/%Y")  # doctest: +SKIP
        # Output: 15/01/2024

        Custom timestamp format:

        >>> df = daft.from_pydict({"ts": [datetime.datetime(2024, 1, 15, 10, 30, 45)]})
        >>> df.write_csv("output_dir", timestamp_format="%Y-%m-%d %H:%M:%S")  # doctest: +SKIP
        # Output: 2024-01-15 10:30:45

        ISO 8601 / RFC 3339 timestamp format:

        >>> df.write_csv("output_dir", timestamp_format="%+")  # doctest: +SKIP
        # Output: 2024-01-15T10:30:45+00:00

    Tip:
        See also [`df.write_parquet()`][daft.DataFrame.write_parquet] and [`df.write_json()`][daft.DataFrame.write_json]
        other formats for writing DataFrames

    """
    if write_mode not in ["append", "overwrite", "overwrite-partitions"]:
        raise ValueError(
            f"Only support `append`, `overwrite`, or `overwrite-partitions` mode. {write_mode} is unsupported"
        )
    if write_mode == "overwrite-partitions" and partition_cols is None:
        raise ValueError("Partition columns must be specified to use `overwrite-partitions` mode.")

    io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config

    cols: list[Expression] | None = None
    if partition_cols is not None:
        cols = column_inputs_to_expressions(tuple(partition_cols))

    file_format_option = PyFormatSinkOption.csv(
        delimiter=delimiter,
        quote=quote,
        escape=escape,
        header=header,
        date_format=date_format,
        timestamp_format=timestamp_format,
    )
    builder = self._builder.write_tabular(
        root_dir=root_dir,
        partition_cols=cols,
        write_mode=WriteMode.from_str(write_mode),
        file_format=FileFormat.Csv,
        file_format_option=file_format_option,
        io_config=io_config,
    )

    # Block and write, then retrieve data
    write_df = DataFrame(builder)
    write_df.collect()
    assert write_df._result is not None

    # Populate and return a new disconnected DataFrame
    # Keep the original logical plan so explain() can still show upstream operators
    # (e.g. filters/projections before the write), instead of collapsing to an
    # in-memory source after collect() caches the result.
    result_df = DataFrame(write_df._get_current_builder())
    result_df._result_cache = write_df._result_cache
    result_df._preview = write_df._preview
    result_df._metadata = write_df._metadata
    return result_df

write_json #

write_json(root_dir: str | Path, write_mode: Literal['append', 'overwrite', 'overwrite-partitions'] = 'append', partition_cols: list[ColumnInputType] | None = None, io_config: IOConfig | None = None, ignore_null_fields: bool | None = False, date_format: str | None = None, timestamp_format: str | None = None) -> DataFrame

Writes the DataFrame as JSON files, returning a new DataFrame with paths to the files that were written.

Files will be written to <root_dir>/* with randomly generated UUIDs as the file names.

Parameters:

Name Type Description Default
root_dir str

root file path to write JSON files to.

required
write_mode str

Operation mode of the write. append will add new data, overwrite will replace the contents of the root directory with new data. overwrite-partitions will replace only the contents in the partitions that are being written to. Defaults to "append".

'append'
partition_cols Optional[List[ColumnInputType]]

How to subpartition each partition further. Defaults to None.

None
io_config Optional[IOConfig]

configurations to use when interacting with remote storage.

None
ignore_null_fields Optional[bool]

Whether to ignore fields with null values when writing JSON. Defaults to False.

False
date_format Optional[str]

Format string for date columns. Uses chrono strftime format (e.g., "%Y-%m-%d", "%d/%m/%Y"). Defaults to None (ISO 8601 format).

None
timestamp_format Optional[str]

Format string for timestamp columns. Uses chrono strftime format (e.g., "%Y-%m-%d %H:%M:%S", "%+"). Defaults to None (ISO 8601 format).

None

Returns:

Name Type Description
DataFrame DataFrame

The filenames that were written out as strings.

Note

This call is blocking and will execute the DataFrame when called

Timezone handling: For timezone-aware timestamp columns, the timestamps are converted to the target timezone before formatting. For example, a timestamp stored as UTC but with timezone "America/New_York" will be formatted in Eastern Time, not UTC. If the timezone string is invalid, an error will be raised.

Examples:

Basic usage:

1
2
3
>>> import daft
>>> df = daft.from_pydict({"x": [1, 2, 3], "y": ["a", "b", "c"]})
>>> df.write_json("output_dir", write_mode="overwrite")

Custom date format (e.g., DD/MM/YYYY):

1
2
3
>>> import datetime
>>> df = daft.from_pydict({"date": [datetime.date(2024, 1, 15)]})
>>> df.write_json("output_dir", date_format="%d/%m/%Y")
# Output: "15/01/2024"

Custom timestamp format:

1
2
>>> df = daft.from_pydict({"ts": [datetime.datetime(2024, 1, 15, 10, 30, 45)]})
>>> df.write_json("output_dir", timestamp_format="%Y-%m-%d %H:%M:%S")
# Output: "2024-01-15 10:30:45"

ISO 8601 / RFC 3339 timestamp format:

1
>>> df.write_json("output_dir", timestamp_format="%+")
# Output: "2024-01-15T10:30:45+00:00"
Source code in daft/dataframe/dataframe.py
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
@DataframePublicAPI
def write_json(
    self,
    root_dir: str | pathlib.Path,
    write_mode: Literal["append", "overwrite", "overwrite-partitions"] = "append",
    partition_cols: list[ColumnInputType] | None = None,
    io_config: IOConfig | None = None,
    ignore_null_fields: bool | None = False,
    date_format: str | None = None,
    timestamp_format: str | None = None,
) -> "DataFrame":
    """Writes the DataFrame as JSON files, returning a new DataFrame with paths to the files that were written.

    Files will be written to `<root_dir>/*` with randomly generated UUIDs as the file names.

    Args:
        root_dir (str): root file path to write JSON files to.
        write_mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace the contents of the root directory with new data. `overwrite-partitions` will replace only the contents in the partitions that are being written to. Defaults to "append".
        partition_cols (Optional[List[ColumnInputType]], optional): How to subpartition each partition further. Defaults to None.
        io_config (Optional[IOConfig], optional): configurations to use when interacting with remote storage.
        ignore_null_fields (Optional[bool], optional): Whether to ignore fields with null values when writing JSON. Defaults to False.
        date_format (Optional[str], optional): Format string for date columns. Uses chrono strftime format (e.g., "%Y-%m-%d", "%d/%m/%Y"). Defaults to None (ISO 8601 format).
        timestamp_format (Optional[str], optional): Format string for timestamp columns. Uses chrono strftime format (e.g., "%Y-%m-%d %H:%M:%S", "%+"). Defaults to None (ISO 8601 format).

    Returns:
        DataFrame: The filenames that were written out as strings.

    Note:
        This call is **blocking** and will execute the DataFrame when called

    **Timezone handling**: For timezone-aware timestamp columns, the timestamps are converted
    to the target timezone before formatting. For example, a timestamp stored as UTC but with
    timezone "America/New_York" will be formatted in Eastern Time, not UTC. If the timezone
    string is invalid, an error will be raised.

    Examples:
        Basic usage:

        >>> import daft
        >>> df = daft.from_pydict({"x": [1, 2, 3], "y": ["a", "b", "c"]})
        >>> df.write_json("output_dir", write_mode="overwrite")  # doctest: +SKIP

        Custom date format (e.g., DD/MM/YYYY):

        >>> import datetime
        >>> df = daft.from_pydict({"date": [datetime.date(2024, 1, 15)]})
        >>> df.write_json("output_dir", date_format="%d/%m/%Y")  # doctest: +SKIP
        # Output: "15/01/2024"

        Custom timestamp format:

        >>> df = daft.from_pydict({"ts": [datetime.datetime(2024, 1, 15, 10, 30, 45)]})
        >>> df.write_json("output_dir", timestamp_format="%Y-%m-%d %H:%M:%S")  # doctest: +SKIP
        # Output: "2024-01-15 10:30:45"

        ISO 8601 / RFC 3339 timestamp format:

        >>> df.write_json("output_dir", timestamp_format="%+")  # doctest: +SKIP
        # Output: "2024-01-15T10:30:45+00:00"
    """
    if write_mode not in ["append", "overwrite", "overwrite-partitions"]:
        raise ValueError(
            f"Only support `append`, `overwrite`, or `overwrite-partitions` mode. {write_mode} is unsupported"
        )
    if write_mode == "overwrite-partitions" and partition_cols is None:
        raise ValueError("Partition columns must be specified to use `overwrite-partitions` mode.")

    io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config

    cols: list[Expression] | None = None
    if partition_cols is not None:
        cols = column_inputs_to_expressions(tuple(partition_cols))

    file_format_option = PyFormatSinkOption.json(
        ignore_null_fields=ignore_null_fields,
        date_format=date_format,
        timestamp_format=timestamp_format,
    )
    builder = self._builder.write_tabular(
        root_dir=root_dir,
        partition_cols=cols,
        write_mode=WriteMode.from_str(write_mode),
        file_format=FileFormat.Json,
        file_format_option=file_format_option,
        io_config=io_config,
    )
    # Block and write, then retrieve data
    write_df = DataFrame(builder)
    write_df.collect()
    assert write_df._result is not None

    # Populate and return a new disconnected DataFrame
    # Keep the original logical plan so explain() can still show upstream operators
    # (e.g. filters/projections before the write), instead of collapsing to an
    # in-memory source after collect() caches the result.
    result_df = DataFrame(write_df._get_current_builder())
    result_df._result_cache = write_df._result_cache
    result_df._preview = write_df._preview
    result_df._metadata = write_df._metadata
    return result_df

write_parquet #

write_parquet(root_dir: str | Path, compression: str = 'snappy', write_mode: Literal['append', 'overwrite', 'overwrite-partitions'] = 'append', write_success_file: bool = False, partition_cols: list[ColumnInputType] | None = None, io_config: IOConfig | None = None) -> DataFrame

Writes the DataFrame as parquet files, returning a new DataFrame with paths to the files that were written.

Files will be written to <root_dir>/* with randomly generated UUIDs as the file names.

Parameters:

Name Type Description Default
root_dir str

root file path to write parquet files to.

required
compression str

compression algorithm. Defaults to "snappy".

'snappy'
write_mode str

Operation mode of the write. append will add new data, overwrite will replace the contents of the root directory with new data. overwrite-partitions will replace only the contents in the partitions that are being written to. Defaults to "append".

'append'
write_success_file bool

Whether to write a _SUCCESS file upon successful completion. Defaults to False.

False
partition_cols Optional[List[ColumnInputType]]

How to subpartition each partition further. Defaults to None.

None
io_config Optional[IOConfig]

configurations to use when interacting with remote storage.

None

Returns:

Name Type Description
DataFrame DataFrame

The filenames that were written out as strings.

Note

This call is blocking and will execute the DataFrame when called

Examples:

1
2
3
>>> import daft
>>> df = daft.from_pydict({"x": [1, 2, 3], "y": ["a", "b", "c"]})
>>> df.write_parquet("output_dir", write_mode="overwrite")
Tip

See also df.write_csv() and df.write_json() Other formats for writing DataFrames

Source code in daft/dataframe/dataframe.py
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
@DataframePublicAPI
def write_parquet(
    self,
    root_dir: str | pathlib.Path,
    compression: str = "snappy",
    write_mode: Literal["append", "overwrite", "overwrite-partitions"] = "append",
    write_success_file: bool = False,
    partition_cols: list[ColumnInputType] | None = None,
    io_config: IOConfig | None = None,
) -> "DataFrame":
    """Writes the DataFrame as parquet files, returning a new DataFrame with paths to the files that were written.

    Files will be written to `<root_dir>/*` with randomly generated UUIDs as the file names.

    Args:
        root_dir (str): root file path to write parquet files to.
        compression (str, optional): compression algorithm. Defaults to "snappy".
        write_mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace the contents of the root directory with new data. `overwrite-partitions` will replace only the contents in the partitions that are being written to. Defaults to "append".
        write_success_file (bool, optional): Whether to write a `_SUCCESS` file upon successful completion. Defaults to False.
        partition_cols (Optional[List[ColumnInputType]], optional): How to subpartition each partition further. Defaults to None.
        io_config (Optional[IOConfig], optional): configurations to use when interacting with remote storage.

    Returns:
        DataFrame: The filenames that were written out as strings.

    Note:
        This call is **blocking** and will execute the DataFrame when called

    Examples:
        >>> import daft
        >>> df = daft.from_pydict({"x": [1, 2, 3], "y": ["a", "b", "c"]})
        >>> df.write_parquet("output_dir", write_mode="overwrite")  # doctest: +SKIP

    Tip:
        See also [`df.write_csv()`][daft.DataFrame.write_csv] and [`df.write_json()`][daft.DataFrame.write_json]
        Other formats for writing DataFrames
    """
    if write_mode not in ["append", "overwrite", "overwrite-partitions"]:
        raise ValueError(
            f"Only support `append`, `overwrite`, or `overwrite-partitions` mode. {write_mode} is unsupported"
        )
    if write_mode == "overwrite-partitions" and partition_cols is None:
        raise ValueError("Partition columns must be specified to use `overwrite-partitions` mode.")

    io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config

    cols: list[Expression] | None = None
    if partition_cols is not None:
        cols = column_inputs_to_expressions(tuple(partition_cols))

    builder = self._builder.write_tabular(
        root_dir=root_dir,
        partition_cols=cols,
        write_mode=WriteMode.from_str(write_mode),
        write_success_file=write_success_file,
        file_format=FileFormat.Parquet,
        compression=compression,
        io_config=io_config,
    )
    # Block and write, then retrieve data
    write_df = DataFrame(builder)
    write_df.collect()
    assert write_df._result is not None

    # Populate and return a new disconnected DataFrame
    # Keep the original logical plan so explain() can still show upstream operators
    # (e.g. filters/projections before the write), instead of collapsing to an
    # in-memory source after collect() caches the result.
    result_df = DataFrame(write_df._get_current_builder())
    result_df._result_cache = write_df._result_cache
    result_df._preview = write_df._preview
    result_df._metadata = write_df._metadata
    return result_df

write_deltalake #

write_deltalake(table: Union[str, Path, DeltaTable, UnityCatalogTable], partition_cols: list[str] | None = None, mode: Literal['append', 'overwrite', 'error', 'ignore'] = 'append', schema_mode: Literal['merge', 'overwrite'] | None = None, name: str | None = None, description: str | None = None, configuration: Mapping[str, str | None] | None = None, custom_metadata: dict[str, str] | None = None, dynamo_table_name: str | None = None, allow_unsafe_rename: bool = False, io_config: IOConfig | None = None, checkpoint: CheckpointStore | None = None) -> DataFrame

Writes the DataFrame to a Delta Lake table, returning a new DataFrame with the operations that occurred.

Parameters:

Name Type Description Default
table Union[str, Path, DeltaTable, UnityCatalogTable]

Destination Delta Lake Table or table URI to write dataframe to.

required
partition_cols List[str]

How to subpartition each partition further. If table exists, expected to match table's existing partitioning scheme, otherwise creates the table with specified partition columns. Defaults to None.

None
mode str

Operation mode of the write. append will add new data, overwrite will replace table with new data, error will raise an error if table already exists, and ignore will not write anything if table already exists. Defaults to append.

'append'
schema_mode str

Schema mode of the write. If set to overwrite, allows replacing the schema of the table when doing mode=overwrite. Schema mode merge is currently not supported.

None
name str

User-provided identifier for this table.

None
description str

User-provided description for this table.

None
configuration Mapping[str, Optional[str]]

A map containing configuration options for the metadata action.

None
custom_metadata Dict[str, str]

Custom metadata to add to the commit info.

None
dynamo_table_name str

Name of the DynamoDB table to be used as the locking provider if writing to S3.

None
allow_unsafe_rename bool

Whether to allow unsafe rename when writing to S3 or local disk. Defaults to False.

False
io_config IOConfig

configurations to use when interacting with remote storage.

None

Returns:

Name Type Description
DataFrame DataFrame

The operations that occurred with this write.

Note

This call is blocking and will execute the DataFrame when called

Examples:

1
2
3
4
>>> import daft
>>> import deltalake
>>> df = daft.from_pydict({"x": [1, 2, 3], "y": ["a", "b", "c"]})
>>> df.write_deltalake("s3://my-bucket/my-deltalake-table")
Source code in daft/dataframe/dataframe.py
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
@DataframePublicAPI
def write_deltalake(
    self,
    table: Union[str, pathlib.Path, "deltalake.DeltaTable", "UnityCatalogTable"],
    partition_cols: list[str] | None = None,
    mode: Literal["append", "overwrite", "error", "ignore"] = "append",
    schema_mode: Literal["merge", "overwrite"] | None = None,
    name: str | None = None,
    description: str | None = None,
    configuration: Mapping[str, str | None] | None = None,
    custom_metadata: dict[str, str] | None = None,
    dynamo_table_name: str | None = None,
    allow_unsafe_rename: bool = False,
    io_config: IOConfig | None = None,
    checkpoint: "CheckpointStore | None" = None,
) -> "DataFrame":
    """Writes the DataFrame to a [Delta Lake](https://docs.delta.io/latest/index.html) table, returning a new DataFrame with the operations that occurred.

    Args:
        table (Union[str, pathlib.Path, deltalake.DeltaTable, UnityCatalogTable]): Destination [Delta Lake Table](https://delta-io.github.io/delta-rs/api/delta_table/) or table URI to write dataframe to.
        partition_cols (List[str], optional): How to subpartition each partition further. If table exists, expected to match table's existing partitioning scheme, otherwise creates the table with specified partition columns. Defaults to None.
        mode (str, optional): Operation mode of the write. `append` will add new data, `overwrite` will replace table with new data, `error` will raise an error if table already exists, and `ignore` will not write anything if table already exists. Defaults to `append`.
        schema_mode (str, optional): Schema mode of the write. If set to `overwrite`, allows replacing the schema of the table when doing `mode=overwrite`. Schema mode `merge` is currently not supported.
        name (str, optional): User-provided identifier for this table.
        description (str, optional): User-provided description for this table.
        configuration (Mapping[str, Optional[str]], optional): A map containing configuration options for the metadata action.
        custom_metadata (Dict[str, str], optional): Custom metadata to add to the commit info.
        dynamo_table_name (str, optional): Name of the DynamoDB table to be used as the locking provider if writing to S3.
        allow_unsafe_rename (bool, optional): Whether to allow unsafe rename when writing to S3 or local disk. Defaults to False.
        io_config (IOConfig, optional): configurations to use when interacting with remote storage.

    Returns:
        DataFrame: The operations that occurred with this write.

    Note:
        This call is **blocking** and will execute the DataFrame when called

    Examples:
        >>> import daft
        >>> import deltalake
        >>> df = daft.from_pydict({"x": [1, 2, 3], "y": ["a", "b", "c"]})
        >>> df.write_deltalake("s3://my-bucket/my-deltalake-table")  # doctest: +SKIP
    """
    import json

    import deltalake
    import pyarrow as pa
    from deltalake.exceptions import TableNotFoundError
    from packaging.version import parse

    from daft import from_pydict
    from daft.dependencies import unity_catalog
    from daft.filesystem import get_protocol_from_path
    from daft.io.delta_lake._deltalake import delta_schema_to_pyarrow
    from daft.io.delta_lake.delta_lake_write import (
        AddAction,
        convert_pa_schema_to_delta,
        create_table_with_add_actions,
    )
    from daft.io.object_store_options import io_config_to_storage_options

    def _create_metadata_param(metadata: dict[str, str] | None) -> Any:
        """From deltalake>=0.20.0 onwards, custom_metadata has to be passed as CommitProperties.

        Args:
            metadata

        Returns:
            DataFrame: metadata for deltalake<0.20.0, otherwise CommitProperties with custom_metadata
        """
        if parse(deltalake.__version__) < parse("0.20.0"):
            return metadata
        else:
            from deltalake import CommitProperties

            return CommitProperties(custom_metadata=metadata)

    if schema_mode == "merge":
        raise ValueError("Schema mode' merge' is not currently supported for write_deltalake.")

    if parse(deltalake.__version__) < parse("0.14.0"):
        raise ValueError(f"Write delta lake is only supported on deltalake>=0.14.0, found {deltalake.__version__}")

    io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config

    # Retrieve table_uri and storage_options from various backends
    table_uri: str
    storage_options: dict[str, str]

    if isinstance(table, deltalake.DeltaTable):
        table_uri = table.table_uri
        storage_options = table._storage_options or {}
        new_storage_options = io_config_to_storage_options(io_config, table_uri)
        storage_options.update(new_storage_options or {})
    else:
        if isinstance(table, str):
            table_uri = os.path.expanduser(table)
        elif isinstance(table, pathlib.Path):
            table_uri = str(table)
        elif unity_catalog.module_available() and isinstance(table, unity_catalog.UnityCatalogTable):
            table_uri = table.table_uri
            io_config = table.io_config
        else:
            raise ValueError(f"Expected table to be a path or a DeltaTable, received: {type(table)}")

        if io_config is None:
            raise ValueError(
                "io_config was not provided to write_deltalake and could not be retrieved from defaults."
            )

        storage_options = io_config_to_storage_options(io_config, table_uri) or {}
        try:
            table = deltalake.DeltaTable(table_uri, storage_options=storage_options)
        except TableNotFoundError:
            table = None

    # see: https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/
    scheme = get_protocol_from_path(table_uri)
    if scheme == "s3" or scheme == "s3a":
        if dynamo_table_name is not None:
            storage_options["AWS_S3_LOCKING_PROVIDER"] = "dynamodb"
            storage_options["DELTA_DYNAMO_TABLE_NAME"] = dynamo_table_name
        else:
            storage_options["AWS_S3_ALLOW_UNSAFE_RENAME"] = "true"

            if not allow_unsafe_rename:
                warnings.warn("No DynamoDB table specified for Delta Lake locking. Defaulting to unsafe writes.")
    elif scheme == "file" and allow_unsafe_rename:
        storage_options["MOUNT_ALLOW_UNSAFE_RENAME"] = "true"

    pyarrow_schema = pa.schema((f.name, f.dtype.to_arrow_dtype()) for f in self.schema())

    large_dtypes = True
    delta_schema = convert_pa_schema_to_delta(pyarrow_schema, large_dtypes=large_dtypes)

    if table:
        if partition_cols and partition_cols != table.metadata().partition_columns:
            raise ValueError(
                f"Expected partition columns to match that of the existing table ({table.metadata().partition_columns}), but received: {partition_cols}"
            )
        else:
            partition_cols = table.metadata().partition_columns

        table.update_incremental()

        table_schema = delta_schema_to_pyarrow(table.schema())
        if Schema.from_pyarrow_schema(delta_schema) != Schema.from_pyarrow_schema(table_schema) and not (
            mode == "overwrite" and schema_mode == "overwrite"
        ):
            raise ValueError(
                "Schema of data does not match table schema\n"
                f"Data schema:\n{delta_schema}\nTable Schema:\n{table_schema}"
            )
        if mode == "error":
            raise AssertionError("Delta table already exists, write mode set to error.")
        elif mode == "ignore":
            return from_pydict(
                {
                    "operation": pa.array([], type=pa.string()),
                    "rows": pa.array([], type=pa.int64()),
                    "file_size": pa.array([], type=pa.int64()),
                    "file_name": pa.array([], type=pa.string()),
                }
            )
        version = table.version() + 1
    else:
        version = 0

    if partition_cols is not None:
        for c in partition_cols:
            if self.schema()[c].dtype == DataType.binary():
                raise NotImplementedError("Binary partition columns are not yet supported for Delta Lake writes")

    builder = self._builder.write_deltalake(
        table_uri,
        mode,
        version,
        large_dtypes,
        io_config=io_config,
        partition_cols=partition_cols,
    )
    write_df = DataFrame(builder)
    write_df.collect()

    write_result = write_df.to_pydict()
    assert "add_action" in write_result
    add_actions: list[AddAction] = write_result["add_action"]

    operations = []
    paths = []
    rows = []
    sizes = []

    for add_action in add_actions:
        stats = json.loads(add_action.stats)
        operations.append("ADD")
        paths.append(add_action.path)
        rows.append(stats["numRecords"])
        sizes.append(add_action.size)

    if table is None:
        create_table_with_add_actions(
            table_uri,
            delta_schema,
            add_actions,
            mode,
            partition_cols or [],
            name,
            description,
            configuration,
            storage_options,
            custom_metadata,
        )
    else:
        if mode == "overwrite":
            old_actions = pa.table(table.get_add_actions())
            old_actions_dict = old_actions.to_pydict()
            for i in range(old_actions.num_rows):
                operations.append("DELETE")
                paths.append(old_actions_dict["path"][i])
                rows.append(old_actions_dict["num_records"][i])
                sizes.append(old_actions_dict["size_bytes"][i])

        metadata_param = _create_metadata_param(custom_metadata)
        if parse(deltalake.__version__) < parse("1.0.0"):
            table._table.create_write_transaction(
                add_actions, mode, partition_cols or [], delta_schema, None, metadata_param
            )
        else:
            table._table.create_write_transaction(
                add_actions,
                mode,
                partition_cols or [],
                deltalake.Schema.from_arrow(delta_schema),
                None,
                metadata_param,
            )
        table.update_incremental()

    # Mark all checkpointed entries as committed after successful catalog commit.
    if checkpoint is not None:
        ckpts = checkpoint.list_checkpoints()
        ids = [c.id for c in ckpts if c.status == CheckpointStatus.Checkpointed]
        if ids:
            checkpoint.mark_committed(ids)

    with_operations = from_pydict(
        {
            "operation": pa.array(operations, type=pa.string()),
            "rows": pa.array(rows, type=pa.int64()),
            "file_size": pa.array(sizes, type=pa.int64()),
            "file_name": pa.array([os.path.basename(fp) for fp in paths], type=pa.string()),
        }
    )
    with_operations._metadata = write_df._metadata
    return with_operations

write_iceberg #

write_iceberg(table: Table, mode: str = 'append', io_config: IOConfig | None = None, snapshot_properties: dict[str, str] | None = None, checkpoint: IdempotentCommit | None = None) -> DataFrame

Writes the DataFrame to an Iceberg table, returning a new DataFrame with the operations that occurred.

Can be run in either append or overwrite mode which will either appends the rows in the DataFrame or will delete the existing rows and then append the DataFrame rows respectively.

Parameters:

Name Type Description Default
table Table

Destination PyIceberg Table to write dataframe to.

required
mode str

Operation mode of the write. append or overwrite Iceberg Table. Defaults to append.

'append'
io_config IOConfig

A custom IOConfig to use when accessing Iceberg object storage data. If provided, configurations set in table are ignored.

None
snapshot_properties dict[str, str]

Optional snapshot properties to set while writing to the table. Keys with prefix daft.idempotence- are reserved.

None
checkpoint IdempotentCommit

Bundled checkpoint store + idempotence key for an idempotent commit. When provided, the snapshot summary is tagged with daft.idempotence-key and retries with the same key recognize the prior attempt without producing a duplicate snapshot. Only mode='append' is supported. Requires the Ray runner.

None

Returns:

Name Type Description
DataFrame DataFrame

The operations that occurred with this write.

Note

This call is blocking and will execute the DataFrame when called.

When checkpoint is provided and write_iceberg raises after the catalog commit landed (e.g. a transient failure during the post-commit mark_committed bookkeeping), the user data is already durable in Iceberg. The next call with the same IdempotentCommit (same idempotence key) will detect the snapshot via its marker, finish the bookkeeping, and exit cleanly without producing a duplicate snapshot.

Idempotence-key contract — read carefully:

  • Same key + different inputs → silent no-op (data loss). The destination already has a snapshot tagged with the key, so nothing new is written.
  • Different key + same retry → duplicate snapshot. The destination won't recognize the prior attempt and will commit again. Idempotence is broken.

The orchestrator pattern (run-id supplied from upstream DAG context) avoids both naturally.

Examples:

1
2
3
4
5
6
>>> import pyiceberg
>>> import daft
>>>
>>> table = pyiceberg.Table(...)
>>> df = daft.from_pydict({"user_id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"]})
>>> df = df.write_iceberg(table, mode="overwrite")
Source code in daft/dataframe/dataframe.py
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
@DataframePublicAPI
def write_iceberg(
    self,
    table: "pyiceberg.table.Table",
    mode: str = "append",
    io_config: IOConfig | None = None,
    snapshot_properties: dict[str, str] | None = None,
    checkpoint: "IdempotentCommit | None" = None,
) -> "DataFrame":
    """Writes the DataFrame to an [Iceberg](https://iceberg.apache.org/docs/nightly/) table, returning a new DataFrame with the operations that occurred.

    Can be run in either `append` or `overwrite` mode which will either appends the rows in the DataFrame or will delete the existing rows and then append the DataFrame rows respectively.

    Args:
        table (pyiceberg.table.Table): Destination [PyIceberg Table](https://py.iceberg.apache.org/reference/pyiceberg/table/#pyiceberg.table.Table) to write dataframe to.
        mode (str, optional): Operation mode of the write. `append` or `overwrite` Iceberg Table. Defaults to `append`.
        io_config (IOConfig, optional): A custom IOConfig to use when accessing Iceberg object storage data. If provided, configurations set in `table` are ignored.
        snapshot_properties (dict[str, str], optional): Optional snapshot properties to set while writing to the table. Keys with prefix ``daft.idempotence-`` are reserved.
        checkpoint (IdempotentCommit, optional): Bundled checkpoint store + idempotence key for an idempotent commit. When provided, the snapshot summary is tagged with ``daft.idempotence-key`` and retries with the same key recognize the prior attempt without producing a duplicate snapshot. Only ``mode='append'`` is supported. Requires the Ray runner.

    Returns:
        DataFrame: The operations that occurred with this write.

    Note:
        This call is **blocking** and will execute the DataFrame when called.

        When ``checkpoint`` is provided and ``write_iceberg`` raises
        *after* the catalog commit landed (e.g. a transient failure during
        the post-commit ``mark_committed`` bookkeeping), the user data is
        already durable in Iceberg. The next call with the same
        ``IdempotentCommit`` (same idempotence key) will detect the
        snapshot via its marker, finish the bookkeeping, and exit cleanly
        without producing a duplicate snapshot.

        Idempotence-key contract — read carefully:

        - **Same key + different inputs → silent no-op (data loss).** The
          destination already has a snapshot tagged with the key, so
          nothing new is written.
        - **Different key + same retry → duplicate snapshot.** The
          destination won't recognize the prior attempt and will commit
          again. Idempotence is broken.

        The orchestrator pattern (run-id supplied from upstream DAG context)
        avoids both naturally.

    Examples:
        >>> import pyiceberg
        >>> import daft
        >>>
        >>> table = pyiceberg.Table(...)  # doctest: +SKIP
        >>> df = daft.from_pydict({"user_id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"]})
        >>> df = df.write_iceberg(table, mode="overwrite")  # doctest: +SKIP

    """
    import pyarrow as pa
    import pyiceberg
    from packaging.version import parse

    from daft.io.iceberg._iceberg import _convert_iceberg_file_io_properties_to_io_config

    if len(table.spec().fields) > 0 and parse(pyiceberg.__version__) < parse("0.7.0"):
        raise ValueError("pyiceberg>=0.7.0 is required to write to a partitioned table")

    if parse(pyiceberg.__version__) < parse("0.6.0"):
        raise ValueError(f"Write Iceberg is only supported on pyiceberg>=0.6.0, found {pyiceberg.__version__}")

    # Snapshot properties are only supported on pyiceberg >= 0.7.0. See https://github.com/apache/iceberg-python/issues/367
    if snapshot_properties and parse(pyiceberg.__version__) < parse("0.7.0"):
        raise ValueError("Snapshot properties are only supported on pyiceberg>=0.7.0")

    if mode not in ["append", "overwrite"]:
        raise ValueError(f"Only support `append` or `overwrite` mode. {mode} is unsupported")

    if checkpoint is not None and mode == "overwrite":
        raise NotImplementedError(
            "write_iceberg with checkpoint=... currently supports mode='append' only; "
            "overwrite + checkpoint is tracked separately."
        )

    if checkpoint is not None and parse(pyiceberg.__version__) < parse("0.7.0"):
        raise ValueError("write_iceberg with checkpoint=... requires pyiceberg>=0.7.0")

    if snapshot_properties:
        for key in snapshot_properties:
            if key.startswith("daft.idempotence-"):
                raise ValueError(
                    f"snapshot_properties keys with prefix 'daft.idempotence-' are reserved; got: {key!r}"
                )

    io_config = (
        _convert_iceberg_file_io_properties_to_io_config(table.io.properties) if io_config is None else io_config
    )
    io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config

    if checkpoint is not None:
        return self._write_iceberg_with_checkpoint(table, io_config, snapshot_properties, checkpoint)

    operations = []
    path = []
    rows = []
    size = []

    builder = self._builder.write_iceberg(table, io_config)
    write_df = DataFrame(builder)
    write_df.collect()

    write_result = write_df.to_pydict()
    assert "data_file" in write_result
    data_files = write_result["data_file"]

    if mode == "overwrite":
        deleted_files = table.scan().plan_files()
    else:
        deleted_files = []

    schema = table.schema()
    partitioning: dict[str, list[Any]] = {
        schema.find_field(field.source_id).name: [] for field in table.spec().fields
    }

    for data_file in data_files:
        operations.append("ADD")
        path.append(data_file.file_path)
        rows.append(data_file.record_count)
        size.append(data_file.file_size_in_bytes)

        for field in partitioning:
            partitioning[field].append(getattr(data_file.partition, field, None))

    for pf in deleted_files:
        data_file = pf.file
        operations.append("DELETE")
        path.append(data_file.file_path)
        rows.append(data_file.record_count)
        size.append(data_file.file_size_in_bytes)

        for field in partitioning:
            partitioning[field].append(getattr(data_file.partition, field, None))

    if parse(pyiceberg.__version__) >= parse("0.7.0"):
        from pyiceberg.table import ALWAYS_TRUE, TableProperties

        if parse(pyiceberg.__version__) >= parse("0.8.0"):
            from pyiceberg.utils.properties import property_as_bool

            property_as_bool = property_as_bool
        else:
            from pyiceberg.table import PropertyUtil

            property_as_bool = PropertyUtil.property_as_bool

        tx = table.transaction()
        snapshot_properties = snapshot_properties or {}

        if mode == "overwrite":
            tx.delete(delete_filter=ALWAYS_TRUE, snapshot_properties=snapshot_properties)

        update_snapshot = tx.update_snapshot(snapshot_properties=snapshot_properties)

        manifest_merge_enabled = mode == "append" and property_as_bool(
            tx.table_metadata.properties,
            TableProperties.MANIFEST_MERGE_ENABLED,
            TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
        )

        append_method = update_snapshot.merge_append if manifest_merge_enabled else update_snapshot.fast_append

        with append_method() as append_files:
            for data_file in data_files:
                append_files.append_data_file(data_file)

        tx.commit_transaction()
    else:
        from pyiceberg.table import _MergingSnapshotProducer
        from pyiceberg.table.snapshots import Operation

        operations_map = {
            "append": Operation.APPEND,
            "overwrite": Operation.OVERWRITE,
        }

        merge = _MergingSnapshotProducer(operation=operations_map[mode], table=table)

        for data_file in data_files:
            merge.append_data_file(data_file)

        merge.commit()

    with_operations = {
        "operation": pa.array(operations, type=pa.string()),
        "rows": pa.array(rows, type=pa.int64()),
        "file_size": pa.array(size, type=pa.int64()),
        "file_name": pa.array([fp for fp in path], type=pa.string()),
    }

    if partitioning:
        with_operations["partitioning"] = pa.StructArray.from_arrays(
            partitioning.values(), names=partitioning.keys()
        )

    from daft import from_pydict

    # NOTE: We are losing the history of the plan here.
    # This is due to the fact that the logical plan of the write_iceberg returns datafiles but we want to return the above data
    df = from_pydict(with_operations)
    df._metadata = write_df._metadata
    return df

write_lance #

write_lance(uri: str | Path, mode: Literal['create', 'append', 'overwrite', 'merge'] = 'create', io_config: IOConfig | None = None, schema: Union[Schema, Schema] | None = None, left_on: str | None = None, right_on: str | None = None, **kwargs: Any) -> DataFrame

Writes the DataFrame to a Lance table.

Parameters:

Name Type Description Default
uri str | Path

The URI of the Lance table to write to. Accepts a local path or an object-store URI like "s3://bucket/path".

required
mode Literal['create', 'append', 'overwrite', 'merge']

The write mode. One of "create", "append", "overwrite", or "merge".

'create'
io_config IOConfig

configurations to use when interacting with remote storage.

None
schema Schema | Schema

Desired schema to enforce during write. - If omitted, Daft will use the DataFrame's current schema. - If a pyarrow.Schema is provided, Daft will enforce the field order, types, and nullability by casting the data to the provided schema prior to write. Table-level (dataset) metadata present on the pyarrow schema is preserved during create/overwrite. - If the target Lance dataset already exists, the data will be cast to the existing table schema to ensure compatibility unless mode="overwrite".

None
left_on/right_on Optional[str]

Only supported in mode="merge". Specify the join key for aligning rows when merging new columns. - If omitted, defaults to "_rowaddr". - If right_on is omitted, it defaults to the value of left_on. - The DataFrame passed to write_lance(mode="merge") must contain fragment_id and the join key column specified by right_on (or _rowaddr by default).

required
**kwargs Any

Additional keyword arguments to pass to the Lance writer.

{}

Returns:

Name Type Description
DataFrame DataFrame

A DataFrame containing metadata about the written Lance table, such as number of fragments, number of deleted rows, number of small files, and version.

Raises:

Type Description
TypeError

If schema is provided but not a Daft Schema or a pyarrow.Schema

ValueError

When appending and the data schema cannot be cast to the existing table schema

Examples:

1
2
3
4
5
6
7
>>> import daft
>>> df = daft.from_pydict({"a": [1, 2, 3, 4]})
>>> df.write_lance("/tmp/lance/my_table.lance")
>>> daft.read_lance("/tmp/lance/my_table.lance").collect()
>>> # Pass additional keyword arguments to the Lance writer
>>> # All additional keyword arguments are passed to `lance.write_fragments`
>>> df.write_lance("/tmp/lance/my_table.lance", mode="overwrite", max_bytes_per_file=1024)
╭───────────────┬──────────────────┬─────────────────┬─────────╮
│ num_fragments ┆ num_deleted_rows ┆ num_small_files ┆ version │
│ ---           ┆ ---              ┆ ---             ┆ ---     │
│ Int64         ┆ Int64            ┆ Int64           ┆ Int64   │
╞═══════════════╪══════════════════╪═════════════════╪═════════╡
│ 1             ┆ 0                ┆ 1               ┆ 1       │
╰───────────────┴──────────────────┴─────────────────┴─────────╯
(Showing first 1 of 1 rows)
╭───────╮
│ a     │
│ ---   │
│ Int64 │
╞═══════╡
│ 1     │
├╌╌╌╌╌╌╌┤
│ 2     │
├╌╌╌╌╌╌╌┤
│ 3     │
├╌╌╌╌╌╌╌┤
│ 4     │
╰───────╯
(Showing first 4 of 4 rows)
╭───────────────┬──────────────────┬─────────────────┬─────────╮
│ num_fragments ┆ num_deleted_rows ┆ num_small_files ┆ version │
│ ---           ┆ ---              ┆ ---             ┆ ---     │
│ Int64         ┆ Int64            ┆ Int64           ┆ Int64   │
╞═══════════════╪══════════════════╪═════════════════╪═════════╡
│ 1             ┆ 0                ┆ 1               ┆ 2       │
╰───────────────┴──────────────────┴─────────────────┴─────────╯
(Showing first 1 of 1 rows)
Source code in daft/dataframe/dataframe.py
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
@DataframePublicAPI
def write_lance(
    self,
    uri: str | pathlib.Path,
    mode: Literal["create", "append", "overwrite", "merge"] = "create",
    io_config: IOConfig | None = None,
    schema: Union[Schema, "pyarrow.Schema"] | None = None,
    left_on: str | None = None,
    right_on: str | None = None,
    **kwargs: Any,
) -> "DataFrame":
    """Writes the DataFrame to a Lance table.

    Args:
      uri: The URI of the Lance table to write to. Accepts a local path or an
        object-store URI like "s3://bucket/path".
      mode: The write mode. One of "create", "append", "overwrite", or "merge".
      - "create" will create the dataset if it does not exist, otherwise raise an error.
      - "append" will append to the existing dataset if it exists, otherwise raise an error.
      - "overwrite" will overwrite the existing dataset if it exists, otherwise raise an error.
      - "merge" will add new columns to the existing dataset.
      io_config (IOConfig, optional): configurations to use when interacting with remote storage.
      schema (Schema | pyarrow.Schema, optional): Desired schema to enforce during write.
        - If omitted, Daft will use the DataFrame's current schema.
        - If a pyarrow.Schema is provided, Daft will enforce the field order, types, and nullability
          by casting the data to the provided schema prior to write. Table-level (dataset) metadata present
          on the pyarrow schema is preserved during create/overwrite.
        - If the target Lance dataset already exists, the data will be cast to the existing table schema
          to ensure compatibility unless ``mode="overwrite"``.
      left_on/right_on (Optional[str]): Only supported in ``mode="merge"``. Specify the join key for aligning rows when merging new columns.
          - If omitted, defaults to ``"_rowaddr"``.
          - If ``right_on`` is omitted, it defaults to the value of ``left_on``.
          - The DataFrame passed to ``write_lance(mode="merge")`` must contain ``fragment_id`` and the join key column specified by ``right_on`` (or ``_rowaddr`` by default).
      **kwargs: Additional keyword arguments to pass to the Lance writer.

    Returns:
        DataFrame: A DataFrame containing metadata about the written Lance table, such as number of fragments, number of deleted rows, number of small files, and version.

    Raises:
        TypeError: If ``schema`` is provided but not a Daft Schema or a pyarrow.Schema
        ValueError: When appending and the data schema cannot be cast to the existing table schema

    Examples:
        >>> import daft
        >>> df = daft.from_pydict({"a": [1, 2, 3, 4]})
        >>> df.write_lance("/tmp/lance/my_table.lance")  # doctest: +SKIP
        ╭───────────────┬──────────────────┬─────────────────┬─────────╮
        │ num_fragments ┆ num_deleted_rows ┆ num_small_files ┆ version │
        │ ---           ┆ ---              ┆ ---             ┆ ---     │
        │ Int64         ┆ Int64            ┆ Int64           ┆ Int64   │
        ╞═══════════════╪══════════════════╪═════════════════╪═════════╡
        │ 1             ┆ 0                ┆ 1               ┆ 1       │
        ╰───────────────┴──────────────────┴─────────────────┴─────────╯
        <BLANKLINE>
        (Showing first 1 of 1 rows)
        >>> daft.read_lance("/tmp/lance/my_table.lance").collect()  # doctest: +SKIP
        ╭───────╮
        │ a     │
        │ ---   │
        │ Int64 │
        ╞═══════╡
        │ 1     │
        ├╌╌╌╌╌╌╌┤
        │ 2     │
        ├╌╌╌╌╌╌╌┤
        │ 3     │
        ├╌╌╌╌╌╌╌┤
        │ 4     │
        ╰───────╯
        <BLANKLINE>
        (Showing first 4 of 4 rows)
        >>> # Pass additional keyword arguments to the Lance writer
        >>> # All additional keyword arguments are passed to `lance.write_fragments`
        >>> df.write_lance("/tmp/lance/my_table.lance", mode="overwrite", max_bytes_per_file=1024)  # doctest: +SKIP
        ╭───────────────┬──────────────────┬─────────────────┬─────────╮
        │ num_fragments ┆ num_deleted_rows ┆ num_small_files ┆ version │
        │ ---           ┆ ---              ┆ ---             ┆ ---     │
        │ Int64         ┆ Int64            ┆ Int64           ┆ Int64   │
        ╞═══════════════╪══════════════════╪═════════════════╪═════════╡
        │ 1             ┆ 0                ┆ 1               ┆ 2       │
        ╰───────────────┴──────────────────┴─────────────────┴─────────╯
        <BLANKLINE>
        (Showing first 1 of 1 rows)
    """
    from daft import context as _context
    from daft.io.lance.lance_data_sink import LanceDataSink
    from daft.io.object_store_options import io_config_to_storage_options

    if schema is None:
        schema = self.schema()

    uri_str = str(uri)
    if uri_str.startswith("rest://"):
        raise ValueError(
            "rest:// Lance URIs are no longer supported by DataFrame.write_lance. "
            "The previous REST-namespace integration did not match the real "
            "lance-namespace API and has been removed."
        )

    # Non-merge modes do not support schema evolution or custom join keys
    if mode != "merge":
        sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in ("left_on", "right_on")}
        sink = LanceDataSink(uri, schema, mode, io_config, **sanitized_kwargs)
        return self.write_sink(sink)

    # Merge mode semantics
    try:
        import lance
    except ImportError as e:
        raise ImportError(
            "Unable to import the `lance` package, please ensure that Daft is installed with the lance extra dependency: `pip install daft[lance]`"
        ) from e

    io_config = _context.get_context().daft_planning_config.default_io_config if io_config is None else io_config
    storage_options = io_config_to_storage_options(io_config, str(uri) if isinstance(uri, pathlib.Path) else uri)

    # Attempt to load dataset; if not exists, behave like create
    lance_ds = None
    try:
        lance_ds = lance.dataset(uri, storage_options=storage_options)
    except (ValueError, FileNotFoundError, OSError) as _e:
        lance_ds = None

    if lance_ds is None:
        sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in ("left_on", "right_on")}
        sink = LanceDataSink(uri, schema, "create", io_config, **sanitized_kwargs)
        return self.write_sink(sink)

    # Dataset exists: detect schema evolution by checking new columns in incoming DF
    existing_fields: set[str] = set()
    try:
        existing_fields = {getattr(f, "name", str(f)) for f in lance_ds.schema}
    except Exception:
        names = []
        try:
            names = list(getattr(lance_ds.schema, "names", []))
        except Exception:
            try:
                names = [getattr(f, "name", str(f)) for f in getattr(lance_ds.schema, "fields", [])]
            except Exception:
                names = []
        existing_fields = set(names)

    meta_exclusions = {"fragment_id", "_rowaddr", "_rowid"}
    new_cols = [c for c in self.column_names if c not in existing_fields and c not in meta_exclusions]

    if len(new_cols) == 0:
        # Pure append: no schema evolution. Ensure merge-specific params are not forwarded.
        sanitized_kwargs = {k: v for k, v in kwargs.items() if k not in ("left_on", "right_on")}

        sink = LanceDataSink(uri, schema, "append", io_config, **sanitized_kwargs)
        return self.write_sink(sink)

    # Schema evolution: route to per-fragment merge keyed by provided business key or default '_rowaddr'
    join_left = left_on or "_rowaddr"
    join_right = right_on or join_left
    if "fragment_id" not in self.column_names:
        raise ValueError(
            "DataFrame must contain 'fragment_id' column for per-fragment merge in mode='merge'. Read from Lance to include 'fragment_id'."
        )
    if join_right not in self.column_names:
        hint = (
            " Read from Lance with default_scan_options={'with_rowaddr': True} to include '_rowaddr'."
            if join_right == "_rowaddr"
            else ""
        )
        raise ValueError(
            f"DataFrame must contain join key column '{join_right}' for per-fragment merge in mode='merge'." + hint
        )

    from daft.io.lance.lance_merge_column import merge_columns_from_df

    merge_columns_from_df(
        df=self,
        lance_ds=lance_ds,
        uri=uri,
        left_on=join_left,
        right_on=join_right,
        storage_options=storage_options,
    )

    # Build and return stats DataFrame similar to sink.finalize
    dataset = lance.dataset(uri, storage_options=storage_options)
    stats = dataset.stats.dataset_stats()
    from daft.dependencies import pa as _pa
    from daft.recordbatch import MicroPartition

    return DataFrame._from_micropartitions(
        MicroPartition.from_pydict(
            {
                "num_fragments": _pa.array([stats["num_fragments"]], type=_pa.int64()),
                "num_deleted_rows": _pa.array([stats["num_deleted_rows"]], type=_pa.int64()),
                "num_small_files": _pa.array([stats["num_small_files"]], type=_pa.int64()),
                "version": _pa.array([dataset.version], type=_pa.int64()),
            }
        )
    )

write_sql #

write_sql(table_name: str, conn: str | Callable[[], Connection], write_mode: Literal['append', 'overwrite', 'fail'] = 'append', column_types: dict[str, Any] | None = None, non_primitive_handling: Literal['bytes', 'str', 'error'] | None = None) -> DataFrame

Write the DataFrame to a SQL database and return write metrics.

The write is executed via :meth:daft.DataFrame.write_sink using an internal :class:daft.io._sql.SQLDataSink.

Primitive columns (ints, floats, bools, strings, binary, dates, timestamps) are written by converting to a pandas DataFrame and calling :meth:pandas.DataFrame.to_sql, letting SQLAlchemy or column_types choose concrete SQL types.

Non-primitive columns (lists, structs, maps, tensors, images, embeddings, python objects, etc.) are normalized according to non_primitive_handling (default None behaves like "str"): "str" serializes values to text (JSON for arrays/maps and other containers, str(..) otherwise), "bytes" writes UTF-8 bytes of that text, and "error" fails if such columns are present.

Parameters:

Name Type Description Default
table_name str

Name of the table to write to.

required
conn str | Callable[[], Connection]

Connection string or factory.

required
write_mode str

Mode to write to the table. "append", "overwrite", or "fail". Defaults to "append".

'append'
column_types Optional[Dict[str, Any]]

Optional mapping from column names to SQLAlchemy types to use when creating the table or casting columns. Passed through to the underlying SQL engine when creating or writing the table.

None
non_primitive_handling Literal['bytes', 'str', 'error'] | None

Controls how non-primitive columns are normalized before reaching SQL; default None behaves like "str". Accepted values are "str", "bytes", and "error".

None

Returns:

Name Type Description
DataFrame DataFrame

A single-row DataFrame containing aggregate write metrics with columns total_written_rows and total_written_bytes.

Warning

This features is early in development and will likely experience API changes.

Note

Primitive columns still rely on pandas/SQLAlchemy (or column_types) for concrete SQL types, while non-primitive columns are pre-normalized in Python according to non_primitive_handling before reaching the SQL driver.

Examples:

Write to a SQL table using a database URL and explicit SQLAlchemy dtypes:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
>>> from sqlalchemy import DateTime, Integer, String
>>> import datetime
>>> import daft
>>> df = daft.from_pydict(
...     {
...         "id": [1, 2],
...         "name": ["Alice", "Bob"],
...         "created_at": [
...             datetime.datetime(2024, 1, 1, 0, 0, 0),
...             datetime.datetime(2024, 1, 2, 0, 0, 0),
...         ],
...     }
... )
>>> column_types = {
...     "id": Integer(),
...     "name": String(length=255),
...     "created_at": DateTime(timezone=True),
... }
>>> metrics_df = df.write_sql("users", "sqlite:///my_database.db", column_types=column_types)

Write to a SQL table using a SQLAlchemy connection factory and dtypes:

1
2
3
4
>>> import sqlalchemy
>>> def create_conn():
...     return sqlalchemy.create_engine("sqlite:///my_database.db").connect()
>>> metrics_df = df.write_sql("users", create_conn, column_types=column_types)

Write to a SQL table using a database URL with column_types=None to rely on inferred types:

1
2
>>> df = daft.from_pydict({"id": [1], "name": ["Alice"]})
>>> metrics_df = df.write_sql("users", "sqlite:///my_database.db", column_types=None)
Source code in daft/dataframe/dataframe.py
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
@DataframePublicAPI
def write_sql(
    self,
    table_name: str,
    conn: str | Callable[[], "Connection"],
    write_mode: Literal["append", "overwrite", "fail"] = "append",
    column_types: dict[str, Any] | None = None,
    non_primitive_handling: Literal["bytes", "str", "error"] | None = None,
) -> "DataFrame":
    """Write the DataFrame to a SQL database and return write metrics.

    The write is executed via :meth:`daft.DataFrame.write_sink` using an internal
    :class:`daft.io._sql.SQLDataSink`.

    Primitive columns (ints, floats, bools, strings, binary, dates, timestamps) are written by converting to a pandas DataFrame and calling :meth:`pandas.DataFrame.to_sql`, letting SQLAlchemy or ``column_types`` choose concrete SQL types.

    Non-primitive columns (lists, structs, maps, tensors, images, embeddings, python objects, etc.) are normalized according to ``non_primitive_handling`` (default ``None`` behaves like ``"str"``): ``"str"`` serializes values to text (JSON for arrays/maps and other containers, ``str(..)`` otherwise), ``"bytes"`` writes UTF-8 bytes of that text, and ``"error"`` fails if such columns are present.

    Args:
        table_name (str): Name of the table to write to.
        conn (str | Callable[[], "Connection"]): Connection string or factory.
        write_mode (str): Mode to write to the table. "append", "overwrite", or "fail". Defaults to "append".
        column_types (Optional[Dict[str, Any]]): Optional mapping from column names to
            SQLAlchemy types to use when creating the table or casting columns.
            Passed through to the underlying SQL engine when creating or writing
            the table.
        non_primitive_handling (Literal["bytes", "str", "error"] | None):
            Controls how non-primitive columns are normalized before reaching SQL; default ``None`` behaves like ``"str"``. Accepted values are ``"str"``, ``"bytes"``, and ``"error"``.

    Returns:
        DataFrame: A single-row DataFrame containing aggregate write metrics with
            columns ``total_written_rows`` and ``total_written_bytes``.

    Warning:
        This features is early in development and will likely experience API changes.

    Note:
        Primitive columns still rely on pandas/SQLAlchemy (or ``column_types``) for concrete SQL types, while non-primitive columns are pre-normalized in Python according to ``non_primitive_handling`` before reaching the SQL driver.

    Examples:
        Write to a SQL table using a database URL and explicit SQLAlchemy dtypes:

        >>> from sqlalchemy import DateTime, Integer, String
        >>> import datetime
        >>> import daft
        >>> df = daft.from_pydict(
        ...     {
        ...         "id": [1, 2],
        ...         "name": ["Alice", "Bob"],
        ...         "created_at": [
        ...             datetime.datetime(2024, 1, 1, 0, 0, 0),
        ...             datetime.datetime(2024, 1, 2, 0, 0, 0),
        ...         ],
        ...     }
        ... )
        >>> column_types = {
        ...     "id": Integer(),
        ...     "name": String(length=255),
        ...     "created_at": DateTime(timezone=True),
        ... }
        >>> metrics_df = df.write_sql("users", "sqlite:///my_database.db", column_types=column_types)

        Write to a SQL table using a SQLAlchemy connection factory and dtypes:

        >>> import sqlalchemy
        >>> def create_conn():
        ...     return sqlalchemy.create_engine("sqlite:///my_database.db").connect()
        >>> metrics_df = df.write_sql("users", create_conn, column_types=column_types)

        Write to a SQL table using a database URL with column_types=None to rely on inferred types:

        >>> df = daft.from_pydict({"id": [1], "name": ["Alice"]})
        >>> metrics_df = df.write_sql("users", "sqlite:///my_database.db", column_types=None)
    """
    from daft.io._sql import SQLDataSink

    sink = SQLDataSink(
        table_name=table_name,
        conn=conn,
        write_mode=write_mode,
        column_types=column_types,
        df_schema=self.schema(),
        non_primitive_handling=non_primitive_handling,
    )

    if non_primitive_handling is None:
        # Check for non-primitive types in the schema and warn if found
        non_primitive_cols = [
            field.name
            for field in self.schema()
            if field.dtype.is_python()
            or field.dtype.is_list()
            or field.dtype.is_struct()
            or field.dtype.is_map()
            or field.dtype.is_tensor()
            or field.dtype.is_image()
            or field.dtype.is_embedding()
        ]
        if non_primitive_cols:
            warnings.warn(
                f"Detected non-primitive columns: {non_primitive_cols}. Writing as text (default). Set `non_primitive_handling` to control or suppress.",
                UserWarning,
                stacklevel=2,
            )

    return self.write_sink(sink)

write_bigtable #

write_bigtable(project_id: str, instance_id: str, table_id: str, row_key_column: str, column_family_mappings: dict[str, str], client_kwargs: dict[str, Any] | None = None, write_kwargs: dict[str, Any] | None = None, serialize_incompatible_types: bool = True) -> DataFrame

Write a DataFrame into a Google Cloud Bigtable table.

Bigtable only accepts datatypes that can be converted to bytes in cells (for more details, please consult the Bigtable documentation: https://cloud.google.com/bigtable/docs/overview#data-types). By default, write_bigtable automatically serializes incompatible types to JSON. This can be disabled by setting auto_convert=False.

This data sink transforms each row of the dataframe into Bigtable rows. A row key is always required. The row_key_column parameter can be used to specify the column name to use for the row key.

Every column must also belong to a column family. The column_family_mappings parameter can be used to specify the column family to use for each column. For example, if you have a column "name" and a column "age", you can specify a "user_data" column family by passing a dictionary like {"name": "user_data", "age": "user_data"}.

EXPERIMENTAL: This features is early in development and will change.

Parameters:

Name Type Description Default
project_id str

The Google Cloud project ID.

required
instance_id str

The Bigtable instance ID.

required
table_id str

The table to write to.

required
row_key_column str

Column name for the row key.

required
column_family_mappings dict[str, str]

Mapping of column names to column families.

required
client_kwargs dict[str, Any] | None

Optional dictionary of arguments to pass to the Bigtable Client constructor.

None
write_kwargs dict[str, Any] | None

Optional dictionary of arguments to pass to the Bigtable MutationsBatcher.

None
serialize_incompatible_types bool

Whether to automatically convert non-bytes/int values to Bigtable-compatible formats. If False, will raise an error for unsupported types. Defaults to True.

True
Source code in daft/dataframe/dataframe.py
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
def write_bigtable(
    self,
    project_id: str,
    instance_id: str,
    table_id: str,
    row_key_column: str,
    column_family_mappings: dict[str, str],
    client_kwargs: dict[str, Any] | None = None,
    write_kwargs: dict[str, Any] | None = None,
    serialize_incompatible_types: bool = True,
) -> "DataFrame":
    """Write a DataFrame into a Google Cloud Bigtable table.

    Bigtable only accepts datatypes that can be converted to bytes in cells (for more details, please consult the Bigtable documentation: https://cloud.google.com/bigtable/docs/overview#data-types).
    By default, `write_bigtable` automatically serializes incompatible types to JSON. This can be disabled by setting `auto_convert=False`.

    This data sink transforms each row of the dataframe into Bigtable rows.
    A row key is always required. The `row_key_column` parameter can be used to specify the column name to use for the row key.

    Every column must also belong to a column family. The `column_family_mappings` parameter can be used to specify the column family to use for each column.
    For example, if you have a column "name" and a column "age", you can specify a "user_data" column family by passing a dictionary like {"name": "user_data", "age": "user_data"}.

    EXPERIMENTAL: This features is early in development and will change.

    Args:
        project_id: The Google Cloud project ID.
        instance_id: The Bigtable instance ID.
        table_id: The table to write to.
        row_key_column: Column name for the row key.
        column_family_mappings: Mapping of column names to column families.
        client_kwargs: Optional dictionary of arguments to pass to the Bigtable Client constructor.
        write_kwargs: Optional dictionary of arguments to pass to the Bigtable MutationsBatcher.
        serialize_incompatible_types: Whether to automatically convert non-bytes/int values to Bigtable-compatible formats.
                                      If False, will raise an error for unsupported types. Defaults to True.
    """
    from daft.io.bigtable.bigtable_data_sink import BigtableDataSink

    sink = BigtableDataSink(
        project_id, instance_id, table_id, row_key_column, column_family_mappings, client_kwargs, write_kwargs
    )

    # Preprocess the DataFrame using the sink's validation and preprocessing logic
    df_to_write = sink._preprocess_dataframe(self, serialize_incompatible_types)

    return df_to_write.write_sink(sink)

write_clickhouse #

write_clickhouse(table: str, *, host: str, port: int | None = None, user: str | None = None, password: str | None = None, database: str | None = None, client_kwargs: dict[str, Any] | None = None, write_kwargs: dict[str, Any] | None = None) -> DataFrame

Writes the DataFrame to a ClickHouse table.

Parameters:

Name Type Description Default
table str

Name of the ClickHouse table to write to.

required
host str

ClickHouse host.

required
port int | None

ClickHouse port.

None
user str | None

ClickHouse user.

None
password str | None

ClickHouse password.

None
database str | None

ClickHouse database.

None
client_kwargs dict[str, Any] | None

Optional dictionary of arguments to pass to the ClickHouse client constructor.

None
write_kwargs dict[str, Any] | None

Optional dictionary of arguments to pass to the ClickHouse write() method.

None

Examples:

1
2
3
>>> import daft
>>> df = daft.from_pydict({"a": [1, 2, 3, 4]})
>>> df.write_clickhouse(table="", host="", port=8123, user="", password="")
╭────────────────────┬─────────────────────╮
│ total_written_rows ┆ total_written_bytes │
│ ---                ┆ ---                 │
│ Int64              ┆ Int64               │
╞════════════════════╪═════════════════════╡
│ 4                  ┆ 32                  │
╰────────────────────┴─────────────────────╯
Source code in daft/dataframe/dataframe.py
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
@DataframePublicAPI
def write_clickhouse(
    self,
    table: str,
    *,
    host: str,
    port: int | None = None,
    user: str | None = None,
    password: str | None = None,
    database: str | None = None,
    client_kwargs: dict[str, Any] | None = None,
    write_kwargs: dict[str, Any] | None = None,
) -> "DataFrame":
    """Writes the DataFrame to a ClickHouse table.

    Args:
        table: Name of the ClickHouse table to write to.
        host: ClickHouse host.
        port: ClickHouse port.
        user: ClickHouse user.
        password: ClickHouse password.
        database: ClickHouse database.
        client_kwargs: Optional dictionary of arguments to pass to the ClickHouse client constructor.
        write_kwargs: Optional dictionary of arguments to pass to the ClickHouse write() method.

    Examples:
        >>> import daft
        >>> df = daft.from_pydict({"a": [1, 2, 3, 4]})  # doctest: +SKIP
        >>> df.write_clickhouse(table="", host="", port=8123, user="", password="")  # doctest: +SKIP
        ╭────────────────────┬─────────────────────╮
        │ total_written_rows ┆ total_written_bytes │
        │ ---                ┆ ---                 │
        │ Int64              ┆ Int64               │
        ╞════════════════════╪═════════════════════╡
        │ 4                  ┆ 32                  │
        ╰────────────────────┴─────────────────────╯
    """
    from daft.io.clickhouse.clickhouse_data_sink import ClickHouseDataSink

    sink = ClickHouseDataSink(
        table,
        host=host,
        port=port,
        user=user,
        password=password,
        database=database,
        client_kwargs=client_kwargs,
        write_kwargs=write_kwargs,
    )
    return self.write_sink(sink)

write_huggingface #

write_huggingface(repo: str, split: str = 'train', data_dir: str = 'data', revision: str = 'main', overwrite: bool = False, commit_message: str = 'Upload dataset using Daft', commit_description: str | None = None, io_config: IOConfig | None = None) -> DataFrame

Write a DataFrame into a Hugging Face dataset.

Parameters:

Name Type Description Default
repo str

The ID of the repository to push to in the following format: <user>/<dataset_name> or <org>/<dataset_name>.

required
split str

The name of the split that will be given to that dataset.

'train'
data_dir str

Directory of the uploaded data files.

'data'
revision str

Branch to push the uploaded files to.

'main'
overwrite bool

Whether to overwrite or append.

False
commit_message str

Message to commit while pushing.

'Upload dataset using Daft'
commit_description str | None

Description of the commit that will be created.

None
io_config IOConfig | None

Configurations to use when interacting with remote storage.

None
Source code in daft/dataframe/dataframe.py
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
def write_huggingface(
    self,
    repo: str,
    split: str = "train",
    data_dir: str = "data",
    revision: str = "main",
    overwrite: bool = False,
    commit_message: str = "Upload dataset using Daft",
    commit_description: str | None = None,
    io_config: IOConfig | None = None,
) -> "DataFrame":
    """Write a DataFrame into a Hugging Face dataset.

    Args:
        repo: The ID of the repository to push to in the following format: `<user>/<dataset_name>` or `<org>/<dataset_name>`.
        split: The name of the split that will be given to that dataset.
        data_dir: Directory of the uploaded data files.
        revision: Branch to push the uploaded files to.
        overwrite: Whether to overwrite or append.
        commit_message: Message to commit while pushing.
        commit_description: Description of the commit that will be created.
        io_config: Configurations to use when interacting with remote storage.
    """
    from daft.io.huggingface.sink import HuggingFaceSink

    io_config = get_context().daft_planning_config.default_io_config if io_config is None else io_config

    sink = HuggingFaceSink(
        repo, split, data_dir, revision, overwrite, commit_message, commit_description, io_config.hf
    )
    return self.write_sink(sink)

write_turbopuffer #

write_turbopuffer(namespace: str | Expression, api_key: str | None = None, region: str | None = None, distance_metric: Literal['cosine_distance', 'euclidean_squared'] | None = None, schema: dict[str, Any] | None = None, id_column: str | None = None, vector_column: str | None = None, client_kwargs: dict[str, Any] | None = None, write_kwargs: dict[str, Any] | None = None) -> DataFrame

Writes the DataFrame to a Turbopuffer namespace.

This method transforms each row of the dataframe into a turbopuffer document. This means that an id column is always required. Optionally, the id_column parameter can be used to specify the column name to used for the id column. Note that the column with the name specified by id_column will be renamed to "id" when written to turbopuffer.

A vector column is required if the namespace has a vector index. Optionally, the vector_column parameter can be used to specify the column name to used for the vector index. Note that the column with the name specified by vector_column will be renamed to "vector" when written to turbopuffer.

All other columns become attributes.

The namespace parameter can be either a string (for a single namespace) or an expression (for multiple namespaces). When using an expression, the data will be partitioned by the computed namespace values and written to each namespace separately.

For more details on parameters, please see the turbopuffer documentation: https://turbopuffer.com/docs/write

Parameters:

Name Type Description Default
namespace str | Expression

The namespace to write to. Can be a string for a single namespace or an expression for multiple namespaces.

required
api_key str | None

Turbopuffer API key.

None
region str | None

Turbopuffer region.

None
distance_metric Literal['cosine_distance', 'euclidean_squared'] | None

Distance metric for vector similarity ("cosine_distance", "euclidean_squared").

None
schema dict[str, Any] | None

Optional manual schema specification.

None
id_column str | None

Optional column name for the id column. The data sink will automatically rename the column to "id" for the id column.

None
vector_column str | None

Optional column name for the vector index column. The data sink will automatically rename the column to "vector" for the vector index.

None
client_kwargs dict[str, Any] | None

Optional dictionary of arguments to pass to the Turbopuffer client constructor. Explicit arguments (api_key, region) will be merged into client_kwargs.

None
write_kwargs dict[str, Any] | None

Optional dictionary of arguments to pass to the namespace.write() method. Explicit arguments (distance_metric, schema) will be merged into write_kwargs.

None
Source code in daft/dataframe/dataframe.py
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
@DataframePublicAPI
def write_turbopuffer(
    self,
    namespace: str | Expression,
    api_key: str | None = None,
    region: str | None = None,
    distance_metric: Literal["cosine_distance", "euclidean_squared"] | None = None,
    schema: dict[str, Any] | None = None,
    id_column: str | None = None,
    vector_column: str | None = None,
    client_kwargs: dict[str, Any] | None = None,
    write_kwargs: dict[str, Any] | None = None,
) -> "DataFrame":
    """Writes the DataFrame to a Turbopuffer namespace.

    This method transforms each row of the dataframe into a turbopuffer document.
    This means that an `id` column is always required. Optionally, the `id_column` parameter can be used to specify the column name to used for the id column.
    Note that the column with the name specified by `id_column` will be renamed to "id" when written to turbopuffer.

    A `vector` column is required if the namespace has a vector index. Optionally, the `vector_column` parameter can be used to specify the column name to used for the vector index.
    Note that the column with the name specified by `vector_column` will be renamed to "vector" when written to turbopuffer.

    All other columns become attributes.

    The namespace parameter can be either a string (for a single namespace) or an expression (for multiple namespaces).
    When using an expression, the data will be partitioned by the computed namespace values and written to each namespace separately.

    For more details on parameters, please see the turbopuffer documentation: https://turbopuffer.com/docs/write

    Args:
        namespace: The namespace to write to. Can be a string for a single namespace or an expression for multiple namespaces.
        api_key: Turbopuffer API key.
        region: Turbopuffer region.
        distance_metric: Distance metric for vector similarity ("cosine_distance", "euclidean_squared").
        schema: Optional manual schema specification.
        id_column: Optional column name for the id column. The data sink will automatically rename the column to "id" for the id column.
        vector_column: Optional column name for the vector index column. The data sink will automatically rename the column to "vector" for the vector index.
        client_kwargs: Optional dictionary of arguments to pass to the Turbopuffer client constructor.
            Explicit arguments (api_key, region) will be merged into client_kwargs.
        write_kwargs: Optional dictionary of arguments to pass to the namespace.write() method.
            Explicit arguments (distance_metric, schema) will be merged into write_kwargs.
    """
    from daft.io.turbopuffer.turbopuffer_data_sink import TurbopufferDataSink

    sink = TurbopufferDataSink(
        namespace, api_key, region, distance_metric, schema, id_column, vector_column, client_kwargs, write_kwargs
    )
    return self.write_sink(sink)

write_sink #

write_sink(sink: DataSink[WriteResultType]) -> DataFrame

Writes the DataFrame to the given DataSink.

Parameters:

Name Type Description Default
sink DataSink[WriteResultType]

The DataSink to write to.

required

Returns:

Name Type Description
DataFrame DataFrame

A dataframe from the micropartition returned by the DataSink's .finalize() method.

Note

This call is blocking and will execute the DataFrame when called

Source code in daft/dataframe/dataframe.py
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
@DataframePublicAPI
def write_sink(self, sink: "DataSink[WriteResultType]") -> "DataFrame":
    """Writes the DataFrame to the given DataSink.

    Args:
        sink: The DataSink to write to.

    Returns:
        DataFrame: A dataframe from the micropartition returned by the DataSink's `.finalize()` method.

    Note:
        This call is **blocking** and will execute the DataFrame when called
    """
    sink.start()

    builder = self._builder.write_datasink(sink.name(), sink)
    write_df = DataFrame(builder)
    write_df.collect()

    results = write_df.to_pydict()
    assert "write_results" in results
    micropartition = sink.finalize(results["write_results"])
    if micropartition.schema() != sink.schema():
        raise ValueError(
            f"Schema mismatch between the data sink's schema and the result's schema:\nSink schema:\n{sink.schema()}\nResult schema:\n{micropartition.schema()}"
        )
    # TODO(desmond): Connect the old and new logical plan builders so that a .explain() shows the
    # plan from the source all the way to the sink to the sink's results. In theory we can do this
    # for all other sinks too.
    df = DataFrame._from_micropartitions(micropartition)
    df._metadata = write_df._metadata
    return df

User-Defined#

Daft supports diverse input sources and output sinks, this section covers lower-level APIs which we are evolving for more advanced usage.

Warning

These APIs are considered experimental.

DataSource #

DataSource is a low-level interface for reading data into DataFrames.

When a DataSource is read, it is split into multiple tasks which can be distributed for parallel processing. Each task is responsible for reading a specific portion of the data (e.g., a file partition, a range of rows, or a subset of a database table) and converting it into RecordBatches. Implementations should ensure that tasks are appropriately sized to balance parallelism.

Warning

This API is early in its development and is subject to change.

Methods:

Name Description
get_partition_fields

Returns the partitioning fields for this data source.

get_tasks

Yields tasks as they are discovered. Called during execution, not planning.

read

Reads a DataSource as a DataFrame.

Attributes:

Name Type Description
name str

Returns the source name which is useful for debugging.

schema Schema

Returns the schema shared by each task's record batches.

name #

name: str

Returns the source name which is useful for debugging.

schema #

schema: Schema

Returns the schema shared by each task's record batches.

get_partition_fields #

get_partition_fields() -> list[PartitionField]

Returns the partitioning fields for this data source.

Source code in daft/io/source.py
51
52
53
def get_partition_fields(self) -> list[PartitionField]:
    """Returns the partitioning fields for this data source."""
    return []

get_tasks #

get_tasks(pushdowns: Pushdowns) -> AsyncIterator[DataSourceTask]

Yields tasks as they are discovered. Called during execution, not planning.

Source code in daft/io/source.py
55
56
57
58
59
60
61
@abstractmethod
async def get_tasks(self, pushdowns: Pushdowns) -> AsyncIterator[DataSourceTask]:
    """Yields tasks as they are discovered. Called during execution, not planning."""
    ...
    # https://mypy.readthedocs.io/en/latest/more_types.html#typing-async-generators
    # This tricks the type checker into treating this as an async generator.
    yield  # type: ignore[misc]

read #

read() -> DataFrame

Reads a DataSource as a DataFrame.

Source code in daft/io/source.py
63
64
65
66
67
68
69
70
71
def read(self) -> DataFrame:
    """Reads a DataSource as a DataFrame."""
    from daft.daft import ScanOperatorHandle
    from daft.dataframe import DataFrame
    from daft.logical.builder import LogicalPlanBuilder

    handle = ScanOperatorHandle.from_data_source(self)
    builder = LogicalPlanBuilder.from_tabular_scan(scan_operator=handle)
    return DataFrame(builder)

DataSourceTask #

DataSourceTask represents a partition of data that can be processed independently.

Warning

This API is early in its development and is subject to change.

Methods:

Name Description
get_micro_partitions

Deprecated: override read instead.

parquet

Create a task that reads a Parquet file using the native reader.

read

Yields record batches. Called from an async execution context.

Attributes:

Name Type Description
schema Schema

Returns the schema of the record batches produced by this task.

schema #

schema: Schema

Returns the schema of the record batches produced by this task.

get_micro_partitions #

get_micro_partitions() -> Iterator[MicroPartition]

Deprecated: override read instead.

Source code in daft/io/source.py
109
110
111
def get_micro_partitions(self) -> Iterator[MicroPartition]:
    """Deprecated: override read instead."""
    raise NotImplementedError

parquet #

parquet(path: str, schema: Schema, *, pushdowns: Pushdowns | None = None, num_rows: int | None = None, size_bytes: int | None = None, partition_values: RecordBatch | None = None, stats: RecordBatch | None = None, storage_config: StorageConfig | None = None) -> DataSourceTask

Create a task that reads a Parquet file using the native reader.

This is the recommended way to create scan tasks for Parquet files when building custom DataSource implementations (e.g., catalog connectors like Iceberg or Paimon).

Partition pruning is the DataSource's responsibility — decide which files to yield in get_tasks rather than relying on the task factory to filter them out.

Parameters:

Name Type Description Default
path str

Path or URI of the Parquet file (e.g., "s3://bucket/file.parquet").

required
schema Schema

Schema to read the file with.

required
pushdowns Pushdowns | None

Query pushdowns (filters, column projection, limit). Pass through the pushdowns received by DataSource.get_tasks.

None
num_rows int | None

Exact row count, if known. Enables metadata-only optimizations.

None
size_bytes int | None

On-disk file size in bytes. Used for task coalescing heuristics.

None
partition_values RecordBatch | None

Single-row RecordBatch of partition column values to inject.

None
stats RecordBatch | None

Column statistics as a RecordBatch for predicate pushdown evaluation.

None
storage_config StorageConfig | None

Optional StorageConfig for IO credentials/settings. Defaults to StorageConfig(multithreaded_io=True).

None
Example

class MyCatalogSource(DataSource): async def get_tasks(self, pushdowns): for file in self.list_files(): yield DataSourceTask.parquet( path=file.uri, schema=self.schema, pushdowns=pushdowns, num_rows=file.row_count, size_bytes=file.size_bytes, )

Returns:

Type Description
DataSourceTask

A DataSourceTask executed by the native Parquet reader.

Source code in daft/io/source.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@staticmethod
def parquet(
    path: str,
    schema: Schema,
    *,
    pushdowns: Pushdowns | None = None,
    num_rows: int | None = None,
    size_bytes: int | None = None,
    partition_values: RecordBatch | None = None,
    stats: RecordBatch | None = None,
    storage_config: StorageConfig | None = None,
) -> DataSourceTask:
    """Create a task that reads a Parquet file using the native reader.

    This is the recommended way to create scan tasks for Parquet files when
    building custom DataSource implementations (e.g., catalog
    connectors like Iceberg or Paimon).

    Partition pruning is the DataSource's responsibility — decide
    which files to yield in get_tasks rather than
    relying on the task factory to filter them out.

    Args:
        path: Path or URI of the Parquet file (e.g., ``"s3://bucket/file.parquet"``).
        schema: Schema to read the file with.
        pushdowns: Query pushdowns (filters, column projection, limit). Pass
            through the pushdowns received by DataSource.get_tasks.
        num_rows: Exact row count, if known. Enables metadata-only optimizations.
        size_bytes: On-disk file size in bytes. Used for task coalescing heuristics.
        partition_values: Single-row RecordBatch of partition column values to inject.
        stats: Column statistics as a RecordBatch for predicate pushdown evaluation.
        storage_config: Optional StorageConfig for IO credentials/settings.
            Defaults to ``StorageConfig(multithreaded_io=True)``.

    Example:
        class MyCatalogSource(DataSource):
            async def get_tasks(self, pushdowns):
                for file in self.list_files():
                    yield DataSourceTask.parquet(
                        path=file.uri,
                        schema=self.schema,
                        pushdowns=pushdowns,
                        num_rows=file.row_count,
                        size_bytes=file.size_bytes,
                    )

    Returns:
        A DataSourceTask executed by the native Parquet reader.
    """
    inner = PyDataSourceTask.parquet(
        path=path,
        schema=schema._schema,
        pushdowns=pushdowns._to_pypushdowns() if pushdowns is not None else None,
        num_rows=num_rows,
        size_bytes=size_bytes,
        partition_values=partition_values._recordbatch if partition_values is not None else None,
        stats=stats._recordbatch if stats is not None else None,
        storage_config=storage_config,
    )

    return _RustDataSourceTask(inner)

read #

read() -> AsyncIterator[RecordBatch]

Yields record batches. Called from an async execution context.

The default implementation delegates to the deprecated get_micro_partitions for backwards compatibility. New subclasses should override this method directly.

Source code in daft/io/source.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
async def read(self) -> AsyncIterator[RecordBatch]:
    """Yields record batches. Called from an async execution context.

    The default implementation delegates to the deprecated
    get_micro_partitions for backwards compatibility.
    New subclasses should override this method directly.
    """
    try:
        parts = self.get_micro_partitions()
    except NotImplementedError:
        raise NotImplementedError(
            f"{type(self).__name__} must implement async def read(self) -> AsyncIterator[RecordBatch]"
        )
    warnings.warn(
        f"{type(self).__name__}.get_micro_partitions() is deprecated — override 'async def read()' instead.",
        DeprecationWarning,
        stacklevel=2,
    )
    for mp in parts:
        for rb in mp.get_record_batches():
            yield rb

DataSink #

Interface for writing data to a sink that is not built-in.

When a DataFrame is written using the .write_sink() method, the following sequence occurs:

  1. The sink's .start() method is called once at the beginning of the write process.
  2. The DataFrame is executed, and its output is split into micropartitions.
  3. The sink's .write() method is invoked on each micropartition, potentially in parallel and distributed across multiple tasks or workers.
  4. After all writes complete, the resulting WriteOutput objects are gathered on a single node.
  5. The .finalize() method is then called with all write outputs to produce a final MicroPartition.
Warning

This API is early in its development and is subject to change.

Methods:

Name Description
finalize

Finalizes the write process and returns a resulting micropartition.

name

Optional custom sink name.

safe_write

This method wraps the abstract write() method with a try block to reraise potentially unserializable exceptions.

schema

The expected schema for the micropartition returned by the .finalize() method of this DataSink.

start

Optional callback for when a write operation begins.

write

Writes a stream of micropartitions to the sink.

finalize #

finalize(write_results: list[WriteResult[WriteResultType]]) -> MicroPartition

Finalizes the write process and returns a resulting micropartition.

For example, this can be used to merge, summarize, or commit the results of individual writes into a single output micropartition.

Parameters:

Name Type Description Default
write_results list[WriteResult[WriteResultType]]

The list of results from the calls to .write().

required

Returns:

Name Type Description
MicroPartition MicroPartition

A final, single micropartition representing the result of all writes.

Source code in daft/io/sink.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
@abstractmethod
def finalize(self, write_results: list[WriteResult[WriteResultType]]) -> MicroPartition:
    """Finalizes the write process and returns a resulting micropartition.

    For example, this can be used to merge, summarize, or commit the results of individual writes
    into a single output micropartition.

    Args:
        write_results (list[WriteResult[WriteResultType]]): The list of results from the calls to `.write()`.

    Returns:
        MicroPartition: A final, single micropartition representing the result of all writes.
    """
    raise NotImplementedError

name #

name() -> str

Optional custom sink name.

Source code in daft/io/sink.py
47
48
49
def name(self) -> str:
    """Optional custom sink name."""
    return "User-defined Data Sink"

safe_write #

safe_write(micropartitions: Iterator[MicroPartition]) -> Iterator[WriteResult[WriteResultType]]

This method wraps the abstract write() method with a try block to reraise potentially unserializable exceptions.

Parameters:

Name Type Description Default
micropartitions Iterator[MicroPartition]

An iterator of micropartitions to be written.

required

Returns:

Type Description
Iterator[WriteResult[WriteResultType]]

Iterator[WriteResult[WriteResultType]]: An iterator of write results wrapped in a WriteOutput.

Raises:

Type Description
Exception

Any exception that occurs during the write operation.

Source code in daft/io/sink.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def safe_write(self, micropartitions: Iterator[MicroPartition]) -> Iterator[WriteResult[WriteResultType]]:
    """This method wraps the abstract `write()` method with a try block to reraise potentially unserializable exceptions.

    Args:
        micropartitions (Iterator[MicroPartition]): An iterator of micropartitions to be written.

    Returns:
        Iterator[WriteResult[WriteResultType]]: An iterator of write results wrapped in a WriteOutput.

    Raises:
        Exception: Any exception that occurs during the write operation.
    """
    try:
        yield from self.write(micropartitions)
    except Exception as e:
        raise RuntimeError(f"Exception occurred while writing to {self.name()}: {type(e).__name__}: {e!s}") from e

schema #

schema() -> Schema

The expected schema for the micropartition returned by the .finalize() method of this DataSink.

If this given schema does not match the actual schema of the micropartition at runtime, we throw an error.

Source code in daft/io/sink.py
51
52
53
54
55
56
57
@abstractmethod
def schema(self) -> Schema:
    """The expected schema for the micropartition returned by the `.finalize()` method of this DataSink.

    If this given schema does not match the actual schema of the micropartition at runtime, we throw an error.
    """
    raise NotImplementedError

start #

start() -> None

Optional callback for when a write operation begins.

For example, this can be used to initialize resources, open connections, start a transaction etc. The default implementation does nothing.

Source code in daft/io/sink.py
59
60
61
62
63
64
65
def start(self) -> None:
    """Optional callback for when a write operation begins.

    For example, this can be used to initialize resources, open connections, start a transaction etc.
    The default implementation does nothing.
    """
    pass

write #

write(micropartitions: Iterator[MicroPartition]) -> Iterator[WriteResult[WriteResultType]]

Writes a stream of micropartitions to the sink.

This method should handle the ingestion of each micropartition and yield a result (e.g. metadata) for each successful write.

Parameters:

Name Type Description Default
micropartitions Iterator[MicroPartition]

An iterator of micropartitions to be written.

required

Returns:

Type Description
Iterator[WriteResult[WriteResultType]]

Iterator[WriteResult[WriteResultType]]: An iterator of write results wrapped in a WriteOutput.

Source code in daft/io/sink.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@abstractmethod
def write(self, micropartitions: Iterator[MicroPartition]) -> Iterator[WriteResult[WriteResultType]]:
    """Writes a stream of micropartitions to the sink.

    This method should handle the ingestion of each micropartition and yield a result
    (e.g. metadata) for each successful write.

    Args:
        micropartitions (Iterator[MicroPartition]): An iterator of micropartitions to be written.

    Returns:
        Iterator[WriteResult[WriteResultType]]: An iterator of write results wrapped in a WriteOutput.
    """
    raise NotImplementedError

WriteResult #

WriteResult(result: WriteResultType, bytes_written: int, rows_written: int)

Wrapper for result of the DataSink's .write() method.

Attributes:

Name Type Description
result WriteResultType

The result from the write operation

bytes_written int

Size of the written data in bytes

rows_written int

Number of rows written

bytes_written #

bytes_written: int

result #

result: WriteResultType

rows_written #

rows_written: int

Pushdowns#

Daft supports predicate, projection, and limit pushdowns.

Pushdowns #

Pushdowns(filters: Expression | None = None, partition_filters: Expression | None = None, columns: list[str] | None = None, limit: int | None = None, aggregation: Expression | None = None)

Pushdowns are sent to scan sources during query planning.

Attributes:

Name Type Description
filters Expression | None

Optional filter predicate to apply to rows.

partition_filters Expression | None

Optional partition filter predicate to apply to partitions or files.

columns list[str] | None

Optional list of column names to project.

limit int | None

Optional limit on the number of rows to return.

aggregation Expression | None

Optional aggregation expression for count pushdown.

Methods:

Name Description
empty
filter_required_column_names

Returns a set of field names that are required by the filter predicate.

aggregation #

aggregation: Expression | None = None

columns #

columns: list[str] | None = None

filters #

filters: Expression | None = None

limit #

limit: int | None = None

partition_filters #

partition_filters: Expression | None = None

empty #

empty() -> Pushdowns
Source code in daft/io/pushdowns.py
59
60
61
@classmethod
def empty(cls) -> Pushdowns:
    return cls()

filter_required_column_names #

filter_required_column_names() -> set[str]

Returns a set of field names that are required by the filter predicate.

Source code in daft/io/pushdowns.py
55
56
57
def filter_required_column_names(self) -> set[str]:
    """Returns a set of field names that are required by the filter predicate."""
    return set() if self.filters is None else _ColumnVisitor().visit(self.filters)

ScanOperator #

ScanOperator is the legacy python DataSource ABC and is being migrated to daft.io.source.DataSource.

In Daft 0.5.0 we will change the pushdown parameter from daft.daft.Pushdowns to daft.io.Pushdowns. For now, please use Pushdowns._from_pypushdowns(py_pushdowns) to convert the rust expressions to this python pushdowns class.

Methods:

Name Description
as_pushdown_filter

Returns this scan operator as a SupportsPushdownFilters if it supports pushdown filters.

can_absorb_filter

Returns true if this scan can accept predicate pushdowns.

can_absorb_limit

Returns true if this scan can accept limit pushdowns.

can_absorb_select

Returns true if this scan can accept projection pushdowns.

display_name

Returns a human-readable name for this scan operator.

multiline_display

Returns a multi-line string representation of this scan operator.

partitioning_keys

Returns the partitioning keys for this data source.

schema

Returns the schema of the data source.

supports_count_pushdown

Returns true if this scan can accept count pushdowns.

to_scan_tasks

Converts this scan operator into scan tasks with the given pushdowns.

as_pushdown_filter #

as_pushdown_filter() -> SupportsPushdownFilters | None

Returns this scan operator as a SupportsPushdownFilters if it supports pushdown filters.

Source code in daft/io/scan.py
79
80
81
def as_pushdown_filter(self) -> SupportsPushdownFilters | None:
    """Returns this scan operator as a SupportsPushdownFilters if it supports pushdown filters."""
    raise NotImplementedError()

can_absorb_filter #

can_absorb_filter() -> bool

Returns true if this scan can accept predicate pushdowns.

Source code in daft/io/scan.py
54
55
56
57
@abc.abstractmethod
def can_absorb_filter(self) -> bool:
    """Returns true if this scan can accept predicate pushdowns."""
    raise NotImplementedError()

can_absorb_limit #

can_absorb_limit() -> bool

Returns true if this scan can accept limit pushdowns.

Source code in daft/io/scan.py
59
60
61
62
@abc.abstractmethod
def can_absorb_limit(self) -> bool:
    """Returns true if this scan can accept limit pushdowns."""
    raise NotImplementedError()

can_absorb_select #

can_absorb_select() -> bool

Returns true if this scan can accept projection pushdowns.

Source code in daft/io/scan.py
64
65
66
67
@abc.abstractmethod
def can_absorb_select(self) -> bool:
    """Returns true if this scan can accept projection pushdowns."""
    raise NotImplementedError()

display_name #

display_name() -> str

Returns a human-readable name for this scan operator.

Source code in daft/io/scan.py
44
45
46
47
@abc.abstractmethod
def display_name(self) -> str:
    """Returns a human-readable name for this scan operator."""
    return self.__class__.__name__

multiline_display #

multiline_display() -> list[str]

Returns a multi-line string representation of this scan operator.

Source code in daft/io/scan.py
69
70
71
72
@abc.abstractmethod
def multiline_display(self) -> list[str]:
    """Returns a multi-line string representation of this scan operator."""
    raise NotImplementedError()

partitioning_keys #

partitioning_keys() -> list[PyPartitionField]

Returns the partitioning keys for this data source.

Source code in daft/io/scan.py
49
50
51
52
@abc.abstractmethod
def partitioning_keys(self) -> list[PyPartitionField]:
    """Returns the partitioning keys for this data source."""
    raise NotImplementedError()

schema #

schema() -> Schema

Returns the schema of the data source.

Source code in daft/io/scan.py
39
40
41
42
@abc.abstractmethod
def schema(self) -> Schema:
    """Returns the schema of the data source."""
    raise NotImplementedError()

supports_count_pushdown #

supports_count_pushdown() -> bool

Returns true if this scan can accept count pushdowns.

Source code in daft/io/scan.py
83
84
85
def supports_count_pushdown(self) -> bool:
    """Returns true if this scan can accept count pushdowns."""
    return False

to_scan_tasks #

to_scan_tasks(pushdowns: PyPushdowns) -> Iterator[ScanTask]

Converts this scan operator into scan tasks with the given pushdowns.

Source code in daft/io/scan.py
74
75
76
77
@abc.abstractmethod
def to_scan_tasks(self, pushdowns: PyPushdowns) -> Iterator[ScanTask]:
    """Converts this scan operator into scan tasks with the given pushdowns."""
    raise NotImplementedError()