Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[system test] Fix downgrade procedure #11154

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,66 +110,130 @@ public class AbstractKRaftUpgradeST extends AbstractST {
protected final LabelSelector coSelector = new LabelSelectorBuilder().withMatchLabels(Map.of(Labels.STRIMZI_KIND_LABEL, "cluster-operator")).build();
protected final LabelSelector connectLabelSelector = KafkaConnectResource.getLabelSelector(CLUSTER_NAME, KafkaConnectResources.componentName(CLUSTER_NAME));


protected void makeComponentsSnapshots(String componentsNamespaceName) {
eoPods = DeploymentUtils.depSnapshot(componentsNamespaceName, KafkaResources.entityOperatorDeploymentName(CLUSTER_NAME));
controllerPods = PodUtils.podSnapshot(componentsNamespaceName, controllerSelector);
brokerPods = PodUtils.podSnapshot(componentsNamespaceName, brokerSelector);
connectPods = PodUtils.podSnapshot(componentsNamespaceName, connectLabelSelector);
}

protected void doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure(
/**
* Performs the Kafka Connect and Kafka Connector upgrade procedure.
* It upgrades the Cluster Operator, Kafka Connect, and Kafka Connector while verifying each step.
*
* @param clusterOperatorNamespaceName Namespace of the Cluster Operator
* @param testStorage Test-related configuration and storage
* @param upgradeDowngradeData Bundle version modification data
* @param upgradeKafkaVersion Kafka version details
* @throws IOException if any I/O error occurs during the procedure
*/
protected void doKafkaConnectAndKafkaConnectorUpgradeProcedure(
final String clusterOperatorNamespaceName,
final TestStorage testStorage,
final BundleVersionModificationData upgradeDowngradeData,
final UpgradeKafkaVersion upgradeKafkaVersion
) throws IOException {
final UpgradeKafkaVersion upgradeKafkaVersion) throws IOException {
// 1. Setup Cluster Operator with KafkaConnect and KafkaConnector
setupEnvAndUpgradeClusterOperator(clusterOperatorNamespaceName, testStorage, upgradeDowngradeData, upgradeKafkaVersion);
deployKafkaConnectAndKafkaConnectorWithWaitForReadiness(testStorage, upgradeDowngradeData, upgradeKafkaVersion);

final KafkaClients clients = ClientUtils.getInstantTlsClientBuilder(testStorage, KafkaResources.tlsBootstrapAddress(CLUSTER_NAME))
.withNamespaceName(testStorage.getNamespaceName())
.withUsername(USER_NAME)
.build();

resourceManager.createResourceWithWait(clients.producerTlsStrimzi(CLUSTER_NAME));
// Verify that Producer finish successfully
ClientUtils.waitForInstantProducerClientSuccess(testStorage);
// 2. Send messages
produceMessagesAndVerify(testStorage);

// 3. Make snapshots
makeComponentsSnapshots(testStorage.getNamespaceName());
logComponentsPodImagesWithConnect(testStorage.getNamespaceName());

// Verify FileSink KafkaConnector before upgrade
String connectorPodName = kubeClient().listPods(testStorage.getNamespaceName(), Collections.singletonMap(Labels.STRIMZI_KIND_LABEL, KafkaConnect.RESOURCE_KIND)).get(0).getMetadata().getName();
KafkaConnectUtils.waitForMessagesInKafkaConnectFileSink(testStorage.getNamespaceName(), connectorPodName, DEFAULT_SINK_FILE_PATH, testStorage.getMessageCount());
// 4. Verify KafkaConnector FileSink
verifyKafkaConnectorFileSink(testStorage);

// Upgrade CO to HEAD and wait for readiness of ClusterOperator
// 5. Upgrade CO to HEAD and wait for readiness of ClusterOperator
changeClusterOperator(clusterOperatorNamespaceName, testStorage.getNamespaceName(), upgradeDowngradeData);

if (TestKafkaVersion.supportedVersionsContainsVersion(upgradeKafkaVersion.getVersion())) {
// Verify that Kafka and Connect Pods Rolled
waitForKafkaClusterRollingUpdate(testStorage.getNamespaceName());
connectPods = RollingUpdateUtils.waitTillComponentHasRolled(testStorage.getNamespaceName(), connectLabelSelector, 1, connectPods);
KafkaConnectorUtils.waitForConnectorReady(testStorage.getNamespaceName(), CLUSTER_NAME);
}
// 6. Wait for components to roll
maybeWaitForRollingUpdate(testStorage, upgradeKafkaVersion);

// 7. Make snapshots with KafkaConnect
logComponentsPodImagesWithConnect(testStorage.getNamespaceName());

// 8. Upgrade Kafka
changeKafkaVersion(testStorage.getNamespaceName(), upgradeDowngradeData);
changeKafkaVersionInKafkaConnect(testStorage.getNamespaceName(), upgradeDowngradeData);

logComponentsPodImagesWithConnect(testStorage.getNamespaceName());
checkAllComponentsImages(testStorage.getNamespaceName(), upgradeDowngradeData);

verifyPostUpgradeOrDowngradeProcedure(testStorage, upgradeDowngradeData);
}

/**
* Performs the Kafka Connect and Kafka Connector downgrade procedure.
* It upgrades the Cluster Operator first, then adjusts Kafka versions, and finally verifies the environment.
*
* @param clusterOperatorNamespaceName Namespace of the Cluster Operator
* @param testStorage Test-related configuration and storage
* @param upgradeDowngradeData Bundle version modification data
* @param upgradeKafkaVersion Kafka version details
* @throws IOException if any I/O error occurs during the procedure
*/
protected void doKafkaConnectAndKafkaConnectorDowngradeProcedure(
final String clusterOperatorNamespaceName,
final TestStorage testStorage,
final BundleVersionModificationData upgradeDowngradeData,
final UpgradeKafkaVersion upgradeKafkaVersion
) throws IOException {
// 1. Setup Cluster Operator with KafkaConnect and KafkaConnector
setupEnvAndUpgradeClusterOperator(clusterOperatorNamespaceName, testStorage, upgradeDowngradeData, upgradeKafkaVersion);
deployKafkaConnectAndKafkaConnectorWithWaitForReadiness(testStorage, upgradeDowngradeData, upgradeKafkaVersion);

// 2. Send messages
produceMessagesAndVerify(testStorage);

// 3. Make snapshots
makeComponentsSnapshots(testStorage.getNamespaceName());
logComponentsPodImagesWithConnect(testStorage.getNamespaceName());

// 4. Verify KafkaConnector FileSink
verifyKafkaConnectorFileSink(testStorage);
logComponentsPodImagesWithConnect(testStorage.getNamespaceName());

// Upgrade/Downgrade kafka
// 5. Downgrade Kafka
changeKafkaVersion(testStorage.getNamespaceName(), upgradeDowngradeData);
changeKafkaVersionInKafkaConnect(testStorage.getNamespaceName(), upgradeDowngradeData);

// 6. Make snapshots with KafkaConnect
logComponentsPodImagesWithConnect(testStorage.getNamespaceName());

// 7. Downgrade CO and wait for readiness of ClusterOperator
changeClusterOperator(clusterOperatorNamespaceName, testStorage.getNamespaceName(), upgradeDowngradeData);

// 8. Wait for components to roll and check component images
maybeWaitForRollingUpdate(testStorage, upgradeKafkaVersion);
checkAllComponentsImages(testStorage.getNamespaceName(), upgradeDowngradeData);

verifyPostUpgradeOrDowngradeProcedure(testStorage, upgradeDowngradeData);
}

/**
* Verifies the environment after an upgrade or downgrade procedure
* by sending new messages, checking connector output, stability, and final state.
*
* @param testStorage Test-related configuration and storage
* @param upgradeDowngradeData Bundle version modification data
*/
private void verifyPostUpgradeOrDowngradeProcedure(final TestStorage testStorage,
final BundleVersionModificationData upgradeDowngradeData) {
final KafkaClients clients = ClientUtils.getInstantTlsClientBuilder(testStorage, KafkaResources.tlsBootstrapAddress(CLUSTER_NAME))
.withNamespaceName(testStorage.getNamespaceName())
.withUsername(USER_NAME)
.build();
// send again new messages
resourceManager.createResourceWithWait(clients.producerTlsStrimzi(CLUSTER_NAME));

// Verify that Producer finish successfully
ClientUtils.waitForInstantProducerClientSuccess(testStorage.getNamespaceName(), testStorage);

// Verify FileSink KafkaConnector
connectorPodName = kubeClient().listPods(testStorage.getNamespaceName(), Collections.singletonMap(Labels.STRIMZI_KIND_LABEL, KafkaConnect.RESOURCE_KIND)).get(0).getMetadata().getName();
String connectorPodName = kubeClient().listPods(testStorage.getNamespaceName(), Collections.singletonMap(Labels.STRIMZI_KIND_LABEL, KafkaConnect.RESOURCE_KIND)).get(0).getMetadata().getName();
KafkaConnectUtils.waitForMessagesInKafkaConnectFileSink(testStorage.getNamespaceName(), connectorPodName, DEFAULT_SINK_FILE_PATH, testStorage.getMessageCount());

// Verify that pods are stable
Expand All @@ -179,6 +243,60 @@ protected void doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure(
verifyProcedure(testStorage.getNamespaceName(), upgradeDowngradeData, testStorage.getContinuousProducerName(), testStorage.getContinuousConsumerName());
}

/**
* Verifies that the Kafka Connector FileSink is receiving messages as expected.
*
* @param testStorage Test-related configuration and storage
*/
private void verifyKafkaConnectorFileSink(final TestStorage testStorage) {
String connectorPodName = kubeClient().listPods(
testStorage.getNamespaceName(),
Collections.singletonMap(Labels.STRIMZI_KIND_LABEL, KafkaConnect.RESOURCE_KIND)
).get(0).getMetadata().getName();

KafkaConnectUtils.waitForMessagesInKafkaConnectFileSink(
testStorage.getNamespaceName(),
connectorPodName,
DEFAULT_SINK_FILE_PATH,
testStorage.getMessageCount()
);
}

/**
* Waits for the Kafka cluster and Kafka Connect to roll if the target version is supported.
*
* @param testStorage Test-related configuration and storage
* @param upgradeKafkaVersion Kafka version details
*/
private void maybeWaitForRollingUpdate(final TestStorage testStorage,
final UpgradeKafkaVersion upgradeKafkaVersion) {
if (TestKafkaVersion.supportedVersionsContainsVersion(upgradeKafkaVersion.getVersion())) {
waitForKafkaClusterRollingUpdate(testStorage.getNamespaceName());
connectPods = RollingUpdateUtils.waitTillComponentHasRolled(
testStorage.getNamespaceName(),
connectLabelSelector,
1,
connectPods
);
KafkaConnectorUtils.waitForConnectorReady(testStorage.getNamespaceName(), CLUSTER_NAME);
}
}

/**
* Produces messages and verifies they were successfully sent, using a TLS client.
*
* @param testStorage Test-related configuration and storage
*/
private void produceMessagesAndVerify(TestStorage testStorage) {
final KafkaClients clients = ClientUtils.getInstantTlsClientBuilder(testStorage, KafkaResources.tlsBootstrapAddress(CLUSTER_NAME))
.withNamespaceName(testStorage.getNamespaceName())
.withUsername(USER_NAME)
.build();

resourceManager.createResourceWithWait(clients.producerTlsStrimzi(CLUSTER_NAME));
ClientUtils.waitForInstantProducerClientSuccess(testStorage);
}

protected void setupEnvAndUpgradeClusterOperator(String clusterOperatorNamespaceName, TestStorage testStorage, BundleVersionModificationData upgradeData, UpgradeKafkaVersion upgradeKafkaVersion) throws IOException {
LOGGER.info("Test upgrade of Cluster Operator from version: {} to version: {}", upgradeData.getFromVersion(), upgradeData.getToVersion());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void testDowngradeOfKafkaKafkaConnectAndKafkaConnector(String from, String to, S

LOGGER.debug("Running downgrade test from version {} to {} (FG: {} -> {})", from, to, fgBefore, fgAfter);

doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure(CO_NAMESPACE, testStorage, downgradeData, downgradeKafkaVersion);
doKafkaConnectAndKafkaConnectorDowngradeProcedure(CO_NAMESPACE, testStorage, downgradeData, downgradeKafkaVersion);
}

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void testUpgradeOfKafkaKafkaConnectAndKafkaConnector(String fromVersion, String

LOGGER.debug("Running upgrade test from version {} to {} (FG: {} -> {})",
fromVersion, toVersion, fgBefore, fgAfter);
doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure(CO_NAMESPACE, testStorage, upgradeData, upgradeKafkaVersion);
doKafkaConnectAndKafkaConnectorUpgradeProcedure(CO_NAMESPACE, testStorage, upgradeData, upgradeKafkaVersion);
}

@IsolatedTest
Expand Down
Loading