Skip to content

Commit

Permalink
Expend feature gates support to all Strimzi operators (#10141)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Scholz <[email protected]>
  • Loading branch information
scholzj authored May 23, 2024
1 parent 6c24f81 commit bd8f6a5
Show file tree
Hide file tree
Showing 20 changed files with 261 additions and 71 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
To use KRaft (ZooKeeper-less Apache Kafka), you still need to use the `strimzi.io/kraft: enabled` annotation on the `Kafka` custom resources or migrate from an existing ZooKeeper-based cluster using the `strimzi.io/kraft: migration` annotation.
* Update the base image used by Strimzi containers from UBI8 to UBI9
* Enhance `KafkaBridge` resource with consumer inactivity timeout and HTTP consumer/producer enablement.
* Add support for feature gates to User and Topic Operators

## 0.41.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.strimzi.operator.common.Util;
import io.strimzi.operator.common.config.ConfigParameter;
import io.strimzi.operator.common.config.ConfigParameterParser;
import io.strimzi.operator.common.featuregates.FeatureGates;
import io.strimzi.operator.common.model.Labels;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -32,6 +33,7 @@
import static io.strimzi.operator.common.config.ConfigParameterParser.LONG;
import static io.strimzi.operator.common.config.ConfigParameterParser.NAMESPACE_SET;
import static io.strimzi.operator.common.config.ConfigParameterParser.STRING;
import static io.strimzi.operator.common.config.ConfigParameterParser.parseFeatureGates;

/**
* Cluster Operator configuration
Expand Down Expand Up @@ -391,10 +393,6 @@ private static KafkaVersion.Lookup parseKafkaVersions(String kafkaImages, String
}
}

static ConfigParameterParser<FeatureGates> parseFeatureGates() {
return FeatureGates::new;
}

static ConfigParameterParser<ImagePullPolicy> parseImagePullPolicy() {
return imagePullPolicyEnvVar -> {
ImagePullPolicy imagePullPolicy = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.strimzi.api.kafka.model.kafka.KafkaResources;
import io.strimzi.api.kafka.model.kafka.entityoperator.EntityOperatorSpec;
import io.strimzi.api.kafka.model.kafka.entityoperator.EntityOperatorTemplate;
import io.strimzi.operator.cluster.ClusterOperatorConfig;
import io.strimzi.operator.cluster.model.securityprofiles.PodSecurityProviderContextImpl;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.Util;
Expand Down Expand Up @@ -105,20 +106,22 @@ protected EntityOperator(Reconciliation reconciliation, HasMetadata resource, Sh
* @param reconciliation The reconciliation marker
* @param kafkaAssembly Desired resource with cluster configuration containing the Entity Operator one
* @param sharedEnvironmentProvider Shared environment provider
* @param config Cluster Operator configuration
*
* @return Entity Operator instance, null if not configured in the ConfigMap
*/
public static EntityOperator fromCrd(Reconciliation reconciliation,
Kafka kafkaAssembly,
SharedEnvironmentProvider sharedEnvironmentProvider) {
SharedEnvironmentProvider sharedEnvironmentProvider,
ClusterOperatorConfig config) {
EntityOperatorSpec entityOperatorSpec = kafkaAssembly.getSpec().getEntityOperator();

if (entityOperatorSpec != null
&& (entityOperatorSpec.getUserOperator() != null || entityOperatorSpec.getTopicOperator() != null)) {
EntityOperator result = new EntityOperator(reconciliation, kafkaAssembly, sharedEnvironmentProvider);

EntityTopicOperator topicOperator = EntityTopicOperator.fromCrd(reconciliation, kafkaAssembly, sharedEnvironmentProvider);
EntityUserOperator userOperator = EntityUserOperator.fromCrd(reconciliation, kafkaAssembly, sharedEnvironmentProvider);
EntityTopicOperator topicOperator = EntityTopicOperator.fromCrd(reconciliation, kafkaAssembly, sharedEnvironmentProvider, config);
EntityUserOperator userOperator = EntityUserOperator.fromCrd(reconciliation, kafkaAssembly, sharedEnvironmentProvider, config);

result.topicOperator = topicOperator;
result.cruiseControlEnabled = kafkaAssembly.getSpec().getCruiseControl() != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class EntityTopicOperator extends AbstractModel implements SupportsLoggin
/* test */ String kafkaBootstrapServers;
private boolean cruiseControlEnabled;
private boolean rackAwarenessEnabled;
private String featureGatesEnvVarValue;

private String watchedNamespace;
/* test */ int reconciliationIntervalMs;
Expand Down Expand Up @@ -110,15 +111,18 @@ protected EntityTopicOperator(Reconciliation reconciliation, HasMetadata resourc
/**
* Create an Entity Topic Operator from given desired resource. When Topic Operator (Or Entity Operator) are not
* enabled, it returns null.
* @param reconciliation The reconciliation
* @param kafkaAssembly desired resource with cluster configuration containing the Entity Topic Operator one
* @param sharedEnvironmentProvider Shared environment provider
*
* @param reconciliation The reconciliation marker
* @param kafkaAssembly Desired resource with cluster configuration containing the Entity Topic Operator one
* @param sharedEnvironmentProvider Shared environment provider
* @param config Cluster Operator configuration
*
* @return Entity Topic Operator instance, null if not configured
*/
public static EntityTopicOperator fromCrd(Reconciliation reconciliation,
Kafka kafkaAssembly,
SharedEnvironmentProvider sharedEnvironmentProvider) {
SharedEnvironmentProvider sharedEnvironmentProvider,
ClusterOperatorConfig config) {
if (kafkaAssembly.getSpec().getEntityOperator() != null
&& kafkaAssembly.getSpec().getEntityOperator().getTopicOperator() != null) {
EntityTopicOperatorSpec topicOperatorSpec = kafkaAssembly.getSpec().getEntityOperator().getTopicOperator();
Expand All @@ -145,6 +149,7 @@ public static EntityTopicOperator fromCrd(Reconciliation reconciliation,

result.cruiseControlEnabled = kafkaAssembly.getSpec().getCruiseControl() != null;
result.rackAwarenessEnabled = result.cruiseControlEnabled && kafkaAssembly.getSpec().getKafka().getRack() != null;
result.featureGatesEnvVarValue = config.featureGates().toEnvironmentVariable();

return result;
} else {
Expand Down Expand Up @@ -180,7 +185,12 @@ protected List<EnvVar> getEnvVars() {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_SECURITY_PROTOCOL, EntityTopicOperatorSpec.DEFAULT_SECURITY_PROTOCOL));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_TLS_ENABLED, Boolean.toString(true)));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_GC_LOG_ENABLED, Boolean.toString(gcLoggingEnabled)));


// Add feature gates configuration if not empty
if (featureGatesEnvVarValue != null && !featureGatesEnvVarValue.isEmpty()) {
varList.add(ContainerUtils.createEnvVar(ClusterOperatorConfig.FEATURE_GATES.key(), featureGatesEnvVarValue));
}

// Add environment variables required for Cruise Control integration
if (this.cruiseControlEnabled) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_CRUISE_CONTROL_ENABLED, Boolean.toString(cruiseControlEnabled)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class EntityUserOperator extends AbstractModel implements SupportsLogging
/* test */ int clientsCaValidityDays;
/* test */ int clientsCaRenewalDays;
private ResourceTemplate templateRoleBinding;
private String featureGatesEnvVarValue;

private boolean aclsAdminApiSupported = false;
private List<String> maintenanceWindows;
Expand Down Expand Up @@ -107,10 +108,14 @@ protected EntityUserOperator(Reconciliation reconciliation, HasMetadata resource
* @param reconciliation The reconciliation
* @param kafkaAssembly Desired resource with cluster configuration containing the Entity User Operator one
* @param sharedEnvironmentProvider Shared environment provider
* @param config Cluster Operator configuration
*
* @return Entity User Operator instance, null if not configured
*/
public static EntityUserOperator fromCrd(Reconciliation reconciliation, Kafka kafkaAssembly, SharedEnvironmentProvider sharedEnvironmentProvider) {
public static EntityUserOperator fromCrd(Reconciliation reconciliation,
Kafka kafkaAssembly,
SharedEnvironmentProvider sharedEnvironmentProvider,
ClusterOperatorConfig config) {
if (kafkaAssembly.getSpec().getEntityOperator() != null
&& kafkaAssembly.getSpec().getEntityOperator().getUserOperator() != null) {
EntityUserOperatorSpec userOperatorSpec = kafkaAssembly.getSpec().getEntityOperator().getUserOperator();
Expand All @@ -131,6 +136,7 @@ public static EntityUserOperator fromCrd(Reconciliation reconciliation, Kafka ka
result.resources = userOperatorSpec.getResources();
result.readinessProbeOptions = ProbeUtils.extractReadinessProbeOptionsOrDefault(userOperatorSpec, EntityOperator.DEFAULT_HEALTHCHECK_OPTIONS);
result.livenessProbeOptions = ProbeUtils.extractLivenessProbeOptionsOrDefault(userOperatorSpec, EntityOperator.DEFAULT_HEALTHCHECK_OPTIONS);
result.featureGatesEnvVarValue = config.featureGates().toEnvironmentVariable();

if (kafkaAssembly.getSpec().getEntityOperator().getTemplate() != null) {
result.templateRoleBinding = kafkaAssembly.getSpec().getEntityOperator().getTemplate().getUserOperatorRoleBinding();
Expand Down Expand Up @@ -196,6 +202,11 @@ protected List<EnvVar> getEnvVars() {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_ACLS_ADMIN_API_SUPPORTED, String.valueOf(aclsAdminApiSupported)));
JvmOptionUtils.javaOptions(varList, jvmOptions);

// Add feature gates configuration if not empty
if (featureGatesEnvVarValue != null && !featureGatesEnvVarValue.isEmpty()) {
varList.add(ContainerUtils.createEnvVar(ClusterOperatorConfig.FEATURE_GATES.key(), featureGatesEnvVarValue));
}

// Add shared environment variables used for all containers
varList.addAll(sharedEnvironmentProvider.variables());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public EntityOperatorReconciler(
) {
this.reconciliation = reconciliation;
this.operationTimeoutMs = config.getOperationTimeoutMs();
this.entityOperator = EntityOperator.fromCrd(reconciliation, kafkaAssembly, supplier.sharedEnvironmentProvider);
this.entityOperator = EntityOperator.fromCrd(reconciliation, kafkaAssembly, supplier.sharedEnvironmentProvider, config);
this.clusterCa = clusterCa;
this.maintenanceWindows = kafkaAssembly.getSpec().getMaintenanceTimeWindows();
this.isNetworkPolicyGeneration = config.isNetworkPolicyGeneration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.strimzi.operator.cluster.model.ImagePullPolicy;
import io.strimzi.operator.cluster.model.UnsupportedVersionException;
import io.strimzi.operator.common.InvalidConfigurationException;
import io.strimzi.operator.common.featuregates.FeatureGates;
import io.strimzi.operator.common.model.Labels;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -65,7 +66,7 @@ public void testDefaultConfig() {
assertThat(config.getConnectBuildTimeoutMs(), is(Long.parseLong(ClusterOperatorConfig.CONNECT_BUILD_TIMEOUT_MS.defaultValue())));
assertThat(config.getOperatorNamespace(), is("operator-namespace"));
assertThat(config.getOperatorNamespaceLabels(), is(nullValue()));
assertThat(config.featureGates().continueOnManualRUFailureEnabled(), is(false));
assertThat(config.featureGates(), is(new FeatureGates("")));
assertThat(config.isCreateClusterRoles(), is(false));
assertThat(config.isNetworkPolicyGeneration(), is(true));
assertThat(config.isPodSetReconciliationOnly(), is(false));
Expand Down Expand Up @@ -329,6 +330,8 @@ public void testInvalidOperatorNamespaceLabels() {

@Test
public void testInvalidFeatureGate() {
// We test that the configuration is really parsing the feature gates environment variable. We test it on
// non-existing feature gate instead of a real one so that we do not have to change it when the FGs are promoted
Map<String, String> envVars = new HashMap<>(ClusterOperatorConfigTest.ENV_VARS);
envVars.put(ClusterOperatorConfig.FEATURE_GATES.key(), "-NonExistingGate");

Expand Down
Loading

0 comments on commit bd8f6a5

Please sign in to comment.