Skip to content

daft.functions.upload#

upload #

upload(expr: Expression, location: str | Expression, max_connections: int = 32, on_error: Literal['raise', 'null'] = 'raise', io_config: IOConfig | None = None) -> Expression

Uploads a column of binary data to the provided location(s) (also supports S3, local etc).

Files will be written into the location (folder(s)) with a generated UUID filename, and the result will be returned as a column of string paths that is compatible with the download() Expression.

Parameters:

Name Type Description Default
expr Expression

The expression to upload.

required
location str | Expression

a folder location or column of folder locations to upload data into

required
max_connections int

The maximum number of connections to use per thread to use for uploading data. Defaults to 32.

32
on_error Literal['raise', 'null']

Behavior when a URL upload error is encountered - "raise" to raise the error immediately or "null" to log the error but fallback to a Null value. Defaults to "raise".

'raise'
io_config IOConfig | None

IOConfig to use when uploading data

None

Returns:

Name Type Description
Expression Expression

a String expression containing the written filepath

Examples:

1
2
3
>>> from daft.functions import upload
>>>
>>> upload(df["data"], "s3://my-bucket/my-folder")

Upload to row-specific URLs

1
>>> upload(df["data"], df["paths"])
Source code in daft/functions/url.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def upload(
    expr: Expression,
    location: str | Expression,
    max_connections: int = 32,
    on_error: Literal["raise", "null"] = "raise",
    io_config: IOConfig | None = None,
) -> Expression:
    """Uploads a column of binary data to the provided location(s) (also supports S3, local etc).

    Files will be written into the location (folder(s)) with a generated UUID filename, and the result
    will be returned as a column of string paths that is compatible with the ``download()`` Expression.

    Args:
        expr: The expression to upload.
        location: a folder location or column of folder locations to upload data into
        max_connections: The maximum number of connections to use per thread to use for uploading data. Defaults to 32.
        on_error: Behavior when a URL upload error is encountered - "raise" to raise the error immediately or "null" to log
            the error but fallback to a Null value. Defaults to "raise".
        io_config: IOConfig to use when uploading data

    Returns:
        Expression: a String expression containing the written filepath

    Examples:
        >>> from daft.functions import upload
        >>>
        >>> upload(df["data"], "s3://my-bucket/my-folder")  # doctest: +SKIP

        Upload to row-specific URLs

        >>> upload(df["data"], df["paths"])  # doctest: +SKIP

    """
    multi_thread = _should_use_multithreading_tokio_runtime()
    # If the user specifies a single location via a string, we should upload to a single folder. Otherwise,
    # if the user gave an expression, we assume that each row has a specific url to upload to.
    # Consider moving the check for is_single_folder to a lower IR.
    is_single_folder = isinstance(location, str)
    io_config = _override_io_config_max_connections(max_connections, io_config)

    return Expression._call_builtin_scalar_fn(
        "url_upload",
        expr,
        location,
        max_connections=max_connections,
        on_error=on_error,
        multi_thread=multi_thread,
        is_single_folder=is_single_folder,
        io_config=io_config,
    )