diff --git a/pkg/image/apis/image/dockertypes.go b/pkg/image/apis/image/dockertypes.go index bc5e10ff6d6f..91d919859054 100644 --- a/pkg/image/apis/image/dockertypes.go +++ b/pkg/image/apis/image/dockertypes.go @@ -68,6 +68,29 @@ func Convert_compatibility_to_api_DockerImage(in *public.DockerV1CompatibilityIm return nil } +// Convert_DockerV1CompatibilityImage_to_DockerImageConfig takes a Docker registry digest +// (schema 2.1) and converts it to the external API version of Image. +func Convert_DockerV1CompatibilityImage_to_DockerImageConfig(in *public.DockerV1CompatibilityImage, out *public.DockerImageConfig) error { + *out = public.DockerImageConfig{ + ID: in.ID, + Parent: in.Parent, + Comment: in.Comment, + Created: in.Created, + Container: in.Container, + DockerVersion: in.DockerVersion, + Author: in.Author, + Architecture: in.Architecture, + Size: in.Size, + OS: "linux", + ContainerConfig: in.ContainerConfig, + } + if in.Config != nil { + out.Config = &public.DockerConfig{} + *out.Config = *in.Config + } + return nil +} + // Convert_imageconfig_to_api_DockerImage takes a Docker registry digest (schema 2.2) and converts it // to the external API version of Image. func Convert_imageconfig_to_api_DockerImage(in *public.DockerImageConfig, out *docker10.DockerImage) error { diff --git a/pkg/image/dockerlayer/add/add.go b/pkg/image/dockerlayer/add/add.go new file mode 100644 index 000000000000..359b80d24a97 --- /dev/null +++ b/pkg/image/dockerlayer/add/add.go @@ -0,0 +1,356 @@ +package add + +import ( + "compress/gzip" + "context" + "encoding/json" + "fmt" + "io" + "runtime" + "time" + + "github.com/docker/distribution" + "github.com/docker/distribution/manifest/schema2" + digest "github.com/opencontainers/go-digest" + + "github.com/openshift/origin/pkg/image/apis/image/docker10" + "github.com/openshift/origin/pkg/image/dockerlayer" +) + +// get base manifest +// check that I can access base layers +// find the input file (assume I can stream) +// start a streaming upload of the layer to the remote registry, while calculating digests +// get back the final digest +// build the new image manifest and config.json +// upload config.json +// upload the rest of the layers +// tag the image + +const ( + // dockerV2Schema2LayerMediaType is the MIME type used for schema 2 layers. + dockerV2Schema2LayerMediaType = "application/vnd.docker.image.rootfs.diff.tar.gzip" + // dockerV2Schema2ConfigMediaType is the MIME type used for schema 2 config blobs. + dockerV2Schema2ConfigMediaType = "application/vnd.docker.container.image.v1+json" +) + +// DigestCopy reads all of src into dst, where src is a gzipped stream. It will return the +// sha256 sum of the underlying content (the layerDigest) and the sha256 sum of the +// tar archive (the blobDigest) or an error. If the gzip layer has a modification time +// it will be returned. +// TODO: use configurable digests +func DigestCopy(dst io.ReaderFrom, src io.Reader) (layerDigest, blobDigest digest.Digest, modTime *time.Time, size int64, err error) { + algo := digest.Canonical + // calculate the blob digest as the sha256 sum of the uploaded contents + blobhash := algo.Hash() + // calculate the diffID as the sha256 sum of the layer contents + pr, pw := io.Pipe() + layerhash := algo.Hash() + ch := make(chan error) + go func() { + defer close(ch) + gr, err := gzip.NewReader(pr) + if err != nil { + ch <- fmt.Errorf("unable to create gzip reader layer upload: %v", err) + return + } + if !gr.Header.ModTime.IsZero() { + modTime = &gr.Header.ModTime + } + _, err = io.Copy(layerhash, gr) + ch <- err + }() + + n, err := dst.ReadFrom(io.TeeReader(src, io.MultiWriter(blobhash, pw))) + if err != nil { + return "", "", nil, 0, fmt.Errorf("unable to upload new layer (%d): %v", n, err) + } + if err := pw.Close(); err != nil { + return "", "", nil, 0, fmt.Errorf("unable to complete writing diffID: %v", err) + } + if err := <-ch; err != nil { + return "", "", nil, 0, fmt.Errorf("unable to calculate layer diffID: %v", err) + } + + layerDigest = digest.NewDigestFromBytes(algo, layerhash.Sum(make([]byte, 0, layerhash.Size()))) + blobDigest = digest.NewDigestFromBytes(algo, blobhash.Sum(make([]byte, 0, blobhash.Size()))) + return layerDigest, blobDigest, modTime, n, nil +} + +func NewEmptyConfig() *docker10.DockerImageConfig { + config := &docker10.DockerImageConfig{ + DockerVersion: "", + // Created must be non-zero + Created: (time.Time{}).Add(1 * time.Second), + OS: runtime.GOOS, + Architecture: runtime.GOARCH, + } + return config +} + +func AddScratchLayerToConfig(config *docker10.DockerImageConfig) distribution.Descriptor { + layer := distribution.Descriptor{ + MediaType: dockerV2Schema2LayerMediaType, + Digest: digest.Digest(dockerlayer.GzippedEmptyLayerDigest), + Size: int64(len(dockerlayer.GzippedEmptyLayer)), + } + AddLayerToConfig(config, layer, dockerlayer.EmptyLayerDiffID) + return layer +} + +func AddLayerToConfig(config *docker10.DockerImageConfig, layer distribution.Descriptor, diffID string) { + if config.RootFS == nil { + config.RootFS = &docker10.DockerConfigRootFS{Type: "layers"} + } + config.RootFS.DiffIDs = append(config.RootFS.DiffIDs, diffID) + config.Size += layer.Size +} + +func UploadSchema2Config(ctx context.Context, blobs distribution.BlobService, config *docker10.DockerImageConfig, layers []distribution.Descriptor) (*schema2.DeserializedManifest, error) { + // ensure the image size is correct before persisting + config.Size = 0 + for _, layer := range layers { + config.Size += layer.Size + } + configJSON, err := json.Marshal(config) + if err != nil { + return nil, err + } + return putSchema2ImageConfig(ctx, blobs, dockerV2Schema2ConfigMediaType, configJSON, layers) +} + +// putSchema2ImageConfig uploads the provided configJSON to the blob store and returns the generated manifest +// for the requested image. +func putSchema2ImageConfig(ctx context.Context, blobs distribution.BlobService, mediaType string, configJSON []byte, layers []distribution.Descriptor) (*schema2.DeserializedManifest, error) { + b := schema2.NewManifestBuilder(blobs, mediaType, configJSON) + for _, layer := range layers { + if err := b.AppendReference(layer); err != nil { + return nil, err + } + } + m, err := b.Build(ctx) + if err != nil { + return nil, err + } + manifest, ok := m.(*schema2.DeserializedManifest) + if !ok { + return nil, fmt.Errorf("unable to turn %T into a DeserializedManifest, unable to store image", m) + } + return manifest, nil +} + +/* +func (r *InstantiateREST) completeInstantiate(ctx apirequest.Context, tag string, target *imageapi.ImageStream, imageInstantiate *imageapi.ImageStreamTagInstantiate, layerBody io.Reader, mediaType string) (runtime.Object, error) { + // TODO: load this from the default registry function + insecure := true + + ref, u, err := registryTarget(target, r.defaultRegistry) + if err != nil { + return nil, err + } + + // verify the user has access to the From image, if any is specified + baseImageName, baseImageRepository, err := r.resolveTagInstantiateToImage(ctx, target, imageInstantiate) + if err != nil { + return nil, err + } + + // no layer, so we load our base image (if necessary) + var created time.Time + var baseImage *imageapi.Image + var sourceRepo distribution.Repository + if len(baseImageName) > 0 { + image, err := r.imageRegistry.GetImage(ctx, baseImageName, &metav1.GetOptions{}) + if err != nil { + return nil, err + } + baseImage = image + sourceRepo, err = r.repository.Repository(ctx, u, baseImageRepository, insecure) + if err != nil { + return nil, errors.NewInternalError(fmt.Errorf("could not contact integrated registry: %v", err)) + } + glog.V(4).Infof("Using base image for instantiate of tag %s: %s from %s", imageInstantiate.Name, baseImageName, baseImageRepository) + created = image.DockerImageMetadata.Created.Time + } + + imageRepository := imageapi.DockerImageReference{Namespace: ref.Namespace, Name: ref.Name}.Exact() + repo, err := r.repository.Repository(ctx, u, imageRepository, insecure) + if err != nil { + return nil, errors.NewInternalError(fmt.Errorf("could not contact integrated registry: %v", err)) + } + + var imageLayer *imageapi.ImageLayer + var imageLayerDiffID digest.Digest + if layerBody != nil { + desc, diffID, modTime, err := uploadLayer(ctx, layerBody, repo, mediaType) + if err != nil { + return nil, errors.NewInternalError(fmt.Errorf("unable to upload new image layer: %v", err)) + } + imageLayer = &imageapi.ImageLayer{ + Name: desc.Digest.String(), + LayerSize: desc.Size, + MediaType: mediaType, + } + imageLayerDiffID = diffID + + if modTime != nil && created.Before(*modTime) { + created = *modTime + } + } + + target, image, err := instantiateImage( + ctx, r.gr, + repo, sourceRepo, r.imageStreamRegistry, r.imageRegistry, + target, baseImage, imageInstantiate, created, + imageLayer, imageLayerDiffID, + *ref, + ) + if err != nil { + glog.V(4).Infof("Failed cloning into tag %s: %v", imageInstantiate.Name, err) + return nil, err + } + + return newISTag(tag, target, image, false) +} + + +// instantiateImage assembles the new image, saves it to the registry, then saves an image and tags the +// image stream. +func instantiateImage( + ctx apirequest.Context, gr schema.GroupResource, + repo, sourceRepo distribution.Repository, + base *docker10.DockerImageConfig, + layer *imageapi.ImageLayer, diffID digest.Digest, + imageReference imageapi.DockerImageReference, +) (*imageapi.ImageStream, *imageapi.Image, error) { + + + // create a new config.json representing the image + imageConfig := *base + imageConfig.Size = 0 + imageConfig.RootFS = &docker10.DockerConfigRootFS{Type: "layers"}, + + // TODO: resolve + // History []DockerConfigHistory + // OSVersion string + // OSFeatures []string + } + layers, err := calculateUpdatedImageConfig(ctx, &imageConfig, base, layer, diffID, sourceRepo) + if err != nil { + return nil, nil, errors.NewInternalError(fmt.Errorf("unable to generate a new image configuration: %v", err)) + } + configJSON, err := json.Marshal(&imageConfig) + if err != nil { + return nil, nil, errors.NewInternalError(fmt.Errorf("unable to marshal the new image config.json: %v", err)) + } + + // generate a manifest for that config.json + glog.V(5).Infof("Saving layer %s onto %q with configJSON:\n%s", diffID, imageInstantiate.Name, configJSON) + blobs := repo.Blobs(ctx) + image, err := importer.SerializeImageAsSchema2Manifest(ctx, blobs, configJSON, layers) + if err != nil { + return nil, nil, errors.NewInternalError(fmt.Errorf("unable to generate a new image manifest: %v", err)) + } + + // create the manifest as an image + imageReference.ID = image.Name + image.DockerImageReference = imageReference.Exact() + if err := images.CreateImage(ctx, image); err != nil && !errors.IsAlreadyExists(err) { + return nil, nil, err + } + return stream, image, err +} + +// calculateUpdatedImageConfig generates a new image config.json with the provided info. +func calculateUpdatedImageConfig( + ctx apirequest.Context, + imageConfig *imageapi.DockerImageConfig, + base *imageapi.Image, + layer *imageapi.ImageLayer, + diffID digest.Digest, + sourceRepo distribution.Repository, +) ([]imageapi.ImageLayer, error) { + var layers []imageapi.ImageLayer + + // initialize with the base + if base != nil { + layers = append(layers, base.DockerImageLayers...) + for i := range layers { + imageConfig.Size += layers[i].LayerSize + } + + // need to look up the rootFS + manifests, err := sourceRepo.Manifests(ctx) + if err != nil { + return nil, err + } + m, err := manifests.Get(ctx, digest.Digest(base.Name)) + if err != nil { + return nil, err + } + var contents []byte + switch t := m.(type) { + case *schema2.DeserializedManifest: + if t.Config.MediaType != manifest.DockerV2Schema2ConfigMediaType { + return nil, fmt.Errorf("unrecognized config: %s", t.Config.MediaType) + } + contents, err = sourceRepo.Blobs(ctx).Get(ctx, t.Config.Digest) + if err != nil { + return nil, fmt.Errorf("unreadable config %s: %v", t.Config.Digest, err) + } + + existingImageConfig := &imageapi.DockerImageConfig{} + if err := json.Unmarshal(contents, existingImageConfig); err != nil { + return nil, fmt.Errorf("manifest unreadable %s: %v", base.Name, err) + } + if existingImageConfig.RootFS == nil || existingImageConfig.RootFS.Type != "layers" { + return nil, fmt.Errorf("unable to find rootFs description from base image %s", base.Name) + } + imageConfig.OS = existingImageConfig.OS + imageConfig.Architecture = existingImageConfig.Architecture + imageConfig.OSFeatures = existingImageConfig.OSFeatures + imageConfig.OSVersion = existingImageConfig.OSVersion + imageConfig.RootFS.DiffIDs = existingImageConfig.RootFS.DiffIDs + + case *schema1.SignedManifest: + digest := digest.FromBytes(t.Canonical) + contents, err = sourceRepo.Blobs(ctx).Get(ctx, digest) + if err != nil { + return nil, fmt.Errorf("unreadable config %s: %v", digest, err) + } + for _, layer := range t.FSLayers { + imageConfig.RootFS.DiffIDs = append(imageConfig.RootFS.DiffIDs, layer.BlobSum.String()) + } + default: + return nil, fmt.Errorf("unrecognized manifest: %T", m) + } + } + + // add the optional layer if provided + if layer != nil { + // the layer goes at the front - the most recent image is always first + layers = append(layers, *layer) + imageConfig.Size += layer.LayerSize + imageConfig.RootFS.DiffIDs = append(imageConfig.RootFS.DiffIDs, diffID.String()) + } + + // add the scratch layer in if no other layers exist + if len(layers) == 0 { + layers = append(layers, imageapi.ImageLayer{ + Name: dockerlayer.GzippedEmptyLayerDigest.String(), + LayerSize: int64(len(dockerlayer.GzippedEmptyLayer)), + MediaType: manifest.DockerV2Schema2LayerMediaType, + }) + imageConfig.RootFS.DiffIDs = append(imageConfig.RootFS.DiffIDs, dockerlayer.EmptyLayerDiffID.String()) + imageConfig.Size += layers[0].LayerSize + } + + // the metav1 serialization of zero is not parseable by the Docker daemon, therefore + // we must store a zero+1 value + if imageConfig.Created.IsZero() { + imageConfig.Created = metav1.Time{imageConfig.Created.Add(1 * time.Second)} + } + + return layers, nil +} +*/ diff --git a/pkg/oc/cli/cmd/image/append/append.go b/pkg/oc/cli/cmd/image/append/append.go new file mode 100644 index 000000000000..8fc99dee8646 --- /dev/null +++ b/pkg/oc/cli/cmd/image/append/append.go @@ -0,0 +1,550 @@ +package append + +import ( + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "os" + "regexp" + "runtime" + "time" + + units "github.com/docker/go-units" + "github.com/golang/glog" + "github.com/spf13/cobra" + + "github.com/docker/distribution" + "github.com/docker/distribution/manifest/manifestlist" + "github.com/docker/distribution/manifest/schema1" + "github.com/docker/distribution/manifest/schema2" + "github.com/docker/distribution/reference" + "github.com/docker/distribution/registry/client" + digest "github.com/opencontainers/go-digest" + + "k8s.io/client-go/rest" + "k8s.io/kubernetes/pkg/kubectl/cmd/templates" + kcmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" + + imageapi "github.com/openshift/origin/pkg/image/apis/image" + "github.com/openshift/origin/pkg/image/apis/image/docker10" + "github.com/openshift/origin/pkg/image/dockerlayer/add" + "github.com/openshift/origin/pkg/image/registryclient" + "github.com/openshift/origin/pkg/image/registryclient/dockercredentials" +) + +var ( + desc = templates.LongDesc(` + Add layers to Docker images + + Modifies an existing image by adding layers or changing configuration and then pushes that + image to a remote registry. Any inherited layers are streamed from registry to registry + without being stored locally. The default docker credentials are used for authenticating + to the registries. + + Layers may be provided as arguments to the command and must each be a gzipped tar archive + representing a filesystem overlay to the inherited images. The archive may contain a "whiteout" + file (the prefix '.wh.' and the filename) which will hide files in the lower layers. All + supported filesystem attributes present in the archive will be used as is. + + Metadata about the image (the configuration passed to the container runtime) may be altered + by passing a JSON string to the --config or --meta options. The --config flag changes what + the container runtime sees, while the --meta option allows you to change the attributes of + the image used by the runtime. Use --dry-run to see the result of your changes. You may + add the --drop-history flag to remove information from the image about the system that + built the base image. + + Images in manifest list format will automatically select an image that matches the current + operating system and architecture unless you use --filter-by-os to select a different image. + This flag has no effect on regular images. + + Experimental: This command is under active development and may change without notice.`) + + example = templates.Examples(` +# Remove the entrypoint on the mysql:latest image +%[1]s --from mysql:latest --to myregistry.com/myimage:latest --config {"Entrypoint":null} + +# Add a new layer to the image +%[1]s --from mysql:latest --to myregistry.com/myimage:latest layer.tar.gz +`) +) + +type options struct { + Out, ErrOut io.Writer + + From, To string + LayerFiles []string + + ConfigPatch string + MetaPatch string + + DropHistory bool + + OSFilter *regexp.Regexp + DefaultOSFilter bool + + FilterByOS string + + MaxPerRegistry int + + DryRun bool + Insecure bool +} + +// schema2ManifestOnly specifically requests a manifest list first +var schema2ManifestOnly = distribution.WithManifestMediaTypes([]string{ + manifestlist.MediaTypeManifestList, + schema2.MediaTypeManifest, +}) + +// New creates a new command +func New(name string, out, errOut io.Writer) *cobra.Command { + o := &options{ + MaxPerRegistry: 3, + } + + cmd := &cobra.Command{ + Use: "append", + Short: "Add layers to images and push them to a registry", + Long: desc, + Example: fmt.Sprintf(example, name), + Run: func(c *cobra.Command, args []string) { + o.Out = out + o.ErrOut = errOut + kcmdutil.CheckErr(o.Complete(c, args)) + kcmdutil.CheckErr(o.Run()) + }, + } + + flag := cmd.Flags() + flag.BoolVar(&o.DryRun, "dry-run", o.DryRun, "Print the actions that would be taken and exit without writing to the destinations.") + flag.BoolVar(&o.Insecure, "insecure", o.Insecure, "Allow push and pull operations to registries to be made over HTTP") + flag.StringVar(&o.FilterByOS, "filter-by-os", o.FilterByOS, "A regular expression to control which images are mirrored. Images will be passed as '/[/]'.") + + flag.StringVar(&o.From, "from", o.From, "The image to use as a base. If empty, a new scratch image is created.") + flag.StringVar(&o.To, "to", o.To, "The Docker repository tag to upload the appended image to.") + + flag.StringVar(&o.ConfigPatch, "config", o.ConfigPatch, "A JSON patch that will be used with the output image.") + flag.StringVar(&o.MetaPatch, "meta", o.MetaPatch, "A JSON patch that will be used with image base metadata (advanced config).") + flag.BoolVar(&o.DropHistory, "drop-history", o.DropHistory, "Fields on the image that relate to the history of how the image was created will be removed.") + + flag.IntVar(&o.MaxPerRegistry, "max-per-registry", o.MaxPerRegistry, "Number of concurrent requests allowed per registry.") + + return cmd +} + +func (o *options) Complete(cmd *cobra.Command, args []string) error { + pattern := o.FilterByOS + if len(pattern) == 0 && !cmd.Flags().Changed("filter-by-os") { + o.DefaultOSFilter = true + pattern = regexp.QuoteMeta(fmt.Sprintf("%s/%s", runtime.GOOS, runtime.GOARCH)) + } + if len(pattern) > 0 { + re, err := regexp.Compile(pattern) + if err != nil { + return fmt.Errorf("--filter-by-os was not a valid regular expression: %v", err) + } + o.OSFilter = re + } + + for _, arg := range args { + fi, err := os.Stat(arg) + if err != nil { + return fmt.Errorf("invalid argument: %s", err) + } + if fi.IsDir() { + return fmt.Errorf("invalid argument: %s is a directory", arg) + } + } + o.LayerFiles = args + + return nil +} + +// includeDescriptor returns true if the provided manifest should be included. +func (o *options) includeDescriptor(d *manifestlist.ManifestDescriptor, hasMultiple bool) bool { + if o.OSFilter == nil { + return true + } + if o.DefaultOSFilter && !hasMultiple { + return true + } + if len(d.Platform.Variant) > 0 { + return o.OSFilter.MatchString(fmt.Sprintf("%s/%s/%s", d.Platform.OS, d.Platform.Architecture, d.Platform.Variant)) + } + return o.OSFilter.MatchString(fmt.Sprintf("%s/%s", d.Platform.OS, d.Platform.Architecture)) +} + +func (o *options) Run() error { + var from *imageapi.DockerImageReference + if len(o.From) > 0 { + src, err := imageapi.ParseDockerImageReference(o.From) + if err != nil { + return err + } + if len(src.Tag) == 0 && len(src.ID) == 0 { + return fmt.Errorf("--from must point to an image ID or image tag") + } + from = &src + } + to, err := imageapi.ParseDockerImageReference(o.To) + if err != nil { + return err + } + if len(to.ID) > 0 { + return fmt.Errorf("--to may not point to an image by ID") + } + + rt, err := rest.TransportFor(&rest.Config{}) + if err != nil { + return err + } + insecureRT, err := rest.TransportFor(&rest.Config{TLSClientConfig: rest.TLSClientConfig{Insecure: true}}) + if err != nil { + return err + } + creds := dockercredentials.NewLocal() + ctx := context.Background() + fromContext := registryclient.NewContext(rt, insecureRT).WithCredentials(creds) + toContext := registryclient.NewContext(rt, insecureRT).WithActions("push").WithCredentials(creds) + + toRepo, err := toContext.Repository(ctx, to.DockerClientDefaults().RegistryURL(), to.RepositoryName(), o.Insecure) + if err != nil { + return err + } + toManifests, err := toRepo.Manifests(ctx) + if err != nil { + return err + } + + var ( + base *docker10.DockerImageConfig + layers []distribution.Descriptor + fromRepo distribution.Repository + ) + if from != nil { + repo, err := fromContext.Repository(ctx, from.DockerClientDefaults().RegistryURL(), from.RepositoryName(), o.Insecure) + if err != nil { + return err + } + fromRepo = repo + var srcDigest digest.Digest + if len(from.Tag) > 0 { + desc, err := repo.Tags(ctx).Get(ctx, from.Tag) + if err != nil { + return err + } + srcDigest = desc.Digest + } else { + srcDigest = digest.Digest(from.ID) + } + manifests, err := repo.Manifests(ctx) + if err != nil { + return err + } + srcManifest, err := manifests.Get(ctx, srcDigest, schema2ManifestOnly) + if err != nil { + return err + } + + originalSrcDigest := srcDigest + srcManifests, srcManifest, srcDigest, err := processManifestList(ctx, srcDigest, srcManifest, manifests, *from, o.includeDescriptor) + if err != nil { + return err + } + if len(srcManifests) == 0 { + return fmt.Errorf("filtered all images from %s", from) + } + + var location string + if srcDigest == originalSrcDigest { + location = fmt.Sprintf("manifest %s", srcDigest) + } else { + location = fmt.Sprintf("manifest %s in manifest list %s", srcDigest, originalSrcDigest) + } + + switch t := srcManifest.(type) { + case *schema2.DeserializedManifest: + if t.Config.MediaType != schema2.MediaTypeImageConfig { + return fmt.Errorf("unable to append layers to images with config %s from %s", t.Config.MediaType, location) + } + configJSON, err := repo.Blobs(ctx).Get(ctx, t.Config.Digest) + if err != nil { + return err + } + glog.V(4).Infof("Raw image config json:\n%s", string(configJSON)) + config := &docker10.DockerImageConfig{} + if err := json.Unmarshal(configJSON, &config); err != nil { + return err + } + + base = config + layers = t.Layers + base.Size = 0 + for _, layer := range t.Layers { + base.Size += layer.Size + } + + case *schema1.SignedManifest: + if glog.V(4) { + _, configJSON, _ := srcManifest.Payload() + glog.Infof("Raw image config json:\n%s", string(configJSON)) + } + if len(t.History) == 0 { + return fmt.Errorf("input image is in an unknown format: no v1Compatibility history") + } + config := &docker10.DockerV1CompatibilityImage{} + if err := json.Unmarshal([]byte(t.History[0].V1Compatibility), &config); err != nil { + return err + } + + base = &docker10.DockerImageConfig{} + if err := imageapi.Convert_DockerV1CompatibilityImage_to_DockerImageConfig(config, base); err != nil { + return err + } + + // schema1 layers are in reverse order + layers = make([]distribution.Descriptor, 0, len(t.FSLayers)) + for i := len(t.FSLayers) - 1; i >= 0; i-- { + layer := distribution.Descriptor{ + MediaType: schema2.MediaTypeLayer, + Digest: t.FSLayers[i].BlobSum, + // size must be reconstructed from the blobs + } + // we must reconstruct the tar sum from the blobs + add.AddLayerToConfig(base, layer, "") + layers = append(layers, layer) + } + + default: + return fmt.Errorf("unable to append layers to images of type %T from %s", srcManifest, location) + } + } else { + base = add.NewEmptyConfig() + layers = []distribution.Descriptor{add.AddScratchLayerToConfig(base)} + } + + if base.Config == nil { + base.Config = &docker10.DockerConfig{} + } + + if glog.V(4) { + configJSON, _ := json.MarshalIndent(base, "", " ") + glog.Infof("input config:\n%s\nlayers: %#v", configJSON, layers) + } + + base.Created = time.Now() + if o.DropHistory { + base.ContainerConfig = docker10.DockerConfig{} + base.History = nil + base.Container = "" + base.DockerVersion = "" + base.Config.Image = "" + } + + if len(o.ConfigPatch) > 0 { + if err := json.Unmarshal([]byte(o.ConfigPatch), base.Config); err != nil { + return fmt.Errorf("unable to patch image from --config: %v", err) + } + } + if len(o.MetaPatch) > 0 { + if err := json.Unmarshal([]byte(o.MetaPatch), base); err != nil { + return fmt.Errorf("unable to patch image from --meta: %v", err) + } + } + + numLayers := len(layers) + toBlobs := toRepo.Blobs(ctx) + + for _, arg := range o.LayerFiles { + err := func() error { + f, err := os.Open(arg) + if err != nil { + return err + } + defer f.Close() + var readerFrom io.ReaderFrom = ioutil.Discard.(io.ReaderFrom) + var done = func(distribution.Descriptor) error { return nil } + if !o.DryRun { + fmt.Fprint(o.Out, "Uploading ... ") + start := time.Now() + bw, err := toBlobs.Create(ctx) + if err != nil { + fmt.Fprintln(o.Out, "failed") + return err + } + readerFrom = bw + defer bw.Close() + done = func(desc distribution.Descriptor) error { + _, err := bw.Commit(ctx, desc) + if err != nil { + fmt.Fprintln(o.Out, "failed") + return err + } + fmt.Fprintf(o.Out, "%s/s\n", units.HumanSize(float64(desc.Size)/float64(time.Now().Sub(start))*float64(time.Second))) + return nil + } + } + layerDigest, blobDigest, modTime, n, err := add.DigestCopy(readerFrom, f) + desc := distribution.Descriptor{ + Digest: blobDigest, + Size: n, + MediaType: schema2.MediaTypeLayer, + } + layers = append(layers, desc) + add.AddLayerToConfig(base, desc, layerDigest.String()) + if modTime != nil && !modTime.IsZero() { + base.Created = *modTime + } + return done(desc) + }() + if err != nil { + return err + } + } + + if o.DryRun { + configJSON, _ := json.MarshalIndent(base, "", " ") + fmt.Fprintf(o.Out, "%s", configJSON) + return nil + } + + // upload base layers in parallel + stopCh := make(chan struct{}) + defer close(stopCh) + q := newWorkQueue(o.MaxPerRegistry, stopCh) + err = q.Try(func(w Try) { + for i := range layers[:numLayers] { + layer := &layers[i] + index := i + missingDiffID := len(base.RootFS.DiffIDs[i]) == 0 + w.Try(func() error { + fromBlobs := fromRepo.Blobs(ctx) + + // check whether the blob exists + if desc, err := fromBlobs.Stat(ctx, layer.Digest); err == nil { + // ensure the correct size makes it back to the manifest + glog.V(4).Infof("Layer %s already exists in destination (%s)", layer.Digest, units.HumanSizeWithPrecision(float64(layer.Size), 3)) + if layer.Size == 0 { + layer.Size = desc.Size + } + // we need to calculate the tar sum from the image, requiring us to pull it + if missingDiffID { + glog.V(4).Infof("Need tar sum, streaming layer %s", layer.Digest) + r, err := fromBlobs.Open(ctx, layer.Digest) + if err != nil { + return err + } + defer r.Close() + layerDigest, _, _, _, err := add.DigestCopy(ioutil.Discard.(io.ReaderFrom), r) + if err != nil { + return err + } + glog.V(4).Infof("Layer %s has tar sum %s", layer.Digest, layerDigest) + base.RootFS.DiffIDs[index] = layerDigest.String() + } + return nil + } + + // source + r, err := fromBlobs.Open(ctx, layer.Digest) + if err != nil { + return err + } + defer r.Close() + + // destination + mountOptions := []distribution.BlobCreateOption{WithDescriptor(*layer)} + if from.Registry == to.Registry { + source, err := reference.WithDigest(fromRepo.Named(), layer.Digest) + if err != nil { + return err + } + mountOptions = append(mountOptions, client.WithMountFrom(source)) + } + bw, err := toBlobs.Create(ctx, mountOptions...) + if err != nil { + return err + } + defer bw.Close() + + // copy the blob, calculating the diffID if necessary + if layer.Size > 0 { + fmt.Fprintf(o.Out, "Uploading %s ...\n", units.HumanSize(float64(layer.Size))) + } else { + fmt.Fprintf(o.Out, "Uploading ...\n") + } + if missingDiffID { + glog.V(4).Infof("Need tar sum, calculating while streaming %s", layer.Digest) + layerDigest, _, _, _, err := add.DigestCopy(bw, r) + if err != nil { + return err + } + glog.V(4).Infof("Layer %s has tar sum %s", layer.Digest, layerDigest) + base.RootFS.DiffIDs[index] = layerDigest.String() + } else { + if _, err := bw.ReadFrom(r); err != nil { + return err + } + } + desc, err := bw.Commit(ctx, *layer) + if err != nil { + return err + } + + // check output + if desc.Digest != layer.Digest { + return fmt.Errorf("when uploading blob %s, got a different returned digest", desc.Digest, layer.Digest) + } + // ensure the correct size makes it back to the manifest + if layer.Size == 0 { + layer.Size = desc.Size + } + return nil + }) + } + }) + if err != nil { + return err + } + + manifest, err := add.UploadSchema2Config(ctx, toBlobs, base, layers) + if err != nil { + return err + } + toDigest, err := putManifestInCompatibleSchema(ctx, manifest, to.Tag, toManifests, fromRepo.Blobs(ctx), toRepo.Named()) + if err != nil { + return err + } + fmt.Fprintf(o.Out, "Pushed image %s to %s\n", toDigest, to) + return nil +} + +type optionFunc func(interface{}) error + +func (f optionFunc) Apply(v interface{}) error { + return f(v) +} + +// WithDescriptor returns a BlobCreateOption which provides the expected blob metadata. +func WithDescriptor(desc distribution.Descriptor) distribution.BlobCreateOption { + return optionFunc(func(v interface{}) error { + opts, ok := v.(*distribution.CreateOptions) + if !ok { + return fmt.Errorf("unexpected options type: %T", v) + } + if opts.Mount.Stat == nil { + opts.Mount.Stat = &desc + } + return nil + }) +} + +func calculateLayerDigest(blobs distribution.BlobService, dgst digest.Digest, readerFrom io.ReaderFrom, r io.Reader) (digest.Digest, error) { + if readerFrom == nil { + readerFrom = ioutil.Discard.(io.ReaderFrom) + } + layerDigest, _, _, _, err := add.DigestCopy(readerFrom, r) + return layerDigest, err +} diff --git a/pkg/oc/cli/cmd/image/append/manifest.go b/pkg/oc/cli/cmd/image/append/manifest.go new file mode 100644 index 000000000000..115510aa5342 --- /dev/null +++ b/pkg/oc/cli/cmd/image/append/manifest.go @@ -0,0 +1,178 @@ +package append + +import ( + "context" + "fmt" + "sync" + + "github.com/docker/distribution" + "github.com/docker/distribution/manifest/manifestlist" + "github.com/docker/distribution/manifest/schema1" + "github.com/docker/distribution/manifest/schema2" + "github.com/docker/distribution/reference" + "github.com/docker/distribution/registry/api/errcode" + "github.com/docker/distribution/registry/api/v2" + + "github.com/docker/libtrust" + "github.com/golang/glog" + digest "github.com/opencontainers/go-digest" + + imageapi "github.com/openshift/origin/pkg/image/apis/image" +) + +func processManifestList(ctx context.Context, srcDigest digest.Digest, srcManifest distribution.Manifest, manifests distribution.ManifestService, ref imageapi.DockerImageReference, filterFn func(*manifestlist.ManifestDescriptor, bool) bool) ([]distribution.Manifest, distribution.Manifest, digest.Digest, error) { + var srcManifests []distribution.Manifest + switch t := srcManifest.(type) { + case *manifestlist.DeserializedManifestList: + manifestDigest := srcDigest + manifestList := t + + filtered := make([]manifestlist.ManifestDescriptor, 0, len(t.Manifests)) + for _, manifest := range t.Manifests { + if !filterFn(&manifest, len(t.Manifests) > 1) { + glog.V(5).Infof("Skipping image for %#v from %s", manifest.Platform, ref) + continue + } + glog.V(5).Infof("Including image for %#v from %s", manifest.Platform, ref) + filtered = append(filtered, manifest) + } + + if len(filtered) == 0 { + return nil, nil, "", nil + } + + // if we're filtering the manifest list, update the source manifest and digest + if len(filtered) != len(t.Manifests) { + var err error + t, err = manifestlist.FromDescriptors(filtered) + if err != nil { + return nil, nil, "", fmt.Errorf("unable to filter source image %s manifest list: %v", ref, err) + } + _, body, err := t.Payload() + if err != nil { + return nil, nil, "", fmt.Errorf("unable to filter source image %s manifest list (bad payload): %v", ref, err) + } + manifestList = t + manifestDigest = srcDigest.Algorithm().FromBytes(body) + glog.V(5).Infof("Filtered manifest list to new digest %s:\n%s", manifestDigest, body) + } + + for i, manifest := range t.Manifests { + childManifest, err := manifests.Get(ctx, manifest.Digest, distribution.WithManifestMediaTypes([]string{manifestlist.MediaTypeManifestList, schema2.MediaTypeManifest})) + if err != nil { + return nil, nil, "", fmt.Errorf("unable to retrieve source image %s manifest #%d from manifest list: %v", ref, i+1, err) + } + srcManifests = append(srcManifests, childManifest) + } + + switch { + case len(srcManifests) == 1: + _, body, err := srcManifests[0].Payload() + if err != nil { + return nil, nil, "", fmt.Errorf("unable to convert source image %s manifest list to single manifest: %v", ref, err) + } + manifestDigest := srcDigest.Algorithm().FromBytes(body) + glog.V(5).Infof("Used only one manifest from the list %s", manifestDigest) + return srcManifests, srcManifests[0], manifestDigest, nil + default: + return append(srcManifests, manifestList), manifestList, manifestDigest, nil + } + + default: + return []distribution.Manifest{srcManifest}, srcManifest, srcDigest, nil + } +} + +// TDOO: remove when quay.io switches to v2 schema +func putManifestInCompatibleSchema( + ctx context.Context, + srcManifest distribution.Manifest, + tag string, + toManifests distribution.ManifestService, + // supports schema2 -> schema1 downconversion + blobs distribution.BlobService, + ref reference.Named, +) (digest.Digest, error) { + var options []distribution.ManifestServiceOption + if len(tag) > 0 { + glog.V(5).Infof("Put manifest %s:%s", ref, tag) + options = []distribution.ManifestServiceOption{distribution.WithTag(tag)} + } else { + glog.V(5).Infof("Put manifest %s", ref) + } + toDigest, err := toManifests.Put(ctx, srcManifest, options...) + if err == nil { + return toDigest, nil + } + errs, ok := err.(errcode.Errors) + if !ok || len(errs) == 0 { + return toDigest, err + } + errcode, ok := errs[0].(errcode.Error) + if !ok || errcode.ErrorCode() != v2.ErrorCodeManifestInvalid { + return toDigest, err + } + // try downconverting to v2-schema1 + schema2Manifest, ok := srcManifest.(*schema2.DeserializedManifest) + if !ok { + return toDigest, err + } + tagRef, tagErr := reference.WithTag(ref, tag) + if tagErr != nil { + return toDigest, err + } + glog.V(5).Infof("Registry reported invalid manifest error, attempting to convert to v2schema1 as ref %s", tagRef) + schema1Manifest, convertErr := convertToSchema1(ctx, blobs, schema2Manifest, tagRef) + if convertErr != nil { + return toDigest, err + } + if glog.V(6) { + _, data, _ := schema1Manifest.Payload() + glog.Infof("Converted to v2schema1\n%s", string(data)) + } + return toManifests.Put(ctx, schema1Manifest, distribution.WithTag(tag)) +} + +// TDOO: remove when quay.io switches to v2 schema +func convertToSchema1(ctx context.Context, blobs distribution.BlobService, schema2Manifest *schema2.DeserializedManifest, ref reference.Named) (distribution.Manifest, error) { + targetDescriptor := schema2Manifest.Target() + configJSON, err := blobs.Get(ctx, targetDescriptor.Digest) + if err != nil { + return nil, err + } + trustKey, err := loadPrivateKey() + if err != nil { + return nil, err + } + builder := schema1.NewConfigManifestBuilder(blobs, trustKey, ref, configJSON) + for _, d := range schema2Manifest.Layers { + if err := builder.AppendReference(d); err != nil { + return nil, err + } + } + manifest, err := builder.Build(ctx) + if err != nil { + return nil, err + } + return manifest, nil +} + +var ( + privateKeyLock sync.Mutex + privateKey libtrust.PrivateKey +) + +// TDOO: remove when quay.io switches to v2 schema +func loadPrivateKey() (libtrust.PrivateKey, error) { + privateKeyLock.Lock() + defer privateKeyLock.Unlock() + if privateKey != nil { + return privateKey, nil + } + trustKey, err := libtrust.GenerateECP256PrivateKey() + if err != nil { + return nil, err + } + privateKey = trustKey + return privateKey, nil +} diff --git a/pkg/oc/cli/cmd/image/append/workqueue.go b/pkg/oc/cli/cmd/image/append/workqueue.go new file mode 100644 index 000000000000..fb57f1a746d4 --- /dev/null +++ b/pkg/oc/cli/cmd/image/append/workqueue.go @@ -0,0 +1,131 @@ +package append + +import ( + "sync" + + "github.com/golang/glog" +) + +type workQueue struct { + ch chan workUnit + wg *sync.WaitGroup +} + +func newWorkQueue(workers int, stopCh <-chan struct{}) *workQueue { + q := &workQueue{ + ch: make(chan workUnit, 100), + wg: &sync.WaitGroup{}, + } + go q.run(workers, stopCh) + return q +} + +func (q *workQueue) run(workers int, stopCh <-chan struct{}) { + for i := 0; i < workers; i++ { + go func(i int) { + defer glog.V(4).Infof("worker %d stopping", i) + for { + select { + case work, ok := <-q.ch: + if !ok { + return + } + work.fn() + work.wg.Done() + case <-stopCh: + return + } + } + }(i) + } + <-stopCh +} + +func (q *workQueue) Batch(fn func(Work)) { + w := &worker{ + wg: &sync.WaitGroup{}, + ch: q.ch, + } + fn(w) + w.wg.Wait() +} + +func (q *workQueue) Try(fn func(Try)) error { + w := &worker{ + wg: &sync.WaitGroup{}, + ch: q.ch, + err: make(chan error), + } + fn(w) + return w.FirstError() +} + +func (q *workQueue) Queue(fn func(Work)) { + w := &worker{ + wg: q.wg, + ch: q.ch, + } + fn(w) +} + +func (q *workQueue) Done() { + q.wg.Wait() +} + +type workUnit struct { + fn func() + wg *sync.WaitGroup +} + +type Work interface { + Parallel(fn func()) +} + +type Try interface { + Try(fn func() error) +} + +type worker struct { + wg *sync.WaitGroup + ch chan workUnit + err chan error +} + +func (w *worker) FirstError() error { + done := make(chan struct{}) + go func() { + w.wg.Wait() + close(done) + }() + for { + select { + case err := <-w.err: + if err != nil { + return err + } + case <-done: + return nil + } + } +} + +func (w *worker) Parallel(fn func()) { + w.wg.Add(1) + w.ch <- workUnit{wg: w.wg, fn: fn} +} + +func (w *worker) Try(fn func() error) { + w.wg.Add(1) + w.ch <- workUnit{ + wg: w.wg, + fn: func() { + err := fn() + if w.err == nil { + // TODO: have the work queue accumulate errors and release them with Done() + glog.Errorf("Worker error: %v", err) + return + } + w.err <- err + }, + } +} diff --git a/pkg/oc/cli/cmd/image/image.go b/pkg/oc/cli/cmd/image/image.go index 4fec5c939ecc..2478562519d7 100644 --- a/pkg/oc/cli/cmd/image/image.go +++ b/pkg/oc/cli/cmd/image/image.go @@ -9,6 +9,7 @@ import ( cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "github.com/openshift/origin/pkg/cmd/templates" + "github.com/openshift/origin/pkg/oc/cli/cmd/image/append" "github.com/openshift/origin/pkg/oc/cli/cmd/image/mirror" "github.com/openshift/origin/pkg/oc/cli/util/clientcmd" ) @@ -35,6 +36,7 @@ func NewCmdImage(fullName string, f *clientcmd.Factory, in io.Reader, out, errou { Message: "Advanced commands:", Commands: []*cobra.Command{ + append.New(name, out, errout), mirror.NewCmdMirrorImage(name, out, errout), }, },