diff --git a/pkg/cmd/server/origin/legacy.go b/pkg/cmd/server/origin/legacy.go index 8372c0c75964..d44ab07ca9fe 100644 --- a/pkg/cmd/server/origin/legacy.go +++ b/pkg/cmd/server/origin/legacy.go @@ -207,6 +207,8 @@ func LegacyStorage(storage map[schema.GroupVersion]map[string]rest.Storage) map[ case *imagestreametcd.REST: legacyStorage[resource] = &imagestreametcd.LegacyREST{REST: storage} + case *imagestreametcd.LayersREST: + delete(legacyStorage, resource) case *routeetcd.REST: store := *storage.Store diff --git a/pkg/image/apiserver/apiserver.go b/pkg/image/apiserver/apiserver.go index 8ec7aa38638b..f8638ab60ed6 100644 --- a/pkg/image/apiserver/apiserver.go +++ b/pkg/image/apiserver/apiserver.go @@ -53,6 +53,7 @@ type ExtraConfig struct { makeV1Storage sync.Once v1Storage map[string]rest.Storage v1StorageErr error + startFns []func(<-chan struct{}) } type ImageAPIServerConfig struct { @@ -107,6 +108,15 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) return nil, err } + if err := s.GenericAPIServer.AddPostStartHook("image.openshift.io-apiserver-caches", func(context genericapiserver.PostStartHookContext) error { + for _, fn := range c.ExtraConfig.startFns { + go fn(context.StopCh) + } + return nil + }); err != nil { + return nil, err + } + return s, nil } @@ -168,10 +178,13 @@ func (c *completedConfig) newV1RESTStorage() (map[string]rest.Storage, error) { whitelister = whitelist.WhitelistAllRegistries() } + imageLayerIndex := imagestreametcd.NewImageLayerIndex(imageStorage) + c.ExtraConfig.startFns = append(c.ExtraConfig.startFns, imageLayerIndex.Run) + imageRegistry := image.NewRegistry(imageStorage) imageSignatureStorage := imagesignature.NewREST(imageClient.Image()) imageStreamSecretsStorage := imagesecret.NewREST(coreClient) - imageStreamStorage, imageStreamStatusStorage, internalImageStreamStorage, err := imagestreametcd.NewREST(c.GenericConfig.RESTOptionsGetter, c.ExtraConfig.RegistryHostnameRetriever, authorizationClient.SubjectAccessReviews(), c.ExtraConfig.LimitVerifier, whitelister) + imageStreamStorage, imageStreamLayersStorage, imageStreamStatusStorage, internalImageStreamStorage, err := imagestreametcd.NewREST(c.GenericConfig.RESTOptionsGetter, c.ExtraConfig.RegistryHostnameRetriever, authorizationClient.SubjectAccessReviews(), c.ExtraConfig.LimitVerifier, whitelister, imageLayerIndex) if err != nil { return nil, fmt.Errorf("error building REST storage: %v", err) } @@ -206,6 +219,7 @@ func (c *completedConfig) newV1RESTStorage() (map[string]rest.Storage, error) { v1Storage["imagesignatures"] = imageSignatureStorage v1Storage["imageStreams/secrets"] = imageStreamSecretsStorage v1Storage["imageStreams"] = imageStreamStorage + v1Storage["imageStreams/layers"] = imageStreamLayersStorage v1Storage["imageStreams/status"] = imageStreamStatusStorage v1Storage["imageStreamImports"] = imageStreamImportStorage v1Storage["imageStreamImages"] = imageStreamImageStorage diff --git a/pkg/image/registry/imagestream/etcd/etcd.go b/pkg/image/registry/imagestream/etcd/etcd.go index 792a17976bbf..49b7cd62c896 100644 --- a/pkg/image/registry/imagestream/etcd/etcd.go +++ b/pkg/image/registry/imagestream/etcd/etcd.go @@ -1,6 +1,7 @@ package etcd import ( + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" apirequest "k8s.io/apiserver/pkg/endpoints/request" @@ -46,7 +47,8 @@ func NewREST( subjectAccessReviewRegistry authorizationclient.SubjectAccessReviewInterface, limitVerifier imageadmission.LimitVerifier, registryWhitelister whitelist.RegistryWhitelister, -) (*REST, *StatusREST, *InternalREST, error) { + imageLayerIndex ImageLayerIndex, +) (*REST, *LayersREST, *StatusREST, *InternalREST, error) { store := registry.Store{ NewFunc: func() runtime.Object { return &imageapi.ImageStream{} }, NewListFunc: func() runtime.Object { return &imageapi.ImageStreamList{} }, @@ -71,9 +73,11 @@ func NewREST( AttrFunc: storage.AttrFunc(storage.DefaultNamespaceScopedAttr).WithFieldMutation(imageapi.ImageStreamSelector), } if err := store.CompleteWithOptions(options); err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } + layersREST := &LayersREST{index: imageLayerIndex, store: &store} + statusStrategy := imagestream.NewStatusStrategy(strategy) statusStore := store statusStore.Decorator = nil @@ -88,7 +92,7 @@ func NewREST( internalStore.UpdateStrategy = internalStrategy internalREST := &InternalREST{store: &internalStore} - return rest, statusREST, internalREST, nil + return rest, layersREST, statusREST, internalREST, nil } // StatusREST implements the REST endpoint for changing the status of an image stream. @@ -138,6 +142,87 @@ func (r *InternalREST) Update(ctx apirequest.Context, name string, objInfo rest. return r.store.Update(ctx, name, objInfo, createValidation, updateValidation) } +// LayersREST implements the REST endpoint for changing both the spec and status of an image stream. +type LayersREST struct { + store *registry.Store + index ImageLayerIndex +} + +var _ rest.Getter = &LayersREST{} + +func (r *LayersREST) New() runtime.Object { + return &imageapi.ImageStreamLayers{} +} + +// Get returns the layers for an image stream. +func (r *LayersREST) Get(ctx apirequest.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { + if !r.index.HasSynced() { + return nil, errors.NewServerTimeout(r.store.DefaultQualifiedResource, "get", 2) + } + obj, err := r.store.Get(ctx, name, options) + if err != nil { + return nil, err + } + is := obj.(*imageapi.ImageStream) + isl := &imageapi.ImageStreamLayers{ + ObjectMeta: is.ObjectMeta, + } + + existing := make(map[string]int) + existingImage := make(map[string][]int) + for tag, status := range is.Status.Tags { + var seen map[string]struct{} + for _, item := range status.Items { + if len(item.Image) == 0 { + continue + } + if _, ok := seen[item.Image]; ok { + continue + } + if len(status.Items) > 1 { + if seen == nil { + seen = make(map[string]struct{}) + } + seen[item.Image] = struct{}{} + } + if indices, ok := existingImage[item.Image]; ok { + for _, index := range indices { + ref := &isl.Layers[index] + ref.Tags = append(ref.Tags, tag) + ref.ImageIDs = append(ref.ImageIDs, item.Image) + } + } + + obj, _, _ := r.index.GetIndexer().GetByKey(item.Image) + entry, ok := obj.(*ImageLayers) + if !ok { + continue + } + indices := make([]int, 0, len(entry.Layers)) + for _, layer := range entry.Layers { + if index, ok := existing[layer.Name]; ok { + ref := &isl.Layers[index] + ref.Tags = append(ref.Tags, tag) + ref.ImageIDs = append(ref.ImageIDs, item.Image) + indices = append(indices, index) + continue + } + index := len(isl.Layers) + existing[layer.Name] = index + indices = append(indices, index) + isl.Layers = append(isl.Layers, imageapi.ImageLayerReference{ + Name: layer.Name, + LayerSize: layer.LayerSize, + MediaType: layer.MediaType, + Tags: []string{tag}, + ImageIDs: []string{item.Image}, + }) + } + } + } + return isl, nil +} + // LegacyREST allows us to wrap and alter some behavior type LegacyREST struct { *REST diff --git a/pkg/image/registry/imagestream/etcd/image.go b/pkg/image/registry/imagestream/etcd/image.go new file mode 100644 index 000000000000..66981435f33e --- /dev/null +++ b/pkg/image/registry/imagestream/etcd/image.go @@ -0,0 +1,144 @@ +package etcd + +import ( + "fmt" + + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/client-go/tools/cache" + + imageapi "github.com/openshift/origin/pkg/image/apis/image" +) + +type ImageLayerIndex interface { + cache.SharedIndexInformer +} + +type ImageStore interface { + rest.Watcher + rest.Lister +} + +func NewImageLayerIndex(store ImageStore) ImageLayerIndex { + ctx := apirequest.NewContext() + informer := cache.NewSharedIndexInformer(&cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + obj, err := store.List(ctx, &metainternalversion.ListOptions{ + ResourceVersion: options.ResourceVersion, + }) + if err != nil { + return nil, err + } + list, ok := obj.(*imageapi.ImageList) + if !ok { + return nil, fmt.Errorf("unexpected store type %T for layer index", obj) + } + out := &metainternalversion.List{ + Items: make([]runtime.Object, len(list.Items)), + } + for i, image := range list.Items { + out.Items[i] = &ImageLayers{ + Name: image.Name, + Layers: image.DockerImageLayers, + } + } + return out, nil + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + w, err := store.Watch(ctx, &metainternalversion.ListOptions{ + ResourceVersion: options.ResourceVersion, + }) + if err != nil { + return nil, err + } + return watch.Filter(w, func(in watch.Event) (out watch.Event, keep bool) { + if in.Object == nil { + return in, true + } + image, ok := in.Object.(*imageapi.Image) + if !ok { + return in, true + } + in.Object = &ImageLayers{ + Name: image.Name, + Layers: image.DockerImageLayers, + } + return in, true + }), nil + }, + }, &ImageLayers{}, 0, cache.Indexers{ + "layers": func(obj interface{}) ([]string, error) { + entry, ok := obj.(*ImageLayers) + if !ok { + return nil, fmt.Errorf("unexpected cache object %T", obj) + } + keys := make([]string, 0, len(entry.Layers)) + for _, layer := range entry.Layers { + keys = append(keys, layer.Name) + } + return keys, nil + }, + }) + return informer +} + +type ImageLayers struct { + Name string + Layers []imageapi.ImageLayer +} + +var _ metav1.Object = &ImageLayers{} + +func (l *ImageLayers) GetObjectKind() schema.ObjectKind { return &metav1.TypeMeta{} } +func (l *ImageLayers) DeepCopyObject() runtime.Object { + var layers []imageapi.ImageLayer + if l.Layers != nil { + layers = make([]imageapi.ImageLayer, len(l.Layers)) + copy(layers, l.Layers) + } + return &ImageLayers{ + Name: l.Name, + Layers: layers, + } +} + +// TODO: fix client-go/cache to not need this + +func (l *ImageLayers) GetNamespace() string { return "" } +func (l *ImageLayers) SetNamespace(namespace string) {} +func (l *ImageLayers) GetName() string { return l.Name } +func (l *ImageLayers) SetName(name string) {} +func (l *ImageLayers) GetGenerateName() string { return "" } +func (l *ImageLayers) SetGenerateName(name string) {} +func (l *ImageLayers) GetUID() types.UID { return "" } +func (l *ImageLayers) SetUID(uid types.UID) {} +func (l *ImageLayers) GetResourceVersion() string { return "" } +func (l *ImageLayers) SetResourceVersion(version string) {} +func (l *ImageLayers) GetGeneration() int64 { return 0 } +func (l *ImageLayers) SetGeneration(generation int64) {} +func (l *ImageLayers) GetSelfLink() string { return "" } +func (l *ImageLayers) SetSelfLink(selfLink string) {} +func (l *ImageLayers) GetCreationTimestamp() metav1.Time { return metav1.Time{} } +func (l *ImageLayers) SetCreationTimestamp(timestamp metav1.Time) {} +func (l *ImageLayers) GetDeletionTimestamp() *metav1.Time { return nil } +func (l *ImageLayers) SetDeletionTimestamp(timestamp *metav1.Time) {} +func (l *ImageLayers) GetDeletionGracePeriodSeconds() *int64 { return nil } +func (l *ImageLayers) SetDeletionGracePeriodSeconds(*int64) {} +func (l *ImageLayers) GetLabels() map[string]string { return nil } +func (l *ImageLayers) SetLabels(labels map[string]string) {} +func (l *ImageLayers) GetAnnotations() map[string]string { return nil } +func (l *ImageLayers) SetAnnotations(annotations map[string]string) {} +func (l *ImageLayers) GetInitializers() *metav1.Initializers { return nil } +func (l *ImageLayers) SetInitializers(initializers *metav1.Initializers) {} +func (l *ImageLayers) GetFinalizers() []string { return nil } +func (l *ImageLayers) SetFinalizers(finalizers []string) {} +func (l *ImageLayers) GetOwnerReferences() []metav1.OwnerReference { return nil } +func (l *ImageLayers) SetOwnerReferences([]metav1.OwnerReference) {} +func (l *ImageLayers) GetClusterName() string { return "" } +func (l *ImageLayers) SetClusterName(clusterName string) {}