close
Skip to content

fix(topology_lag): Kafka Topology Lag breaking when no offsets are commited for topic/partition#8589

Open
DiogoP98 wants to merge 3 commits intoapache:masterfrom
DiogoP98:fix-kafka-lag-issue
Open

fix(topology_lag): Kafka Topology Lag breaking when no offsets are commited for topic/partition#8589
DiogoP98 wants to merge 3 commits intoapache:masterfrom
DiogoP98:fix-kafka-lag-issue

Conversation

@DiogoP98
Copy link
Copy Markdown
Contributor

@DiogoP98 DiogoP98 commented May 6, 2026

What is the purpose of the change

Fixes the silent failure of the topology spout lag query, where the Storm UI's "Topology Lag" panel stops working with this warning being logged repeatedly for every Kafka spout:

  TopologySpoutLag [WARN] Exception thrown while getting lag for spout id: <spoutId>
  TopologySpoutLag [WARN] Exception message:null
  java.lang.ClassCastException

The regression was introduced by the Kafka client bump from 3.9.0 → 4.x (#8243). Two issues, on the same code path:

  1. KafkaOffsetLagUtil — NPE on partitions with no committed offset.
  2. TopologySpoutLag — ClassCastException swallows the real error. When the kafka-monitor subprocess fails (the NPE above, or any other failure: broker unreachable, auth failure, bad config, etc.), it prints a plain-text error to stdout.

Together, (1) restores correct lag reporting for partitions with no committed offset, and (2) ensures any future monitor failure is reported usefully instead of as a generic ClassCastException with a null message.

How was the change tested

Integration test

  • KafkaOffsetLagUtilTest#getOffsetLagsReportsCommittedOffsetForCommittedPartitionsAndMinusOneForUncommittedPartitions creates a 2-partition topic, commits an offset on partition 0 only, and calls the production KafkaOffsetLagUtil.getOffsetLags(...) end-to-end. Asserts the committed offset is reported for partition 0 and -1 for partition 1 (the regression case — pre-fix this NPE'd inside the monitor and surfaced as ClassCastException upstream in TopologySpoutLag).

Manual verification on a Storm 2.8.7 cluster

  • Built storm-core and storm-kafka-monitor JARs and replaced them on the storm-ui node.
  • Confirmed the previously-failing spouts (managementKafka, secondaryKafka, Instructions, …) now return lag results in the UI with no TopologySpoutLag warnings in ui.log.
  • For a spout with a fresh consumer group (no committed offsets yet), confirmed lag reports correctly instead of throwing.

Issue: #8588

Copy link
Copy Markdown
Contributor

@rzo1 rzo1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! Both bugs are real, and the bulk-committed() + null-check is exactly the right shape for the fix.

One heads-up before going further: master currently targets Storm 3 (Java 21, no Clojure). The PR description mentions manual verification on a 2.8.7 cluster, so if you also need this fix on the 2.x line, happy to get a separate PR against the 2.x branch — they're maintained independently now.

A couple of things on the change itself:

  1. Testcontainers without a Docker fallback — see inline at KafkaOffsetLagUtilTest.java:45. This will hard-fail on every CI runner (and dev box) without Docker. storm-metrics-prometheus already established the right pattern; please mirror it.
  2. Formatting churn in KafkaOffsetLagUtil.java — the bug fix is ~6 useful lines, but the diff also reformats half the file (continuation indent 4→8, plus a Javadoc tweak). Bug fixes that get cherry-picked to release branches are much easier to backport when they aren't entangled with style-only changes. Consider splitting the formatting into a separate commit (or dropping it from this PR).
  3. Minor observability regression in TopologySpoutLag.java — see inline.

Nothing blocking on (2) or (3) — author's call.

import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be @Testcontainers(disabledWithoutDocker = true).

With bare @Testcontainers, this test hard-fails on any environment without a Docker daemon (CI runners, sandboxed builds, contributor laptops without Docker Desktop). With disabledWithoutDocker = true the JUnit engine skips the test instead of failing it.

The Storm codebase already has precedent for this — storm-metrics-prometheus:

// external/storm-metrics-prometheus/src/test/java/.../PrometheusPreparableReporterTest.java:43
@Testcontainers(disabledWithoutDocker = true)

(Note: storm-redis/.../RedisFilterBoltTest.java:58 uses bare @Testcontainers — that's an older pattern that probably should also be updated, but out of scope for this PR.)

Also worth noting: a Kafka container cold-start is ~10–20s. Worth flagging in the test class's javadoc that this is an integration test gated on Docker availability — so a future contributor running mvn test on the module without Docker understands why it's skipped.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. Thanks for the suggestion

Comment on lines +175 to +181
Map<TopicPartition, OffsetAndMetadata> committedOffsets = consumer.committed(new HashSet<>(topicPartitionList));
consumer.seekToEnd(topicPartitionList);
for (TopicPartition topicPartition : topicPartitionList) {
Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = consumer.committed(Collections.singleton(topicPartition));
long committedOffset = offsetAndMetadata != null ? offsetAndMetadata.get(topicPartition).offset() : -1;
consumer.seekToEnd(toArrayList(topicPartition));
OffsetAndMetadata partitionOffset = committedOffsets.get(topicPartition);
long committedOffset = partitionOffset != null ? partitionOffset.offset() : -1;
result.add(new KafkaOffsetLagResult(topicPartition.topic(), topicPartition.partition(), committedOffset,
consumer.position(topicPartition)));
consumer.position(topicPartition)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix here is good — bulk committed(Set) + null-check is exactly right, and moving seekToEnd(topicPartitionList) out of the per-partition loop is a nice cleanup.

My only ask is to separate the bug fix from the formatting churn in this file. Outside this loop, most of the diff is reformatting (continuation indent 4 → 8, Javadoc whitespace). That makes a clean cherry-pick of the fix to maintenance branches harder, and obscures what's actually changing for someone bisecting later. Suggest two commits: one for the fix, one for the formatting — or just drop the formatting touches from this PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. I've removed the the formatting touches on the rest of the file.

} catch (ParseException e) {
LOGGER.debug("JSON parsing failed, assuming message as error message: {}", resultFromMonitor);
// json parsing fail -> error received
errorMsg = resultFromMonitor;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiny nit: the previous code had a LOGGER.debug("JSON parsing failed, assuming message as error message: {}", resultFromMonitor) here. Now that the new instanceof Map path silently falls through to errorMsg and the ParseException path silently does too, there's no log line that captures "the monitor said something we couldn't parse."

The error does propagate via errorMsg so users see it, but losing the debug log makes future debugging harder. Suggest keeping a single debug-level log line in both branches, e.g. LOGGER.debug("Monitor returned non-JSON output, treating as error: {}", resultFromMonitor); — same observability you had before, no extra cost.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

@DiogoP98 DiogoP98 force-pushed the fix-kafka-lag-issue branch from da2b705 to 49835fe Compare May 7, 2026 21:14
@DiogoP98 DiogoP98 requested a review from rzo1 May 7, 2026 21:14
@reiabreu
Copy link
Copy Markdown
Contributor

reiabreu commented May 7, 2026

Hey @DiogoP98.
Thanks for this. I'll try to review it ASAP

@reiabreu reiabreu added the bug label May 7, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants