Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.6][Backport] Prune orphaned blobs on registry storage #16126

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 94 additions & 3 deletions pkg/cmd/dockerregistry/dockerregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ package dockerregistry
import (
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"strings"
"time"

log "github.com/Sirupsen/logrus"
"github.com/docker/go-units"
gorillahandlers "github.com/gorilla/handlers"

"github.com/Sirupsen/logrus/formatters/logstash"
Expand All @@ -19,8 +22,10 @@ import (
"github.com/docker/distribution/health"
"github.com/docker/distribution/registry/auth"
"github.com/docker/distribution/registry/handlers"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/driver/factory"
"github.com/docker/distribution/uuid"
"github.com/docker/distribution/version"
distversion "github.com/docker/distribution/version"

_ "github.com/docker/distribution/registry/auth/htpasswd"
_ "github.com/docker/distribution/registry/auth/token"
Expand All @@ -35,18 +40,104 @@ import (
_ "github.com/docker/distribution/registry/storage/driver/s3-aws"
_ "github.com/docker/distribution/registry/storage/driver/swift"

"strings"
kubeversion "k8s.io/kubernetes/pkg/version"

"github.com/openshift/origin/pkg/cmd/server/crypto"
"github.com/openshift/origin/pkg/cmd/util/clientcmd"
"github.com/openshift/origin/pkg/dockerregistry/server"
"github.com/openshift/origin/pkg/dockerregistry/server/api"
"github.com/openshift/origin/pkg/dockerregistry/server/audit"
registryconfig "github.com/openshift/origin/pkg/dockerregistry/server/configuration"
"github.com/openshift/origin/pkg/dockerregistry/server/prune"
"github.com/openshift/origin/pkg/version"
)

var pruneMode = flag.String("prune", "", "prune blobs from the storage and exit (check, delete)")

func versionFields() log.Fields {
return log.Fields{
"distribution_version": distversion.Version,
"kubernetes_version": kubeversion.Get(),
"openshift_version": version.Get(),
}
}

// ExecutePruner runs the pruner.
func ExecutePruner(configFile io.Reader, dryRun bool) {
config, _, err := registryconfig.Parse(configFile)
if err != nil {
log.Fatalf("error parsing configuration file: %s", err)
}

// A lot of installations have the 'debug' log level in their config files,
// but it's too verbose for pruning. Therefore we ignore it, but we still
// respect overrides using environment variables.
config.Loglevel = ""
config.Log.Level = configuration.Loglevel(os.Getenv("REGISTRY_LOG_LEVEL"))
if len(config.Log.Level) == 0 {
config.Log.Level = "warning"
}

ctx := context.Background()
ctx, err = configureLogging(ctx, config)
if err != nil {
log.Fatalf("error configuring logging: %s", err)
}

startPrune := "start prune"
var registryOptions []storage.RegistryOption
if dryRun {
startPrune += " (dry-run mode)"
} else {
registryOptions = append(registryOptions, storage.EnableDelete)
}
log.WithFields(versionFields()).Info(startPrune)

registryClient := server.NewRegistryClient(clientcmd.NewConfig().BindToFile())

storageDriver, err := factory.Create(config.Storage.Type(), config.Storage.Parameters())
if err != nil {
log.Fatalf("error creating storage driver: %s", err)
}

registry, err := storage.NewRegistry(ctx, storageDriver, registryOptions...)
if err != nil {
log.Fatalf("error creating registry: %s", err)
}

stats, err := prune.Prune(ctx, storageDriver, registry, registryClient, dryRun)
if err != nil {
log.Error(err)
}
if dryRun {
fmt.Printf("Would delete %d blobs\n", stats.Blobs)
fmt.Printf("Would free up %s of disk space\n", units.BytesSize(float64(stats.DiskSpace)))
fmt.Println("Use -prune=delete to actually delete the data")
} else {
fmt.Printf("Deleted %d blobs\n", stats.Blobs)
fmt.Printf("Freed up %s of disk space\n", units.BytesSize(float64(stats.DiskSpace)))
}
if err != nil {
os.Exit(1)
}
}

// Execute runs the Docker registry.
func Execute(configFile io.Reader) {
if len(*pruneMode) != 0 {
var dryRun bool
switch *pruneMode {
case "delete":
dryRun = false
case "check":
dryRun = true
default:
log.Fatal("invalid value for the -prune option")
}
ExecutePruner(configFile, dryRun)
return
}

dockerConfig, extraConfig, err := registryconfig.Parse(configFile)
if err != nil {
log.Fatalf("error parsing configuration file: %s", err)
Expand All @@ -64,7 +155,7 @@ func Execute(configFile io.Reader) {
registryClient := server.NewRegistryClient(clientcmd.NewConfig().BindToFile())
ctx = server.WithRegistryClient(ctx, registryClient)

log.Infof("version=%s", version.Version)
log.WithFields(versionFields()).Info("start registry")
// inject a logger into the uuid library. warns us if there is a problem
// with uuid generation under low entropy.
uuid.Loggerf = context.GetLogger(ctx).Warnf
Expand Down
3 changes: 2 additions & 1 deletion pkg/dockerregistry/server/errorblobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func (f statCrossMountCreateOptions) Apply(v interface{}) error {
if err != nil {
context.GetLogger(f.ctx).Infof("cannot mount blob %s from repository %s: %v - disabling cross-repo mount",
opts.Mount.From.Digest().String(),
opts.Mount.From.Name())
opts.Mount.From.Name(),
err)
opts.Mount.ShouldMount = false
return nil
}
Expand Down
200 changes: 200 additions & 0 deletions pkg/dockerregistry/server/prune/prune.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package prune

import (
"fmt"

"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest/schema2"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/storage"
"github.com/docker/distribution/registry/storage/driver"

kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/openshift/origin/pkg/dockerregistry/server"
imageapi "github.com/openshift/origin/pkg/image/apis/image"
)

func imageStreamHasManifestDigest(is *imageapi.ImageStream, dgst digest.Digest) bool {
for _, tagEventList := range is.Status.Tags {
for _, tagEvent := range tagEventList.Items {
if tagEvent.Image == string(dgst) {
return true
}
}
}
return false
}

// Summary is cumulative information about what was pruned.
type Summary struct {
Blobs int
DiskSpace int64
}

// Prune removes blobs which are not used by Images in OpenShift.
//
// On error, the Summary will contain what was deleted so far.
//
// TODO(dmage): remove layer links to a blob if the blob is removed or it doesn't belong to the ImageStream.
// TODO(dmage): keep young blobs (docker/distribution#2297).
func Prune(ctx context.Context, storageDriver driver.StorageDriver, registry distribution.Namespace, registryClient server.RegistryClient, dryRun bool) (Summary, error) {
logger := context.GetLogger(ctx)

repositoryEnumerator, ok := registry.(distribution.RepositoryEnumerator)
if !ok {
return Summary{}, fmt.Errorf("unable to convert Namespace to RepositoryEnumerator")
}

oc, _, err := registryClient.Clients()
if err != nil {
return Summary{}, fmt.Errorf("error getting clients: %v", err)
}

imageList, err := oc.Images().List(metav1.ListOptions{})
if err != nil {
return Summary{}, fmt.Errorf("error listing images: %v", err)
}

inuse := make(map[string]string)
for _, image := range imageList.Items {
// Keep the manifest.
inuse[image.Name] = image.DockerImageReference

// Keep the config for a schema 2 manifest.
if image.DockerImageManifestMediaType == schema2.MediaTypeManifest {
inuse[image.DockerImageMetadata.ID] = image.DockerImageReference
}

// Keep image layers.
for _, layer := range image.DockerImageLayers {
inuse[layer.Name] = image.DockerImageReference
}
}

var stats Summary

var reposToDelete []string
err = repositoryEnumerator.Enumerate(ctx, func(repoName string) error {
logger.Debugln("Processing repository", repoName)

named, err := reference.WithName(repoName)
if err != nil {
return fmt.Errorf("failed to parse the repo name %s: %v", repoName, err)
}

ref, err := imageapi.ParseDockerImageReference(repoName)
if err != nil {
return fmt.Errorf("failed to parse the image reference %s: %v", repoName, err)
}

is, err := oc.ImageStreams(ref.Namespace).Get(ref.Name, metav1.GetOptions{})
if kerrors.IsNotFound(err) {
logger.Printf("The image stream %s/%s is not found, will remove the whole repository", ref.Namespace, ref.Name)

// We cannot delete the repository at this point, because it would break Enumerate.
reposToDelete = append(reposToDelete, repoName)

return nil
} else if err != nil {
return fmt.Errorf("failed to get the image stream %s: %v", repoName, err)
}

repository, err := registry.Repository(ctx, named)
if err != nil {
return err
}

manifestService, err := repository.Manifests(ctx)
if err != nil {
return err
}

manifestEnumerator, ok := manifestService.(distribution.ManifestEnumerator)
if !ok {
return fmt.Errorf("unable to convert ManifestService into ManifestEnumerator")
}

err = manifestEnumerator.Enumerate(ctx, func(dgst digest.Digest) error {
if _, ok := inuse[string(dgst)]; ok && imageStreamHasManifestDigest(is, dgst) {
logger.Debugf("Keeping the manifest link %s@%s", repoName, dgst)
return nil
}

if dryRun {
logger.Printf("Would delete manifest link: %s@%s", repoName, dgst)
return nil
}

logger.Printf("Deleting manifest link: %s@%s", repoName, dgst)
if err := manifestService.Delete(ctx, dgst); err != nil {
return fmt.Errorf("failed to delete the manifest link %s@%s: %v", repoName, dgst, err)
}

return nil
})
if e, ok := err.(driver.PathNotFoundError); ok {
logger.Printf("Skipped manifest link pruning for the repository %s: %v", repoName, e)
} else if err != nil {
return fmt.Errorf("failed to prune manifest links in the repository %s: %v", repoName, err)
}

return nil
})
if e, ok := err.(driver.PathNotFoundError); ok {
logger.Warnf("No repositories found: %v", e)
return stats, nil
} else if err != nil {
return stats, err
}

vacuum := storage.NewVacuum(ctx, storageDriver)

logger.Debugln("Removing repositories")
for _, repoName := range reposToDelete {
if dryRun {
logger.Printf("Would delete repository: %s", repoName)
continue
}

if err = vacuum.RemoveRepository(repoName); err != nil {
return stats, fmt.Errorf("unable to remove the repository %s: %v", repoName, err)
}
}

logger.Debugln("Processing blobs")
blobStatter := registry.BlobStatter()
err = registry.Blobs().Enumerate(ctx, func(dgst digest.Digest) error {
if imageReference, ok := inuse[string(dgst)]; ok {
logger.Debugf("Keeping the blob %s (it belongs to the image %s)", dgst, imageReference)
return nil
}

desc, err := blobStatter.Stat(ctx, dgst)
if err != nil {
return err
}

stats.Blobs++
stats.DiskSpace += desc.Size

if dryRun {
logger.Printf("Would delete blob: %s", dgst)
return nil
}

if err := vacuum.RemoveBlob(string(dgst)); err != nil {
return fmt.Errorf("failed to delete the blob %s: %v", dgst, err)
}

return nil
})
if e, ok := err.(driver.PathNotFoundError); ok {
logger.Warnf("No repositories found: %v", e)
return stats, nil
}
return stats, err
}
12 changes: 8 additions & 4 deletions test/extended/imageapis/limitrange_admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,14 @@ var _ = g.Describe("[Feature:ImageQuota] Image limit range", func() {

g.It(fmt.Sprintf("should deny an import of a repository exceeding limit on %s resource", imageapi.ResourceImageStreamTags), func() {
oc.SetOutputDir(exutil.TestContext.OutputDir)
defer tearDown(oc)

maxBulkImport, err := getMaxImagesBulkImportedPerRepository()
o.Expect(err).NotTo(o.HaveOccurred())
if err != nil {
g.Skip(err.Error())
return
}

defer tearDown(oc)

s1tag2Image, err := buildAndPushTestImagesTo(oc, "src1st", "tag", maxBulkImport+1)
s2tag2Image, err := buildAndPushTestImagesTo(oc, "src2nd", "t", 2)
Expand Down Expand Up @@ -235,7 +239,7 @@ func buildAndPushTestImagesTo(oc *exutil.CLI, isName string, tagPrefix string, n

for i := 1; i <= numberOfImages; i++ {
tag := fmt.Sprintf("%s%d", tagPrefix, i)
dgst, err := imagesutil.BuildAndPushImageOfSizeWithDocker(oc, dClient, isName, tag, imageSize, 2, g.GinkgoWriter, true)
dgst, _, err := imagesutil.BuildAndPushImageOfSizeWithDocker(oc, dClient, isName, tag, imageSize, 2, g.GinkgoWriter, true, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -309,7 +313,7 @@ func bumpLimit(oc *exutil.CLI, resourceName kapi.ResourceName, limit string) (ka
func getMaxImagesBulkImportedPerRepository() (int, error) {
max := os.Getenv("MAX_IMAGES_BULK_IMPORTED_PER_REPOSITORY")
if len(max) == 0 {
return 0, fmt.Errorf("MAX_IMAGES_BULK_IMAGES_IMPORTED_PER_REPOSITORY needs to be set")
return 0, fmt.Errorf("MAX_IMAGES_BULK_IMAGES_IMPORTED_PER_REPOSITORY is not set")
}
return strconv.Atoi(max)
}
Loading