Reading from Kafka#
Experimental
This connector is experimental and the API may change in future releases. Current limitations:
- Bounded batch reads only — no streaming or unbounded mode
- No offset commit management — consumer group offsets are not committed
- No write support — Daft cannot produce messages to Kafka
Apache Kafka is a distributed event streaming platform. Daft can read bounded ranges of messages from Kafka topics into DataFrames using daft.read_kafka().
Daft currently supports:
- Bounded batch reads: Read a fixed range of messages defined by offsets, timestamps, or earliest/latest markers
- Multi-topic reads: Read from one or more topics in a single call
- Partition filtering: Read from specific partitions within a topic
- Flexible bounds: Specify start/end using
"earliest"/"latest", timestamps (datetime, ISO-8601, or epoch ms), or explicit per-partition offset maps - Limit pushdown: Use
.limit(N)to stop reading early once enough rows are collected
Installing Daft with Kafka Support#
Daft integrates with Kafka through the confluent-kafka package:
1 | |
Or install the dependency manually:
1 | |
Output Schema#
Every read_kafka call returns a DataFrame with the following fixed schema:
| Column | Type | Description |
|---|---|---|
topic | string | The topic the message was read from |
partition | int32 | The partition ID within the topic |
offset | int64 | The offset of the message within the partition |
timestamp_ms | int64 | Message timestamp in milliseconds (null if unavailable) |
key | binary | The message key as raw bytes (null if not present) |
value | binary | The message value as raw bytes |
Reading Messages#
Basic Usage#
Read all messages from a topic (earliest to latest):
1 2 3 4 | |
Timestamp Bounds#
Read messages within a time range:
1 2 3 4 5 6 7 8 9 10 11 12 | |
You can also use ISO-8601 strings or epoch milliseconds:
1 2 3 4 5 6 | |
Offset Bounds#
Read a specific offset range from a partition:
1 2 3 4 5 6 | |
Partition Filtering#
Read only from specific partitions:
1 2 3 4 5 | |
Multiple Topics#
Read from multiple topics at once:
1 2 3 4 | |
With per-topic offset maps:
1 2 3 4 5 6 | |
Using Limit#
Stop reading early once enough rows are collected:
1 2 | |
Kafka Client Configuration#
Pass additional librdkafka configuration options via kafka_client_config:
1 2 3 4 5 6 7 8 9 10 | |
Note
The following keys are managed internally and cannot be overridden via kafka_client_config: bootstrap.servers, group.id, enable.auto.commit, enable.auto.offset.store. These are configured through the dedicated bootstrap_servers and group_id parameters of read_kafka() instead.