From cf6d145455529c7c17247642523c3a4c3ecc7874 Mon Sep 17 00:00:00 2001 From: Seth Jennings Date: Wed, 10 Jan 2018 14:16:15 -0600 Subject: [PATCH 1/2] UPSTREAM: 56191: CPU Manager panics on state initialization error Origin-commit: d4a358db521bce79185c82a44f8b07a7dc3fc82d --- pkg/kubelet/cm/cpumanager/state/state_file.go | 75 +++++++++------- .../cm/cpumanager/state/state_file_test.go | 89 ++++++++++--------- 2 files changed, 87 insertions(+), 77 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/state/state_file.go b/pkg/kubelet/cm/cpumanager/state/state_file.go index 20c7763350d1e..6c2353cf10f5d 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_file.go +++ b/pkg/kubelet/cm/cpumanager/state/state_file.go @@ -51,9 +51,10 @@ func NewFileState(filePath string, policyName string) State { if err := stateFile.tryRestoreState(); err != nil { // could not restore state, init new state file - glog.Infof("[cpumanager] state file: initializing empty state file - reason: \"%s\"", err) - stateFile.cache.ClearState() - stateFile.storeState() + msg := fmt.Sprintf("[cpumanager] state file: unable to restore state from disk (%s)\n", err.Error()) + + "Panicking because we cannot guarantee sane CPU affinity for existing containers.\n" + + fmt.Sprintf("Please drain this node and delete the CPU manager state file \"%s\" before restarting Kubelet.", stateFile.stateFilePath) + panic(msg) } return stateFile @@ -73,45 +74,51 @@ func (sf *stateFile) tryRestoreState() error { var content []byte - if content, err = ioutil.ReadFile(sf.stateFilePath); os.IsNotExist(err) { - // Create file - if _, err = os.Create(sf.stateFilePath); err != nil { - glog.Errorf("[cpumanager] state file: unable to create state file \"%s\":%s", sf.stateFilePath, err.Error()) - panic("[cpumanager] state file not created") - } - glog.Infof("[cpumanager] state file: created empty state file \"%s\"", sf.stateFilePath) - } else { - // File exists - try to read - var readState stateFileData + content, err = ioutil.ReadFile(sf.stateFilePath) - if err = json.Unmarshal(content, &readState); err != nil { - glog.Warningf("[cpumanager] state file: could not unmarshal, corrupted state file - \"%s\"", sf.stateFilePath) - return err - } + // If the state file does not exist or has zero length, write a new file. + if os.IsNotExist(err) || len(content) == 0 { + sf.storeState() + glog.Infof("[cpumanager] state file: created new state file \"%s\"", sf.stateFilePath) + return nil + } - if sf.policyName != readState.PolicyName { - return fmt.Errorf("policy configured \"%s\" != policy from state file \"%s\"", sf.policyName, readState.PolicyName) - } + // Fail on any other file read error. + if err != nil { + return err + } + + // File exists; try to read it. + var readState stateFileData + + if err = json.Unmarshal(content, &readState); err != nil { + glog.Errorf("[cpumanager] state file: could not unmarshal, corrupted state file - \"%s\"", sf.stateFilePath) + return err + } + + if sf.policyName != readState.PolicyName { + return fmt.Errorf("policy configured \"%s\" != policy from state file \"%s\"", sf.policyName, readState.PolicyName) + } + + if tmpDefaultCPUSet, err = cpuset.Parse(readState.DefaultCPUSet); err != nil { + glog.Errorf("[cpumanager] state file: could not parse state file - [defaultCpuSet:\"%s\"]", readState.DefaultCPUSet) + return err + } - if tmpDefaultCPUSet, err = cpuset.Parse(readState.DefaultCPUSet); err != nil { - glog.Warningf("[cpumanager] state file: could not parse state file - [defaultCpuSet:\"%s\"]", readState.DefaultCPUSet) + for containerID, cpuString := range readState.Entries { + if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil { + glog.Errorf("[cpumanager] state file: could not parse state file - container id: %s, cpuset: \"%s\"", containerID, cpuString) return err } + tmpAssignments[containerID] = tmpContainerCPUSet + } - for containerID, cpuString := range readState.Entries { - if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil { - glog.Warningf("[cpumanager] state file: could not parse state file - container id: %s, cpuset: \"%s\"", containerID, cpuString) - return err - } - tmpAssignments[containerID] = tmpContainerCPUSet - } + sf.cache.SetDefaultCPUSet(tmpDefaultCPUSet) + sf.cache.SetCPUAssignments(tmpAssignments) - sf.cache.SetDefaultCPUSet(tmpDefaultCPUSet) - sf.cache.SetCPUAssignments(tmpAssignments) + glog.V(2).Infof("[cpumanager] state file: restored state from state file \"%s\"", sf.stateFilePath) + glog.V(2).Infof("[cpumanager] state file: defaultCPUSet: %s", tmpDefaultCPUSet.String()) - glog.V(2).Infof("[cpumanager] state file: restored state from state file \"%s\"", sf.stateFilePath) - glog.V(2).Infof("[cpumanager] state file: defaultCPUSet: %s", tmpDefaultCPUSet.String()) - } return nil } diff --git a/pkg/kubelet/cm/cpumanager/state/state_file_test.go b/pkg/kubelet/cm/cpumanager/state/state_file_test.go index dde616555e16f..93dd3910ad29c 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_file_test.go +++ b/pkg/kubelet/cm/cpumanager/state/state_file_test.go @@ -77,33 +77,31 @@ func TestFileStateTryRestore(t *testing.T) { stateFileContent string policyName string expErr string + expPanic bool expectedState *stateMemory }{ { - "Invalid JSON - empty file", + "Invalid JSON - one byte file", "\n", "none", - "state file: could not unmarshal, corrupted state file", - &stateMemory{ - assignments: ContainerCPUAssignments{}, - defaultCPUSet: cpuset.NewCPUSet(), - }, + "[cpumanager] state file: unable to restore state from disk (unexpected end of JSON input)", + true, + &stateMemory{}, }, { "Invalid JSON - invalid content", "{", "none", - "state file: could not unmarshal, corrupted state file", - &stateMemory{ - assignments: ContainerCPUAssignments{}, - defaultCPUSet: cpuset.NewCPUSet(), - }, + "[cpumanager] state file: unable to restore state from disk (unexpected end of JSON input)", + true, + &stateMemory{}, }, { "Try restore defaultCPUSet only", `{"policyName": "none", "defaultCpuSet": "4-6"}`, "none", "", + false, &stateMemory{ assignments: ContainerCPUAssignments{}, defaultCPUSet: cpuset.NewCPUSet(4, 5, 6), @@ -113,11 +111,9 @@ func TestFileStateTryRestore(t *testing.T) { "Try restore defaultCPUSet only - invalid name", `{"policyName": "none", "defaultCpuSet" "4-6"}`, "none", - "", - &stateMemory{ - assignments: ContainerCPUAssignments{}, - defaultCPUSet: cpuset.NewCPUSet(), - }, + `[cpumanager] state file: unable to restore state from disk (invalid character '"' after object key)`, + true, + &stateMemory{}, }, { "Try restore assignments only", @@ -130,6 +126,7 @@ func TestFileStateTryRestore(t *testing.T) { }`, "none", "", + false, &stateMemory{ assignments: ContainerCPUAssignments{ "container1": cpuset.NewCPUSet(4, 5, 6), @@ -146,21 +143,17 @@ func TestFileStateTryRestore(t *testing.T) { "entries": {} }`, "B", - "policy configured \"B\" != policy from state file \"A\"", - &stateMemory{ - assignments: ContainerCPUAssignments{}, - defaultCPUSet: cpuset.NewCPUSet(), - }, + `[cpumanager] state file: unable to restore state from disk (policy configured "B" != policy from state file "A")`, + true, + &stateMemory{}, }, { "Try restore invalid assignments", `{"entries": }`, "none", - "state file: could not unmarshal, corrupted state file", - &stateMemory{ - assignments: ContainerCPUAssignments{}, - defaultCPUSet: cpuset.NewCPUSet(), - }, + "[cpumanager] state file: unable to restore state from disk (invalid character '}' looking for beginning of value)", + true, + &stateMemory{}, }, { "Try restore valid file", @@ -174,6 +167,7 @@ func TestFileStateTryRestore(t *testing.T) { }`, "none", "", + false, &stateMemory{ assignments: ContainerCPUAssignments{ "container1": cpuset.NewCPUSet(4, 5, 6), @@ -189,11 +183,9 @@ func TestFileStateTryRestore(t *testing.T) { "defaultCpuSet": "2-sd" }`, "none", - "state file: could not parse state file", - &stateMemory{ - assignments: ContainerCPUAssignments{}, - defaultCPUSet: cpuset.NewCPUSet(), - }, + `[cpumanager] state file: unable to restore state from disk (strconv.Atoi: parsing "sd": invalid syntax)`, + true, + &stateMemory{}, }, { "Try restore un-parsable assignments", @@ -206,17 +198,16 @@ func TestFileStateTryRestore(t *testing.T) { } }`, "none", - "state file: could not parse state file", - &stateMemory{ - assignments: ContainerCPUAssignments{}, - defaultCPUSet: cpuset.NewCPUSet(), - }, + `[cpumanager] state file: unable to restore state from disk (strconv.Atoi: parsing "p": invalid syntax)`, + true, + &stateMemory{}, }, { - "TryRestoreState creates empty state file", + "tryRestoreState creates empty state file", "", "none", "", + false, &stateMemory{ assignments: ContainerCPUAssignments{}, defaultCPUSet: cpuset.NewCPUSet(), @@ -226,11 +217,23 @@ func TestFileStateTryRestore(t *testing.T) { for idx, tc := range testCases { t.Run(tc.description, func(t *testing.T) { + defer func() { + if tc.expPanic { + r := recover() + panicMsg := r.(string) + if !strings.HasPrefix(panicMsg, tc.expErr) { + t.Fatalf(`expected panic "%s" but got "%s"`, tc.expErr, panicMsg) + } else { + t.Logf(`got expected panic "%s"`, panicMsg) + } + } + }() + sfilePath, err := ioutil.TempFile("/tmp", fmt.Sprintf("cpumanager_state_file_test_%d", idx)) if err != nil { t.Errorf("cannot create temporary file: %q", err.Error()) } - // Don't create state file, let TryRestoreState figure out that is should create + // Don't create state file, let tryRestoreState figure out that is should create if tc.stateFileContent != "" { writeToStateFile(sfilePath.Name(), tc.stateFileContent) } @@ -245,11 +248,11 @@ func TestFileStateTryRestore(t *testing.T) { if tc.expErr != "" { if logData.String() != "" { if !strings.Contains(logData.String(), tc.expErr) { - t.Errorf("TryRestoreState() error = %v, wantErr %v", logData.String(), tc.expErr) + t.Errorf("tryRestoreState() error = %v, wantErr %v", logData.String(), tc.expErr) return } } else { - t.Errorf("TryRestoreState() error = nil, wantErr %v", tc.expErr) + t.Errorf("tryRestoreState() error = nil, wantErr %v", tc.expErr) return } } @@ -268,7 +271,7 @@ func TestFileStateTryRestorePanic(t *testing.T) { }{ "Panic creating file", true, - "[cpumanager] state file not created", + "[cpumanager] state file not written", } t.Run(testCase.description, func(t *testing.T) { @@ -277,10 +280,10 @@ func TestFileStateTryRestorePanic(t *testing.T) { if err := recover(); err != nil { if testCase.wantPanic { if testCase.panicMessage == err { - t.Logf("TryRestoreState() got expected panic = %v", err) + t.Logf("tryRestoreState() got expected panic = %v", err) return } - t.Errorf("TryRestoreState() unexpected panic = %v, wantErr %v", err, testCase.panicMessage) + t.Errorf("tryRestoreState() unexpected panic = %v, wantErr %v", err, testCase.panicMessage) } } }() From 924a2a47b6efcf5460962f1cdb51d05aba661d0d Mon Sep 17 00:00:00 2001 From: Seth Jennings Date: Wed, 10 Jan 2018 14:21:09 -0600 Subject: [PATCH 2/2] UPSTREAM: 54410: Cpu manager reconcile loop - restore state Origin-commit: aecd8ed5b576df6a38c58617c5a754504e233934 --- pkg/kubelet/cm/cpumanager/cpu_manager.go | 23 ++++++++++++++++++++-- pkg/kubelet/cm/cpumanager/policy.go | 2 ++ pkg/kubelet/cm/cpumanager/policy_static.go | 8 +++++++- 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index 50bdcb5e7445e..6c59dca18d4e7 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -152,8 +152,8 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo } func (m *manager) Start(activePods ActivePodsFunc, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService) { - glog.Infof("[cpumanger] starting with %s policy", m.policy.Name()) - glog.Infof("[cpumanger] reconciling every %v", m.reconcilePeriod) + glog.Infof("[cpumanager] starting with %s policy", m.policy.Name()) + glog.Infof("[cpumanager] reconciling every %v", m.reconcilePeriod) m.activePods = activePods m.podStatusProvider = podStatusProvider @@ -234,6 +234,25 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec continue } + // Check whether container is present in state, there may be 3 reasons why it's not present: + // - policy does not want to track the container + // - kubelet has just been restarted - and there is no previous state file + // - container has been removed from state by RemoveContainer call (DeletionTimestamp is set) + if _, ok := m.state.GetCPUSet(containerID); !ok { + if status.Phase == v1.PodRunning && pod.DeletionTimestamp == nil { + glog.V(4).Infof("[cpumanager] reconcileState: container is not present in state - trying to add (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID) + err := m.AddContainer(pod, &container, containerID) + if err != nil { + glog.Errorf("[cpumanager] reconcileState: failed to add container (pod: %s, container: %s, container id: %s, error: %v)", pod.Name, container.Name, containerID, err) + failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID}) + } + } else { + // if DeletionTimestamp is set, pod has already been removed from state + // skip the pod/container since it's not running and will be deleted soon + continue + } + } + cset := m.state.GetCPUSetOrDefault(containerID) if cset.IsEmpty() { // NOTE: This should not happen outside of tests. diff --git a/pkg/kubelet/cm/cpumanager/policy.go b/pkg/kubelet/cm/cpumanager/policy.go index 39eb76316b108..c79091659e329 100644 --- a/pkg/kubelet/cm/cpumanager/policy.go +++ b/pkg/kubelet/cm/cpumanager/policy.go @@ -25,6 +25,8 @@ import ( type Policy interface { Name() string Start(s state.State) + // AddContainer call is idempotent AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error + // RemoveContainer call is idempotent RemoveContainer(s state.State, containerID string) error } diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index dfbb0a297d067..9a461bacb63fb 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -156,9 +156,15 @@ func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet { } func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error { - glog.Infof("[cpumanager] static policy: AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID) if numCPUs := guaranteedCPUs(pod, container); numCPUs != 0 { + glog.Infof("[cpumanager] static policy: AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID) // container belongs in an exclusively allocated pool + + if _, ok := s.GetCPUSet(containerID); ok { + glog.Infof("[cpumanager] static policy: container already present in state, skipping (container: %s, container id: %s)", container.Name, containerID) + return nil + } + cpuset, err := p.allocateCPUs(s, numCPUs) if err != nil { glog.Errorf("[cpumanager] unable to allocate %d CPUs (container id: %s, error: %v)", numCPUs, containerID, err)