close
Skip to content

fix(kafka): correct MCL batch listener registration and enable batchi…#17430

Open
david-leifker wants to merge 2 commits into
masterfrom
fix/mcl-batch-registration
Open

fix(kafka): correct MCL batch listener registration and enable batchi…#17430
david-leifker wants to merge 2 commits into
masterfrom
fix/mcl-batch-registration

Conversation

@david-leifker
Copy link
Copy Markdown
Collaborator

…ng by default

The MCL batch listener was not being registered correctly — the endpoint always pointed at the single-record consume method and the container factory was never configured for batch mode. This fixes three root causes:

  1. Make consumeBatch the primary contract on GenericKafkaListener, with consume as a convenience default that delegates via singleton list.
  2. Add a batch-enabled KafkaListenerContainerFactory bean (mclBatchEventConsumer) and select it when batching is enabled.
  3. Override createListenerEndpoint in MCLKafkaListenerRegistrar to register the consumeBatch(List) method when batch mode is active.

Additionally, enable MCL and MCP batching by default in docker/profiles for all quickstart and debug deployments.

@github-actions
Copy link
Copy Markdown
Contributor

Linear: PFP-3898

@github-actions github-actions Bot added the devops PR or Issue related to DataHub backend & deployment label May 13, 2026
@codecov
Copy link
Copy Markdown

codecov Bot commented May 13, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ All tests successful. No failed tests found.

📢 Thoughts on this report? Let us know!

@david-leifker david-leifker marked this pull request as draft May 13, 2026 22:40
@david-leifker david-leifker marked this pull request as draft May 13, 2026 22:40
@david-leifker david-leifker force-pushed the fix/mcl-batch-registration branch from dc71d51 to 427b1a0 Compare May 14, 2026 00:40
…ng by default

The MCL batch listener was not being registered correctly — the endpoint
always pointed at the single-record `consume` method and the container
factory was never configured for batch mode. This fixes three root causes:

1. Make `consumeBatch` the primary contract on `GenericKafkaListener`,
   with `consume` as a convenience default that delegates via singleton list.
2. Add a batch-enabled `KafkaListenerContainerFactory` bean
   (`mclBatchEventConsumer`) and select it when batching is enabled.
3. Override `createListenerEndpoint` in `MCLKafkaListenerRegistrar` to
   register the `consumeBatch(List)` method when batch mode is active.

Additionally, enable MCL and MCP batching by default in docker/profiles
for all quickstart and debug deployments.

Co-authored-by: Cursor <cursoragent@cursor.com>
…solutionException

MethodKafkaListenerEndpoint wraps batch delivery in a
BatchMessagingMessageListenerAdapter that strips ConsumerRecord wrappers,
converting List<ConsumerRecord<K,V>> to List<V> before method invocation.
This caused a MethodArgumentResolutionException because the consumeBatch
method expects List<ConsumerRecord<String, GenericRecord>>.

Introduce BatchKafkaListenerEndpoint — a KafkaListenerEndpoint that sets
a BatchMessageListener directly on the container, bypassing the messaging
adapter entirely. The BatchMessageListener interface delivers the raw
List<ConsumerRecord<K,V>> without stripping wrappers.

Co-authored-by: Cursor <cursoragent@cursor.com>
@github-actions
Copy link
Copy Markdown
Contributor

Linear: PFP-3902

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

devops PR or Issue related to DataHub backend & deployment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant