Skip to content

Commit

Permalink
WIP - Build a mirroring plan and execute it
Browse files Browse the repository at this point in the history
  • Loading branch information
smarterclayton committed Mar 19, 2018
1 parent f53168c commit 90c61df
Show file tree
Hide file tree
Showing 2 changed files with 367 additions and 104 deletions.
182 changes: 78 additions & 104 deletions pkg/oc/cli/cmd/image/mirror/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/spf13/cobra"
"k8s.io/client-go/rest"

kerrors "k8s.io/apimachinery/pkg/util/errors"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
kcmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
Expand Down Expand Up @@ -297,86 +296,6 @@ func (o *pushOptions) Complete(args []string) error {
return nil
}

type key struct {
registry string
repository string
}

type destination struct {
t DestinationType
ref imageapi.DockerImageReference
tags []string
}

type pushTargets map[key]destination

type destinations struct {
ref imageapi.DockerImageReference
tags map[string]pushTargets
digests map[string]pushTargets
}

func (d destinations) mergeIntoDigests(srcDigest godigest.Digest, target pushTargets) {
srcKey := srcDigest.String()
current, ok := d.digests[srcKey]
if !ok {
d.digests[srcKey] = target
return
}
for repo, dst := range target {
existing, ok := current[repo]
if !ok {
current[repo] = dst
continue
}
existing.tags = append(existing.tags, dst.tags...)
}
}

type targetTree map[key]destinations

func buildTargetTree(mappings []Mapping) targetTree {
tree := make(targetTree)
for _, m := range mappings {
srcKey := key{registry: m.Source.Registry, repository: m.Source.RepositoryName()}
dstKey := key{registry: m.Destination.Registry, repository: m.Destination.RepositoryName()}

src, ok := tree[srcKey]
if !ok {
src.ref = m.Source.AsRepository()
src.digests = make(map[string]pushTargets)
src.tags = make(map[string]pushTargets)
tree[srcKey] = src
}

var current pushTargets
if tag := m.Source.Tag; len(tag) != 0 {
current = src.tags[tag]
if current == nil {
current = make(pushTargets)
src.tags[tag] = current
}
} else {
current = src.digests[m.Source.ID]
if current == nil {
current = make(pushTargets)
src.digests[m.Source.ID] = current
}
}

dst, ok := current[dstKey]
if !ok {
dst.ref = m.Destination.AsRepository()
dst.t = m.Type
}
if len(m.Destination.Tag) > 0 {
dst.tags = append(dst.tags, m.Destination.Tag)
}
current[dstKey] = dst
}
return tree
}

type retrieverError struct {
src, dst imageapi.DockerImageReference
err error
Expand Down Expand Up @@ -418,44 +337,52 @@ func (o *pushOptions) includeDescriptor(d *manifestlist.ManifestDescriptor) bool
var ErrAlreadyExists = fmt.Errorf("blob already exists in the target location")

func (o *pushOptions) Run() error {
p, err := o.plan()
if err != nil {
return err
}
p.Print(o.Out)

return nil
}

func (o *pushOptions) plan() (*plan, error) {
tree := buildTargetTree(o.Mappings)

creds := dockercredentials.NewLocal()
ctx := apirequest.NewContext()

rt, err := rest.TransportFor(&rest.Config{})
if err != nil {
return err
return nil, err
}
insecureRT, err := rest.TransportFor(&rest.Config{TLSClientConfig: rest.TLSClientConfig{Insecure: true}})
if err != nil {
return err
return nil, err
}
srcClient := registryclient.NewContext(rt, insecureRT).WithCredentials(creds)
toContext := registryclient.NewContext(rt, insecureRT).WithActions("pull", "push")

var errs []error
plan := &plan{}

for _, src := range tree {
srcRepo, err := srcClient.Repository(ctx, src.ref.DockerClientDefaults().RegistryURL(), src.ref.RepositoryName(), o.Insecure)
if err != nil {
errs = append(errs, retrieverError{err: fmt.Errorf("unable to connect to %s: %v", src.ref, err), src: src.ref})
plan.AddError(retrieverError{err: fmt.Errorf("unable to connect to %s: %v", src.ref, err), src: src.ref})
continue
}

manifests, err := srcRepo.Manifests(ctx)
if err != nil {
errs = append(errs, retrieverError{src: src.ref, err: fmt.Errorf("unable to access source image %s manifests: %v", src.ref, err)})
plan.AddError(retrieverError{src: src.ref, err: fmt.Errorf("unable to access source image %s manifests: %v", src.ref, err)})
continue
}

var tagErrs []retrieverError
var digestErrs []retrieverError

// convert source tags to digests
for srcTag, pushTargets := range src.tags {
desc, err := srcRepo.Tags(ctx).Get(ctx, srcTag)
if err != nil {
tagErrs = append(tagErrs, retrieverError{src: src.ref, err: fmt.Errorf("unable to retrieve source image %s by tag: %v", src.ref, err)})
plan.AddError(retrieverError{src: src.ref, err: fmt.Errorf("unable to retrieve source image %s by tag: %v", src.ref, err)})
continue
}
srcDigest := desc.Digest
Expand All @@ -470,14 +397,14 @@ func (o *pushOptions) Run() error {
srcDigest := godigest.Digest(srcDigestString)
srcManifest, err := manifests.Get(ctx, godigest.Digest(srcDigest), schema2ManifestOnly)
if err != nil {
digestErrs = append(digestErrs, retrieverError{src: src.ref, err: fmt.Errorf("unable to retrieve source image %s manifest: %v", src.ref, err)})
plan.AddError(retrieverError{src: src.ref, err: fmt.Errorf("unable to retrieve source image %s manifest: %v", src.ref, err)})
continue
}

// filter or load manifest list as appropriate
srcManifests, srcManifest, srcDigest, err := processManifestList(ctx, srcDigest, srcManifest, manifests, src.ref, o.includeDescriptor)
if err != nil {
digestErrs = append(digestErrs, retrieverError{src: src.ref, err: err})
plan.AddError(retrieverError{src: src.ref, err: err})
continue
}
if len(srcManifests) == 0 {
Expand All @@ -493,14 +420,18 @@ func (o *pushOptions) Run() error {

toRepo, err := o.Repository(ctx, toContext, creds, dst.t, dst.ref)
if err != nil {
digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to connect to %s: %v", dst.ref, err)})
plan.AddError(retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to connect to %s: %v", dst.ref, err)})
continue
}

canonicalTo := toRepo.Named()

repoPlan := plan.RegistryPlan(dst.ref.Registry).RepositoryPlan(canonicalTo.String())
blobPlan := repoPlan.Blobs(src.ref.String())

toManifests, err := toRepo.Manifests(ctx)
if err != nil {
digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to access destination image %s manifests: %v", src.ref, err)})
repoPlan.AddError(retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to access destination image %s manifests: %v", src.ref, err)})
continue
}

Expand All @@ -513,29 +444,24 @@ func (o *pushOptions) Run() error {
default:
if _, err := toManifests.Get(ctx, srcDigest); err != nil {
mustCopyLayers = true
blobPlan.AlreadyExists(distribution.Descriptor{Digest: srcDigest})
} else {
glog.V(4).Infof("Manifest exists in %s, no need to copy layers without --force", dst.ref)
}
}

if mustCopyLayers {
if errs := uploadBlobs(ctx, dst, srcRepo, toRepo, srcManifests, src.ref, srcDigest, canonicalFrom, o.Force, o.SkipMount, o.ErrOut); len(errs) > 0 {
digestErrs = append(digestErrs, errs...)
if errs := planBlobs(ctx, blobPlan, dst, srcRepo, toRepo, srcManifests, src.ref, o.Force); len(errs) > 0 {
repoPlan.AddError(errs...)
continue
}
}

if errs := uploadAndTagManifests(ctx, dst, srcManifest, src.ref, toManifests, o.Out, toRepo.Blobs(ctx), canonicalTo); len(errs) > 0 {
digestErrs = append(digestErrs, errs...)
continue
}
repoPlan.Manifests().Copy(srcDigest, srcManifest, dst.tags, toManifests)
}
}
for _, err := range append(tagErrs, digestErrs...) {
errs = append(errs, err)
}
}
return kerrors.NewAggregate(errs)
return plan, nil
}

func processManifestList(ctx apirequest.Context, srcDigest godigest.Digest, srcManifest distribution.Manifest, manifests distribution.ManifestService, ref imageapi.DockerImageReference, filterFn func(*manifestlist.ManifestDescriptor) bool) ([]distribution.Manifest, distribution.Manifest, godigest.Digest, error) {
Expand Down Expand Up @@ -601,6 +527,54 @@ func processManifestList(ctx apirequest.Context, srcDigest godigest.Digest, srcM
}
}

func planBlobs(
ctx apirequest.Context,
plan *repositoryBlobCopy,
dst destination,
srcRepo, toRepo distribution.Repository,
srcManifests []distribution.Manifest,
srcRef imageapi.DockerImageReference,
force bool,
) []error {

// upload all the blobs
toBlobs := toRepo.Blobs(ctx)
srcBlobs := srcRepo.Blobs(ctx)

var errs []error

// upload the each manifest
for _, srcManifest := range srcManifests {
switch srcManifest.(type) {
case *schema2.DeserializedManifest:
case *manifestlist.DeserializedManifestList:
// we do not need to upload layers in a manifestlist
continue
default:
errs = append(errs, retrieverError{src: srcRef, dst: dst.ref, err: fmt.Errorf("the manifest type %T is not supported", srcManifest)})
continue
}

for _, blob := range srcManifest.References() {
// if we aren't forcing upload, skip the blob copy
if !force {
_, err := toBlobs.Stat(ctx, blob.Digest)
if err == nil {
// blob exists, skip
plan.AlreadyExists(blob)
glog.V(5).Infof("Server reports blob exists %#v", blob)
continue
}
if err != distribution.ErrBlobUnknown {
glog.V(5).Infof("Server was unable to check whether blob exists %s: %v", blob.Digest, err)
}
}
plan.Copy(blob, srcBlobs, toBlobs)
}
}
return errs
}

func uploadBlobs(
ctx apirequest.Context,
dst destination,
Expand Down
Loading

0 comments on commit 90c61df

Please sign in to comment.