Skip to content

Configuration#

Configure the execution backend, Daft in various ways during execution, and how Daft interacts with storage.

Setting the Runner#

Control the execution backend that Daft will run on by calling these functions once at the start of your application.

set_runner_native #

set_runner_native(num_threads: int | None = None) -> Runner[PartitionT]

Configure Daft to execute dataframes using native multi-threaded processing.

This is the default execution mode for Daft.

Returns:

Type Description
Runner[PartitionT]

Runner[PartitionT]: A runner object with the native runner's configuration.

Note

Can also be configured via environment variable: DAFT_RUNNER=native

Source code in daft/runners/__init__.py
51
52
53
54
55
56
57
58
59
60
61
62
def set_runner_native(num_threads: int | None = None) -> Runner[PartitionT]:
    """Configure Daft to execute dataframes using native multi-threaded processing.

    This is the default execution mode for Daft.

    Returns:
        Runner[PartitionT]: A runner object with the native runner's configuration.

    Note:
        Can also be configured via environment variable: DAFT_RUNNER=native
    """
    return _set_runner_native(num_threads)

set_runner_ray #

set_runner_ray(address: str | None = None, noop_if_initialized: bool = False, force_client_mode: bool = False) -> Runner[PartitionT]

Configure Daft to execute dataframes using the Ray distributed computing framework.

Parameters:

Name Type Description Default
address str | None

Ray cluster address to connect to. If None, connects to or starts a local Ray instance.

None
noop_if_initialized bool

If True, skip initialization if Ray is already running.

False
force_client_mode bool

If True, forces Ray to run in client mode.

False

Returns:

Type Description
Runner[PartitionT]

Runner[PartitionT]: A runner object with the Ray runner's configurations.

Note

Can also be configured via environment variable: DAFT_RUNNER=ray

Source code in daft/runners/__init__.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def set_runner_ray(
    address: str | None = None,
    noop_if_initialized: bool = False,
    force_client_mode: bool = False,
) -> Runner[PartitionT]:
    """Configure Daft to execute dataframes using the Ray distributed computing framework.

    Args:
        address: Ray cluster address to connect to. If None, connects to or starts a local Ray instance.
        noop_if_initialized: If True, skip initialization if Ray is already running.
        force_client_mode: If True, forces Ray to run in client mode.

    Returns:
        Runner[PartitionT]: A runner object with the Ray runner's configurations.

    Note:
        Can also be configured via environment variable: DAFT_RUNNER=ray
    """
    return _set_runner_ray(
        address=address,
        noop_if_initialized=noop_if_initialized,
        force_client_mode=force_client_mode,
    )

get_or_create_runner #

get_or_create_runner() -> Runner[PartitionT]

Get or create the current runner instance.

If a runner has already been set, returns it. Otherwise, creates a new runner using the default configuration (native) and locks it in.

Returns:

Type Description
Runner[PartitionT]

Runner[PartitionT]: The current runner instance.

Note

After calling this function, the runner cannot be changed for the lifetime of the process. Use get_or_infer_runner_type to check the runner type without this side effect.

Source code in daft/runners/__init__.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def get_or_create_runner() -> Runner[PartitionT]:
    """Get or create the current runner instance.

    If a runner has already been set, returns it. Otherwise, creates a new
    runner using the default configuration (native) and locks it in.

    Returns:
        Runner[PartitionT]: The current runner instance.

    Note:
        After calling this function, the runner cannot be changed for the
        lifetime of the process. Use ``get_or_infer_runner_type`` to check the
        runner type without this side effect.
    """
    return _get_or_create_runner()

Checking the Runner#

Check the execution backend that Daft is currently using.

get_or_infer_runner_type #

get_or_infer_runner_type() -> str

Get or infer the runner type.

This API will get or infer the currently used runner type according to the following strategies: 1. If the runner has been set, return its type directly; 2. Try to determine whether it's currently running on a ray cluster. If so, consider it to be a ray type; 3. Try to determine based on DAFT_RUNNER env variable.

Returns:

Name Type Description
str str

The runner type ("native" or "ray").

Source code in daft/runners/__init__.py
37
38
39
40
41
42
43
44
45
46
47
48
def get_or_infer_runner_type() -> str:
    """Get or infer the runner type.

    This API will get or infer the currently used runner type according to the following strategies:
    1. If the `runner` has been set, return its type directly;
    2. Try to determine whether it's currently running on a ray cluster. If so, consider it to be a ray type;
    3. Try to determine based on `DAFT_RUNNER` env variable.

    Returns:
        str: The runner type ("native" or "ray").
    """
    return _get_or_infer_runner_type()

Setting Configurations#

Configure Daft in various ways during execution.

set_planning_config #

set_planning_config(config: PyDaftPlanningConfig | None = None, default_io_config: IOConfig | None = None, enable_strict_filter_pushdown: bool | None = None) -> DaftContext

Globally sets various configuration parameters which control Daft plan construction behavior.

These configuration values are used when a Dataframe is being constructed (e.g. calls to create a Dataframe, or to build on an existing Dataframe).

Parameters:

Name Type Description Default
config PyDaftPlanningConfig | None

A PyDaftPlanningConfig object to set the config to, before applying other kwargs. Defaults to None which indicates that the old (current) config should be used.

None
default_io_config IOConfig | None

A default IOConfig to use in the absence of one being explicitly passed into any Expression (e.g. .download()) or Dataframe operation (e.g. daft.read_parquet()).

None
Source code in daft/context.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def set_planning_config(
    config: PyDaftPlanningConfig | None = None,
    default_io_config: IOConfig | None = None,
    enable_strict_filter_pushdown: bool | None = None,
) -> DaftContext:
    """Globally sets various configuration parameters which control Daft plan construction behavior.

    These configuration values are used when a Dataframe is being constructed (e.g. calls to create a Dataframe, or to build on an existing Dataframe).

    Args:
        config: A PyDaftPlanningConfig object to set the config to, before applying other kwargs. Defaults to None which indicates
            that the old (current) config should be used.
        default_io_config: A default IOConfig to use in the absence of one being explicitly passed into any Expression (e.g. `.download()`)
            or Dataframe operation (e.g. `daft.read_parquet()`).
    """
    # Replace values in the DaftPlanningConfig with user-specified overrides
    ctx = get_context()
    with ctx._lock:
        old_daft_planning_config = ctx._ctx._daft_planning_config if config is None else config
        new_daft_planning_config = old_daft_planning_config.with_config_values(
            default_io_config=default_io_config, enable_strict_filter_pushdown=enable_strict_filter_pushdown
        )

        ctx._ctx._daft_planning_config = new_daft_planning_config
        return ctx

planning_config_ctx #

planning_config_ctx(**kwargs: Any) -> Generator[None, None, None]

Context manager that wraps set_planning_config to reset the config to its original setting afternwards.

Source code in daft/context.py
161
162
163
164
165
166
167
168
169
@contextlib.contextmanager
def planning_config_ctx(**kwargs: Any) -> Generator[None, None, None]:
    """Context manager that wraps set_planning_config to reset the config to its original setting afternwards."""
    original_config = get_context().daft_planning_config
    try:
        set_planning_config(**kwargs)
        yield
    finally:
        set_planning_config(config=original_config)

set_execution_config #

set_execution_config(config: PyDaftExecutionConfig | None = None, enable_scan_task_split_and_merge: bool | None = None, scan_tasks_min_size_bytes: int | None = None, scan_tasks_max_size_bytes: int | None = None, max_sources_per_scan_task: int | None = None, broadcast_join_size_bytes_threshold: int | None = None, parquet_split_row_groups_max_files: int | None = None, hash_join_partition_size_leniency: float | None = None, sample_size_for_sort: int | None = None, num_preview_rows: int | None = None, parquet_target_filesize: int | None = None, parquet_target_row_group_size: int | None = None, parquet_inflation_factor: float | None = None, csv_target_filesize: int | None = None, csv_inflation_factor: float | None = None, json_target_filesize: int | None = None, json_inflation_factor: float | None = None, text_inflation_factor: float | None = None, shuffle_aggregation_default_partitions: int | None = None, partial_aggregation_threshold: int | None = None, high_cardinality_aggregation_threshold: float | None = None, read_sql_partition_size_bytes: int | None = None, default_morsel_size: int | None = None, shuffle_algorithm: str | None = None, pre_shuffle_merge_threshold: int | None = None, pre_shuffle_merge_partition_threshold: int | None = None, scantask_max_parallel: int | None = None, native_parquet_writer: bool | None = None, min_cpu_per_task: float | None = None, actor_udf_ready_timeout: int | None = None, worker_startup_timeout: int | None = None, maintain_order: bool | None = None, enable_dynamic_batching: bool | None = None, dynamic_batching_strategy: str | None = None, flight_shuffle_dirs: list[str] | None = None, enable_multi_glob_path_tasks: bool | None = None) -> DaftContext

Globally sets various configuration parameters which control various aspects of Daft execution.

These configuration values are used when a Dataframe is executed (e.g. calls to DataFrame.write_*, DataFrame.collect() or DataFrame.show()).

Parameters:

Name Type Description Default
config PyDaftExecutionConfig | None

A PyDaftExecutionConfig object to set the config to, before applying other kwargs. Defaults to None which indicates that the old (current) config should be used.

None
enable_scan_task_split_and_merge bool | None

Whether to enable scan task split and merge. Defaults to False.

None
scan_tasks_min_size_bytes int | None

Minimum size of scan tasks in bytes. Defaults to 96MB.

None
scan_tasks_max_size_bytes int | None

Maximum size of scan tasks in bytes. Defaults to 384MB.

None
max_sources_per_scan_task int | None

Maximum number of sources per scan task. Defaults to 10.

None
parquet_split_row_groups_max_files int | None

Maximum number of files to read in which the row group splitting should happen. (Defaults to 10)

None
broadcast_join_size_bytes_threshold int | None

If one side of a join is smaller than this threshold, a broadcast join will be used. Default is 10 MiB.

None
hash_join_partition_size_leniency float | None

If the left side of a hash join is already correctly partitioned and the right side isn't, and the ratio between the left and right size is at least this value, then the right side is repartitioned to have an equal number of partitions as the left. Defaults to 0.5.

None
sample_size_for_sort int | None

number of elements to sample from each partition when running sort, Default is 20.

None
num_preview_rows int | None

number of rows to when showing a dataframe preview, Default is 8.

None
parquet_target_filesize int | None

Target File Size when writing out Parquet Files. Defaults to 512MB

None
parquet_target_row_group_size int | None

Target Row Group Size when writing out Parquet Files. Defaults to 128MB

None
parquet_inflation_factor float | None

Inflation Factor of parquet files (In-Memory-Size / File-Size) ratio. Defaults to 3.0

None
csv_target_filesize int | None

Target File Size when writing out CSV Files. Defaults to 512MB

None
csv_inflation_factor float | None

Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5

None
json_target_filesize int | None

Target File Size when writing out JSON Files. Defaults to 512MB

None
json_inflation_factor float | None

Inflation Factor of JSON files (In-Memory-Size / File-Size) ratio. Defaults to 0.25

None
text_inflation_factor float | None

Inflation Factor of Text files (In-Memory-Size / File-Size) ratio. Defaults to 1.0

None
shuffle_aggregation_default_partitions int | None

Maximum number of partitions to create when performing aggregations on the Ray Runner. Defaults to 200, unless the number of input partitions is less than 200.

None
partial_aggregation_threshold int | None

Threshold for performing partial aggregations on the Native Runner. Defaults to 10000 rows.

None
high_cardinality_aggregation_threshold float | None

Threshold selectivity for performing high cardinality aggregations on the Native Runner. Defaults to 0.8.

None
read_sql_partition_size_bytes int | None

Target size of partition when reading from SQL databases. Defaults to 512MB

None
default_morsel_size int | None

Default size of morsels used for the new local executor. Defaults to 131072 rows.

None
shuffle_algorithm str | None

The shuffle algorithm to use. Defaults to "auto", which will let Daft determine the algorithm. Options are "map_reduce", "pre_shuffle_merge", and "flight_shuffle".

None
pre_shuffle_merge_threshold int | None

Memory threshold in bytes for pre-shuffle merge. Defaults to 1GB

None
pre_shuffle_merge_partition_threshold int | None

Number of partitions threshold to enable pre-shuffle merge when shuffle_algorithm is "auto". Defaults to 200.

None
scantask_max_parallel int | None

Set the max parallelism for running scan tasks simultaneously. Currently, this only works for Native Runner. If set to 0, all available CPUs will be used. Defaults to 8.

None
native_parquet_writer bool | None

Whether to use the native parquet writer vs the pyarrow parquet writer. Defaults to True.

None
min_cpu_per_task float | None

Minimum CPU per task in the Ray runner. Defaults to 0.5.

None
actor_udf_ready_timeout int | None

Timeout for UDF actors to be ready. Defaults to 120 seconds.

None
worker_startup_timeout int | None

Timeout for worker actors to report their addresses during startup. Defaults to 120 seconds.

None
maintain_order bool | None

Whether to maintain order during execution. Defaults to True. Some blocking sink operators (e.g. write_parquet) won't respect this flag and will always keep maintain_order as false, and propagate to child operators. It's useful to set this to False for running df.collect() when no ordering is required.

None
enable_dynamic_batching bool | None

Whether to enable dynamic batching. Defaults to False.

None
dynamic_batching_strategy str | None

The strategy to use for dynamic batching. Defaults to 'auto'.

None
flight_shuffle_dirs list[str] | None

Directories to use for flight shuffle. Defaults to ["/tmp"]. Must not be empty.

None
enable_multi_glob_path_tasks bool | None

Whether to create multiple glob path tasks in Ray Runner to achieve parallel glob. Defaults to False.

None
Source code in daft/context.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
def set_execution_config(
    config: PyDaftExecutionConfig | None = None,
    enable_scan_task_split_and_merge: bool | None = None,
    scan_tasks_min_size_bytes: int | None = None,
    scan_tasks_max_size_bytes: int | None = None,
    max_sources_per_scan_task: int | None = None,
    broadcast_join_size_bytes_threshold: int | None = None,
    parquet_split_row_groups_max_files: int | None = None,
    hash_join_partition_size_leniency: float | None = None,
    sample_size_for_sort: int | None = None,
    num_preview_rows: int | None = None,
    parquet_target_filesize: int | None = None,
    parquet_target_row_group_size: int | None = None,
    parquet_inflation_factor: float | None = None,
    csv_target_filesize: int | None = None,
    csv_inflation_factor: float | None = None,
    json_target_filesize: int | None = None,
    json_inflation_factor: float | None = None,
    text_inflation_factor: float | None = None,
    shuffle_aggregation_default_partitions: int | None = None,
    partial_aggregation_threshold: int | None = None,
    high_cardinality_aggregation_threshold: float | None = None,
    read_sql_partition_size_bytes: int | None = None,
    default_morsel_size: int | None = None,
    shuffle_algorithm: str | None = None,
    pre_shuffle_merge_threshold: int | None = None,
    pre_shuffle_merge_partition_threshold: int | None = None,
    scantask_max_parallel: int | None = None,
    native_parquet_writer: bool | None = None,
    min_cpu_per_task: float | None = None,
    actor_udf_ready_timeout: int | None = None,
    worker_startup_timeout: int | None = None,
    maintain_order: bool | None = None,
    enable_dynamic_batching: bool | None = None,
    dynamic_batching_strategy: str | None = None,
    flight_shuffle_dirs: list[str] | None = None,
    enable_multi_glob_path_tasks: bool | None = None,
) -> DaftContext:
    """Globally sets various configuration parameters which control various aspects of Daft execution.

    These configuration values
    are used when a Dataframe is executed (e.g. calls to `DataFrame.write_*`, [DataFrame.collect()](https://docs.daft.ai/en/stable/api/dataframe/#daft.DataFrame.collect) or [DataFrame.show()](https://docs.daft.ai/en/stable/api/dataframe/#daft.DataFrame.select)).

    Args:
        config: A PyDaftExecutionConfig object to set the config to, before applying other kwargs. Defaults to None which indicates
            that the old (current) config should be used.
        enable_scan_task_split_and_merge: Whether to enable scan task split and merge. Defaults to False.
        scan_tasks_min_size_bytes: Minimum size of scan tasks in bytes. Defaults to 96MB.
        scan_tasks_max_size_bytes: Maximum size of scan tasks in bytes. Defaults to 384MB.
        max_sources_per_scan_task: Maximum number of sources per scan task. Defaults to 10.
        parquet_split_row_groups_max_files: Maximum number of files to read in which the row group splitting should happen. (Defaults to 10)
        broadcast_join_size_bytes_threshold: If one side of a join is smaller than this threshold, a broadcast join will be used.
            Default is 10 MiB.
        hash_join_partition_size_leniency: If the left side of a hash join is already correctly partitioned and the right side isn't,
            and the ratio between the left and right size is at least this value, then the right side is repartitioned to have an equal
            number of partitions as the left. Defaults to 0.5.
        sample_size_for_sort: number of elements to sample from each partition when running sort,
            Default is 20.
        num_preview_rows: number of rows to when showing a dataframe preview,
            Default is 8.
        parquet_target_filesize: Target File Size when writing out Parquet Files. Defaults to 512MB
        parquet_target_row_group_size: Target Row Group Size when writing out Parquet Files. Defaults to 128MB
        parquet_inflation_factor: Inflation Factor of parquet files (In-Memory-Size / File-Size) ratio. Defaults to 3.0
        csv_target_filesize: Target File Size when writing out CSV Files. Defaults to 512MB
        csv_inflation_factor: Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5
        json_target_filesize: Target File Size when writing out JSON Files. Defaults to 512MB
        json_inflation_factor: Inflation Factor of JSON files (In-Memory-Size / File-Size) ratio. Defaults to 0.25
        text_inflation_factor: Inflation Factor of Text files (In-Memory-Size / File-Size) ratio. Defaults to 1.0
        shuffle_aggregation_default_partitions: Maximum number of partitions to create when performing aggregations on the Ray Runner. Defaults to 200, unless the number of input partitions is less than 200.
        partial_aggregation_threshold: Threshold for performing partial aggregations on the Native Runner. Defaults to 10000 rows.
        high_cardinality_aggregation_threshold: Threshold selectivity for performing high cardinality aggregations on the Native Runner. Defaults to 0.8.
        read_sql_partition_size_bytes: Target size of partition when reading from SQL databases. Defaults to 512MB
        default_morsel_size: Default size of morsels used for the new local executor. Defaults to 131072 rows.
        shuffle_algorithm: The shuffle algorithm to use. Defaults to "auto", which will let Daft determine the algorithm. Options are "map_reduce", "pre_shuffle_merge", and "flight_shuffle".
        pre_shuffle_merge_threshold: Memory threshold in bytes for pre-shuffle merge. Defaults to 1GB
        pre_shuffle_merge_partition_threshold: Number of partitions threshold to enable pre-shuffle merge when shuffle_algorithm is "auto". Defaults to 200.
        scantask_max_parallel: Set the max parallelism for running scan tasks simultaneously. Currently, this only works for Native Runner. If set to 0, all available CPUs will be used. Defaults to 8.
        native_parquet_writer: Whether to use the native parquet writer vs the pyarrow parquet writer. Defaults to `True`.
        min_cpu_per_task: Minimum CPU per task in the Ray runner. Defaults to 0.5.
        actor_udf_ready_timeout: Timeout for UDF actors to be ready. Defaults to 120 seconds.
        worker_startup_timeout: Timeout for worker actors to report their addresses during startup. Defaults to 120 seconds.
        maintain_order: Whether to maintain order during execution. Defaults to True. Some blocking sink operators (e.g. write_parquet) won't respect this flag and will always keep maintain_order as false, and propagate to child operators. It's useful to set this to False for running df.collect() when no ordering is required.
        enable_dynamic_batching: Whether to enable dynamic batching. Defaults to False.
        dynamic_batching_strategy: The strategy to use for dynamic batching. Defaults to 'auto'.
        flight_shuffle_dirs: Directories to use for flight shuffle. Defaults to ["/tmp"]. Must not be empty.
        enable_multi_glob_path_tasks: Whether to create multiple glob path tasks in Ray Runner to achieve parallel glob. Defaults to False.
    """
    # Replace values in the DaftExecutionConfig with user-specified overrides
    ctx = get_context()
    with ctx._lock:
        old_daft_execution_config = ctx._ctx._daft_execution_config if config is None else config

        new_daft_execution_config = old_daft_execution_config.with_config_values(
            enable_scan_task_split_and_merge=enable_scan_task_split_and_merge,
            scan_tasks_min_size_bytes=scan_tasks_min_size_bytes,
            scan_tasks_max_size_bytes=scan_tasks_max_size_bytes,
            max_sources_per_scan_task=max_sources_per_scan_task,
            parquet_split_row_groups_max_files=parquet_split_row_groups_max_files,
            broadcast_join_size_bytes_threshold=broadcast_join_size_bytes_threshold,
            hash_join_partition_size_leniency=hash_join_partition_size_leniency,
            sample_size_for_sort=sample_size_for_sort,
            num_preview_rows=num_preview_rows,
            parquet_target_filesize=parquet_target_filesize,
            parquet_target_row_group_size=parquet_target_row_group_size,
            parquet_inflation_factor=parquet_inflation_factor,
            csv_target_filesize=csv_target_filesize,
            csv_inflation_factor=csv_inflation_factor,
            json_target_filesize=json_target_filesize,
            json_inflation_factor=json_inflation_factor,
            text_inflation_factor=text_inflation_factor,
            shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions,
            partial_aggregation_threshold=partial_aggregation_threshold,
            high_cardinality_aggregation_threshold=high_cardinality_aggregation_threshold,
            read_sql_partition_size_bytes=read_sql_partition_size_bytes,
            default_morsel_size=default_morsel_size,
            shuffle_algorithm=shuffle_algorithm,
            pre_shuffle_merge_threshold=pre_shuffle_merge_threshold,
            pre_shuffle_merge_partition_threshold=pre_shuffle_merge_partition_threshold,
            scantask_max_parallel=scantask_max_parallel,
            native_parquet_writer=native_parquet_writer,
            min_cpu_per_task=min_cpu_per_task,
            actor_udf_ready_timeout=actor_udf_ready_timeout,
            worker_startup_timeout=worker_startup_timeout,
            maintain_order=maintain_order,
            enable_dynamic_batching=enable_dynamic_batching,
            dynamic_batching_strategy=dynamic_batching_strategy,
            flight_shuffle_dirs=flight_shuffle_dirs,
            enable_multi_glob_path_tasks=enable_multi_glob_path_tasks,
        )

        ctx._ctx._daft_execution_config = new_daft_execution_config
        return ctx

execution_config_ctx #

execution_config_ctx(**kwargs: Any) -> Generator[None, None, None]

Context manager that wraps set_execution_config to reset the config to its original setting afternwards.

Source code in daft/context.py
199
200
201
202
203
204
205
206
207
@contextlib.contextmanager
def execution_config_ctx(**kwargs: Any) -> Generator[None, None, None]:
    """Context manager that wraps set_execution_config to reset the config to its original setting afternwards."""
    original_config = get_context()._ctx._daft_execution_config
    try:
        set_execution_config(**kwargs)
        yield
    finally:
        set_execution_config(config=original_config)

I/O Configurations#

Configure behavior when Daft interacts with storage (e.g. credentials, retry policies and various other knobs to control performance/resource usage)

These configurations are most often used as inputs to Daft when reading I/O functions such as in I/O.

IOConfig #

IOConfig(s3: S3Config | None = None, azure: AzureConfig | None = None, gcs: GCSConfig | None = None, http: HTTPConfig | None = None, unity: UnityConfig | None = None, hf: HuggingFaceConfig | None = None, disable_suffix_range: bool | None = None, tos: TosConfig | None = None, gravitino: GravitinoConfig | None = None, cos: CosConfig | None = None, opendal_backends: dict[str, dict[str, str]] | None = None, protocol_aliases: dict[str, str] | None = None)

Configuration for the native I/O layer, e.g. credentials for accessing cloud storage systems.

Methods:

Name Description
replace

Replaces values if provided, returning a new IOConfig.

Attributes:

Name Type Description
azure AzureConfig
cos CosConfig
disable_suffix_range bool
gcs GCSConfig
gravitino GravitinoConfig
hf HuggingFaceConfig
http HTTPConfig
opendal_backends dict[str, dict[str, str]]
protocol_aliases dict[str, str]
s3 S3Config
tos TosConfig
unity UnityConfig
Source code in daft/daft/__init__.pyi
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
def __init__(
    self,
    s3: S3Config | None = None,
    azure: AzureConfig | None = None,
    gcs: GCSConfig | None = None,
    http: HTTPConfig | None = None,
    unity: UnityConfig | None = None,
    hf: HuggingFaceConfig | None = None,
    disable_suffix_range: bool | None = None,
    tos: TosConfig | None = None,
    gravitino: GravitinoConfig | None = None,
    cos: CosConfig | None = None,
    opendal_backends: dict[str, dict[str, str]] | None = None,
    protocol_aliases: dict[str, str] | None = None,
): ...

azure #

azure: AzureConfig

cos #

cos: CosConfig

disable_suffix_range #

disable_suffix_range: bool

gcs #

gcs: GCSConfig

gravitino #

gravitino: GravitinoConfig

http #

http: HTTPConfig

opendal_backends #

opendal_backends: dict[str, dict[str, str]]

protocol_aliases #

protocol_aliases: dict[str, str]

s3 #

tos #

tos: TosConfig

unity #

unity: UnityConfig

replace #

replace(s3: S3Config | None = None, azure: AzureConfig | None = None, gcs: GCSConfig | None = None, http: HTTPConfig | None = None, unity: UnityConfig | None = None, hf: HuggingFaceConfig | None = None, disable_suffix_range: bool | None = None, tos: TosConfig | None = None, gravitino: GravitinoConfig | None = None, cos: CosConfig | None = None, opendal_backends: dict[str, dict[str, str]] | None = None, protocol_aliases: dict[str, str] | None = None) -> IOConfig

Replaces values if provided, returning a new IOConfig.

Source code in daft/daft/__init__.pyi
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
def replace(
    self,
    s3: S3Config | None = None,
    azure: AzureConfig | None = None,
    gcs: GCSConfig | None = None,
    http: HTTPConfig | None = None,
    unity: UnityConfig | None = None,
    hf: HuggingFaceConfig | None = None,
    disable_suffix_range: bool | None = None,
    tos: TosConfig | None = None,
    gravitino: GravitinoConfig | None = None,
    cos: CosConfig | None = None,
    opendal_backends: dict[str, dict[str, str]] | None = None,
    protocol_aliases: dict[str, str] | None = None,
) -> IOConfig:
    """Replaces values if provided, returning a new IOConfig."""
    ...

S3Config #

S3Config(region_name: str | None = None, endpoint_url: str | None = None, key_id: str | None = None, session_token: str | None = None, access_key: str | None = None, credentials_provider: Callable[[], S3Credentials] | None = None, buffer_time: int | None = None, max_connections: int | None = None, retry_initial_backoff_ms: int | None = None, connect_timeout_ms: int | None = None, read_timeout_ms: int | None = None, num_tries: int | None = None, retry_mode: str | None = None, anonymous: bool | None = None, use_ssl: bool | None = None, verify_ssl: bool | None = None, check_hostname_ssl: bool | None = None, requester_pays: bool | None = None, force_virtual_addressing: bool | None = None, profile_name: str | None = None, multipart_size: int | None = None, multipart_max_concurrency: int | None = None, custom_retry_msgs: list[str] | None = None)

I/O configuration for accessing an S3-compatible system.

Parameters:

Name Type Description Default
region_name str

Name of the region to be used (used when accessing AWS S3), defaults to "us-east-1". If wrongly provided, Daft will attempt to auto-detect the buckets' region at the cost of extra S3 requests.

None
endpoint_url str

URL to the S3 endpoint, defaults to endpoints to AWS

None
key_id str

AWS Access Key ID, defaults to auto-detection from the current environment

None
access_key str

AWS Secret Access Key, defaults to auto-detection from the current environment

None
credentials_provider Callable[[], S3Credentials]

Custom credentials provider function, should return a S3Credentials object

None
buffer_time int

Amount of time in seconds before the actual credential expiration time where credentials given by credentials_provider are considered expired, defaults to 10s

None
max_connections int

Maximum number of connections to S3 at any time per io thread, defaults to 8

None
session_token str

AWS Session Token, required only if key_id and access_key are temporary credentials

None
retry_initial_backoff_ms int

Initial backoff duration in milliseconds for an S3 retry, defaults to 1000ms

None
connect_timeout_ms int

Timeout duration to wait to make a connection to S3 in milliseconds, defaults to 30 seconds

None
read_timeout_ms int

Timeout duration to wait to read the first byte from S3 in milliseconds, defaults to 30 seconds

None
num_tries int

Number of attempts to make a connection, defaults to 25

None
retry_mode str

Retry Mode when a request fails, current supported values are standard and adaptive, defaults to adaptive

None
anonymous bool

Whether or not to use "anonymous mode", which will access S3 without any credentials

None
use_ssl bool

Whether or not to use SSL, which require accessing S3 over HTTPS rather than HTTP, defaults to True

None
verify_ssl bool

Whether or not to verify ssl certificates, which will access S3 without checking if the certs are valid, defaults to True

None
check_hostname_ssl bool

Whether or not to verify the hostname when verifying ssl certificates, this was the legacy behavior for openssl, defaults to True

None
requester_pays bool

Whether or not the authenticated user will assume transfer costs, which is required by some providers of bulk data, defaults to False

None
force_virtual_addressing bool

Force S3 client to use virtual addressing in all cases. If False, virtual addressing will only be used if endpoint_url is empty, defaults to False

None
profile_name str

Name of AWS_PROFILE to load, defaults to None which will then check the Environment Variable AWS_PROFILE then fall back to default

None
multipart_size int

The size of multipart part (bytes), the size range should be 5MB to 5GB, defaults to 8MB.

None
multipart_max_concurrency int

The max concurrency of upload part per object, defaults to 100.

None
custom_retry_msgs list[str]

Will retry the request if any custom retry message appeared in the error message of response, defaults to None.

None

Examples:

1
2
3
>>> # For AWS S3
>>> io_config = IOConfig(s3=S3Config(key_id="xxx", access_key="xxx"))
>>> daft.read_parquet("s3://some-path", io_config=io_config)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
>>> # For S3-compatible services (e.g. Volcengine TOS)
>>> io_config = IOConfig(
...     s3=S3Config(
...         endpoint_url="https://tos-s3-{region}.ivolces.com",
...         region_name="{region}",
...         force_virtual_addressing=True,
...         verify_ssl=True,
...         key_id="your-access-key-id",
...         access_key="your-secret-access-key",
...     )
... )
>>> daft.read_parquet("s3://some-path", io_config=io_config)

Methods:

Name Description
from_env

Creates an S3Config, retrieving credentials and configurations from the current environment.

provide_cached_credentials

Wrapper around call to S3Config.credentials_provider to cache credentials until expiry.

replace

Replaces values if provided, returning a new S3Config.

Attributes:

Name Type Description
access_key str | None
anonymous bool
check_hostname_ssl bool
connect_timeout_ms int
credentials_provider Callable[[], S3Credentials] | None
custom_retry_msgs list[str] | None
endpoint_url str | None
force_virtual_addressing bool | None
key_id str | None
max_connections int
multipart_max_concurrency int | None
multipart_size int | None
num_tries int
profile_name str | None
read_timeout_ms int
region_name str | None
requester_pays bool | None
retry_initial_backoff_ms int
retry_mode str | None
session_token str | None
use_ssl bool
verify_ssl bool
Source code in daft/daft/__init__.pyi
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
def __init__(
    self,
    region_name: str | None = None,
    endpoint_url: str | None = None,
    key_id: str | None = None,
    session_token: str | None = None,
    access_key: str | None = None,
    credentials_provider: Callable[[], S3Credentials] | None = None,
    buffer_time: int | None = None,
    max_connections: int | None = None,
    retry_initial_backoff_ms: int | None = None,
    connect_timeout_ms: int | None = None,
    read_timeout_ms: int | None = None,
    num_tries: int | None = None,
    retry_mode: str | None = None,
    anonymous: bool | None = None,
    use_ssl: bool | None = None,
    verify_ssl: bool | None = None,
    check_hostname_ssl: bool | None = None,
    requester_pays: bool | None = None,
    force_virtual_addressing: bool | None = None,
    profile_name: str | None = None,
    multipart_size: int | None = None,
    multipart_max_concurrency: int | None = None,
    custom_retry_msgs: list[str] | None = None,
): ...

access_key #

access_key: str | None

anonymous #

anonymous: bool

check_hostname_ssl #

check_hostname_ssl: bool

connect_timeout_ms #

connect_timeout_ms: int

credentials_provider #

credentials_provider: Callable[[], S3Credentials] | None

custom_retry_msgs #

custom_retry_msgs: list[str] | None

endpoint_url #

endpoint_url: str | None

force_virtual_addressing #

force_virtual_addressing: bool | None

key_id #

key_id: str | None

max_connections #

max_connections: int

multipart_max_concurrency #

multipart_max_concurrency: int | None

multipart_size #

multipart_size: int | None

num_tries #

num_tries: int

profile_name #

profile_name: str | None

read_timeout_ms #

read_timeout_ms: int

region_name #

region_name: str | None

requester_pays #

requester_pays: bool | None

retry_initial_backoff_ms #

retry_initial_backoff_ms: int

retry_mode #

retry_mode: str | None

session_token #

session_token: str | None

use_ssl #

use_ssl: bool

verify_ssl #

verify_ssl: bool

from_env #

from_env() -> S3Config

Creates an S3Config, retrieving credentials and configurations from the current environment.

Source code in daft/daft/__init__.pyi
642
643
644
645
@staticmethod
def from_env() -> S3Config:
    """Creates an S3Config, retrieving credentials and configurations from the current environment."""
    ...

provide_cached_credentials #

provide_cached_credentials() -> S3Credentials | None

Wrapper around call to S3Config.credentials_provider to cache credentials until expiry.

Source code in daft/daft/__init__.pyi
647
648
649
def provide_cached_credentials(self) -> S3Credentials | None:
    """Wrapper around call to `S3Config.credentials_provider` to cache credentials until expiry."""
    ...

replace #

replace(region_name: str | None = None, endpoint_url: str | None = None, key_id: str | None = None, session_token: str | None = None, access_key: str | None = None, credentials_provider: Callable[[], S3Credentials] | None = None, max_connections: int | None = None, retry_initial_backoff_ms: int | None = None, connect_timeout_ms: int | None = None, read_timeout_ms: int | None = None, num_tries: int | None = None, retry_mode: str | None = None, anonymous: bool | None = None, use_ssl: bool | None = None, verify_ssl: bool | None = None, check_hostname_ssl: bool | None = None, requester_pays: bool | None = None, force_virtual_addressing: bool | None = None, profile_name: str | None = None, multipart_size: int | None = None, multipart_max_concurrency: int | None = None, custom_retry_msgs: list[str] | None = None) -> S3Config

Replaces values if provided, returning a new S3Config.

Source code in daft/daft/__init__.pyi
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
def replace(
    self,
    region_name: str | None = None,
    endpoint_url: str | None = None,
    key_id: str | None = None,
    session_token: str | None = None,
    access_key: str | None = None,
    credentials_provider: Callable[[], S3Credentials] | None = None,
    max_connections: int | None = None,
    retry_initial_backoff_ms: int | None = None,
    connect_timeout_ms: int | None = None,
    read_timeout_ms: int | None = None,
    num_tries: int | None = None,
    retry_mode: str | None = None,
    anonymous: bool | None = None,
    use_ssl: bool | None = None,
    verify_ssl: bool | None = None,
    check_hostname_ssl: bool | None = None,
    requester_pays: bool | None = None,
    force_virtual_addressing: bool | None = None,
    profile_name: str | None = None,
    multipart_size: int | None = None,
    multipart_max_concurrency: int | None = None,
    custom_retry_msgs: list[str] | None = None,
) -> S3Config:
    """Replaces values if provided, returning a new S3Config."""
    ...

S3Credentials #

S3Credentials(key_id: str, access_key: str, session_token: str | None = None, expiry: datetime | None = None)

Attributes:

Name Type Description
access_key str
expiry datetime | None
key_id str
session_token str | None
Source code in daft/daft/__init__.pyi
657
658
659
660
661
662
663
def __init__(
    self,
    key_id: str,
    access_key: str,
    session_token: str | None = None,
    expiry: datetime.datetime | None = None,
): ...

access_key #

access_key: str

expiry #

expiry: datetime | None

key_id #

key_id: str

session_token #

session_token: str | None

GCSConfig #

GCSConfig(project_id: str | None = None, credentials: str | None = None, token: str | None = None, anonymous: bool | None = None, max_connections: int | None = None, retry_initial_backoff_ms: int | None = None, connect_timeout_ms: int | None = None, read_timeout_ms: int | None = None, num_tries: int | None = None)

I/O configuration for accessing Google Cloud Storage.

Methods:

Name Description
replace

Replaces values if provided, returning a new GCSConfig.

Attributes:

Name Type Description
anonymous bool
connect_timeout_ms int
credentials str | None
max_connections int
num_tries int
project_id str | None
read_timeout_ms int
retry_initial_backoff_ms int
token str | None
Source code in daft/daft/__init__.pyi
727
728
729
730
731
732
733
734
735
736
737
738
def __init__(
    self,
    project_id: str | None = None,
    credentials: str | None = None,
    token: str | None = None,
    anonymous: bool | None = None,
    max_connections: int | None = None,
    retry_initial_backoff_ms: int | None = None,
    connect_timeout_ms: int | None = None,
    read_timeout_ms: int | None = None,
    num_tries: int | None = None,
): ...

anonymous #

anonymous: bool

connect_timeout_ms #

connect_timeout_ms: int

credentials #

credentials: str | None

max_connections #

max_connections: int

num_tries #

num_tries: int

project_id #

project_id: str | None

read_timeout_ms #

read_timeout_ms: int

retry_initial_backoff_ms #

retry_initial_backoff_ms: int

token #

token: str | None

replace #

replace(project_id: str | None = None, credentials: str | None = None, token: str | None = None, anonymous: bool | None = None, max_connections: int | None = None, retry_initial_backoff_ms: int | None = None, connect_timeout_ms: int | None = None, read_timeout_ms: int | None = None, num_tries: int | None = None) -> GCSConfig

Replaces values if provided, returning a new GCSConfig.

Source code in daft/daft/__init__.pyi
739
740
741
742
743
744
745
746
747
748
749
750
751
752
def replace(
    self,
    project_id: str | None = None,
    credentials: str | None = None,
    token: str | None = None,
    anonymous: bool | None = None,
    max_connections: int | None = None,
    retry_initial_backoff_ms: int | None = None,
    connect_timeout_ms: int | None = None,
    read_timeout_ms: int | None = None,
    num_tries: int | None = None,
) -> GCSConfig:
    """Replaces values if provided, returning a new GCSConfig."""
    ...

AzureConfig #

AzureConfig(storage_account: str | None = None, access_key: str | None = None, sas_token: str | None = None, bearer_token: str | None = None, tenant_id: str | None = None, client_id: str | None = None, client_secret: str | None = None, use_fabric_endpoint: bool | None = None, anonymous: bool | None = None, endpoint_url: str | None = None, use_ssl: bool | None = None, max_connections: int | None = None)

I/O configuration for accessing Azure Blob Storage.

Methods:

Name Description
replace

Replaces values if provided, returning a new AzureConfig.

Attributes:

Name Type Description
access_key str | None
anonymous bool | None
bearer_token str | None
client_id str | None
client_secret str | None
endpoint_url str | None
max_connections int
sas_token str | None
storage_account str | None
tenant_id str | None
use_fabric_endpoint bool | None
use_ssl bool | None
Source code in daft/daft/__init__.pyi
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
def __init__(
    self,
    storage_account: str | None = None,
    access_key: str | None = None,
    sas_token: str | None = None,
    bearer_token: str | None = None,
    tenant_id: str | None = None,
    client_id: str | None = None,
    client_secret: str | None = None,
    use_fabric_endpoint: bool | None = None,
    anonymous: bool | None = None,
    endpoint_url: str | None = None,
    use_ssl: bool | None = None,
    max_connections: int | None = None,
): ...

access_key #

access_key: str | None

anonymous #

anonymous: bool | None

bearer_token #

bearer_token: str | None

client_id #

client_id: str | None

client_secret #

client_secret: str | None

endpoint_url #

endpoint_url: str | None = None

max_connections #

max_connections: int

sas_token #

sas_token: str | None

storage_account #

storage_account: str | None

tenant_id #

tenant_id: str | None

use_fabric_endpoint #

use_fabric_endpoint: bool | None

use_ssl #

use_ssl: bool | None = None

replace #

replace(storage_account: str | None = None, access_key: str | None = None, sas_token: str | None = None, bearer_token: str | None = None, tenant_id: str | None = None, client_id: str | None = None, client_secret: str | None = None, use_fabric_endpoint: bool | None = None, anonymous: bool | None = None, endpoint_url: str | None = None, use_ssl: bool | None = None, max_connections: int | None = None) -> AzureConfig

Replaces values if provided, returning a new AzureConfig.

Source code in daft/daft/__init__.pyi
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
def replace(
    self,
    storage_account: str | None = None,
    access_key: str | None = None,
    sas_token: str | None = None,
    bearer_token: str | None = None,
    tenant_id: str | None = None,
    client_id: str | None = None,
    client_secret: str | None = None,
    use_fabric_endpoint: bool | None = None,
    anonymous: bool | None = None,
    endpoint_url: str | None = None,
    use_ssl: bool | None = None,
    max_connections: int | None = None,
) -> AzureConfig:
    """Replaces values if provided, returning a new AzureConfig."""
    ...

HTTPConfig #

HTTPConfig(bearer_token: str | None = None, retry_initial_backoff_ms: int | None = None, connect_timeout_ms: int | None = None, read_timeout_ms: int | None = None, num_tries: int | None = None)

I/O configuration for accessing HTTP systems.

Attributes:

Name Type Description
bearer_token str | None
connect_timeout_ms int
num_tries int
read_timeout_ms int
retry_initial_backoff_ms int
Source code in daft/daft/__init__.pyi
508
509
510
511
512
513
514
515
def __init__(
    self,
    bearer_token: str | None = None,
    retry_initial_backoff_ms: int | None = None,
    connect_timeout_ms: int | None = None,
    read_timeout_ms: int | None = None,
    num_tries: int | None = None,
): ...

bearer_token #

bearer_token: str | None

connect_timeout_ms #

connect_timeout_ms: int

num_tries #

num_tries: int

read_timeout_ms #

read_timeout_ms: int

retry_initial_backoff_ms #

retry_initial_backoff_ms: int

UnityConfig #

UnityConfig(endpoint: str | None, token: str | None)

I/O configuration for Unity Catalog volumes.

Methods:

Name Description
replace

Replaces values if provided, returning a new UnityConfig.

Attributes:

Name Type Description
endpoint str | None
token str | None
Source code in daft/daft/__init__.pyi
760
761
762
763
764
def __init__(
    self,
    endpoint: str | None,
    token: str | None,
): ...

endpoint #

endpoint: str | None

token #

token: str | None

replace #

replace(endpoint: str | None, token: str | None) -> UnityConfig

Replaces values if provided, returning a new UnityConfig.

Source code in daft/daft/__init__.pyi
765
766
767
768
769
770
771
def replace(
    self,
    endpoint: str | None,
    token: str | None,
) -> UnityConfig:
    """Replaces values if provided, returning a new UnityConfig."""
    ...

HuggingFaceConfig #

HuggingFaceConfig(token: str | None = None, anonymous: bool | None = None, use_content_defined_chunking: bool | None = None, row_group_size: int | None = None, target_filesize: int | None = None, max_operations_per_commit: int | None = None)

I/O configuration for accessing Hugging Face datasets.

Parameters:

Name Type Description Default
token str

Your Hugging Face access token, generated from https://huggingface.co/settings/tokens.

None
anonymous bool

Whether or not to use "anonymous mode", which will access Hugging Face without any credentials. Defaults to False.

None
use_content_defined_chunking bool

Set the use_content_defined_chunking parameter when creating a pyarrow.parquet.ParquetWriter. Only available with pyarrow>=21. Defaults to true if available.

None
row_group_size int

Row group size when writing Parquet files. Defaults to the default pyarrow.parquet.ParquetWriter row group size.

None
target_filesize int

Target size in bytes for each written Parquet file. Defaults to 512 MB.

None
max_operations_per_commit int

Maximum number of files to add/copy/delete per commit. Defaults to 100.

None

Methods:

Name Description
replace

Replaces values if provided, returning a new HuggingFaceConfig.

Attributes:

Name Type Description
anonymous bool
max_operations_per_commit int
row_group_size int | None
target_filesize int
token str | None
use_content_defined_chunking bool
Source code in daft/daft/__init__.pyi
824
825
826
827
828
829
830
831
832
def __init__(
    self,
    token: str | None = None,
    anonymous: bool | None = None,
    use_content_defined_chunking: bool | None = None,
    row_group_size: int | None = None,
    target_filesize: int | None = None,
    max_operations_per_commit: int | None = None,
): ...

anonymous #

anonymous: bool

max_operations_per_commit #

max_operations_per_commit: int

row_group_size #

row_group_size: int | None

target_filesize #

target_filesize: int

token #

token: str | None

use_content_defined_chunking #

use_content_defined_chunking: bool

replace #

replace(token: str | None = None, anonymous: bool | None = None, use_content_defined_chunking: bool | None = None, row_group_size: int | None = None, target_filesize: int | None = None, max_operations_per_commit: int | None = None) -> HuggingFaceConfig

Replaces values if provided, returning a new HuggingFaceConfig.

Source code in daft/daft/__init__.pyi
833
834
835
836
837
838
839
840
841
842
843
def replace(
    self,
    token: str | None = None,
    anonymous: bool | None = None,
    use_content_defined_chunking: bool | None = None,
    row_group_size: int | None = None,
    target_filesize: int | None = None,
    max_operations_per_commit: int | None = None,
) -> HuggingFaceConfig:
    """Replaces values if provided, returning a new HuggingFaceConfig."""
    ...

TosConfig #

TosConfig(region: str | None = None, endpoint: str | None = None, access_key: str | None = None, secret_key: str | None = None, security_token: str | None = None, anonymous: bool | None = None, max_retries: int | None = None, retry_timeout_ms: int | None = None, connect_timeout_ms: int | None = None, read_timeout_ms: int | None = None, max_concurrent_requests: int | None = None, max_connections_per_io_thread: int | None = None)

I/O configuration for accessing Volcengine TOS (Torch Object Storage).

Parameters:

Name Type Description Default
region str

Name of the region to be used, defaults to None, it can be detected automatically from endpoint if standard endpoint is set.

None
endpoint str

URL to the TOS endpoint, defaults to None for Volcengine TOS, it can be inferred from region.

None
access_key str

TOS Access Key, defaults to None.

None
secret_key str

TOS Secret Key, defaults to None.

None
security_token str

TOS Security Token, required for temporary credentials, defaults to None.

None
anonymous bool

Whether to use "anonymous mode" or not, which will access TOS without any credentials. Defaults to False.

None
max_retries int

Maximum number of retries for failed requests, defaults to 3.

None
retry_timeout_ms int

Timeout duration for retry attempts in milliseconds, defaults to 30000ms.

None
connect_timeout_ms int

Timeout duration to wait to make a connection to TOS in milliseconds, defaults to 10000ms.

None
read_timeout_ms int

Timeout duration to wait to read the first byte from TOS in milliseconds, defaults to 30000ms.

None
max_concurrent_requests int

Maximum number of concurrent requests to TOS at any time, defaults to 50.

None
max_connections_per_io_thread int

Maximum number of connections to TOS per IO thread, defaults to 50.

None

Examples:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
>>> # For Volcengine & byteplus TOS, refer to https://www.volcengine.com/docs/6349/107356
>>> # or https://docs.byteplus.com/en/docs/tos/docs-region-and-endpoint or get endpoint and region info.
>>>
>>> io_config = IOConfig(
...     tos=TosConfig(
...         region="cn-beijing",
...         endpoint="https://tos-cn-beijing.volces.com",
...         access_key="your-access-key",
...         secret_key="your-secret-key",
...     )
... )
>>> daft.read_parquet("tos://some-path", io_config=io_config)

Methods:

Name Description
from_env

Creates a TosConfig, retrieving credentials and configurations from the current environment.

replace

Replaces values if provided, returning a new TosConfig.

Attributes:

Name Type Description
access_key str | None
anonymous bool
connect_timeout_ms int
endpoint str | None
max_concurrent_requests int
max_connections_per_io_thread int
max_retries int
read_timeout_ms int
region str | None
retry_timeout_ms int
secret_key str | None
security_token str | None
Source code in daft/daft/__init__.pyi
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
def __init__(
    self,
    region: str | None = None,
    endpoint: str | None = None,
    access_key: str | None = None,
    secret_key: str | None = None,
    security_token: str | None = None,
    anonymous: bool | None = None,
    max_retries: int | None = None,
    retry_timeout_ms: int | None = None,
    connect_timeout_ms: int | None = None,
    read_timeout_ms: int | None = None,
    max_concurrent_requests: int | None = None,
    max_connections_per_io_thread: int | None = None,
): ...

access_key #

access_key: str | None

anonymous #

anonymous: bool

connect_timeout_ms #

connect_timeout_ms: int

endpoint #

endpoint: str | None

max_concurrent_requests #

max_concurrent_requests: int

max_connections_per_io_thread #

max_connections_per_io_thread: int

max_retries #

max_retries: int

read_timeout_ms #

read_timeout_ms: int

region #

region: str | None

retry_timeout_ms #

retry_timeout_ms: int

secret_key #

secret_key: str | None

security_token #

security_token: str | None

from_env #

from_env() -> TosConfig

Creates a TosConfig, retrieving credentials and configurations from the current environment.

TOS_ENDPOINT: Endpoint of the TOS service. TOS_REGION: Region of the TOS service. TOS_ACCESS_KEY: Access key for TOS authentication. TOS_SECRET_KEY: Secret key for TOS authentication. TOS_SECURITY_TOKEN: Security token for TOS authentication.

Source code in daft/daft/__init__.pyi
923
924
925
926
927
928
929
930
931
932
@staticmethod
def from_env() -> TosConfig:
    """Creates a TosConfig, retrieving credentials and configurations from the current environment.

    TOS_ENDPOINT: Endpoint of the TOS service.
    TOS_REGION: Region of the TOS service.
    TOS_ACCESS_KEY: Access key for TOS authentication.
    TOS_SECRET_KEY: Secret key for TOS authentication.
    TOS_SECURITY_TOKEN: Security token for TOS authentication.
    """

replace #

replace(region: str | None = None, endpoint: str | None = None, access_key: str | None = None, secret_key: str | None = None, security_token: str | None = None, anonymous: bool | None = None, max_retries: int | None = None, retry_timeout_ms: int | None = None, connect_timeout_ms: int | None = None, read_timeout_ms: int | None = None, max_concurrent_requests: int | None = None, max_connections_per_io_thread: int | None = None) -> TosConfig

Replaces values if provided, returning a new TosConfig.

Source code in daft/daft/__init__.pyi
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
def replace(
    self,
    region: str | None = None,
    endpoint: str | None = None,
    access_key: str | None = None,
    secret_key: str | None = None,
    security_token: str | None = None,
    anonymous: bool | None = None,
    max_retries: int | None = None,
    retry_timeout_ms: int | None = None,
    connect_timeout_ms: int | None = None,
    read_timeout_ms: int | None = None,
    max_concurrent_requests: int | None = None,
    max_connections_per_io_thread: int | None = None,
) -> TosConfig:
    """Replaces values if provided, returning a new TosConfig."""
    ...

CosConfig #

CosConfig(region: str | None = None, endpoint: str | None = None, secret_id: str | None = None, secret_key: str | None = None, security_token: str | None = None, anonymous: bool | None = None, max_retries: int | None = None, retry_timeout_ms: int | None = None, connect_timeout_ms: int | None = None, read_timeout_ms: int | None = None, max_concurrent_requests: int | None = None, max_connections: int | None = None)

I/O configuration for accessing Tencent Cloud COS (Cloud Object Storage).

Parameters:

Name Type Description Default
region str

Name of the region, e.g. "ap-guangzhou", "ap-beijing". Defaults to None.

None
endpoint str

URL to the COS endpoint. Defaults to None, will be inferred from region.

None
secret_id str

Tencent Cloud SecretId. Defaults to None.

None
secret_key str

Tencent Cloud SecretKey. Defaults to None.

None
security_token str

Security token for temporary credentials (STS). Defaults to None.

None
anonymous bool

Whether to use anonymous access. Defaults to False.

None
max_retries int

Maximum number of retries for failed requests. Defaults to 3.

None
retry_timeout_ms int

Timeout duration for retry attempts in milliseconds. Defaults to 30000ms.

None
connect_timeout_ms int

Timeout duration to make a connection in milliseconds. Defaults to 10000ms.

None
read_timeout_ms int

Timeout duration to read the first byte in milliseconds. Defaults to 30000ms.

None
max_concurrent_requests int

Maximum number of concurrent requests. Defaults to 50.

None
max_connections int

Maximum number of connections per IO thread. Defaults to 50.

None

Examples:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
>>> # For Tencent Cloud COS, refer to https://cloud.tencent.com/document/product/436
>>> # for endpoint and region information.
>>>
>>> io_config = IOConfig(
...     cos=CosConfig(
...         region="ap-guangzhou",
...         secret_id="your-secret-id",
...         secret_key="your-secret-key",
...     )
... )
>>> daft.read_parquet("cos://some-bucket/some-path", io_config=io_config)

Methods:

Name Description
from_env

Creates a CosConfig, retrieving credentials and configurations from the current environment.

replace

Replaces values if provided, returning a new CosConfig.

Attributes:

Name Type Description
anonymous bool
connect_timeout_ms int
endpoint str | None
max_concurrent_requests int
max_connections int
max_retries int
read_timeout_ms int
region str | None
retry_timeout_ms int
secret_id str | None
secret_key str | None
security_token str | None
Source code in daft/daft/__init__.pyi
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
def __init__(
    self,
    region: str | None = None,
    endpoint: str | None = None,
    secret_id: str | None = None,
    secret_key: str | None = None,
    security_token: str | None = None,
    anonymous: bool | None = None,
    max_retries: int | None = None,
    retry_timeout_ms: int | None = None,
    connect_timeout_ms: int | None = None,
    read_timeout_ms: int | None = None,
    max_concurrent_requests: int | None = None,
    max_connections: int | None = None,
): ...

anonymous #

anonymous: bool

connect_timeout_ms #

connect_timeout_ms: int

endpoint #

endpoint: str | None

max_concurrent_requests #

max_concurrent_requests: int

max_connections #

max_connections: int

max_retries #

max_retries: int

read_timeout_ms #

read_timeout_ms: int

region #

region: str | None

retry_timeout_ms #

retry_timeout_ms: int

secret_id #

secret_id: str | None

secret_key #

secret_key: str | None

security_token #

security_token: str | None

from_env #

from_env() -> CosConfig

Creates a CosConfig, retrieving credentials and configurations from the current environment.

COS_ENDPOINT: Endpoint of the COS service. COS_REGION or TENCENTCLOUD_REGION: Region of the COS service. COS_SECRET_ID or TENCENTCLOUD_SECRET_ID: SecretId for COS authentication. COS_SECRET_KEY or TENCENTCLOUD_SECRET_KEY: SecretKey for COS authentication. COS_SECURITY_TOKEN or TENCENTCLOUD_SECURITY_TOKEN: Security token for COS authentication.

Source code in daft/daft/__init__.pyi
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
@staticmethod
def from_env() -> CosConfig:
    """Creates a CosConfig, retrieving credentials and configurations from the current environment.

    COS_ENDPOINT: Endpoint of the COS service.
    COS_REGION or TENCENTCLOUD_REGION: Region of the COS service.
    COS_SECRET_ID or TENCENTCLOUD_SECRET_ID: SecretId for COS authentication.
    COS_SECRET_KEY or TENCENTCLOUD_SECRET_KEY: SecretKey for COS authentication.
    COS_SECURITY_TOKEN or TENCENTCLOUD_SECURITY_TOKEN: Security token for COS authentication.
    """

replace #

replace(region: str | None = None, endpoint: str | None = None, secret_id: str | None = None, secret_key: str | None = None, security_token: str | None = None, anonymous: bool | None = None, max_retries: int | None = None, retry_timeout_ms: int | None = None, connect_timeout_ms: int | None = None, read_timeout_ms: int | None = None, max_concurrent_requests: int | None = None, max_connections: int | None = None) -> CosConfig

Replaces values if provided, returning a new CosConfig.

Source code in daft/daft/__init__.pyi
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
def replace(
    self,
    region: str | None = None,
    endpoint: str | None = None,
    secret_id: str | None = None,
    secret_key: str | None = None,
    security_token: str | None = None,
    anonymous: bool | None = None,
    max_retries: int | None = None,
    retry_timeout_ms: int | None = None,
    connect_timeout_ms: int | None = None,
    read_timeout_ms: int | None = None,
    max_concurrent_requests: int | None = None,
    max_connections: int | None = None,
) -> CosConfig:
    """Replaces values if provided, returning a new CosConfig."""
    ...