Skip to content

Commit

Permalink
Add MetricsModel interface and refactor affected classes
Browse files Browse the repository at this point in the history
Signed-off-by: ocorriga <[email protected]>
  • Loading branch information
ocorriga committed Feb 19, 2025
1 parent c73b7d4 commit 032882f
Show file tree
Hide file tree
Showing 17 changed files with 269 additions and 266 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.strimzi.operator.cluster.model.logging.SupportsLogging;
import io.strimzi.operator.cluster.model.metrics.MetricsModel;
import io.strimzi.operator.cluster.model.metrics.JmxPrometheusExporterModel;
import io.strimzi.operator.cluster.model.metrics.SupportsMetrics;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.model.Labels;
Expand Down Expand Up @@ -67,10 +67,11 @@ public static Map<String, String> generateMetricsAndLogConfigMapData(Reconciliat
data.put(supportsLogging.logging().configMapKey(), supportsLogging.logging().loggingConfiguration(reconciliation, metricsAndLogging.loggingCm()));
}

if (model instanceof SupportsMetrics supportMetrics && supportMetrics.metrics() != null) {
String parseResult = supportMetrics.metrics().metricsJson(reconciliation, metricsAndLogging.metricsCm());
// this is only for JMX Prometheus Exporter, because Strimzi Metrics Reporter configuration is in the Kafka configuration file
if (model instanceof SupportsMetrics supportMetrics && supportMetrics.metrics() instanceof JmxPrometheusExporterModel) {
String parseResult = ((JmxPrometheusExporterModel) supportMetrics.metrics()).metricsJson(reconciliation, metricsAndLogging.metricsCm());
if (parseResult != null) {
data.put(MetricsModel.CONFIG_MAP_KEY, parseResult);
data.put(JmxPrometheusExporterModel.CONFIG_MAP_KEY, parseResult);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.strimzi.operator.cluster.model.cruisecontrol.HashLoginServiceApiCredentials;
import io.strimzi.operator.cluster.model.logging.LoggingModel;
import io.strimzi.operator.cluster.model.logging.SupportsLogging;
import io.strimzi.operator.cluster.model.metrics.JmxPrometheusExporterModel;
import io.strimzi.operator.cluster.model.metrics.MetricsModel;
import io.strimzi.operator.cluster.model.metrics.SupportsMetrics;
import io.strimzi.operator.cluster.model.securityprofiles.ContainerSecurityProviderContextImpl;
Expand Down Expand Up @@ -114,14 +115,14 @@ public class CruiseControl extends AbstractModel implements SupportsMetrics, Sup

// Configuration defaults
protected static final boolean DEFAULT_CRUISE_CONTROL_METRICS_ENABLED = false;

private boolean sslEnabled;
private boolean authEnabled;
private HashLoginServiceApiCredentials apiCredentials;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the fromCrd method
protected Capacity capacity;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the fromCrd method
private MetricsModel jmxExporterMetrics;
private JmxPrometheusExporterModel metrics;
private LoggingModel logging;
/* test */ CruiseControlConfiguration configuration;

Expand Down Expand Up @@ -222,13 +223,13 @@ public static CruiseControl fromCrd(
result.jvmOptions = ccSpec.getJvmOptions();

if (ccSpec.getMetricsConfig() instanceof JmxPrometheusExporterMetrics) {
result.jmxExporterMetrics = new MetricsModel(ccSpec);
result.metrics = new JmxPrometheusExporterModel(ccSpec);
} else if (ccSpec.getMetricsConfig() instanceof StrimziMetricsReporter) {
// Cruise Control own metrics are only exported through JMX
LOGGER.errorCr(reconciliation, "The Strimzi Metrics Reporter is not supported for Cruise Control");
throw new InvalidResourceException("The Strimzi Metrics Reporter is not supported for Cruise Control");
LOGGER.errorCr(reconciliation, "The Strimzi Metrics Reporter is not supported with Cruise Control");
throw new InvalidResourceException("The Strimzi Metrics Reporter is not supported with Cruise Control");
}

result.logging = new LoggingModel(ccSpec, result.getClass().getSimpleName(), true, false);
result.resources = ccSpec.getResources();

Expand All @@ -248,6 +249,10 @@ public static CruiseControl fromCrd(
}
}

private boolean hasMetricsConfig() {
return metrics != null && metrics.isEnabled();
}

private void updateConfigurationWithDefaults(CruiseControlSpec ccSpec, KafkaConfiguration kafkaConfiguration) {
Map<String, String> defaultCruiseControlProperties = new HashMap<>(CruiseControlConfiguration.getCruiseControlDefaultPropertiesMap());
if (kafkaConfiguration.getConfigOption(KafkaConfiguration.DEFAULT_REPLICATION_FACTOR) != null) {
Expand Down Expand Up @@ -317,7 +322,7 @@ protected List<ContainerPort> getContainerPortList() {

portList.add(ContainerUtils.createContainerPort(REST_API_PORT_NAME, REST_API_PORT));

if (jmxExporterMetrics != null && jmxExporterMetrics.isEnabled()) {
if (hasMetricsConfig()) {
portList.add(ContainerUtils.createContainerPort(MetricsModel.METRICS_PORT_NAME, MetricsModel.METRICS_PORT));
}

Expand Down Expand Up @@ -405,7 +410,7 @@ public Deployment generateDeployment(Map<String, String> annotations, boolean is
protected List<EnvVar> getEnvVars() {
List<EnvVar> varList = new ArrayList<>();

varList.add(ContainerUtils.createEnvVar(ENV_VAR_CRUISE_CONTROL_JMX_EXPORTER_ENABLED, jmxExporterMetrics != null && jmxExporterMetrics.isEnabled() ? Boolean.TRUE.toString() : Boolean.FALSE.toString()));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_CRUISE_CONTROL_JMX_EXPORTER_ENABLED, hasMetricsConfig() ? Boolean.TRUE.toString() : Boolean.FALSE.toString()));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_KAFKA_BOOTSTRAP_SERVERS, KafkaResources.bootstrapServiceName(cluster) + ":" + KafkaCluster.REPLICATION_PORT));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_KAFKA_GC_LOG_ENABLED, String.valueOf(gcLoggingEnabled)));

Expand Down Expand Up @@ -464,15 +469,15 @@ public Secret generateCertificatesSecret(String namespace, String clusterName, C
* @param operatorNamespace Namespace where the Strimzi Cluster Operator runs. Null if not configured.
* @param operatorNamespaceLabels Labels of the namespace where the Strimzi Cluster Operator runs. Null if not configured.
* @param topicOperatorEnabled Whether to also enable access to Cruise Control from the Entity Operator.
*
*
* @return The network policy.
*/
public NetworkPolicy generateNetworkPolicy(String operatorNamespace, Labels operatorNamespaceLabels, boolean topicOperatorEnabled) {
List<NetworkPolicyPeer> peers = new ArrayList<>(2);
NetworkPolicyPeer clusterOperatorPeer = NetworkPolicyUtils.createPeer(Map.of(Labels.STRIMZI_KIND_LABEL, "cluster-operator"),
NetworkPolicyPeer clusterOperatorPeer = NetworkPolicyUtils.createPeer(Map.of(Labels.STRIMZI_KIND_LABEL, "cluster-operator"),
NetworkPolicyUtils.clusterOperatorNamespaceSelector(namespace, operatorNamespace, operatorNamespaceLabels));
peers.add(clusterOperatorPeer);

if (topicOperatorEnabled) {
NetworkPolicyPeer entityOperatorPeer = NetworkPolicyUtils.createPeer(Map.of(Labels.STRIMZI_NAME_LABEL, format("%s-entity-operator", cluster)),
NetworkPolicyUtils.clusterOperatorNamespaceSelector(namespace, operatorNamespace, operatorNamespaceLabels));
Expand All @@ -486,7 +491,7 @@ public NetworkPolicy generateNetworkPolicy(String operatorNamespace, Labels oper
rules.add(NetworkPolicyUtils.createIngressRule(REST_API_PORT, peers));

// Everyone can access metrics
if (jmxExporterMetrics != null && jmxExporterMetrics.isEnabled()) {
if (hasMetricsConfig()) {
rules.add(NetworkPolicyUtils.createIngressRule(MetricsModel.METRICS_PORT, List.of()));
}

Expand All @@ -510,8 +515,8 @@ public HashLoginServiceApiCredentials apiCredentials() {
/**
* @return Metrics Model instance for configuring Prometheus metrics
*/
public MetricsModel metrics() {
return jmxExporterMetrics;
public JmxPrometheusExporterModel metrics() {
return metrics;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import io.strimzi.operator.cluster.model.jmx.SupportsJmx;
import io.strimzi.operator.cluster.model.logging.LoggingModel;
import io.strimzi.operator.cluster.model.logging.SupportsLogging;
import io.strimzi.operator.cluster.model.metrics.JmxPrometheusExporterModel;
import io.strimzi.operator.cluster.model.metrics.MetricsModel;
import io.strimzi.operator.cluster.model.metrics.StrimziMetricsReporterModel;
import io.strimzi.operator.cluster.model.metrics.SupportsMetrics;
Expand Down Expand Up @@ -228,9 +229,7 @@ public class KafkaCluster extends AbstractModel implements SupportsMetrics, Supp
private String clusterId;
private JmxModel jmx;
private CruiseControlMetricsReporter ccMetricsReporter;
// new MetricsModel metrics
private MetricsModel jmxExporterMetrics;
private StrimziMetricsReporterModel strimziMetricsReporter;
private MetricsModel metrics;
private LoggingModel logging;
private QuotasPlugin quotas;
/* test */ KafkaConfiguration configuration;
Expand Down Expand Up @@ -335,9 +334,9 @@ public static KafkaCluster fromCrd(Reconciliation reconciliation,
result.initImage = initImage;

if (kafkaClusterSpec.getMetricsConfig() instanceof JmxPrometheusExporterMetrics) {
result.jmxExporterMetrics = new MetricsModel(kafkaClusterSpec);
result.metrics = new JmxPrometheusExporterModel(kafkaClusterSpec);
} else if (kafkaClusterSpec.getMetricsConfig() instanceof StrimziMetricsReporter) {
result.strimziMetricsReporter = new StrimziMetricsReporterModel(kafkaClusterSpec);
result.metrics = new StrimziMetricsReporterModel(kafkaClusterSpec);
}

result.logging = new LoggingModel(kafkaClusterSpec, result.getClass().getSimpleName(), false, true);
Expand Down Expand Up @@ -423,6 +422,10 @@ public static KafkaCluster fromCrd(Reconciliation reconciliation,
return result;
}

private boolean hasMetricsConfig() {
return metrics != null && metrics.isEnabled();
}

/**
* Generates list of references to Kafka nodes for this Kafka cluster. The references contain both the pod name and
* the ID of the Kafka node.
Expand Down Expand Up @@ -1318,11 +1321,8 @@ private List<ContainerPort> getContainerPortList(KafkaPool pool) {
}

// Metrics port is enabled on all node types regardless their role
// if metrics instance of
if (jmxExporterMetrics != null && jmxExporterMetrics.isEnabled()) {
if (hasMetricsConfig()) {
ports.add(ContainerUtils.createContainerPort(MetricsModel.METRICS_PORT_NAME, MetricsModel.METRICS_PORT));
} else if (strimziMetricsReporter != null && strimziMetricsReporter.isEnabled()) {
ports.add(ContainerUtils.createContainerPort(StrimziMetricsReporterModel.METRICS_PORT_NAME, StrimziMetricsReporterModel.METRICS_PORT));
}

// JMX port is enabled on all node types regardless their role
Expand Down Expand Up @@ -1638,8 +1638,8 @@ private Container createContainer(ImagePullPolicy imagePullPolicy, KafkaPool poo
*/
private List<EnvVar> getEnvVars(KafkaPool pool) {
List<EnvVar> varList = new ArrayList<>();
String jmxMetricsEnabled = jmxExporterMetrics != null && jmxExporterMetrics.isEnabled() ? Boolean.TRUE.toString() : Boolean.FALSE.toString();
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_JMX_EXPORTER_ENABLED, jmxMetricsEnabled));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_JMX_EXPORTER_ENABLED,
hasMetricsConfig() && metrics instanceof JmxPrometheusExporterModel ? Boolean.TRUE.toString() : Boolean.FALSE.toString()));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_KAFKA_GC_LOG_ENABLED, String.valueOf(pool.gcLoggingEnabled)));

JvmOptionUtils.heapOptions(varList, 50, 5L * 1024L * 1024L * 1024L, pool.jvmOptions, pool.resources);
Expand Down Expand Up @@ -1751,10 +1751,8 @@ public NetworkPolicy generateNetworkPolicy(String operatorNamespace, Labels oper
}

// The Metrics port (if enabled) is opened to all by default
if (jmxExporterMetrics != null && jmxExporterMetrics.isEnabled()) {
if (hasMetricsConfig()) {
rules.add(NetworkPolicyUtils.createIngressRule(MetricsModel.METRICS_PORT, List.of()));
} else if (strimziMetricsReporter != null && strimziMetricsReporter.isEnabled()) {
rules.add(NetworkPolicyUtils.createIngressRule(StrimziMetricsReporterModel.METRICS_PORT, List.of()));
}

// The JMX port (if enabled) is opened to all by default
Expand Down Expand Up @@ -1839,7 +1837,7 @@ public String generatePerBrokerConfiguration(int nodeId, Map<Integer, Map<String
* @return String with the Kafka broker configuration
*/
private String generatePerBrokerConfiguration(NodeRef node, KafkaPool pool, Map<Integer, Map<String, String>> advertisedHostnames, Map<Integer, Map<String, String>> advertisedPorts) {
return new KafkaBrokerConfigurationBuilder(reconciliation, node)
KafkaBrokerConfigurationBuilder builder = new KafkaBrokerConfigurationBuilder(reconciliation, node)
.withRackId(rack)
.withKRaft(cluster, namespace, nodes())
.withKRaftMetadataLogDir(VolumeUtils.kraftMetadataPath(pool.storage))
Expand All @@ -1853,12 +1851,15 @@ private String generatePerBrokerConfiguration(NodeRef node, KafkaPool pool, Map<
)
.withAuthorization(cluster, authorization)
.withCruiseControl(cluster, ccMetricsReporter, node.broker())
.withStrimziMetricsReporter(strimziMetricsReporter)
.withTieredStorage(cluster, tieredStorage)
.withQuotas(cluster, quotas)
.withUserConfiguration(configuration, node.broker() && ccMetricsReporter != null, strimziMetricsReporter != null && strimziMetricsReporter.isEnabled())
.build()
.trim();
.withQuotas(cluster, quotas);
if (hasMetricsConfig() && metrics instanceof StrimziMetricsReporterModel) {
builder.withStrimziMetricsReporter((StrimziMetricsReporterModel) metrics)
.withUserConfiguration(configuration, node.broker() && ccMetricsReporter != null, true);
} else {
builder.withUserConfiguration(configuration, node.broker() && ccMetricsReporter != null, false);
}
return builder.build().trim();
}

/**
Expand All @@ -1871,10 +1872,10 @@ private String generatePerBrokerConfiguration(NodeRef node, KafkaPool pool, Map<
*
* @return ConfigMap with the shared configuration.
*/
public List<ConfigMap> generatePerBrokerConfigurationConfigMaps(MetricsAndLogging metricsAndLogging, Map<Integer, Map<String, String>> advertisedHostnames, Map<Integer, Map<String, String>> advertisedPorts) {
public List<ConfigMap> generatePerBrokerConfigurationConfigMaps(MetricsAndLogging metricsAndLogging, Map<Integer, Map<String, String>> advertisedHostnames, Map<Integer, Map<String, String>> advertisedPorts) {
String parsedMetrics = null;
if (jmxExporterMetrics != null && jmxExporterMetrics.isEnabled()) {
parsedMetrics = jmxExporterMetrics.metricsJson(reconciliation, metricsAndLogging.metricsCm());
if (hasMetricsConfig() && metrics instanceof JmxPrometheusExporterModel) {
parsedMetrics = ((JmxPrometheusExporterModel) metrics).metricsJson(reconciliation, metricsAndLogging.metricsCm());
}
String parsedLogging = logging().loggingConfiguration(reconciliation, metricsAndLogging.loggingCm());
List<ConfigMap> configMaps = new ArrayList<>();
Expand All @@ -1884,7 +1885,7 @@ public List<ConfigMap> generatePerBrokerConfigurationConfigMaps(MetricsAndLoggin
Map<String, String> data = new HashMap<>(4);

if (parsedMetrics != null) {
data.put(MetricsModel.CONFIG_MAP_KEY, parsedMetrics);
data.put(JmxPrometheusExporterModel.CONFIG_MAP_KEY, parsedMetrics);
}

data.put(logging.configMapKey(), parsedLogging);
Expand Down Expand Up @@ -1949,14 +1950,7 @@ public JmxModel jmx() {
* @return Metrics Model instance for configuring Prometheus metrics
*/
public MetricsModel metrics() {
return jmxExporterMetrics;
}

/**
* @return Strimzi Metrics Reporter Model instance for configuring Prometheus metrics
*/
public StrimziMetricsReporterModel strimziMetricsReporter() {
return strimziMetricsReporter;
return metrics;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import io.strimzi.operator.cluster.model.logging.LoggingModel;
import io.strimzi.operator.cluster.model.logging.LoggingUtils;
import io.strimzi.operator.cluster.model.logging.SupportsLogging;
import io.strimzi.operator.cluster.model.metrics.JmxPrometheusExporterModel;
import io.strimzi.operator.cluster.model.metrics.MetricsModel;
import io.strimzi.operator.cluster.model.metrics.SupportsMetrics;
import io.strimzi.operator.cluster.model.securityprofiles.ContainerSecurityProviderContextImpl;
Expand Down Expand Up @@ -266,10 +267,10 @@ protected static <C extends KafkaConnectCluster> C fromSpec(Reconciliation recon
result.jvmOptions = spec.getJvmOptions();

if (spec.getMetricsConfig() instanceof JmxPrometheusExporterMetrics) {
result.metrics = new MetricsModel(spec);
result.metrics = new JmxPrometheusExporterModel(spec);
} else if (spec.getMetricsConfig() instanceof StrimziMetricsReporter) {
LOGGER.errorCr(reconciliation, "The Strimzi Metrics Reporter is not supported for this component");
throw new InvalidResourceException("The Strimzi Metrics Reporter is not supported for this component");
LOGGER.errorCr(reconciliation, "The Strimzi Metrics Reporter is not supported with this component");
throw new InvalidResourceException("The Strimzi Metrics Reporter is not supported with this component");
}

result.logging = new LoggingModel(spec, result.getClass().getSimpleName(), false, true);
Expand Down
Loading

0 comments on commit 032882f

Please sign in to comment.