diff --git a/pkg/oc/admin/prune/imageprune/prune.go b/pkg/oc/admin/prune/imageprune/prune.go index 6b0b7ccfe2a5..dab6fa8983fa 100644 --- a/pkg/oc/admin/prune/imageprune/prune.go +++ b/pkg/oc/admin/prune/imageprune/prune.go @@ -147,11 +147,13 @@ type PrunerOptions struct { // Images is the entire list of images in OpenShift. An image must be in this // list to be a candidate for pruning. Images *imageapi.ImageList + // ImageWatcher watches for image changes. + ImageWatcher watch.Interface // Streams is the entire list of image streams across all namespaces in the // cluster. Streams *imageapi.ImageStreamList - // StreamsWatcher watches for stream changes. - StreamsWatcher watch.Interface + // StreamWatcher watches for stream changes. + StreamWatcher watch.Interface // Pods is the entire list of pods across all namespaces in the cluster. Pods *kapi.PodList // RCs is the entire list of replication controllers across all namespaces in @@ -205,6 +207,7 @@ type pruner struct { algorithm pruneAlgorithm registryClientFactory RegistryClientFactoryFunc registryURL *url.URL + imageWatcher watch.Interface imageStreamWatcher watch.Interface imageStreamLimits map[string][]*kapi.LimitRange // sorted queue of images to prune @@ -292,7 +295,8 @@ func NewPruner(options PrunerOptions) (Pruner, kerrors.Aggregate) { registryClientFactory: options.RegistryClientFactory, registryURL: options.RegistryURL, processedImages: make(map[*imagegraph.ImageNode]*Job), - imageStreamWatcher: options.StreamsWatcher, + imageWatcher: options.ImageWatcher, + imageStreamWatcher: options.StreamWatcher, imageStreamLimits: options.LimitRanges, numWorkers: options.NumWorkers, } @@ -828,7 +832,7 @@ func (p *pruner) handleImageStreamEvent(event watch.Event) { if isNode != nil { glog.V(4).Infof("Removing updated ImageStream %s from the graph", getName(is)) // first remove the current node if present - p.g.RemoveNode(imagegraph.EnsureImageStreamNode(p.g, is)) + p.g.RemoveNode(isNode) } glog.V(4).Infof("Adding updated ImageStream %s back to the graph", getName(is)) @@ -836,6 +840,44 @@ func (p *pruner) handleImageStreamEvent(event watch.Event) { } } +func (p *pruner) handleImageEvent(event watch.Event) { + getImageNode := func() (*imageapi.Image, *imagegraph.ImageNode) { + img, ok := event.Object.(*imageapi.Image) + if !ok { + utilruntime.HandleError(fmt.Errorf("internal error: expected Image object in %s event, not %T", event.Type, event.Object)) + return nil, nil + } + n := p.g.Find(imagegraph.ImageNodeName(img)) + if imgNode, ok := n.(*imagegraph.ImageNode); ok { + return img, imgNode + } + return img, nil + } + + switch event.Type { + case watch.Added: + img, imgNode := getImageNode() + if img == nil { + return + } + if imgNode != nil { + glog.V(4).Infof("Ignoring added Image %s that is already present in the graph", img) + return + } + glog.V(4).Infof("Adding new Image %s to the graph", img.Name) + p.addImagesToGraph(&imageapi.ImageList{Items: []imageapi.Image{*img}}) + + case watch.Deleted: + img, imgNode := getImageNode() + if imgNode == nil { + glog.V(4).Infof("Ignoring event for deleted Image %s that is not present in the graph", img.Name) + return + } + glog.V(4).Infof("Removing deleted image %s from the graph", img.Name) + p.g.RemoveNode(imgNode) + } +} + // getImageNodes returns only nodes of type ImageNode. func getImageNodes(nodes []gonum.Node) map[string]*imagegraph.ImageNode { ret := make(map[string]*imagegraph.ImageNode) @@ -1205,6 +1247,7 @@ func (p *pruner) runLoop( jobChan chan<- *Job, resultChan <-chan JobResult, ) (deletions []Deletion, failures []Failure) { + imgUpdateChan := p.imageWatcher.ResultChan() isUpdateChan := p.imageStreamWatcher.ResultChan() for { // make workers busy @@ -1235,7 +1278,8 @@ func (p *pruner) runLoop( delete(p.processedImages, res.Job.Image) case event := <-isUpdateChan: p.handleImageStreamEvent(event) - // TODO: handle new images - do not add them to the queue though + case event := <-imgUpdateChan: + p.handleImageEvent(event) } } } diff --git a/pkg/oc/admin/prune/imageprune/prune_test.go b/pkg/oc/admin/prune/imageprune/prune_test.go index 17e57d8865e6..f831379dc252 100644 --- a/pkg/oc/admin/prune/imageprune/prune_test.go +++ b/pkg/oc/admin/prune/imageprune/prune_test.go @@ -950,8 +950,9 @@ func TestImagePruning(t *testing.T) { Namespace: test.namespace, AllImages: test.allImages, Images: &test.images, + ImageWatcher: watch.NewFake(), Streams: &test.streams, - StreamsWatcher: watch.NewFake(), + StreamWatcher: watch.NewFake(), Pods: &test.pods, RCs: &test.rcs, BCs: &test.bcs, @@ -1318,8 +1319,9 @@ func TestRegistryPruning(t *testing.T) { KeepTagRevisions: &keepTagRevisions, PruneRegistry: &test.pruneRegistry, Images: &test.images, + ImageWatcher: watch.NewFake(), Streams: &test.streams, - StreamsWatcher: watch.NewFake(), + StreamWatcher: watch.NewFake(), Pods: &kapi.PodList{}, RCs: &kapi.ReplicationControllerList{}, BCs: &buildapi.BuildConfigList{}, @@ -1393,17 +1395,18 @@ func TestImageWithStrongAndWeakRefsIsNotPruned(t *testing.T) { rss := testutil.RSList() options := PrunerOptions{ - Images: &images, - Streams: &streams, - StreamsWatcher: watch.NewFake(), - Pods: &pods, - RCs: &rcs, - BCs: &bcs, - Builds: &builds, - DSs: &dss, - Deployments: &deployments, - DCs: &dcs, - RSs: &rss, + Images: &images, + ImageWatcher: watch.NewFake(), + Streams: &streams, + StreamWatcher: watch.NewFake(), + Pods: &pods, + RCs: &rcs, + BCs: &bcs, + Builds: &builds, + DSs: &dss, + Deployments: &deployments, + DCs: &dcs, + RSs: &rss, } keepYoungerThan := 24 * time.Hour keepTagRevisions := 2 @@ -1662,7 +1665,7 @@ func TestChangeImageStreamsWhilePruning(t *testing.T) { ) streams := testutil.StreamList(testutil.Stream("registry1", "foo", "bar", testutil.Tags())) - streamsWatcher := watch.NewFake() + streamWatcher := watch.NewFake() pods := testutil.PodList() rcs := testutil.RCList() bcs := testutil.BCList() @@ -1673,17 +1676,18 @@ func TestChangeImageStreamsWhilePruning(t *testing.T) { rss := testutil.RSList() options := PrunerOptions{ - Images: &images, - Streams: &streams, - StreamsWatcher: streamsWatcher, - Pods: &pods, - RCs: &rcs, - BCs: &bcs, - Builds: &builds, - DSs: &dss, - Deployments: &deployments, - DCs: &dcs, - RSs: &rss, + Images: &images, + ImageWatcher: watch.NewFake(), + Streams: &streams, + StreamWatcher: streamWatcher, + Pods: &pods, + RCs: &rcs, + BCs: &bcs, + Builds: &builds, + DSs: &dss, + Deployments: &deployments, + DCs: &dcs, + RSs: &rss, RegistryClientFactory: FakeRegistryClientFactory, RegistryURL: &url.URL{Scheme: "https", Host: "registry1.io"}, NumWorkers: 1, @@ -1731,7 +1735,7 @@ func TestChangeImageStreamsWhilePruning(t *testing.T) { testutil.Tag("latest", testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000002", "registry1/foo/new@sha256:0000000000000000000000000000000000000000000000000000000000000002"), ))) - streamsWatcher.Add(&stream) + streamWatcher.Add(&stream) imageDeleter.unblock() // the pruner shall skip the newly referenced image @@ -1748,7 +1752,7 @@ func TestChangeImageStreamsWhilePruning(t *testing.T) { testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000000", "registry1/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000000"), testutil.TagEvent("sha256:0000000000000000000000000000000000000000000000000000000000000004", "registry1/foo/bar@sha256:0000000000000000000000000000000000000000000000000000000000000004"), ))) - streamsWatcher.Modify(&stream) + streamWatcher.Modify(&stream) imageDeleter.unblock() // the pruner shall skip the newly referenced image diff --git a/pkg/oc/admin/prune/images.go b/pkg/oc/admin/prune/images.go index 472b72ae5587..ca825b496945 100644 --- a/pkg/oc/admin/prune/images.go +++ b/pkg/oc/admin/prune/images.go @@ -251,11 +251,6 @@ func (o PruneImagesOptions) Validate() error { // Run contains all the necessary functionality for the OpenShift cli prune images command. func (o PruneImagesOptions) Run() error { - allImages, err := o.ImageClient.Images().List(metav1.ListOptions{}) - if err != nil { - return err - } - allPods, err := o.KubeClient.Core().Pods(o.Namespace).List(metav1.ListOptions{}) if err != nil { return err @@ -327,6 +322,17 @@ func (o PruneImagesOptions) Run() error { limitRangesMap[limit.Namespace] = limits } + allImages, err := o.ImageClient.Images().List(metav1.ListOptions{}) + if err != nil { + return err + } + imageWatcher, err := o.Imageclient.Images().Watch(metav1.ListOptions{}) + if err != nil { + utilruntime.HandleError(fmt.Errorf("internal error: failed to watch for images: %v"+ + "\n - image changes will not be detected", err)) + imageWatcher = watch.NewFake() + } + imageStreamWatcher, err := o.ImageClient.ImageStreams(o.Namespace).Watch(metav1.ListOptions{}) if err != nil { utilruntime.HandleError(fmt.Errorf("internal error: failed to watch for image streams: %v"+ @@ -393,8 +399,9 @@ func (o PruneImagesOptions) Run() error { PruneOverSizeLimit: o.PruneOverSizeLimit, AllImages: o.AllImages, Images: allImages, + ImageWatcher: imageWatcher, Streams: allStreams, - StreamsWatcher: imageStreamWatcher, + StreamWatcher: imageStreamWatcher, Pods: allPods, RCs: allRCs, BCs: allBCs,