Allow swapping projections / row filters at row group boundaries in Parquet reader#9968
Allow swapping projections / row filters at row group boundaries in Parquet reader#9968adriangb wants to merge 9 commits into
Conversation
403183a to
48d1657
Compare
Add a small surface to the push decoder so callers can swap the RowFilter, ProjectionMask, and/or RowSelectionPolicy at row-group boundaries without rebuilding the decoder: - new pub fn `can_swap_strategy() -> bool` — true between row groups (outer state `ReadingRowGroup`, inner state `Finished`) - new pub fn `swap_strategy(StrategySwap) -> Result<()>` — rejected with `ParquetError::General` when called mid-row-group - new `pub struct StrategySwap` (`#[non_exhaustive]`) with builder methods `with_projection`, `with_filter`, `with_row_selection_policy` - new pub fn `row_groups_remaining() -> usize` for diagnostics `PushBuffers` carries through the swap, so bytes already fetched for columns that survive into the new strategy are reused — only bytes the new strategy needs but that aren't already buffered get requested via `NeedsData`. Adaptive callers should drive the decoder with `try_next_reader` rather than `try_decode`: handing the active reader off transitions the decoder back to `ReadingRowGroup` immediately, giving the caller a clean swap window between two consecutive returns. `try_decode` loops past row-group boundaries internally and is unsuitable for in-flight strategy changes. Tests cover: filter swap between row groups, mid-row-group rejection, swap-while-iterating-handed-off-reader, and projection narrowing that reuses already-buffered bytes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`PushBuffers` is `pub(crate)` in the arrow-rs workspace, so the intra-doc link `[\`PushBuffers\`]: crate::util::push_buffers::PushBuffers` in `ParquetPushDecoder::swap_strategy`'s public docs failed the `-D rustdoc::private-intra-doc-links` check on `cargo +nightly doc --document-private-items`. The rest of the doc is unchanged; the prose now refers to the type by name without a link. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
48d1657 to
fee4744
Compare
|
@HippoBaro @alamb mind taking a look at this? |
Add three swap_strategy tests: - expands_projection_between_row_groups: narrow -> wide swap with the whole file prefetched; verifies RG1 decodes the widened projection using already-buffered bytes. - expand_projection_requests_new_bytes: narrow -> wide swap driven incrementally; pins the post-swap NeedsData to the three RG1 column chunks (11168 bytes total), confirming the decoder fetches bytes for the newly-added columns. - narrow_projection_skips_unneeded_bytes: wide -> narrow swap driven incrementally; pins the post-swap NeedsData to the single RG1 column-a chunk (1856 bytes), confirming the decoder skips bytes for columns dropped from the projection. The incremental tests compare RG1-with-projection-X against RG1-with-projection-Y (not RG0 vs RG1), so the asymmetry isolates the projection change rather than per-row-group size variation in the StringView column "c". Expected ranges are hardcoded since TEST_BATCH and the writer settings are static. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
752de6e to
b75d300
Compare
|
I am OOO until next Wed but will try to find some time to look at this! |
| /// `Option` is "do you want to change the filter?", the inner is "set or | ||
| /// clear?". `Some(None)` clears any previously-installed filter. | ||
| #[derive(Debug, Default)] | ||
| #[non_exhaustive] |
There was a problem hiding this comment.
I think this idea makes sense (to change the projection/selection/filter) but I am thinking about why to make this a new structure, as it makes some things akward (like Filter being Option<Option<...>>
What do you think about putting direct functions on ParquetPushDecoder to override these
let decoder = ParquetPushDecoder:...
let decoder = decoder.with_projection(new_projection)For example ?
That might make the API simpler and easier to find
There was a problem hiding this comment.
Another thought I had on API is that we could maybe add an API to go back to the existign ParuqetPushDecoderBuilder which has all the fields, etc that can be adjusted.
It might be quite elegant to have something like
let decoder = ParquetPushDecoder:...
let new_decoder = decoder
.into_builder()
.with_projection()
.build()?That would also mean that any additional parameters that got added / wanted to be changed during decode would have a natural API wihtout additonal plumbing
There was a problem hiding this comment.
Looking at it I think it might be pretty straightforward:
A bunch of boiler plate destructing, but easy
There was a problem hiding this comment.
@alamb this was a great suggestion! I implemented it. There was some complexity around allowing the buffers to be re-used - can you check that I handled that in a way that makes sense?
There was a problem hiding this comment.
Our comments crossed
I think the way you handled it looks good to me (and the into_parts API is a bit wacky but it is an internal implementation detail)
| self.state.clear_all_ranges(); | ||
| } | ||
|
|
||
| /// True iff the decoder is at a row-group boundary and a |
There was a problem hiding this comment.
I think it would help to add a high level overview / example of how to swap strategies (like turn off the row filter) at boundaries
Perhaps in https://docs.rs/parquet/latest/parquet/arrow/push_decoder/type.ParquetPushDecoderBuilder.html we can mention that the strategy can be changed during execution
I think in particular pointing out that using try_next_reader is the important API where the strategy can be changed if needed
There was a problem hiding this comment.
You have this in the tests but I fear it will be hidden / not obvious if we don't also add it to the docs
There was a problem hiding this comment.
Added a rather extensive docstring. Let me know if it's too much.
There was a problem hiding this comment.
I did and I think the example is great. Making it more concise would be great, but in general I think it is good
…o_builder `StrategySwap` / `swap_strategy` exposed an awkward API: callers had to learn a new struct, and clearing vs. installing a row filter required a doubly `Option`-wrapped field (`Option<Option<RowFilter>>`). Per review feedback on apache#9968, replace it with `ParquetPushDecoder::into_builder`, the inverse of `ParquetPushDecoderBuilder::build`: at a row-group boundary it decomposes the decoder back into a `ParquetPushDecoderBuilder` configured for the not-yet-decoded row groups (pinned row groups, remaining row selection and offset/limit budget). Callers reconfigure with the existing builder setters and `build()` a fresh decoder, so any builder option — current and future — is adjustable mid-scan with no extra plumbing. `can_swap_strategy` is renamed to `is_at_row_group_boundary`. Unlike the old in-place swap, `into_builder` does not preserve buffered bytes; the rebuilt decoder re-requests what it needs (typically nothing extra at a boundary reached via incremental I/O). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…o_builder `into_builder` previously dropped the decoder's `PushBuffers`, so a rebuilt decoder re-requested data it already held. `ArrowReaderBuilder` is shared with the sync/async readers and cannot grow a push-decoder-specific field, so carry the buffers in the one slot that is push-decoder-specific: `NoInput`. It changes from a unit struct to a struct holding `PushBuffers`; `build()` consumes those buffers instead of allocating empty ones, and a new `pub(crate)` `ParquetPushDecoderBuilder::with_buffers` swaps them in. `into_builder` recovers the decoder's buffers (now returned by `RowGroupReaderBuilder::into_parts` rather than discarded) and threads them into the rebuilt builder. `with_buffers` is `pub(crate)`: `PushBuffers` is an internal type, so callers seeding a fresh decoder use `push_ranges` instead. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Per review feedback on apache#9968, expand the "Adaptive scans" section of the `ParquetPushDecoderBuilder` docs with a runnable example showing the `try_next_reader` loop and an `into_builder` strategy change at a row-group boundary, and call out that `try_next_reader` (not `try_decode`) is the API that exposes those boundaries. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…) wart
`NoInput` was named for a unit struct but now holds the decoder's
`PushBuffers`, so the name contradicted the contents. Rename it to
`PushInput`: the push decoder's input genuinely *is* the buffer of pushed
bytes.
Also derive `Default` for `PushBuffers` so `PushInput::default()` no longer
constructs it via the misleading `PushBuffers::new(0)` ("a file of length
zero"); `PushBuffers::default()` reads honestly as "empty".
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| row_group_reader_builder, | ||
| } = self; | ||
| let RowGroupFrontier { | ||
| parquet_metadata: _, |
There was a problem hiding this comment.
we probably want to pass along the metadata as well, right?
| /// } | ||
| /// // We are now at a row-group boundary. Based on whatever stats | ||
| /// // were gathered, optionally change strategy for the row groups | ||
| /// // still to come: drop or promote a row filter, narrow or widen |
| /// denote in the type system there is no type. | ||
| #[derive(Debug, Clone, Copy)] | ||
| pub struct NoInput; | ||
| /// The name predates the buffer-carrying behaviour; it still reads as "no |
There was a problem hiding this comment.
this sentence (and the rest of these comments can probably be cleaned up) but otherwise this looks good
| /// Reuse a [`PushBuffers`] when [`build`](Self::build)ing the decoder so | ||
| /// that bytes already fetched are not requested again. | ||
| /// | ||
| /// This is how [`ParquetPushDecoder::into_builder`] carries a decoder's |
There was a problem hiding this comment.
I don't think this context is particularly relevant -- It would be simpler if the API was just like "provide a preexisting PushBuffers to read from"
There was a problem hiding this comment.
I assume you mean exposing PushBuffers in the public API? I've done that but just wanted to confirm.
| /// The name predates the buffer-carrying behaviour; it still reads as "no | ||
| /// *reader* input". | ||
| #[derive(Debug)] | ||
| pub struct NoInput { |
There was a problem hiding this comment.
Since this is technically a breaking change anyways, perhaps we should just rename the struct to BuffersInput or PushDecoderInput ?
| /// applies that filter to the subsequent row groups while leaving the | ||
| /// already-decoded row group's results untouched. | ||
| /// | ||
| /// Adaptive callers should drive the decoder with `try_next_reader` |
There was a problem hiding this comment.
I think this context is now covered on the encoder builder docs -- perhaps you can just leave a reference to those comments here
|
|
||
| // After getting the first batch, we're inside `DecodingRowGroup`: | ||
| // an active reader is still alive. Mid-reader is not a boundary. | ||
| let _ = expect_data(decoder.try_decode()); |
| let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) | ||
| .unwrap() | ||
| .build() | ||
| .unwrap(); | ||
| decoder | ||
| .push_range(test_file_range(), TEST_FILE_DATA.clone()) | ||
| .unwrap(); |
There was a problem hiding this comment.
A lof of this test setup is repeated multiple times -- maybe we can make a function to avoid repeating the same 8 lines many times?
There was a problem hiding this comment.
Refactored into a shared prefetched_decoder
| /// | ||
| /// [`ParquetPushDecoderBuilder`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder | ||
| pub(crate) fn into_parts(self) -> RowGroupReaderBuilderParts { | ||
| let Self { |
There was a problem hiding this comment.
it might be worth a comment here and in the other into_parts function that if a new field gets added, it should also be threaded back into RowGroupReaderBuilderParts (not ignored)
Addresses the review comments on apache#9968: - Rename the input struct `PushInput` -> `PushDecoderInput`. - Make `PushBuffers` and `ParquetPushDecoderBuilder::with_buffers` public, and re-export `PushBuffers` from `parquet::arrow::push_decoder`, so the API reads as a plain "provide a preexisting `PushBuffers` to read from". Incidental `PushBuffers` methods are tightened to `pub(crate)`/private to keep the new surface minimal. - Carry the file `metadata` explicitly in `RemainingRowGroupsParts` (next to `schema`, both whole-file properties) instead of discarding the frontier's copy, so nothing looks dropped. - Tighten the doc comments added in this PR. - Tests: replace the repeated build+prefetch setup with `prefetched_decoder` / `prefetch_test_file` helpers, drop a duplicated doc paragraph in favour of a reference to the builder docs, and stop binding an ignored `expect_data`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
@alamb I tried to address your last round of feedback. I won't be able to merge anyway so I will leave this open for you to check or merge at your leisure. Thanks for reviewing! |
This is the decoder piece of the work presented at the NYC DataFusion meetup.
The idea is that we'll be able to adaptively promote and demote filters into row filters based on runtime selectivity stats.