Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stop_ongoing_execution flag to rebalance requests for full run #10703

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> onProposalReady(Re
AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder<?, ?> rebalanceOptionsBuilder) {
if (Annotations.booleanAnnotation(kafkaRebalance, ANNO_STRIMZI_IO_REBALANCE_AUTOAPPROVAL, false)) {
LOGGER.infoCr(reconciliation, "Auto-approval set on the KafkaRebalance resource");
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, false, rebalanceOptionsBuilder);
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, false, rebalanceOptionsBuilder, true);
} else {
KafkaRebalanceAnnotation rebalanceAnnotation = rebalanceAnnotation(kafkaRebalance);
switch (rebalanceAnnotation) {
Expand All @@ -876,7 +876,7 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> onProposalReady(Re
return configMapOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName()).compose(loadmap -> Future.succeededFuture(new MapAndStatus<>(loadmap, buildRebalanceStatusFromPreviousStatus(kafkaRebalance.getStatus(), StatusUtils.validate(reconciliation, kafkaRebalance)))));
case approve:
LOGGER.debugCr(reconciliation, "Annotation {}={}", ANNO_STRIMZI_IO_REBALANCE, KafkaRebalanceAnnotation.approve);
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, false, rebalanceOptionsBuilder);
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, false, rebalanceOptionsBuilder, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are using the stop ongoing execution only when we are approving a proposal ... what about refreshing while it's in ProposalPending or Rebalancing? (of course refreshing in ProposalReady doesn't make sense because there is nothing going on).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to stop ongoing execution when approving a proposal in the first place?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point ... what's the use case you saw @tinaselenge ?

Copy link
Contributor Author

@tinaselenge tinaselenge Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is my understanding of the code based on my recreation of the problem, please correct me if I'm wrong. The issue mainly happens when you refresh, while the KafkaRabalance is in Rebalancing state (let's say rebalance_1 is in progress). When refresh annotation applied, we send a request to stop the ongoing rebalance operation, however this does not stop rebalance_1 completely, and that's the problem. Immediately after the stop request completes, we send a request for a new proposal (dry_run=true), then the state becomes ProposalReady. If auto-approval is set or user manually approved this new proposal, we send a new rebalance request (dry_run=false), let's call it rebalance_2. However the request for rebalance_2 fails if the rebalance_1 is still in progress. This change makes sure that rebalance_1 is completely stopped and then the request for rebalance_2 is processed. I tested this only with auto-approval annotation, however I think it is possible to happen with manual approval by user, if they approved it quickly and rebalance_1 was taking a long time.

I don't think we need this flag set for ProposalPending or Rebalancing states, because when refresh annotation is applied while on these 2 states, we only send a request for new proposal (dry_run=true). We only need this flag set when we send dry_run=false requests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @tinaselenge what you described is what I faced during auto-rebalance where "auto-approval" is set and people can ask two consecutive scale up/down which result in two consecutive rebalancing requests.

I don't think we need this flag set for ProposalPending or Rebalancing states, because when refresh annotation is applied while on these 2 states, we only send a request for new proposal (dry_run=false). We only need this flag set when we send dry_run=true requests.

Did you mean the other way around? I mean "we only send a request for new proposal (dry_run=true). We only need this flag set when we send dry_run=false requests."

Because in PendingProposal we obviously ask for a new ... proposal with dry_run=true not false.
Anyway even when we ask for a new proposal, while a proposal is still processing, we should be sure that CC doesn't reply the same way so we should stop execution anyway (maybe not, because it's different from an actual rebalance, but it's worth checking on CC codebase). The same applied in Rebalancing where we ask for a new proposal (dry_run=true) is refresh is applied.

Long story short, the question is, is stop going execution flag valid only when you run an actual rebalance or even when you asked to CC to process a proposal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean the other way around? I mean "we only send a request for new proposal (dry_run=true). We only need this flag set when we send dry_run=false requests."

Yes, I meant the other way around. Sorry!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because in PendingProposal we obviously ask for a new ... proposal with dry_run=true not false.
Anyway even when we ask for a new proposal, while a proposal is still processing, we should be sure that CC doesn't reply the same way so we should stop execution anyway (maybe not, because it's different from an actual rebalance, but it's worth checking on CC codebase). The same applied in Rebalancing where we ask for a new proposal (dry_run=true) is refresh is applied.

I agree that we should stop the ongoing execution when we ask for a new proposal due to "refresh" annotation so the optimisation calculation is up to date. And we do already have this logic, if you see the "Rebalancing()" function. In this function, the operator first calls the stop endpoint and then requests a new proposal. The issue is sometimes CC internally can still have in progress batch, even though stop endpoint was called. I can spend sometime looking into CC code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long story short, the question is, is stop going execution flag valid only when you run an actual rebalance or even when you asked to CC to process a proposal?

It seems to be effective when we run an actual rebalance, not when we asked for a proposal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is sometimes CC internally can still have in progress batch, even though stop endpoint was called.

So it means that even if we call the stopExecution we should then call the requestRebalance with the stop_ongoing_execution=true even in the onRebalancing. At this point I was wondering if we should actually totally remove the stopExecution call and just pass the corresponding flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it be still beneficial to have the stopExecution call because it gets called before requesting a new proposal?The flag is set in a later call to request an actual rebalance operation but the existing rebalance might be already stopped by the earlier stopExecution, and in the case it's not, the flag helps.

case refresh:
LOGGER.debugCr(reconciliation, "Annotation {}={}", ANNO_STRIMZI_IO_REBALANCE, KafkaRebalanceAnnotation.refresh);
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder);
Expand Down Expand Up @@ -1129,22 +1129,33 @@ private boolean isKafkaClusterReady(Kafka kafka) {
&& kafka.getStatus().getConditions().stream().anyMatch(condition -> condition.getType().equals("Ready") && condition.getStatus().equals("True"));
}

private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> requestRebalance(Reconciliation reconciliation,
String host, CruiseControlApi apiClient, KafkaRebalance kafkaRebalance,
boolean dryrun, AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder<?, ?> rebalanceOptionsBuilder,
boolean stopOngoingExecution) {
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, dryrun, rebalanceOptionsBuilder, null, stopOngoingExecution);
}

private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> requestRebalance(Reconciliation reconciliation,
String host, CruiseControlApi apiClient, KafkaRebalance kafkaRebalance,
boolean dryrun, AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder<?, ?> rebalanceOptionsBuilder) {
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, dryrun, rebalanceOptionsBuilder, null);
return requestRebalance(reconciliation, host, apiClient, kafkaRebalance, dryrun, rebalanceOptionsBuilder, null, false);
}


private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> requestRebalance(Reconciliation reconciliation, String host, CruiseControlApi apiClient, KafkaRebalance kafkaRebalance,
boolean dryrun,
AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder<?, ?> rebalanceOptionsBuilder, String userTaskID) {
AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder<?, ?> rebalanceOptionsBuilder,
String userTaskID, boolean stopOngoingExecution) {

LOGGER.infoCr(reconciliation, "Requesting Cruise Control rebalance [dryrun={}]", dryrun);
LOGGER.infoCr(reconciliation, "Requesting Cruise Control rebalance [dryrun={}] [stop_ongoing_execution={}]", dryrun, stopOngoingExecution);
rebalanceOptionsBuilder.withVerboseResponse();
if (!dryrun) {
rebalanceOptionsBuilder.withFullRun();
}
if (stopOngoingExecution) {
rebalanceOptionsBuilder.withStopOngoingExecution();
}
// backward compatibility, no mode specified means "full"
KafkaRebalanceMode mode = Optional.ofNullable(kafkaRebalance.getSpec())
.map(spec -> spec.getMode())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public abstract class AbstractRebalanceOptions {
private final boolean skipHardGoalCheck;
/** Sets whether the response should be JSON formatted or formatted for readability on the command line */
private final boolean json;
/** Sets whether to stop the ongoing execution (if any) and start executing the given request. */
private final boolean stopOngoingExecution;
/** A regular expression to specify topics that should not be considered for replica movement */
private final String excludedTopics;
/** The upper bound of ongoing replica movements going into/out of each broker */
Expand Down Expand Up @@ -67,6 +69,13 @@ public boolean isJson() {
return json;
}

/**
* @return True if stopping the ongoing execution (if any) and starting executing the given request. False otherwise.
*/
public boolean isStopOngoingExecution() {
return stopOngoingExecution;
}

/**
* @return Excludes topics
*/
Expand Down Expand Up @@ -108,6 +117,7 @@ public List<String> getReplicaMovementStrategies() {
this.verbose = builder.verbose;
this.skipHardGoalCheck = builder.skipHardGoalCheck;
this.json = builder.json;
this.stopOngoingExecution = builder.stopOngoingExecution;
this.excludedTopics = builder.excludedTopics;
this.concurrentPartitionMovementsPerBroker = builder.concurrentPartitionMovementsPerBroker;
this.concurrentLeaderMovements = builder.concurrentLeaderMovements;
Expand All @@ -127,6 +137,7 @@ public abstract static class AbstractRebalanceOptionsBuilder<B extends AbstractR
private boolean verbose;
private boolean skipHardGoalCheck;
private boolean json;
private boolean stopOngoingExecution;
private String excludedTopics;
private int concurrentPartitionMovementsPerBroker;
private int concurrentLeaderMovements;
Expand All @@ -138,6 +149,7 @@ public abstract static class AbstractRebalanceOptionsBuilder<B extends AbstractR
goals = null;
verbose = false;
skipHardGoalCheck = false;
stopOngoingExecution = false;
json = true;
excludedTopics = null;
concurrentPartitionMovementsPerBroker = 0;
Expand Down Expand Up @@ -183,6 +195,16 @@ public B withSkipHardGoalCheck() {
return self();
}

/**
* Stop the ongoing execution (if any) and start executing the given request
*
* @return Instance of this builder
*/
public B withStopOngoingExecution() {
this.stopOngoingExecution = true;
return self();
}

/**
* Set rebalance goals
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ private PathBuilder withAbstractRebalanceParameters(AbstractRebalanceOptions opt
if (options != null) {
PathBuilder builder = withParameter(CruiseControlParameters.DRY_RUN, String.valueOf(options.isDryRun()))
.withParameter(CruiseControlParameters.VERBOSE, String.valueOf(options.isVerbose()))
.withParameter(CruiseControlParameters.SKIP_HARD_GOAL_CHECK, String.valueOf(options.isSkipHardGoalCheck()));
.withParameter(CruiseControlParameters.SKIP_HARD_GOAL_CHECK, String.valueOf(options.isSkipHardGoalCheck()))
.withParameter(CruiseControlParameters.STOP_ONGOING_EXECUTION, String.valueOf(options.isStopOngoingExecution()));

if (options.getExcludedTopics() != null) {
builder.withParameter(CruiseControlParameters.EXCLUDED_TOPICS, options.getExcludedTopics());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,74 @@ private void krNewToProposalReadyToRebalancingToReadyThenRefresh(VertxTestContex
}));
}

/**
* Tests the transition from 'New' to 'Ready'
* The rebalance proposal is auto approved and the resource moves to 'Rebalancing'.
* Then the Rebalancing KafkaRebalance is refreshed and a moved to 'ProposalReady' again.
* Then the ProposalReady KafkaRebalance moves to Rebalancing again and finally to 'Ready'
*
* 1. A new KafkaRebalance resource is created with auto-approval annotation set; it is in the 'New' state
* 2. The operator requests a rebalance proposal through the Cruise Control REST API
* 3. The rebalance proposal is ready on the first call
* 4. The KafkaRebalance resource transitions to the 'ProposalReady' state
* 5. The operator requests the rebalance operation through the Cruise Control REST API
* 6. The rebalance operation is not done immediately; the operator starts polling the Cruise Control REST API
* 7. The KafkaRebalance resource moves to the 'Rebalancing' state
* 8. The KafkaRebalance resource is annotated with 'strimzi.io/rebalance=refresh' while the rebalancing is still in progress
* 9. The operator stops polling the Cruise Control REST API and requests a stop execution
* 10. The operator requests a new rebalance proposal through the Cruise Control REST API
* 11. The KafkaRebalance resource transitions to the 'ProposalReady' state again
* 12. The operator requests the rebalance operation through the Cruise Control REST API
* 13. The rebalance operation is not done immediately; the operator starts polling the Cruise Control REST API
* 14. The KafkaRebalance resource moves to the 'Rebalancing' state again
* 15. The rebalance operation is done
* 16. The KafkaRebalance resource moves to the 'Ready' state
*/
@Test
public void krNewToProposalReadyToRebalancingToRefresh(VertxTestContext context) throws IOException, URISyntaxException {
KafkaRebalance kr = createKafkaRebalance(namespace, CLUSTER_NAME, RESOURCE_NAME, REMOVE_BROKER_KAFKA_REBALANCE_SPEC, true);

// Set up the rebalance and user tasks endpoints with the number of pending calls before a response is received.
cruiseControlServer.setupCCRebalanceResponse(0, CruiseControlEndpoints.REMOVE_BROKER);
cruiseControlServer.setupCCUserTasksResponseNoGoals(0, 0);
cruiseControlServer.setupCCStopResponse();

Crds.kafkaRebalanceOperation(client).inNamespace(namespace).resource(kr).create();
crdCreateKafka();
crdCreateCruiseControlSecrets();

Checkpoint checkpoint = context.checkpoint();
krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName()))
// the resource moved from 'New' to 'ProposalReady' directly (no pending calls in the Mock server)
.onComplete(context.succeeding(v ->
assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.ProposalReady)))
.compose(v -> krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName())))
.onComplete(context.succeeding(v -> {
// the resource moved from ProposalReady to Rebalancing on auto approval
assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.Rebalancing);
}))
.compose(v -> {
// apply the "refresh" annotation to the resource in the Rebalancing state
annotate(client, namespace, kr.getMetadata().getName(), KafkaRebalanceAnnotation.refresh);
return krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName()));
})
.onComplete(context.succeeding(v -> {
// the resource moved from Rebalancing to ProposalReady due to refresh annotation
assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.ProposalReady);
}))
.compose(v -> krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName())))
.onComplete(context.succeeding(v -> {
// the resource moved from ProposalReady to Rebalancing
assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.Rebalancing);
}))
.compose(v -> krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName())))
.onComplete(context.succeeding(v -> {
// the resource moved from Rebalancing to Ready
assertState(context, client, namespace, kr.getMetadata().getName(), KafkaRebalanceState.Ready);
checkpoint.flag();
}));
}

/**
* Tests the transition from 'New' to 'NotReady' due to "missing hard goals" error
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,27 @@ private void krStoppedRefreshToPendingProposal(Vertx vertx, VertxTestContext con
.onComplete(result -> checkOptimizationResults(result, context, true));
}

/**
* Tests the transition from 'Rebalancing' to 'ProposalReady' when refresh
*
* 1. A new KafkaRebalance resource is created and annotated with strimzi.io/rebalance=refresh; it is in the Rebalancing state
* 2. The operator calls the /stop_proposal_execution to stop the ongoing rebalance execution
* 3. The operator sends a request for a new proposal
* 4. The operator computes the next state on the proposal via the Cruise Control REST API
* 5. The rebalance proposal is ready on the first call
* 6. The KafkaRebalance resource moves to the 'ProposalReady' state
*/
@Test
public void testRebalancingToRefreshProposalReady(Vertx vertx, VertxTestContext context) throws IOException, URISyntaxException {
KafkaRebalance kcRebalance = createKafkaRebalance(KafkaRebalanceState.Rebalancing, "", "refresh", REMOVE_BROKER_KAFKA_REBALANCE_SPEC, null, false);
cruiseControlServer.setupCCStopResponse();
cruiseControlServer.setupCCRebalanceResponse(0, CruiseControlEndpoints.REMOVE_BROKER);
checkTransition(vertx, context,
KafkaRebalanceState.Rebalancing, KafkaRebalanceState.ProposalReady,
kcRebalance)
.onComplete(result -> checkOptimizationResults(result, context, false));
}

/**
* Tests the transition from 'Stopped' to 'ProposalReady' when refresh
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ private String getExpectedRebalanceString() {
CruiseControlParameters.DRY_RUN + "=false&" +
CruiseControlParameters.VERBOSE + "=true&" +
CruiseControlParameters.SKIP_HARD_GOAL_CHECK + "=false&" +
CruiseControlParameters.STOP_ONGOING_EXECUTION + "=false&" +
CruiseControlParameters.EXCLUDED_TOPICS + "=test-.*&" +
CruiseControlParameters.GOALS + "=");

Expand Down Expand Up @@ -72,6 +73,7 @@ public void testQueryStringList() {
.withParameter(CruiseControlParameters.DRY_RUN, "false")
.withParameter(CruiseControlParameters.VERBOSE, "true")
.withParameter(CruiseControlParameters.SKIP_HARD_GOAL_CHECK, "false")
.withParameter(CruiseControlParameters.STOP_ONGOING_EXECUTION, "false")
.withParameter(CruiseControlParameters.EXCLUDED_TOPICS, "test-.*")
.withParameter(CruiseControlParameters.GOALS, GOALS)
.withParameter(CruiseControlParameters.REBALANCE_DISK, "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ public enum CruiseControlParameters {
/**
* Skip rack awareness check
*/
SKIP_RACK_AWARENESS_CHECK("skip_rack_awareness_check");
SKIP_RACK_AWARENESS_CHECK("skip_rack_awareness_check"),

/**
* Stop the ongoing execution (if any) and start executing the given request
*/
STOP_ONGOING_EXECUTION("stop_ongoing_execution");

private final String key;

Expand Down
Loading