Skip to content

Commit 088d444

Browse files
tuantran0910claude
andauthored
feat(writer): auto-set referenced_data_file on PositionDeleteFileWriter (#169)
* feat(writer): auto-set referenced_data_file on PositionDeleteFileWriter close When a position delete writer closes with exactly one output file and all written entries reference a single distinct data file path, set referenced_data_file on the resulting DataFile. This allows readers to scope delete file application without scanning the file_path column, matching Iceberg Java behavior and eliminating unnecessary I/O. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(writer): set referenced_data_file on all rolled position delete files The builders.len() == 1 guard prevented referenced_data_file from being set when the rolling writer produced multiple output files due to size rollover. Since rolling is purely size-driven, distinct_paths.len() == 1 already guarantees that every rolled output file contains entries for the same single data file. Removing the builders.len() check ensures the field is set on all output files, so readers can skip them for non-matching data files even in high-volume single-file delete scenarios. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * refactor(writer): clean up test comment in position_delete_file_writer Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 429299c commit 088d444

1 file changed

Lines changed: 152 additions & 3 deletions

File tree

crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs

Lines changed: 152 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
//! as required by the Iceberg specification. Ordering and deduplication must be handled by the
2222
//! caller (e.g. by using a sorting writer) before passing records to this writer.
2323
24+
use std::collections::HashSet;
2425
use std::sync::Arc;
2526

2627
use arrow_array::RecordBatch;
@@ -119,6 +120,7 @@ where
119120
Ok(PositionDeleteFileWriter {
120121
inner: Some(self.inner.build()),
121122
partition_key,
123+
distinct_paths: HashSet::new(),
122124
})
123125
}
124126
}
@@ -131,6 +133,7 @@ pub struct PositionDeleteFileWriter<
131133
> {
132134
inner: Option<RollingFileWriter<B, L, F>>,
133135
partition_key: Option<PartitionKey>,
136+
distinct_paths: HashSet<Arc<str>>,
134137
}
135138

136139
#[async_trait::async_trait]
@@ -145,6 +148,10 @@ where
145148
return Ok(());
146149
}
147150

151+
for pd in &input {
152+
self.distinct_paths.insert(Arc::clone(&pd.path));
153+
}
154+
148155
let batch = build_position_delete_batch(input)?;
149156

150157
if let Some(writer) = self.inner.as_mut() {
@@ -159,9 +166,17 @@ where
159166

160167
async fn close(&mut self) -> Result<Vec<DataFile>> {
161168
if let Some(writer) = self.inner.take() {
162-
writer
163-
.close()
164-
.await?
169+
let builders = writer.close().await?;
170+
let single_ref = if self.distinct_paths.len() == 1 {
171+
self.distinct_paths
172+
.iter()
173+
.next()
174+
.map(|p: &Arc<str>| p.as_ref().to_string())
175+
} else {
176+
None
177+
};
178+
179+
builders
165180
.into_iter()
166181
.map(|mut builder| {
167182
builder.content(DataContentType::PositionDeletes);
@@ -172,6 +187,9 @@ where
172187
builder.partition(Struct::empty());
173188
builder.partition_spec_id(0);
174189
}
190+
if let Some(ref_path) = single_ref.as_ref() {
191+
builder.referenced_data_file(Some(ref_path.clone()));
192+
}
175193
builder.build().map_err(|e| {
176194
Error::new(
177195
ErrorKind::DataInvalid,
@@ -382,4 +400,135 @@ mod tests {
382400

383401
Ok(())
384402
}
403+
404+
#[tokio::test]
405+
async fn test_referenced_data_file_single_path() -> Result<()> {
406+
let temp_dir = TempDir::new()?;
407+
let file_io = FileIOBuilder::new_fs_io().build()?;
408+
let location_gen = DefaultLocationGenerator::with_data_location(
409+
temp_dir.path().to_str().unwrap().to_string(),
410+
);
411+
let file_name_gen =
412+
DefaultFileNameGenerator::new("pos_del".to_string(), None, DataFileFormat::Parquet);
413+
let schema = Arc::new(POSITION_DELETE_SCHEMA.clone());
414+
let parquet_builder =
415+
ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
416+
let rolling_builder = RollingFileWriterBuilder::new_with_default_file_size(
417+
parquet_builder,
418+
file_io.clone(),
419+
location_gen,
420+
file_name_gen,
421+
);
422+
let mut writer = PositionDeleteFileWriterBuilder::new(rolling_builder)
423+
.build(None)
424+
.await?;
425+
426+
writer
427+
.write(vec![
428+
PositionDeleteInput::new(Arc::from("s3://bucket/data/file-1.parquet"), 1),
429+
PositionDeleteInput::new(Arc::from("s3://bucket/data/file-1.parquet"), 2),
430+
])
431+
.await?;
432+
433+
let data_files = writer.close().await?;
434+
assert_eq!(data_files.len(), 1);
435+
assert_eq!(
436+
data_files[0].referenced_data_file().as_deref(),
437+
Some("s3://bucket/data/file-1.parquet")
438+
);
439+
440+
Ok(())
441+
}
442+
443+
#[tokio::test]
444+
async fn test_referenced_data_file_multi_path_is_null() -> Result<()> {
445+
let temp_dir = TempDir::new()?;
446+
let file_io = FileIOBuilder::new_fs_io().build()?;
447+
let location_gen = DefaultLocationGenerator::with_data_location(
448+
temp_dir.path().to_str().unwrap().to_string(),
449+
);
450+
let file_name_gen =
451+
DefaultFileNameGenerator::new("pos_del".to_string(), None, DataFileFormat::Parquet);
452+
let schema = Arc::new(POSITION_DELETE_SCHEMA.clone());
453+
let parquet_builder =
454+
ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
455+
let rolling_builder = RollingFileWriterBuilder::new_with_default_file_size(
456+
parquet_builder,
457+
file_io.clone(),
458+
location_gen,
459+
file_name_gen,
460+
);
461+
let mut writer = PositionDeleteFileWriterBuilder::new(rolling_builder)
462+
.build(None)
463+
.await?;
464+
465+
writer
466+
.write(vec![
467+
PositionDeleteInput::new(Arc::from("s3://bucket/data/file-1.parquet"), 1),
468+
PositionDeleteInput::new(Arc::from("s3://bucket/data/file-2.parquet"), 5),
469+
])
470+
.await?;
471+
472+
let data_files = writer.close().await?;
473+
assert_eq!(data_files.len(), 1);
474+
assert_eq!(data_files[0].referenced_data_file(), None);
475+
476+
Ok(())
477+
}
478+
479+
#[tokio::test]
480+
async fn test_referenced_data_file_set_on_all_rolled_files() -> Result<()> {
481+
// When rolling produces multiple output files but all entries target a single
482+
// data file, every output file must carry referenced_data_file.
483+
let temp_dir = TempDir::new()?;
484+
let file_io = FileIOBuilder::new_fs_io().build()?;
485+
let location_gen = DefaultLocationGenerator::with_data_location(
486+
temp_dir.path().to_str().unwrap().to_string(),
487+
);
488+
let file_name_gen =
489+
DefaultFileNameGenerator::new("pos_del".to_string(), None, DataFileFormat::Parquet);
490+
let schema = Arc::new(POSITION_DELETE_SCHEMA.clone());
491+
let parquet_builder =
492+
ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
493+
// Use a tiny target size to force rolling after the first write.
494+
let rolling_builder = RollingFileWriterBuilder::new(
495+
parquet_builder,
496+
1,
497+
file_io.clone(),
498+
location_gen,
499+
file_name_gen,
500+
);
501+
let mut writer = PositionDeleteFileWriterBuilder::new(rolling_builder)
502+
.build(None)
503+
.await?;
504+
505+
writer
506+
.write(vec![PositionDeleteInput::new(
507+
Arc::from("s3://bucket/data/file-1.parquet"),
508+
1,
509+
)])
510+
.await?;
511+
writer
512+
.write(vec![PositionDeleteInput::new(
513+
Arc::from("s3://bucket/data/file-1.parquet"),
514+
2,
515+
)])
516+
.await?;
517+
518+
let data_files = writer.close().await?;
519+
assert!(
520+
data_files.len() > 1,
521+
"expected rolling to produce multiple files, got {}",
522+
data_files.len()
523+
);
524+
for file in &data_files {
525+
assert_eq!(
526+
file.referenced_data_file().as_deref(),
527+
Some("s3://bucket/data/file-1.parquet"),
528+
"all rolled files must carry referenced_data_file"
529+
);
530+
}
531+
532+
Ok(())
533+
}
385534
}

0 commit comments

Comments
 (0)