Skip to content

Commit

Permalink
Squash commits
Browse files Browse the repository at this point in the history
Address initial feedback

Signed-off-by: ocorriga <[email protected]>

Reverse dashboards and fix build issue

Signed-off-by: ocorriga <[email protected]>

Update derived resources

Signed-off-by: ocorriga <[email protected]>

Trigger build

Signed-off-by: Federico Valeri <[email protected]>

Add MetricsModel interface and refactor affected classes

Signed-off-by: ocorriga <[email protected]>
  • Loading branch information
ocorriga committed Feb 19, 2025
1 parent 72523ef commit ece3b1f
Show file tree
Hide file tree
Showing 25 changed files with 5,343 additions and 329 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.strimzi.operator.cluster.model.logging.SupportsLogging;
import io.strimzi.operator.cluster.model.metrics.MetricsModel;
import io.strimzi.operator.cluster.model.metrics.JmxPrometheusExporterModel;
import io.strimzi.operator.cluster.model.metrics.SupportsMetrics;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.model.Labels;
Expand Down Expand Up @@ -67,10 +67,11 @@ public static Map<String, String> generateMetricsAndLogConfigMapData(Reconciliat
data.put(supportsLogging.logging().configMapKey(), supportsLogging.logging().loggingConfiguration(reconciliation, metricsAndLogging.loggingCm()));
}

if (model instanceof SupportsMetrics supportMetrics && supportMetrics.metrics() != null) {
String parseResult = supportMetrics.metrics().metricsJson(reconciliation, metricsAndLogging.metricsCm());
// this is only for JMX Prometheus Exporter, because Strimzi Metrics Reporter configuration is in the Kafka configuration file
if (model instanceof SupportsMetrics supportMetrics && supportMetrics.metrics() instanceof JmxPrometheusExporterModel) {
String parseResult = ((JmxPrometheusExporterModel) supportMetrics.metrics()).metricsJson(reconciliation, metricsAndLogging.metricsCm());
if (parseResult != null) {
data.put(MetricsModel.CONFIG_MAP_KEY, parseResult);
data.put(JmxPrometheusExporterModel.CONFIG_MAP_KEY, parseResult);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.strimzi.operator.cluster.model.cruisecontrol.HashLoginServiceApiCredentials;
import io.strimzi.operator.cluster.model.logging.LoggingModel;
import io.strimzi.operator.cluster.model.logging.SupportsLogging;
import io.strimzi.operator.cluster.model.metrics.JmxPrometheusExporterModel;
import io.strimzi.operator.cluster.model.metrics.MetricsModel;
import io.strimzi.operator.cluster.model.metrics.SupportsMetrics;
import io.strimzi.operator.cluster.model.securityprofiles.ContainerSecurityProviderContextImpl;
Expand Down Expand Up @@ -114,14 +115,14 @@ public class CruiseControl extends AbstractModel implements SupportsMetrics, Sup

// Configuration defaults
protected static final boolean DEFAULT_CRUISE_CONTROL_METRICS_ENABLED = false;

private boolean sslEnabled;
private boolean authEnabled;
private HashLoginServiceApiCredentials apiCredentials;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the fromCrd method
protected Capacity capacity;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the fromCrd method
private MetricsModel jmxExporterMetrics;
private JmxPrometheusExporterModel metrics;
private LoggingModel logging;
/* test */ CruiseControlConfiguration configuration;

Expand Down Expand Up @@ -222,13 +223,13 @@ public static CruiseControl fromCrd(
result.jvmOptions = ccSpec.getJvmOptions();

if (ccSpec.getMetricsConfig() instanceof JmxPrometheusExporterMetrics) {
result.jmxExporterMetrics = new MetricsModel(ccSpec);
result.metrics = new JmxPrometheusExporterModel(ccSpec);
} else if (ccSpec.getMetricsConfig() instanceof StrimziMetricsReporter) {
// Cruise Control own metrics are only exported through JMX
LOGGER.errorCr(reconciliation, "The Strimzi Metrics Reporter is not supported for Cruise Control");
throw new InvalidResourceException("The Strimzi Metrics Reporter is not supported for Cruise Control");
LOGGER.errorCr(reconciliation, "The Strimzi Metrics Reporter is not supported with Cruise Control");
throw new InvalidResourceException("The Strimzi Metrics Reporter is not supported with Cruise Control");
}

result.logging = new LoggingModel(ccSpec, result.getClass().getSimpleName(), true, false);
result.resources = ccSpec.getResources();

Expand All @@ -248,6 +249,10 @@ public static CruiseControl fromCrd(
}
}

private boolean hasMetricsConfig() {
return metrics != null && metrics.isEnabled();
}

private void updateConfigurationWithDefaults(CruiseControlSpec ccSpec, KafkaConfiguration kafkaConfiguration) {
Map<String, String> defaultCruiseControlProperties = new HashMap<>(CruiseControlConfiguration.getCruiseControlDefaultPropertiesMap());
if (kafkaConfiguration.getConfigOption(KafkaConfiguration.DEFAULT_REPLICATION_FACTOR) != null) {
Expand Down Expand Up @@ -317,7 +322,7 @@ protected List<ContainerPort> getContainerPortList() {

portList.add(ContainerUtils.createContainerPort(REST_API_PORT_NAME, REST_API_PORT));

if (jmxExporterMetrics != null && jmxExporterMetrics.isEnabled()) {
if (hasMetricsConfig()) {
portList.add(ContainerUtils.createContainerPort(MetricsModel.METRICS_PORT_NAME, MetricsModel.METRICS_PORT));
}

Expand Down Expand Up @@ -405,8 +410,7 @@ public Deployment generateDeployment(Map<String, String> annotations, boolean is
protected List<EnvVar> getEnvVars() {
List<EnvVar> varList = new ArrayList<>();

String jmxMetricsEnabled = jmxExporterMetrics != null && jmxExporterMetrics.isEnabled() ? Boolean.TRUE.toString() : Boolean.FALSE.toString();
varList.add(ContainerUtils.createEnvVar(ENV_VAR_CRUISE_CONTROL_JMX_EXPORTER_ENABLED, jmxMetricsEnabled));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_CRUISE_CONTROL_JMX_EXPORTER_ENABLED, hasMetricsConfig() ? Boolean.TRUE.toString() : Boolean.FALSE.toString()));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_KAFKA_BOOTSTRAP_SERVERS, KafkaResources.bootstrapServiceName(cluster) + ":" + KafkaCluster.REPLICATION_PORT));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_KAFKA_GC_LOG_ENABLED, String.valueOf(gcLoggingEnabled)));

Expand Down Expand Up @@ -465,15 +469,15 @@ public Secret generateCertificatesSecret(String namespace, String clusterName, C
* @param operatorNamespace Namespace where the Strimzi Cluster Operator runs. Null if not configured.
* @param operatorNamespaceLabels Labels of the namespace where the Strimzi Cluster Operator runs. Null if not configured.
* @param topicOperatorEnabled Whether to also enable access to Cruise Control from the Entity Operator.
*
*
* @return The network policy.
*/
public NetworkPolicy generateNetworkPolicy(String operatorNamespace, Labels operatorNamespaceLabels, boolean topicOperatorEnabled) {
List<NetworkPolicyPeer> peers = new ArrayList<>(2);
NetworkPolicyPeer clusterOperatorPeer = NetworkPolicyUtils.createPeer(Map.of(Labels.STRIMZI_KIND_LABEL, "cluster-operator"),
NetworkPolicyPeer clusterOperatorPeer = NetworkPolicyUtils.createPeer(Map.of(Labels.STRIMZI_KIND_LABEL, "cluster-operator"),
NetworkPolicyUtils.clusterOperatorNamespaceSelector(namespace, operatorNamespace, operatorNamespaceLabels));
peers.add(clusterOperatorPeer);

if (topicOperatorEnabled) {
NetworkPolicyPeer entityOperatorPeer = NetworkPolicyUtils.createPeer(Map.of(Labels.STRIMZI_NAME_LABEL, format("%s-entity-operator", cluster)),
NetworkPolicyUtils.clusterOperatorNamespaceSelector(namespace, operatorNamespace, operatorNamespaceLabels));
Expand All @@ -487,7 +491,7 @@ public NetworkPolicy generateNetworkPolicy(String operatorNamespace, Labels oper
rules.add(NetworkPolicyUtils.createIngressRule(REST_API_PORT, peers));

// Everyone can access metrics
if (jmxExporterMetrics != null && jmxExporterMetrics.isEnabled()) {
if (hasMetricsConfig()) {
rules.add(NetworkPolicyUtils.createIngressRule(MetricsModel.METRICS_PORT, List.of()));
}

Expand All @@ -511,8 +515,8 @@ public HashLoginServiceApiCredentials apiCredentials() {
/**
* @return Metrics Model instance for configuring Prometheus metrics
*/
public MetricsModel metrics() {
return jmxExporterMetrics;
public JmxPrometheusExporterModel metrics() {
return metrics;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
*/
public class KafkaBrokerConfigurationBuilder {
private final static String CONTROL_PLANE_LISTENER_NAME = "CONTROLPLANE-9090";
private final static String KAFKA_METRIC_REPORTERS_CONFIG_FIELD = "metric.reporters";
private final static String REPLICATION_LISTENER_NAME = "REPLICATION-9091";
// Names of environment variables expanded through config providers inside the Kafka node
private final static String PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:CERTS_STORE_PASSWORD}";
Expand Down Expand Up @@ -306,6 +307,7 @@ public KafkaBrokerConfigurationBuilder withListeners(
////////////////////
// Shared configurations with values dependent on all listeners
////////////////////

// configure OAuth principal builder for all the nodes - brokers, controllers, and mixed
configureOAuthPrincipalBuilderIfNeeded(writer, kafkaListeners);

Expand Down Expand Up @@ -821,7 +823,7 @@ public KafkaBrokerConfigurationBuilder withUserConfiguration(KafkaConfiguration
configProviders(userConfig);

// Handle all combinations of metric.reporters
String metricReporters = userConfig.getConfigOption(KafkaCluster.KAFKA_METRIC_REPORTERS_CONFIG_FIELD);
String metricReporters = userConfig.getConfigOption(KAFKA_METRIC_REPORTERS_CONFIG_FIELD);

// If the injectCcMetricsReporter / injectStrimziMetricsReporter flag is set to true, it is appended to the list of metric reporters
if (injectCcMetricsReporter) {
Expand All @@ -832,7 +834,7 @@ public KafkaBrokerConfigurationBuilder withUserConfiguration(KafkaConfiguration
}
if (metricReporters != null) {
// update the userConfig with the new list of metric reporters
userConfig.setConfigOption(KafkaCluster.KAFKA_METRIC_REPORTERS_CONFIG_FIELD, metricReporters);
userConfig.setConfigOption(KAFKA_METRIC_REPORTERS_CONFIG_FIELD, metricReporters);
}

printSectionHeader("User provided configuration");
Expand All @@ -852,7 +854,7 @@ public KafkaBrokerConfigurationBuilder withUserConfiguration(KafkaConfiguration
if (injectStrimziMetricsReporter) {
metricReporters = appendMetricReporter(metricReporters, StrimziMetricsReporterModel.KAFKA_PROMETHEUS_METRICS_REPORTER);
}
writer.println(KafkaCluster.KAFKA_METRIC_REPORTERS_CONFIG_FIELD + "=" + metricReporters);
writer.println(KAFKA_METRIC_REPORTERS_CONFIG_FIELD + "=" + metricReporters);
writer.println();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import io.strimzi.operator.cluster.model.jmx.SupportsJmx;
import io.strimzi.operator.cluster.model.logging.LoggingModel;
import io.strimzi.operator.cluster.model.logging.SupportsLogging;
import io.strimzi.operator.cluster.model.metrics.JmxPrometheusExporterModel;
import io.strimzi.operator.cluster.model.metrics.MetricsModel;
import io.strimzi.operator.cluster.model.metrics.StrimziMetricsReporterModel;
import io.strimzi.operator.cluster.model.metrics.SupportsMetrics;
Expand Down Expand Up @@ -228,8 +229,7 @@ public class KafkaCluster extends AbstractModel implements SupportsMetrics, Supp
private String clusterId;
private JmxModel jmx;
private CruiseControlMetricsReporter ccMetricsReporter;
private MetricsModel jmxExporterMetrics;
private StrimziMetricsReporterModel strimziMetricsReporter;
private MetricsModel metrics;
private LoggingModel logging;
private QuotasPlugin quotas;
/* test */ KafkaConfiguration configuration;
Expand Down Expand Up @@ -334,9 +334,9 @@ public static KafkaCluster fromCrd(Reconciliation reconciliation,
result.initImage = initImage;

if (kafkaClusterSpec.getMetricsConfig() instanceof JmxPrometheusExporterMetrics) {
result.jmxExporterMetrics = new MetricsModel(kafkaClusterSpec);
result.metrics = new JmxPrometheusExporterModel(kafkaClusterSpec);
} else if (kafkaClusterSpec.getMetricsConfig() instanceof StrimziMetricsReporter) {
result.strimziMetricsReporter = new StrimziMetricsReporterModel(kafkaClusterSpec);
result.metrics = new StrimziMetricsReporterModel(kafkaClusterSpec);
}

result.logging = new LoggingModel(kafkaClusterSpec, result.getClass().getSimpleName(), false, true);
Expand Down Expand Up @@ -422,6 +422,10 @@ public static KafkaCluster fromCrd(Reconciliation reconciliation,
return result;
}

private boolean hasMetricsConfig() {
return metrics != null && metrics.isEnabled();
}

/**
* Generates list of references to Kafka nodes for this Kafka cluster. The references contain both the pod name and
* the ID of the Kafka node.
Expand Down Expand Up @@ -1317,10 +1321,8 @@ private List<ContainerPort> getContainerPortList(KafkaPool pool) {
}

// Metrics port is enabled on all node types regardless their role
if (jmxExporterMetrics != null && jmxExporterMetrics.isEnabled()) {
if (hasMetricsConfig()) {
ports.add(ContainerUtils.createContainerPort(MetricsModel.METRICS_PORT_NAME, MetricsModel.METRICS_PORT));
} else if (strimziMetricsReporter != null && strimziMetricsReporter.isEnabled()) {
ports.add(ContainerUtils.createContainerPort(StrimziMetricsReporterModel.METRICS_PORT_NAME, StrimziMetricsReporterModel.METRICS_PORT));
}

// JMX port is enabled on all node types regardless their role
Expand Down Expand Up @@ -1636,8 +1638,8 @@ private Container createContainer(ImagePullPolicy imagePullPolicy, KafkaPool poo
*/
private List<EnvVar> getEnvVars(KafkaPool pool) {
List<EnvVar> varList = new ArrayList<>();
String jmxMetricsEnabled = jmxExporterMetrics != null && jmxExporterMetrics.isEnabled() ? Boolean.TRUE.toString() : Boolean.FALSE.toString();
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_JMX_EXPORTER_ENABLED, jmxMetricsEnabled));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_JMX_EXPORTER_ENABLED,
hasMetricsConfig() && metrics instanceof JmxPrometheusExporterModel ? Boolean.TRUE.toString() : Boolean.FALSE.toString()));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_KAFKA_GC_LOG_ENABLED, String.valueOf(pool.gcLoggingEnabled)));

JvmOptionUtils.heapOptions(varList, 50, 5L * 1024L * 1024L * 1024L, pool.jvmOptions, pool.resources);
Expand Down Expand Up @@ -1749,10 +1751,8 @@ public NetworkPolicy generateNetworkPolicy(String operatorNamespace, Labels oper
}

// The Metrics port (if enabled) is opened to all by default
if (jmxExporterMetrics != null && jmxExporterMetrics.isEnabled()) {
if (hasMetricsConfig()) {
rules.add(NetworkPolicyUtils.createIngressRule(MetricsModel.METRICS_PORT, List.of()));
} else if (strimziMetricsReporter != null && strimziMetricsReporter.isEnabled()) {
rules.add(NetworkPolicyUtils.createIngressRule(StrimziMetricsReporterModel.METRICS_PORT, List.of()));
}

// The JMX port (if enabled) is opened to all by default
Expand Down Expand Up @@ -1837,7 +1837,7 @@ public String generatePerBrokerConfiguration(int nodeId, Map<Integer, Map<String
* @return String with the Kafka broker configuration
*/
private String generatePerBrokerConfiguration(NodeRef node, KafkaPool pool, Map<Integer, Map<String, String>> advertisedHostnames, Map<Integer, Map<String, String>> advertisedPorts) {
return new KafkaBrokerConfigurationBuilder(reconciliation, node)
KafkaBrokerConfigurationBuilder builder = new KafkaBrokerConfigurationBuilder(reconciliation, node)
.withRackId(rack)
.withKRaft(cluster, namespace, nodes())
.withKRaftMetadataLogDir(VolumeUtils.kraftMetadataPath(pool.storage))
Expand All @@ -1851,12 +1851,15 @@ private String generatePerBrokerConfiguration(NodeRef node, KafkaPool pool, Map<
)
.withAuthorization(cluster, authorization)
.withCruiseControl(cluster, ccMetricsReporter, node.broker())
.withStrimziMetricsReporter(strimziMetricsReporter)
.withTieredStorage(cluster, tieredStorage)
.withQuotas(cluster, quotas)
.withUserConfiguration(configuration, node.broker() && ccMetricsReporter != null, strimziMetricsReporter != null && strimziMetricsReporter.isEnabled())
.build()
.trim();
.withQuotas(cluster, quotas);
if (hasMetricsConfig() && metrics instanceof StrimziMetricsReporterModel) {
builder.withStrimziMetricsReporter((StrimziMetricsReporterModel) metrics)
.withUserConfiguration(configuration, node.broker() && ccMetricsReporter != null, true);
} else {
builder.withUserConfiguration(configuration, node.broker() && ccMetricsReporter != null, false);
}
return builder.build().trim();
}

/**
Expand All @@ -1869,10 +1872,10 @@ private String generatePerBrokerConfiguration(NodeRef node, KafkaPool pool, Map<
*
* @return ConfigMap with the shared configuration.
*/
public List<ConfigMap> generatePerBrokerConfigurationConfigMaps(MetricsAndLogging metricsAndLogging, Map<Integer, Map<String, String>> advertisedHostnames, Map<Integer, Map<String, String>> advertisedPorts) {
public List<ConfigMap> generatePerBrokerConfigurationConfigMaps(MetricsAndLogging metricsAndLogging, Map<Integer, Map<String, String>> advertisedHostnames, Map<Integer, Map<String, String>> advertisedPorts) {
String parsedMetrics = null;
if (jmxExporterMetrics != null && jmxExporterMetrics.isEnabled()) {
parsedMetrics = jmxExporterMetrics.metricsJson(reconciliation, metricsAndLogging.metricsCm());
if (hasMetricsConfig() && metrics instanceof JmxPrometheusExporterModel) {
parsedMetrics = ((JmxPrometheusExporterModel) metrics).metricsJson(reconciliation, metricsAndLogging.metricsCm());
}
String parsedLogging = logging().loggingConfiguration(reconciliation, metricsAndLogging.loggingCm());
List<ConfigMap> configMaps = new ArrayList<>();
Expand All @@ -1882,7 +1885,7 @@ public List<ConfigMap> generatePerBrokerConfigurationConfigMaps(MetricsAndLoggin
Map<String, String> data = new HashMap<>(4);

if (parsedMetrics != null) {
data.put(MetricsModel.CONFIG_MAP_KEY, parsedMetrics);
data.put(JmxPrometheusExporterModel.CONFIG_MAP_KEY, parsedMetrics);
}

data.put(logging.configMapKey(), parsedLogging);
Expand Down Expand Up @@ -1947,14 +1950,7 @@ public JmxModel jmx() {
* @return Metrics Model instance for configuring Prometheus metrics
*/
public MetricsModel metrics() {
return jmxExporterMetrics;
}

/**
* @return Strimzi Metrics Reporter Model instance for configuring Prometheus metrics
*/
public StrimziMetricsReporterModel strimziMetricsReporter() {
return strimziMetricsReporter;
return metrics;
}

/**
Expand Down
Loading

0 comments on commit ece3b1f

Please sign in to comment.