Pass dynamic filter predicate to ConnectorSplitSource#getNextBatch#29206
Open
chenjian2664 wants to merge 2 commits intotrinodb:masterfrom
Open
Pass dynamic filter predicate to ConnectorSplitSource#getNextBatch#29206chenjian2664 wants to merge 2 commits intotrinodb:masterfrom
ConnectorSplitSource#getNextBatch#29206chenjian2664 wants to merge 2 commits intotrinodb:masterfrom
Conversation
6805608 to
da416b1
Compare
da416b1 to
4c7cf8a
Compare
6f38f96 to
0648ae0
Compare
07fc49b to
0648ae0
Compare
Member
|
Could you rebase on master to resolve conflicts? |
Add TupleDomain `dynamicFilterPredicate` parameter to `ConnectorSplitSource#getNextBatch` so connectors receive the resolved predicate per batch without needing to hold a `DynamicFilter` and manage the wait logic themselves. `ConnectorAwareSplitSource` now owns the dynamic filter wait timeout and passes getCurrentPredicate() down on each batch call.
0648ae0 to
8c6f835
Compare
ebyhr
reviewed
Apr 28, 2026
| return dynamicFilteringWaitTimeout; | ||
| } | ||
|
|
||
| @Config("dynamic-filtering.wait-timeout") |
Member
There was a problem hiding this comment.
@raunaqmorarka Do you know why the DF timeout and the relevant logic were implemented on the connectors' side? (To avoid SPI change ConnectorSplitSource.getNextBatch, or allow different values per catalog?)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
Move dynamic filter waiting logic from individual connectors into the engine layer by adding a
TupleDomain<ColumnHandle>dynamicFilterPredicate parameter toConnectorSplitSource#getNextBatch, so connectors receive the resolved predicate per batch without needing to hold a DynamicFilter and manage wait/timeout logic themselves.Previously,
IcebergSplitSource(and DeltaLakeSplitSource) each duplicated the same pattern: hold aDynamicFilter, pollisAwaitable()/isBlocked(), manage a Stopwatch against a connector-specific timeout, and only then call into Iceberg's planning API. This logic now lives entirely inConnectorAwareSplitSource, which waits for dynamic filter completion (controlled by a new dynamic_filtering_wait_timeout system session property backed by DynamicFilterConfig) and passesdynamicFilter.getCurrentPredicate()to the connector on each batch call.The change is split into two commits:
getNextBatch(int, TupleDomain<ColumnHandle>)as the new primary method (default delegates to the deprecatedgetNextBatch(int)); move wait/timeout ownership intoConnectorAwareSplitSource; updateClassLoaderSafeConnectorSplitSourceto forward the predicate.IcebergSplitSource; remove DynamicFilter/Stopwatch fields; removeiceberg.dynamic-filtering.wait-timeoutconfig property (marked @DefunctConfig) and thedynamic_filtering_wait_timeoutconnector session property; update tests and docs.Additional Changes
getNextBatch(int)continue to work via the default bridge — no changes required in other connectors.dynamic_filtering_wait_timeoutsystem property instead of the removed Iceberg-specific one.Additional context and related issues
Release notes
() This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text: