To setup this example, let's read a Parquet file from a public S3 bucket containing sample dog owners, use daft.col() with the df.with_column method to create a new column full_name, and join the contents from the last_name column to the first_name column. Then, let's create a dogs DataFrame from a Python dictionary and use df.join to join this with our dataframe of owners:
importdaftfromdaftimportcol# Read parquet file containing sample dog ownersdf=daft.read_parquet("s3://daft-oss-public-data/tutorials/10-min/sample-data-dog-owners-partitioned.pq/**")# Combine "first_name" and "last_name" to create new column "full_name"df=df.with_column("full_name",col("first_name")+" "+col("last_name"))df.select("full_name","age","country","has_dog").show()# Create dataframe of dogsdf_dogs=daft.from_pydict({"urls":["https://live.staticflickr.com/65535/53671838774_03ba68d203_o.jpg","https://live.staticflickr.com/65535/53671700073_2c9441422e_o.jpg","https://live.staticflickr.com/65535/53670606332_1ea5f2ce68_o.jpg","https://live.staticflickr.com/65535/53671838039_b97411a441_o.jpg","https://live.staticflickr.com/65535/53671698613_0230f8af3c_o.jpg",],"full_name":["Ernesto Evergreen","James Jale","Wolfgang Winter","Shandra Shamas","Zaya Zaphora",],"dog_name":["Ernie","Jackie","Wolfie","Shaggie","Zadie"],})# Join owners with dogs, dropping some columnsdf_family=df.join(df_dogs,on="full_name").exclude("first_name","last_name","DoB","country","age")df_family.show()
importdaftfromdaftimportcol,DataTypeimportnumpyasnp# 1. Create a DataFrame with image URLsdf=daft.from_pydict({"urls":["https://live.staticflickr.com/65535/53671838774_03ba68d203_o.jpg","https://live.staticflickr.com/65535/53671700073_2c9441422e_o.jpg","https://live.staticflickr.com/65535/53670606332_1ea5f2ce68_o.jpg","https://live.staticflickr.com/65535/53671838039_b97411a441_o.jpg","https://live.staticflickr.com/65535/53671698613_0230f8af3c_o.jpg",],})# 2. Define a UDF for normalization (Standard ImageNet normalization)@daft.func(return_dtype=DataType.tensor(DataType.float32()))defnormalize_image(img):ifimgisNone:returnNone# Standard ImageNet normalization mean and stdmean=np.array([0.485,0.456,0.406],dtype=np.float32)std=np.array([0.229,0.224,0.225],dtype=np.float32)# Convert to float32 and scale to [0, 1]# Input img is [H, W, C]img_float=img.astype(np.float32)/255.0# Normalize# img_float is [H, W, C], mean/std are [3] broadcasting over the last dimensionnormalized=(img_float-mean)/std# Transpose to [C, H, W] for PyTorch modelsnormalized=normalized.transpose(2,0,1)returnnormalized# 3. Build the pipeline: URL -> download -> decode -> resize -> to_tensor -> normalizedf=df.with_column("image",col("urls").download(on_error="null").decode_image().resize(224,224))df=df.with_column("tensor",col("image").image_to_tensor())df=df.with_column("normalized",normalize_image(col("tensor")))df.collect()df.select("urls","image","normalized").show()
When processing images with User-Defined Functions (UDFs) in Daft, using libraries like Pillow, OpenCV, or torchvision efficiently is key to performance and robustness.
Daft data may contain None (null) values. Your UDF must handle these gracefully to avoid runtime errors.
1 2 3 4 5 6 7 8 9101112131415161718192021
importdaftfromPILimportImageimportio@daft.func(return_dtype=daft.DataType.binary())defprocess_image(image_bytes):# Always check for None!ifimage_bytesisNone:returnNonetry:img=Image.open(io.BytesIO(image_bytes))# ... processing ...# Serialize back to bytes for efficiencyout=io.BytesIO()img.save(out,format=img.formator"PNG")returnout.getvalue()exceptException:# Decide whether to return None or raise an errorreturnNone
The return_dtype argument in @daft.func or @daft.udf is crucial. It tells Daft what kind of data to expect, allowing for optimizations and correct schema inference.
daft.DataType.tensor(dtype): Best for returning numerical data (numpy arrays, torch tensors). This allows Daft to treat the column as a native tensor type, enabling further vectorized operations.
daft.DataType.binary(): Best for returning raw bytes (e.g. encoded PNG/JPEG data). This is often more memory efficient than full bitmaps, and avoids the pickling overhead associated with Python objects.
daft.DataType.python(): Use this if you are returning arbitrary Python objects (like PIL.Image objects) that don't map neatly to a Daft type. Note: Python objects cannot be serialized as efficiently and may block some downstream optimizations.
Returning native arrays (NumPy or PyTorch) is generally more performant than returning Python objects like PIL.Image, especially when return_dtype is set to a Tensor type.
importnumpyasnp@daft.func(return_dtype=daft.DataType.tensor(daft.DataType.uint8()))defimage_to_numpy(image_bytes):ifimage_bytesisNone:returnNoneimg=Image.open(io.BytesIO(image_bytes))# Convert to numpy arrayreturnnp.array(img)
When using torchvision, operations typically return torch.Tensor. You can return these directly if you specify a Tensor return type.
1 2 3 4 5 6 7 8 9101112131415161718192021
importtorchimporttorchvision.transforms.functionalasFimportnumpyasnp@daft.func(return_dtype=daft.DataType.tensor(daft.DataType.float32()))deftransform_image(image_tensor):ifimage_tensorisNone:returnNone# Assuming input is already a tensor or numpy arrayifisinstance(image_tensor,np.ndarray):image_tensor=torch.from_numpy(image_tensor)# Ensure channel-first format (C, H, W) for torchvisionifimage_tensor.ndim==3andimage_tensor.shape[-1]==3:image_tensor=image_tensor.permute(2,0,1)# Apply torchvision transforms using Fimage_tensor=F.resize(image_tensor,[224,224])returnimage_tensor
For even higher performance, especially with heavy libraries like OpenCV or PyTorch, consider using batched UDFs to process multiple rows at once, reducing Python function call overhead.
1 2 3 4 5 6 7 8 9101112131415
@daft.func.batch(return_dtype=daft.DataType.tensor(daft.DataType.uint8()))defbatch_process_images(series):# 'series' is a Daft Series object# Convert to list of inputsinputs=series.to_pylist()results=[]foritemininputs:ifitemisNone:results.append(None)continue# Process item...# results.append(processed_item)returnresults
image_hash() computes a compact perceptual hash for each image. Two hashes with a low Hamming distance indicate visually similar images, making this a fast first-pass filter for near-duplicate detection at scale.
Use method="crop_resistant" when images may have been cropped or have different aspect ratios. The output hash is 9× larger (72 bytes for hash_size=8):
We'll define a function that uses a pre-trained PyTorch model: ResNet50 to classify the dog pictures. We'll pass the contents of the image urls column and send the classification predictions to a new column classify_breed.
Working with PyTorch adds some complexity but you can just run the cells below to perform the classification.
First, make sure to install and import some extra dependencies:
# import additional libraries, these are necessary for PyTorchimporttorch
Define your ClassifyImages UDF. Models are expensive to initialize and load, so we want to do this as few times as possible, and share a model across multiple invocations.
1 2 3 4 5 6 7 8 910111213141516
@daft.udf(return_dtype=daft.DataType.fixed_size_list(dtype=daft.DataType.string(),size=2))classClassifyImages:def__init__(self):# Perform expensive initializations - create and load the pre-trained modelself.model=torch.hub.load("NVIDIA/DeepLearningExamples:torchhub","nvidia_resnet50",pretrained=True)self.utils=torch.hub.load("NVIDIA/DeepLearningExamples:torchhub","nvidia_convnets_processing_utils")self.model.eval().to(torch.device("cpu"))def__call__(self,images_urls):batch=torch.cat([self.utils.prepare_input_from_uri(uri)foruriinimages_urls]).to(torch.device("cpu"))withtorch.no_grad():output=torch.nn.functional.softmax(self.model(batch),dim=1)results=self.utils.pick_n_best(predictions=output,n=1)return[result[0]forresultinresults]
Now you're ready to call this function on the urls column and store the outputs in a new column we'll call classify_breed:
Daft uses dynamic execution to automatically adjust batch sizes based on the operation type and data characteristics.
This is necessary because multimodal data such as images, videos, and audio files have different memory and processing characteristics that can cause issues with fixed batching: large batches may exceed available memory, while small batches may not fully utilize hardware optimizations or network bandwidth.
Multimodal Downloads: Downloads for multimodal data use smaller batch sizes (typically a factor of the max_connections parameter) to prevent memory exhaustion when downloading large files, while maintaining network throughput.
Vectorized Operations: Operations that can operate on many rows in parallel, such as byte decoding / encoding, aggregations, and scalar projections, will use larger batch sizes that can take advantage of vectorized execution using SIMD.
1234
# Each operation uses different batch sizes automaticallydf=daft.read_parquet("metadata.parquet")# Large batches.with_column("image_data",col("urls").download())# Small batches.with_column("resized",col("image_data").resize(224,224))# Medium batches
This approach allows processing of datasets larger than available memory, while maintaining optimal performance for each operation type.