Skip to content

Commit d785957

Browse files
authored
MINOR: code cleanup in InternalTopicManager (#21165)
- Avoid double-brace initialization - Fix timeout error handling, by pass in correct list of topic names - Remove unnecessary code Reviewers: Vincent Potuček (@Pankraz76), Lucas Brutschy <[email protected]>
1 parent a2eae88 commit d785957

File tree

1 file changed

+13
-14
lines changed

1 file changed

+13
-14
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -194,18 +194,19 @@ public ValidationResult validate(final Map<String, InternalTopicConfig> topicCon
194194
(streamsSide, brokerSide) -> validateCleanupPolicy(validationResult, streamsSide, brokerSide)
195195
);
196196
}
197-
197+
198+
final Set<String> topicsStillToValidate = new HashSet<>();
199+
topicsStillToValidate.addAll(topicDescriptionsStillToValidate);
200+
topicsStillToValidate.addAll(topicConfigsStillToValidate);
201+
198202
maybeThrowTimeout(new TimeoutContext(
199-
new HashSet<String>() {{
200-
addAll(topicDescriptionsStillToValidate);
201-
addAll(topicConfigsStillToValidate);
202-
}},
203-
deadline,
204-
"Validation timeout",
205-
String.format("Could not validate internal topics within %d milliseconds. " +
206-
"This can happen if the Kafka cluster is temporarily not available.", retryTimeoutMs),
207-
null
208-
));
203+
topicsStillToValidate,
204+
deadline,
205+
"Validation timeout",
206+
String.format("Could not validate internal topics within %d milliseconds. " +
207+
"This can happen if the Kafka cluster is temporarily not available.", retryTimeoutMs),
208+
null
209+
));
209210

210211
if (!descriptionsForTopic.isEmpty() || !configsForTopic.isEmpty()) {
211212
Utils.sleep(100);
@@ -497,7 +498,7 @@ public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) {
497498
}
498499
if (!topicsNotReady.isEmpty()) {
499500
maybeThrowTimeout(new TimeoutContext(
500-
Collections.singleton("makeReadyCheck"), // dummy collection just to trigger if `topicsNotReady` is non-empty
501+
topicsNotReady,
501502
deadlineMs,
502503
"MakeReady timeout",
503504
String.format("Could not create topics within %d milliseconds. This can happen if the Kafka cluster is temporarily not available.", retryTimeoutMs),
@@ -613,8 +614,6 @@ private Set<String> createTopics(final Set<NewTopic> topicsToCreate,
613614
deadlineMs - time.milliseconds()
614615
);
615616
Utils.sleep(retryBackOffMs);
616-
} else {
617-
continue;
618617
}
619618
}
620619
return createdTopics;

0 commit comments

Comments
 (0)