Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expend feature gates support to all Strimzi operators #10141

Merged
merged 4 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
To use KRaft (ZooKeeper-less Apache Kafka), you still need to use the `strimzi.io/kraft: enabled` annotation on the `Kafka` custom resources or migrate from an existing ZooKeeper-based cluster using the `strimzi.io/kraft: migration` annotation.
* Update the base image used by Strimzi containers from UBI8 to UBI9
* Enhance `KafkaBridge` resource with consumer inactivity timeout and HTTP consumer/producer enablement.
* Add support for feature gates to User and Topic Operators

## 0.41.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.strimzi.operator.common.Util;
import io.strimzi.operator.common.config.ConfigParameter;
import io.strimzi.operator.common.config.ConfigParameterParser;
import io.strimzi.operator.common.featuregates.FeatureGates;
import io.strimzi.operator.common.model.Labels;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -32,6 +33,7 @@
import static io.strimzi.operator.common.config.ConfigParameterParser.LONG;
import static io.strimzi.operator.common.config.ConfigParameterParser.NAMESPACE_SET;
import static io.strimzi.operator.common.config.ConfigParameterParser.STRING;
import static io.strimzi.operator.common.config.ConfigParameterParser.parseFeatureGates;

/**
* Cluster Operator configuration
Expand Down Expand Up @@ -391,10 +393,6 @@ private static KafkaVersion.Lookup parseKafkaVersions(String kafkaImages, String
}
}

static ConfigParameterParser<FeatureGates> parseFeatureGates() {
return FeatureGates::new;
}

static ConfigParameterParser<ImagePullPolicy> parseImagePullPolicy() {
return imagePullPolicyEnvVar -> {
ImagePullPolicy imagePullPolicy = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.strimzi.api.kafka.model.kafka.KafkaResources;
import io.strimzi.api.kafka.model.kafka.entityoperator.EntityOperatorSpec;
import io.strimzi.api.kafka.model.kafka.entityoperator.EntityOperatorTemplate;
import io.strimzi.operator.cluster.ClusterOperatorConfig;
import io.strimzi.operator.cluster.model.securityprofiles.PodSecurityProviderContextImpl;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.Util;
Expand Down Expand Up @@ -105,20 +106,22 @@ protected EntityOperator(Reconciliation reconciliation, HasMetadata resource, Sh
* @param reconciliation The reconciliation marker
* @param kafkaAssembly Desired resource with cluster configuration containing the Entity Operator one
* @param sharedEnvironmentProvider Shared environment provider
* @param config Cluster Operator configuration
*
* @return Entity Operator instance, null if not configured in the ConfigMap
*/
public static EntityOperator fromCrd(Reconciliation reconciliation,
Kafka kafkaAssembly,
SharedEnvironmentProvider sharedEnvironmentProvider) {
SharedEnvironmentProvider sharedEnvironmentProvider,
ClusterOperatorConfig config) {
EntityOperatorSpec entityOperatorSpec = kafkaAssembly.getSpec().getEntityOperator();

if (entityOperatorSpec != null
&& (entityOperatorSpec.getUserOperator() != null || entityOperatorSpec.getTopicOperator() != null)) {
EntityOperator result = new EntityOperator(reconciliation, kafkaAssembly, sharedEnvironmentProvider);

EntityTopicOperator topicOperator = EntityTopicOperator.fromCrd(reconciliation, kafkaAssembly, sharedEnvironmentProvider);
EntityUserOperator userOperator = EntityUserOperator.fromCrd(reconciliation, kafkaAssembly, sharedEnvironmentProvider);
EntityTopicOperator topicOperator = EntityTopicOperator.fromCrd(reconciliation, kafkaAssembly, sharedEnvironmentProvider, config);
EntityUserOperator userOperator = EntityUserOperator.fromCrd(reconciliation, kafkaAssembly, sharedEnvironmentProvider, config);

result.topicOperator = topicOperator;
result.cruiseControlEnabled = kafkaAssembly.getSpec().getCruiseControl() != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class EntityTopicOperator extends AbstractModel implements SupportsLoggin
/* test */ String kafkaBootstrapServers;
private boolean cruiseControlEnabled;
private boolean rackAwarenessEnabled;
private String featureGatesEnvVarValue;

private String watchedNamespace;
/* test */ int reconciliationIntervalMs;
Expand Down Expand Up @@ -110,15 +111,18 @@ protected EntityTopicOperator(Reconciliation reconciliation, HasMetadata resourc
/**
* Create an Entity Topic Operator from given desired resource. When Topic Operator (Or Entity Operator) are not
* enabled, it returns null.
* @param reconciliation The reconciliation
* @param kafkaAssembly desired resource with cluster configuration containing the Entity Topic Operator one
* @param sharedEnvironmentProvider Shared environment provider
*
* @param reconciliation The reconciliation marker
* @param kafkaAssembly Desired resource with cluster configuration containing the Entity Topic Operator one
* @param sharedEnvironmentProvider Shared environment provider
* @param config Cluster Operator configuration
*
* @return Entity Topic Operator instance, null if not configured
*/
public static EntityTopicOperator fromCrd(Reconciliation reconciliation,
Kafka kafkaAssembly,
SharedEnvironmentProvider sharedEnvironmentProvider) {
SharedEnvironmentProvider sharedEnvironmentProvider,
ClusterOperatorConfig config) {
if (kafkaAssembly.getSpec().getEntityOperator() != null
&& kafkaAssembly.getSpec().getEntityOperator().getTopicOperator() != null) {
EntityTopicOperatorSpec topicOperatorSpec = kafkaAssembly.getSpec().getEntityOperator().getTopicOperator();
Expand All @@ -145,6 +149,7 @@ public static EntityTopicOperator fromCrd(Reconciliation reconciliation,

result.cruiseControlEnabled = kafkaAssembly.getSpec().getCruiseControl() != null;
result.rackAwarenessEnabled = result.cruiseControlEnabled && kafkaAssembly.getSpec().getKafka().getRack() != null;
result.featureGatesEnvVarValue = config.featureGates().toEnvironmentVariable();

return result;
} else {
Expand Down Expand Up @@ -180,7 +185,12 @@ protected List<EnvVar> getEnvVars() {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_SECURITY_PROTOCOL, EntityTopicOperatorSpec.DEFAULT_SECURITY_PROTOCOL));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_TLS_ENABLED, Boolean.toString(true)));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_GC_LOG_ENABLED, Boolean.toString(gcLoggingEnabled)));


// Add feature gates configuration if not empty
if (featureGatesEnvVarValue != null && !featureGatesEnvVarValue.isEmpty()) {
varList.add(ContainerUtils.createEnvVar(ClusterOperatorConfig.FEATURE_GATES.key(), featureGatesEnvVarValue));
}

// Add environment variables required for Cruise Control integration
if (this.cruiseControlEnabled) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_CRUISE_CONTROL_ENABLED, Boolean.toString(cruiseControlEnabled)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class EntityUserOperator extends AbstractModel implements SupportsLogging
/* test */ int clientsCaValidityDays;
/* test */ int clientsCaRenewalDays;
private ResourceTemplate templateRoleBinding;
private String featureGatesEnvVarValue;

private boolean aclsAdminApiSupported = false;
private List<String> maintenanceWindows;
Expand Down Expand Up @@ -107,10 +108,14 @@ protected EntityUserOperator(Reconciliation reconciliation, HasMetadata resource
* @param reconciliation The reconciliation
* @param kafkaAssembly Desired resource with cluster configuration containing the Entity User Operator one
* @param sharedEnvironmentProvider Shared environment provider
* @param config Cluster Operator configuration
*
* @return Entity User Operator instance, null if not configured
*/
public static EntityUserOperator fromCrd(Reconciliation reconciliation, Kafka kafkaAssembly, SharedEnvironmentProvider sharedEnvironmentProvider) {
public static EntityUserOperator fromCrd(Reconciliation reconciliation,
Kafka kafkaAssembly,
SharedEnvironmentProvider sharedEnvironmentProvider,
ClusterOperatorConfig config) {
if (kafkaAssembly.getSpec().getEntityOperator() != null
&& kafkaAssembly.getSpec().getEntityOperator().getUserOperator() != null) {
EntityUserOperatorSpec userOperatorSpec = kafkaAssembly.getSpec().getEntityOperator().getUserOperator();
Expand All @@ -131,6 +136,7 @@ public static EntityUserOperator fromCrd(Reconciliation reconciliation, Kafka ka
result.resources = userOperatorSpec.getResources();
result.readinessProbeOptions = ProbeUtils.extractReadinessProbeOptionsOrDefault(userOperatorSpec, EntityOperator.DEFAULT_HEALTHCHECK_OPTIONS);
result.livenessProbeOptions = ProbeUtils.extractLivenessProbeOptionsOrDefault(userOperatorSpec, EntityOperator.DEFAULT_HEALTHCHECK_OPTIONS);
result.featureGatesEnvVarValue = config.featureGates().toEnvironmentVariable();

if (kafkaAssembly.getSpec().getEntityOperator().getTemplate() != null) {
result.templateRoleBinding = kafkaAssembly.getSpec().getEntityOperator().getTemplate().getUserOperatorRoleBinding();
Expand Down Expand Up @@ -196,6 +202,11 @@ protected List<EnvVar> getEnvVars() {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_ACLS_ADMIN_API_SUPPORTED, String.valueOf(aclsAdminApiSupported)));
JvmOptionUtils.javaOptions(varList, jvmOptions);

// Add feature gates configuration if not empty
if (featureGatesEnvVarValue != null && !featureGatesEnvVarValue.isEmpty()) {
varList.add(ContainerUtils.createEnvVar(ClusterOperatorConfig.FEATURE_GATES.key(), featureGatesEnvVarValue));
}

// Add shared environment variables used for all containers
varList.addAll(sharedEnvironmentProvider.variables());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public EntityOperatorReconciler(
) {
this.reconciliation = reconciliation;
this.operationTimeoutMs = config.getOperationTimeoutMs();
this.entityOperator = EntityOperator.fromCrd(reconciliation, kafkaAssembly, supplier.sharedEnvironmentProvider);
this.entityOperator = EntityOperator.fromCrd(reconciliation, kafkaAssembly, supplier.sharedEnvironmentProvider, config);
this.clusterCa = clusterCa;
this.maintenanceWindows = kafkaAssembly.getSpec().getMaintenanceTimeWindows();
this.isNetworkPolicyGeneration = config.isNetworkPolicyGeneration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.strimzi.operator.cluster.model.ImagePullPolicy;
import io.strimzi.operator.cluster.model.UnsupportedVersionException;
import io.strimzi.operator.common.InvalidConfigurationException;
import io.strimzi.operator.common.featuregates.FeatureGates;
import io.strimzi.operator.common.model.Labels;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -65,7 +66,7 @@ public void testDefaultConfig() {
assertThat(config.getConnectBuildTimeoutMs(), is(Long.parseLong(ClusterOperatorConfig.CONNECT_BUILD_TIMEOUT_MS.defaultValue())));
assertThat(config.getOperatorNamespace(), is("operator-namespace"));
assertThat(config.getOperatorNamespaceLabels(), is(nullValue()));
assertThat(config.featureGates().continueOnManualRUFailureEnabled(), is(false));
assertThat(config.featureGates(), is(new FeatureGates("")));
assertThat(config.isCreateClusterRoles(), is(false));
assertThat(config.isNetworkPolicyGeneration(), is(true));
assertThat(config.isPodSetReconciliationOnly(), is(false));
Expand Down Expand Up @@ -329,6 +330,8 @@ public void testInvalidOperatorNamespaceLabels() {

@Test
public void testInvalidFeatureGate() {
// We test that the configuration is really parsing the feature gates environment variable. We test it on
// non-existing feature gate instead of a real one so that we do not have to change it when the FGs are promoted
Map<String, String> envVars = new HashMap<>(ClusterOperatorConfigTest.ENV_VARS);
envVars.put(ClusterOperatorConfig.FEATURE_GATES.key(), "-NonExistingGate");

Expand Down
Loading
Loading