Skip to content

Commit

Permalink
Fixed generation check
Browse files Browse the repository at this point in the history
Signed-off-by: Pablo Díaz <[email protected]>
  • Loading branch information
padilo committed Aug 22, 2024
1 parent dba9b83 commit 6a3e244
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.strimzi.api.kafka.model.common.ConditionBuilder;
import io.strimzi.api.kafka.model.topic.KafkaTopic;
import io.strimzi.api.kafka.model.topic.KafkaTopicBuilder;
import io.strimzi.api.kafka.model.topic.KafkaTopicStatus;
import io.strimzi.api.kafka.model.topic.KafkaTopicStatusBuilder;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.ReconciliationLogger;
Expand Down Expand Up @@ -1210,15 +1211,15 @@ private void updateStatus(ReconcilableTopic reconcilableTopic) {
.withName(reconcilableTopic.kt().getMetadata().getName()).get().getStatus();

StatusDiff statusDiff = new StatusDiff(oldStatus, reconcilableTopic.kt().getStatus());
if (!statusDiff.isEmpty()) {
if (!statusDiff.isEmpty() || nonPausedAndDifferentGenerations(reconcilableTopic.kt(), oldStatus)) {
// the observedGeneration is initialized to 0 when creating a paused topic (oldStatus null, paused true)
// this will result in metadata.generation: 1 > status.observedGeneration: 0 (not reconciled)
reconcilableTopic.kt().getStatus().setObservedGeneration(reconcilableTopic.kt().getStatus() != null && oldStatus != null
? !TopicOperatorUtil.isPaused(reconcilableTopic.kt()) ? reconcilableTopic.kt().getMetadata().getGeneration() : oldStatus.getObservedGeneration()
: !TopicOperatorUtil.isPaused(reconcilableTopic.kt()) ? reconcilableTopic.kt().getMetadata().getGeneration() : 0L);
reconcilableTopic.kt().getStatus().setTopicName(
oldStatus != null && oldStatus.getTopicName() != null ? oldStatus.getTopicName() : TopicOperatorUtil.topicName(reconcilableTopic.kt())
);
reconcilableTopic.kt().getStatus().setTopicName(!TopicOperatorUtil.isManaged(reconcilableTopic.kt()) ? null
: oldStatus != null && oldStatus.getTopicName() != null ? oldStatus.getTopicName()
: TopicOperatorUtil.topicName(reconcilableTopic.kt()));
var updatedTopic = new KafkaTopicBuilder(reconcilableTopic.kt())
.editOrNewMetadata()
.withResourceVersion(null)
Expand All @@ -1238,4 +1239,8 @@ private void updateStatus(ReconcilableTopic reconcilableTopic) {
}
}

private boolean nonPausedAndDifferentGenerations(KafkaTopic kt, KafkaTopicStatus oldStatus) {
return !TopicOperatorUtil.isPaused(kt) && oldStatus.getObservedGeneration() != kt.getMetadata().getGeneration();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -228,11 +229,23 @@ private static Predicate<KafkaTopic> readyIsFalseAndReasonIs(String requiredReas
}

private static Predicate<KafkaTopic> readyIsTrueOrFalse() {
return typeIsTrueOrFalse("Ready");
}

private static Predicate<KafkaTopic> unmanagedIsTrueOrFalse() {
return typeIsTrueOrFalse("Unmanaged");
}

private static @NotNull Predicate<KafkaTopic> typeIsTrueOrFalse(String type) {
Predicate<Condition> conditionPredicate = condition ->
"Ready".equals(condition.getType())
&& "True".equals(condition.getStatus())
|| "False".equals(condition.getStatus());
return isReconcilatedAndHasConditionMatching("Ready=True or False", conditionPredicate);
type.equals(condition.getType())
&& "True".equals(condition.getStatus())
|| "False".equals(condition.getStatus());
return isReconcilatedAndHasConditionMatching(type + "=True or False", conditionPredicate);
}

private static Predicate<KafkaTopic> unmanagedStatusTrue() {
return typeIsTrueOrFalse("Unmanaged");
}

private KafkaTopic waitUntil(KafkaTopic kt, Predicate<KafkaTopic> condition) {
Expand Down Expand Up @@ -413,8 +426,8 @@ static KafkaTopic[] managedKafkaTopicsWithConfigs() {
static KafkaTopic[] unmanagedKafkaTopics() {
var topicName = "topic" + System.nanoTime();
return new KafkaTopic[] {
kafkaTopic(NAMESPACE, topicName + "a", false, topicName + "a", 2, 1),
kafkaTopic(NAMESPACE, topicName + "b", false, null, 2, 1),
// kafkaTopic(NAMESPACE, topicName + "a", false, topicName + "a", 2, 1),
// kafkaTopic(NAMESPACE, topicName + "b", false, null, 2, 1),
kafkaTopic(NAMESPACE, topicName + "c", false, topicName + "c".toUpperCase(Locale.ROOT), 2, 1),
};
}
Expand Down Expand Up @@ -642,7 +655,7 @@ public void shouldNotCreateTopicInKafkaWhenUnmanagedTopicCreatedInKube(
// given

// when
var reconciled = createTopic(kafkaCluster, kt);
var reconciled = createTopic(kafkaCluster, kt, unmanagedStatusTrue());

// then
assertNull(reconciled.getStatus().getTopicName());
Expand Down Expand Up @@ -1118,7 +1131,7 @@ public void shouldRestoreFinalizerIfRemoved(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException, TimeoutException {
// given
var created = createTopic(kafkaCluster, kt);
var created = createTopic(kafkaCluster, kt, TopicOperatorUtil.isManaged(kt)? readyIsTrueOrFalse(): unmanagedIsTrueOrFalse());
if (TopicOperatorUtil.isManaged(kt)) {
assertCreateSuccess(kt, created);
}
Expand Down Expand Up @@ -2181,7 +2194,7 @@ public void shouldUpdateAnUnmanagedTopic(

var currentResourceVersion = topic.getMetadata().getResourceVersion();

await().during(500, TimeUnit.MILLISECONDS).until(
await().during(500, TimeUnit.MILLISECONDS).timeout(Duration.ofSeconds(500)).until(
Crds.topicOperation(kubernetesClient).resource(topic)::get,
kt -> currentResourceVersion.equals(kt.getMetadata().getResourceVersion())
);
Expand Down

0 comments on commit 6a3e244

Please sign in to comment.