Skip to content

Commit

Permalink
handling image stream changes
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Minář <[email protected]>
  • Loading branch information
Michal Minář committed Apr 26, 2018
1 parent 53ef20c commit 2d04866
Show file tree
Hide file tree
Showing 5 changed files with 419 additions and 36 deletions.
39 changes: 34 additions & 5 deletions pkg/oc/admin/prune/imageprune/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ import (
"sort"
"strings"

"github.com/docker/distribution/registry/api/errcode"
"github.com/golang/glog"

kmeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/kubernetes/pkg/api/legacyscheme"
kapiref "k8s.io/kubernetes/pkg/api/ref"
kapi "k8s.io/kubernetes/pkg/apis/core"

"github.com/docker/distribution/registry/api/errcode"
"github.com/golang/glog"

kerrors "k8s.io/apimachinery/pkg/util/errors"

imageapi "github.com/openshift/origin/pkg/image/apis/image"
imagegraph "github.com/openshift/origin/pkg/oc/graph/imagegraph/nodes"
"github.com/openshift/origin/pkg/util/netutils"
)

Expand Down Expand Up @@ -302,3 +302,32 @@ func getRef(obj runtime.Object) *kapi.ObjectReference {
}
return ref
}

// removeImageNodesAtIndexes removes nodes at the given indexes from the given array and returns the resulting
// slice that points to the original array. The indexes must be valid and sorted in ascending order. Otherwise
// the result is undefined.
func removeImageNodesAtIndexes(nodes []*imagegraph.ImageNode, indexes ...int) []*imagegraph.ImageNode {
if len(nodes) == 0 || len(indexes) == 0 {
return nodes
}

tmp := nodes[:indexes[0]]

highestIndex := -1

for i, n := range indexes {
if n <= highestIndex || n >= len(nodes) || n < 0 {
// indexes are not sorted or invalid
break
}

highestIndex = n

if i < len(indexes)-1 && indexes[i+1] < len(nodes) && indexes[i+1] > highestIndex {
tmp = append(tmp, nodes[n+1:indexes[i+1]]...)
} else {
tmp = append(tmp, nodes[n+1:]...)
}
}
return tmp
}
90 changes: 90 additions & 0 deletions pkg/oc/admin/prune/imageprune/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"testing"

imagegraph "github.com/openshift/origin/pkg/oc/graph/imagegraph/nodes"
knet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/util/diff"
)
Expand Down Expand Up @@ -215,3 +216,92 @@ func TestDefaultImagePinger(t *testing.T) {
}()
}
}

func TestRemoveImageNodesAtIndexes(t *testing.T) {
for _, tc := range []struct {
name string
nodes int
remove []int
expectedRemainingIndexes []int
}{
{
name: "no nodes",
remove: []int{0, 1, 2},
expectedRemainingIndexes: []int{},
},
{
name: "nothing to remove",
nodes: 4,
remove: []int{},
expectedRemainingIndexes: []int{0, 1, 2, 3},
},
{
name: "remove first",
nodes: 3,
remove: []int{0},
expectedRemainingIndexes: []int{1, 2},
},
{
name: "remove last",
nodes: 3,
remove: []int{2},
expectedRemainingIndexes: []int{0, 1},
},
{
name: "remove 2 consecutive",
nodes: 4,
remove: []int{1, 2},
expectedRemainingIndexes: []int{0, 3},
},
{
name: "remove odd",
nodes: 5,
remove: []int{1, 3, 5, 7},
expectedRemainingIndexes: []int{0, 2, 4},
},
{
name: "remove even",
nodes: 5,
remove: []int{0, 2, 4, 6, 8},
expectedRemainingIndexes: []int{1, 3},
},
{
name: "unsorted sequence",
nodes: 5,
remove: []int{0, 2, 2, 2, 4, 3, 2, 1, 2},
expectedRemainingIndexes: []int{1, 3, 4},
},
} {
t.Run(tc.name, func(t *testing.T) {
// generate nodes
nodes := make([]*imagegraph.ImageNode, 0, tc.nodes)
nodeIndexes := map[*imagegraph.ImageNode]int{}
for i := 0; i < tc.nodes; i++ {
node := &imagegraph.ImageNode{}
nodes = append(nodes, node)
nodeIndexes[nodes[i]] = i
}

nodes = removeImageNodesAtIndexes(nodes, tc.remove...)

for i := 0; i < len(nodes) || i < len(tc.expectedRemainingIndexes); i++ {
if i >= len(nodes) {
t.Errorf("no nodes at index %d where node #%d is expected", i, tc.expectedRemainingIndexes[i])
continue
}
newIndex, ok := nodeIndexes[nodes[i]]
if !ok {
t.Errorf("unknown node present on index %d", i)
continue
}
if i >= len(tc.expectedRemainingIndexes) {
t.Errorf("found node #%d at index %d where no node is expected", newIndex, i)
continue
}
if a, e := newIndex, tc.expectedRemainingIndexes[i]; a != e {
t.Errorf("expected node #%d at index %d, not node #%d", e, i, a)
}
}
})
}
}
79 changes: 68 additions & 11 deletions pkg/oc/admin/prune/imageprune/prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/util/retry"
kapi "k8s.io/kubernetes/pkg/apis/core"
kapisext "k8s.io/kubernetes/pkg/apis/extensions"
Expand Down Expand Up @@ -60,7 +61,7 @@ const (
// ImageNode to an ImageComponentNode.
ReferencedImageManifestEdgeKind = "ReferencedImageManifest"

pruneImageWorkerCount = 5
defaultPruneImageWorkerCount = 5
)

// RegistryClientFactoryFunc is a factory function returning a registry client for use in a worker.
Expand Down Expand Up @@ -149,6 +150,8 @@ type PrunerOptions struct {
// Streams is the entire list of image streams across all namespaces in the
// cluster.
Streams *imageapi.ImageStreamList
// StreamsWatcher watches for stream changes.
StreamsWatcher watch.Interface
// Pods is the entire list of pods across all namespaces in the cluster.
Pods *kapi.PodList
// RCs is the entire list of replication controllers across all namespaces in
Expand Down Expand Up @@ -176,6 +179,9 @@ type PrunerOptions struct {
RegistryClientFactory RegistryClientFactoryFunc
// RegistryURL is the URL of the integrated Docker registry.
RegistryURL *url.URL
// NumWorkers is a desired number of workers concurrently handling image prune jobs. If less than 1, the
// default number of workers will be spawned.
NumWorkers int
}

// Pruner knows how to prune istags, images, manifest, layers, image configs and blobs.
Expand All @@ -199,10 +205,13 @@ type pruner struct {
algorithm pruneAlgorithm
registryClientFactory RegistryClientFactoryFunc
registryURL *url.URL
imageStreamWatcher watch.Interface
imageStreamLimits map[string][]*kapi.LimitRange
// sorted queue of images to prune
queue []*imagegraph.ImageNode
// contains prunable images removed from queue that are currently being processed
processedImages map[*imagegraph.ImageNode]*Job
numWorkers int
}

var _ Pruner = &pruner{}
Expand Down Expand Up @@ -283,6 +292,13 @@ func NewPruner(options PrunerOptions) (Pruner, kerrors.Aggregate) {
registryClientFactory: options.RegistryClientFactory,
registryURL: options.RegistryURL,
processedImages: make(map[*imagegraph.ImageNode]*Job),
imageStreamWatcher: options.StreamsWatcher,
imageStreamLimits: options.LimitRanges,
numWorkers: options.NumWorkers,
}

if p.numWorkers < 1 {
p.numWorkers = defaultPruneImageWorkerCount
}

if err := p.buildGraph(options); err != nil {
Expand Down Expand Up @@ -776,6 +792,50 @@ func (p *pruner) addBuildStrategyImageReferencesToGraph(referrer *kapi.ObjectRef
return nil
}

func (p *pruner) handleImageStreamEvent(event watch.Event) {
getIsNode := func() (*imageapi.ImageStream, *imagegraph.ImageStreamNode) {
is, ok := event.Object.(*imageapi.ImageStream)
if !ok {
utilruntime.HandleError(fmt.Errorf("internal error: expected ImageStream object in %s event, not %T", event.Type, event.Object))
return nil, nil
}
n := p.g.Find(imagegraph.ImageStreamNodeName(is))
if isNode, ok := n.(*imagegraph.ImageStreamNode); ok {
return is, isNode
}
return is, nil
}

switch event.Type {
case watch.Added:
is, isNode := getIsNode()
if is == nil {
return
}
if isNode != nil {
glog.V(4).Infof("Ignoring added ImageStream %s that is already present in the graph", getName(is))
return
}
glog.V(4).Infof("Adding ImageStream %s to the graph", getName(is))
p.addImageStreamsToGraph(&imageapi.ImageStreamList{Items: []imageapi.ImageStream{*is}}, p.imageStreamLimits)

case watch.Modified:
is, isNode := getIsNode()
if is == nil {
return
}

if isNode != nil {
glog.V(4).Infof("Removing updated ImageStream %s from the graph", getName(is))
// first remove the current node if present
p.g.RemoveNode(imagegraph.EnsureImageStreamNode(p.g, is))
}

glog.V(4).Infof("Adding updated ImageStream %s back to the graph", getName(is))
p.addImageStreamsToGraph(&imageapi.ImageStreamList{Items: []imageapi.ImageStream{*is}}, p.imageStreamLimits)
}
}

// getImageNodes returns only nodes of type ImageNode.
func getImageNodes(nodes []gonum.Node) map[string]*imagegraph.ImageNode {
ret := make(map[string]*imagegraph.ImageNode)
Expand Down Expand Up @@ -1112,7 +1172,7 @@ func (p *pruner) Prune(

defer close(jobChan)

for i := 0; i < pruneImageWorkerCount; i++ {
for i := 0; i < p.numWorkers; i++ {
worker, err := NewWorker(
p.algorithm,
p.registryClientFactory,
Expand Down Expand Up @@ -1145,9 +1205,10 @@ func (p *pruner) runLoop(
jobChan chan<- *Job,
resultChan <-chan JobResult,
) (deletions []Deletion, failures []Failure) {
isUpdateChan := p.imageStreamWatcher.ResultChan()
for {
// make workers busy
for len(p.processedImages) < pruneImageWorkerCount {
for len(p.processedImages) < p.numWorkers {
job, blocked := p.getNextJob()
if blocked {
break
Expand All @@ -1172,7 +1233,8 @@ func (p *pruner) runLoop(
failures = append(failures, failure)
}
delete(p.processedImages, res.Job.Image)
// TODO: handle image stream updates
case event := <-isUpdateChan:
p.handleImageStreamEvent(event)
// TODO: handle new images - do not add them to the queue though
}
}
Expand Down Expand Up @@ -1220,13 +1282,8 @@ func (p *pruner) getNextJob() (job *Job, blocked bool) {
blocked = job == nil

// remove no longer prunable images from the queue
for i, n := range toRemove {
if i < len(toRemove)-1 {
p.queue = append(p.queue[:n-i], p.queue[n+1-i:toRemove[i+1]-i]...)
} else {
p.queue = append(p.queue[:n-i], p.queue[n+1-i:]...)
}
}
// TODO: highly ineffective for a long queue O(N*M), consider the use of doubly linked list
p.queue = removeImageNodesAtIndexes(p.queue, toRemove...)

return
}
Expand Down
Loading

0 comments on commit 2d04866

Please sign in to comment.