Skip to content

Commit

Permalink
WIP - Build a mirroring plan and execute it
Browse files Browse the repository at this point in the history
  • Loading branch information
smarterclayton committed Mar 19, 2018
1 parent f53168c commit b91c635
Show file tree
Hide file tree
Showing 3 changed files with 626 additions and 120 deletions.
11 changes: 11 additions & 0 deletions pkg/image/registryclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ type Context struct {
redirect map[url.URL]*url.URL
}

func (c *Context) Copy() *Context {
copied := *c
copied.authFn = nil
copied.pings = make(map[url.URL]error)
copied.redirect = make(map[url.URL]*url.URL)
for k, v := range c.redirect {
copied.redirect[k] = v
}
return &copied
}

func (c *Context) WithScopes(scopes ...auth.Scope) *Context {
c.authFn = nil
c.Scopes = scopes
Expand Down
185 changes: 65 additions & 120 deletions pkg/oc/cli/cmd/image/mirror/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"regexp"
"strings"
"sync"
"time"

"github.com/docker/distribution"
"github.com/docker/distribution/manifest/manifestlist"
Expand All @@ -26,7 +27,6 @@ import (
"github.com/spf13/cobra"
"k8s.io/client-go/rest"

kerrors "k8s.io/apimachinery/pkg/util/errors"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
kcmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
Expand Down Expand Up @@ -297,95 +297,6 @@ func (o *pushOptions) Complete(args []string) error {
return nil
}

type key struct {
registry string
repository string
}

type destination struct {
t DestinationType
ref imageapi.DockerImageReference
tags []string
}

type pushTargets map[key]destination

type destinations struct {
ref imageapi.DockerImageReference
tags map[string]pushTargets
digests map[string]pushTargets
}

func (d destinations) mergeIntoDigests(srcDigest godigest.Digest, target pushTargets) {
srcKey := srcDigest.String()
current, ok := d.digests[srcKey]
if !ok {
d.digests[srcKey] = target
return
}
for repo, dst := range target {
existing, ok := current[repo]
if !ok {
current[repo] = dst
continue
}
existing.tags = append(existing.tags, dst.tags...)
}
}

type targetTree map[key]destinations

func buildTargetTree(mappings []Mapping) targetTree {
tree := make(targetTree)
for _, m := range mappings {
srcKey := key{registry: m.Source.Registry, repository: m.Source.RepositoryName()}
dstKey := key{registry: m.Destination.Registry, repository: m.Destination.RepositoryName()}

src, ok := tree[srcKey]
if !ok {
src.ref = m.Source.AsRepository()
src.digests = make(map[string]pushTargets)
src.tags = make(map[string]pushTargets)
tree[srcKey] = src
}

var current pushTargets
if tag := m.Source.Tag; len(tag) != 0 {
current = src.tags[tag]
if current == nil {
current = make(pushTargets)
src.tags[tag] = current
}
} else {
current = src.digests[m.Source.ID]
if current == nil {
current = make(pushTargets)
src.digests[m.Source.ID] = current
}
}

dst, ok := current[dstKey]
if !ok {
dst.ref = m.Destination.AsRepository()
dst.t = m.Type
}
if len(m.Destination.Tag) > 0 {
dst.tags = append(dst.tags, m.Destination.Tag)
}
current[dstKey] = dst
}
return tree
}

type retrieverError struct {
src, dst imageapi.DockerImageReference
err error
}

func (e retrieverError) Error() string {
return e.err.Error()
}

func (o *pushOptions) Repository(ctx apirequest.Context, context *registryclient.Context, creds auth.CredentialStore, t DestinationType, ref imageapi.DockerImageReference) (distribution.Repository, error) {
switch t {
case DestinationRegistry:
Expand Down Expand Up @@ -414,48 +325,64 @@ func (o *pushOptions) includeDescriptor(d *manifestlist.ManifestDescriptor) bool
return o.OSFilter.MatchString(fmt.Sprintf("%s/%s", d.Platform.OS, d.Platform.Architecture))
}

// ErrAlreadyExists may be returned by the blob Create function to indicate that the blob already exists.
var ErrAlreadyExists = fmt.Errorf("blob already exists in the target location")

func (o *pushOptions) Run() error {
start := time.Now()
p, err := o.plan()
if err != nil {
return err
}
fmt.Fprintf(o.ErrOut, "info: Planning complete in %s\n", time.Now().Sub(start).Round(time.Millisecond))
p.Print(o.Out)

work := Greedy(p)
for i, phase := range work.phases {
fmt.Fprintf(o.ErrOut, "phase %d:\n", i)
for _, unit := range phase.independent {
fmt.Fprintf(o.ErrOut, " %s %s\n", unit.registry.name, unit.repository.name)
fmt.Fprintf(o.ErrOut, " blobs=%d mounts=%d manifests=%d\n", unit.repository.stats.sharedCount+unit.repository.stats.uniqueCount, unit.stats.mountOpportunities, unit.repository.manifests.stats.count)
}
}

return nil
}

func (o *pushOptions) plan() (*plan, error) {
tree := buildTargetTree(o.Mappings)

creds := dockercredentials.NewLocal()
ctx := apirequest.NewContext()

rt, err := rest.TransportFor(&rest.Config{})
if err != nil {
return err
return nil, err
}
insecureRT, err := rest.TransportFor(&rest.Config{TLSClientConfig: rest.TLSClientConfig{Insecure: true}})
if err != nil {
return err
return nil, err
}
srcClient := registryclient.NewContext(rt, insecureRT).WithCredentials(creds)
fromContext := registryclient.NewContext(rt, insecureRT).WithCredentials(creds)
toContext := registryclient.NewContext(rt, insecureRT).WithActions("pull", "push")

var errs []error
plan := &plan{}

for _, src := range tree {
srcRepo, err := srcClient.Repository(ctx, src.ref.DockerClientDefaults().RegistryURL(), src.ref.RepositoryName(), o.Insecure)
srcRepo, err := fromContext.Repository(ctx, src.ref.DockerClientDefaults().RegistryURL(), src.ref.RepositoryName(), o.Insecure)
if err != nil {
errs = append(errs, retrieverError{err: fmt.Errorf("unable to connect to %s: %v", src.ref, err), src: src.ref})
plan.AddError(retrieverError{err: fmt.Errorf("unable to connect to %s: %v", src.ref, err), src: src.ref})
continue
}

manifests, err := srcRepo.Manifests(ctx)
if err != nil {
errs = append(errs, retrieverError{src: src.ref, err: fmt.Errorf("unable to access source image %s manifests: %v", src.ref, err)})
plan.AddError(retrieverError{src: src.ref, err: fmt.Errorf("unable to access source image %s manifests: %v", src.ref, err)})
continue
}

var tagErrs []retrieverError
var digestErrs []retrieverError

// convert source tags to digests
for srcTag, pushTargets := range src.tags {
desc, err := srcRepo.Tags(ctx).Get(ctx, srcTag)
if err != nil {
tagErrs = append(tagErrs, retrieverError{src: src.ref, err: fmt.Errorf("unable to retrieve source image %s by tag: %v", src.ref, err)})
plan.AddError(retrieverError{src: src.ref, err: fmt.Errorf("unable to retrieve source image %s by tag: %v", src.ref, err)})
continue
}
srcDigest := desc.Digest
Expand All @@ -470,14 +397,14 @@ func (o *pushOptions) Run() error {
srcDigest := godigest.Digest(srcDigestString)
srcManifest, err := manifests.Get(ctx, godigest.Digest(srcDigest), schema2ManifestOnly)
if err != nil {
digestErrs = append(digestErrs, retrieverError{src: src.ref, err: fmt.Errorf("unable to retrieve source image %s manifest: %v", src.ref, err)})
plan.AddError(retrieverError{src: src.ref, err: fmt.Errorf("unable to retrieve source image %s manifest: %v", src.ref, err)})
continue
}

// filter or load manifest list as appropriate
srcManifests, srcManifest, srcDigest, err := processManifestList(ctx, srcDigest, srcManifest, manifests, src.ref, o.includeDescriptor)
if err != nil {
digestErrs = append(digestErrs, retrieverError{src: src.ref, err: err})
plan.AddError(retrieverError{src: src.ref, err: err})
continue
}
if len(srcManifests) == 0 {
Expand All @@ -488,19 +415,23 @@ func (o *pushOptions) Run() error {
for _, dst := range pushTargets {
// if we are going to be using cross repository mount, get a token that covers the src
if src.ref.Registry == dst.ref.Registry {
toContext = toContext.WithScopes(auth.RepositoryScope{Repository: src.ref.RepositoryName(), Actions: []string{"pull"}})
toContext = toContext.Copy().WithScopes(auth.RepositoryScope{Repository: src.ref.RepositoryName(), Actions: []string{"pull"}})
}

toRepo, err := o.Repository(ctx, toContext, creds, dst.t, dst.ref)
if err != nil {
digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to connect to %s: %v", dst.ref, err)})
plan.AddError(retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to connect to %s: %v", dst.ref, err)})
continue
}

canonicalTo := toRepo.Named()

repoPlan := plan.RegistryPlan(dst.ref.Registry).RepositoryPlan(canonicalTo.String())
blobPlan := repoPlan.Blobs(src.ref.AsRepository().String())

toManifests, err := toRepo.Manifests(ctx)
if err != nil {
digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to access destination image %s manifests: %v", src.ref, err)})
repoPlan.AddError(retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to access destination image %s manifests: %v", src.ref, err)})
continue
}

Expand All @@ -513,29 +444,43 @@ func (o *pushOptions) Run() error {
default:
if _, err := toManifests.Get(ctx, srcDigest); err != nil {
mustCopyLayers = true
blobPlan.AlreadyExists(distribution.Descriptor{Digest: srcDigest})
} else {
glog.V(4).Infof("Manifest exists in %s, no need to copy layers without --force", dst.ref)
}
}

if mustCopyLayers {
if errs := uploadBlobs(ctx, dst, srcRepo, toRepo, srcManifests, src.ref, srcDigest, canonicalFrom, o.Force, o.SkipMount, o.ErrOut); len(errs) > 0 {
digestErrs = append(digestErrs, errs...)
continue
// upload all the blobs
toBlobs := toRepo.Blobs(ctx)
srcBlobs := srcRepo.Blobs(ctx)

// upload the each manifest
for _, srcManifest := range srcManifests {
switch srcManifest.(type) {
case *schema2.DeserializedManifest:
case *manifestlist.DeserializedManifestList:
// we do not need to upload layers in a manifestlist
continue
default:
repoPlan.AddError(retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("the manifest type %T is not supported", srcManifest)})
continue
}
for _, blob := range srcManifest.References() {
// glog.V(5).Infof("copying blob %s to %s", blob.Digest, dst.ref)
blobPlan.Copy(blob, srcBlobs, toBlobs)
}
}
}

if errs := uploadAndTagManifests(ctx, dst, srcManifest, src.ref, toManifests, o.Out, toRepo.Blobs(ctx), canonicalTo); len(errs) > 0 {
digestErrs = append(digestErrs, errs...)
continue
}
repoPlan.Manifests().Copy(srcDigest, srcManifest, dst.tags, toManifests)
}
}
for _, err := range append(tagErrs, digestErrs...) {
errs = append(errs, err)
}
}
return kerrors.NewAggregate(errs)

plan.calculateStats()

return plan, nil
}

func processManifestList(ctx apirequest.Context, srcDigest godigest.Digest, srcManifest distribution.Manifest, manifests distribution.ManifestService, ref imageapi.DockerImageReference, filterFn func(*manifestlist.ManifestDescriptor) bool) ([]distribution.Manifest, distribution.Manifest, godigest.Digest, error) {
Expand Down
Loading

0 comments on commit b91c635

Please sign in to comment.