Skip to content

Commit fc835a7

Browse files
tuantran0910claude
andcommitted
fix(writer): set referenced_data_file on all rolled position delete files
The builders.len() == 1 guard prevented referenced_data_file from being set when the rolling writer produced multiple output files due to size rollover. Since rolling is purely size-driven, distinct_paths.len() == 1 already guarantees that every rolled output file contains entries for the same single data file. Removing the builders.len() check ensures the field is set on all output files, so readers can skip them for non-matching data files even in high-volume single-file delete scenarios. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 12619cc commit fc835a7

1 file changed

Lines changed: 58 additions & 1 deletion

File tree

crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ where
167167
async fn close(&mut self) -> Result<Vec<DataFile>> {
168168
if let Some(writer) = self.inner.take() {
169169
let builders = writer.close().await?;
170-
let single_ref = if builders.len() == 1 && self.distinct_paths.len() == 1 {
170+
let single_ref = if self.distinct_paths.len() == 1 {
171171
self.distinct_paths
172172
.iter()
173173
.next()
@@ -475,4 +475,61 @@ mod tests {
475475

476476
Ok(())
477477
}
478+
479+
#[tokio::test]
480+
async fn test_referenced_data_file_set_on_all_rolled_files() -> Result<()> {
481+
// When rolling produces multiple output files but all entries target a single
482+
// data file, every output file must carry referenced_data_file. A builders.len() == 1
483+
// guard would silently drop the field for rolled files, defeating the optimization.
484+
let temp_dir = TempDir::new()?;
485+
let file_io = FileIOBuilder::new_fs_io().build()?;
486+
let location_gen = DefaultLocationGenerator::with_data_location(
487+
temp_dir.path().to_str().unwrap().to_string(),
488+
);
489+
let file_name_gen =
490+
DefaultFileNameGenerator::new("pos_del".to_string(), None, DataFileFormat::Parquet);
491+
let schema = Arc::new(POSITION_DELETE_SCHEMA.clone());
492+
let parquet_builder =
493+
ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
494+
// Use a tiny target size to force rolling after the first write.
495+
let rolling_builder = RollingFileWriterBuilder::new(
496+
parquet_builder,
497+
1,
498+
file_io.clone(),
499+
location_gen,
500+
file_name_gen,
501+
);
502+
let mut writer = PositionDeleteFileWriterBuilder::new(rolling_builder)
503+
.build(None)
504+
.await?;
505+
506+
writer
507+
.write(vec![PositionDeleteInput::new(
508+
Arc::from("s3://bucket/data/file-1.parquet"),
509+
1,
510+
)])
511+
.await?;
512+
writer
513+
.write(vec![PositionDeleteInput::new(
514+
Arc::from("s3://bucket/data/file-1.parquet"),
515+
2,
516+
)])
517+
.await?;
518+
519+
let data_files = writer.close().await?;
520+
assert!(
521+
data_files.len() > 1,
522+
"expected rolling to produce multiple files, got {}",
523+
data_files.len()
524+
);
525+
for file in &data_files {
526+
assert_eq!(
527+
file.referenced_data_file().as_deref(),
528+
Some("s3://bucket/data/file-1.parquet"),
529+
"all rolled files must carry referenced_data_file"
530+
);
531+
}
532+
533+
Ok(())
534+
}
478535
}

0 commit comments

Comments
 (0)