Reading from and Writing to Delta Lake#
Delta Lake is an open-source storage framework for data analytics on data lakes. It provides ACID transactions, scalable metadata handling, and a unification of streaming and batch data processing, all on top of Parquet files in cloud storage.
Daft currently supports:
-
Parallel + Distributed Reads: Daft parallelizes Delta Lake table reads over all cores of your machine, if using the default multithreading runner, or all cores + machines of your Ray cluster, if using the distributed Ray runner.
-
Skipping Filtered Data: Daft ensures that only data that matches your
df.where()filter will be read, often skipping entire files/partitions. -
Multi-cloud Support: Daft supports reading Delta Lake tables from AWS S3, Azure Blob Store, and GCS, as well as local files.
A detailed Delta Lake roadmap for Daft can be found on our GitHub issues. For the overall Daft development plan, see Daft Roadmap.
Installing Daft with Delta Lake Support#
Daft internally uses the deltalake Python package to fetch metadata about the Delta Lake table, such as paths to the underlying Parquet files and table statistics. The deltalake package therefore must be installed to read Delta Lake tables with Daft, either manually or with the below daft[deltalake] extras install of Daft.
1 | |
Reading a Table#
A Delta Lake table can be read by providing daft.read_deltalake with the URI for your table.
The below example uses the deltalake Python package to create a local Delta Lake table for Daft to read, but Daft can also read Delta Lake tables from all of the major cloud stores.
1 2 3 4 5 6 7 8 9 10 11 12 | |
After writing this local example table, we can easily read it into Daft.
1 2 3 4 | |
Data Skipping Optimizations#
Subsequent filters on the partition column group will efficiently skip data that doesn't match the predicate. In the below example, the group != 2 partitions (files) will be pruned, i.e. they will never be read into memory.
1 2 3 | |
Filters on non-partition columns will still benefit from automatic file pruning via file-level statistics. In the below example, the group=2 partition (file) will have 2 <= df["num"] <= 3 lower/upper bounds for the num column, and since the filter predicate is df["num"] < 2, Daft will prune the file from the read. Similar is true for group=3 and group=4 partitions, with none of the data from those files being read into memory.
1 2 3 | |
Write to Delta Lake#
You can use df.write_deltalake() to write a DataFrame to a Delta table:
1 | |
Daft supports multiple write modes. See the API docs for df.write_deltalake() for more details.
Type System#
Daft and Delta Lake have compatible type systems. Here are how types are converted across the two systems.
When reading from a Delta Lake table into Daft:
| Delta Lake | Daft |
|---|---|
BOOLEAN | daft.DataType.bool() |
BYTE | daft.DataType.int8() |
SHORT | daft.DataType.int16() |
INT | daft.DataType.int32() |
LONG | daft.DataType.int64() |
FLOAT | daft.DataType.float32() |
DOUBLE | daft.DataType.float64() |
DECIMAL(precision, scale) | daft.DataType.decimal128(precision, scale) |
DATE | daft.DataType.date() |
TIMESTAMP_NTZ | daft.DataType.timestamp(timeunit="us", timezone=None) |
TIMESTAMP | daft.DataType.timestamp(timeunit="us", timezone="UTC") |
STRING | daft.DataType.string() |
BINARY | daft.DataType.binary() |
MAP<key_type, value_type> | daft.DataType.map(key_type, value_type) |
STRUCT<[field_name: field_type,]> | daft.DataType.struct(fields) |
ARRAY<element_type> | daft.DataType.list(element_type) |
References: