Skip to content

Commit

Permalink
builders: simplified image progress reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
csrwng committed Jun 8, 2016
1 parent de2a8bc commit fd2a53e
Show file tree
Hide file tree
Showing 8 changed files with 358 additions and 257 deletions.
30 changes: 5 additions & 25 deletions pkg/bootstrap/docker/dockerhelper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/golang/glog"

starterrors "github.com/openshift/origin/pkg/bootstrap/docker/errors"
"github.com/openshift/origin/pkg/cmd/util/pullprogress"
"github.com/openshift/origin/pkg/cmd/util/imageprogress"
)

const openShiftInsecureCIDR = "172.30.0.0/16"
Expand Down Expand Up @@ -118,31 +118,11 @@ func (h *Helper) CheckAndPull(image string, out io.Writer) error {
}
glog.V(5).Infof("Image %q not found. Pulling", image)
fmt.Fprintf(out, "Pulling image %s\n", image)
extracting := false
var outputStream io.Writer
writeProgress := func(r *pullprogress.ProgressReport) {
if extracting {
return
}
if r.Downloading == 0 && r.Waiting == 0 && r.Extracting > 0 {
fmt.Fprintf(out, "Extracting\n")
extracting = true
return
}
plural := "s"
if r.Downloading == 1 {
plural = " "
}
fmt.Fprintf(out, "Downloading %d layer%s (%3.0f%%)", r.Downloading, plural, r.DownloadPct)
if r.Waiting > 0 {
fmt.Fprintf(out, ", %d waiting\n", r.Waiting)
} else {
fmt.Fprintf(out, "\n")
}
logProgress := func(s string) {
fmt.Fprintf(out, "%s\n", s)
}
if !glog.V(5) {
outputStream = pullprogress.NewPullProgressWriter(writeProgress)
} else {
outputStream := imageprogress.NewPullWriter(logProgress)
if glog.V(5) {
outputStream = out
}
err = h.client.PullImage(docker.PullImageOptions{
Expand Down
4 changes: 2 additions & 2 deletions pkg/build/builder/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ func (d *DockerBuilder) Build() error {
if authPresent {
glog.V(4).Infof("Authenticating Docker push with user %q", pushAuthConfig.Username)
}
glog.V(1).Infof("Pushing image %s ...", pushTag)
glog.V(0).Infof("Pushing image %s ...", pushTag)
if err := pushImage(d.dockerClient, pushTag, pushAuthConfig); err != nil {
return fmt.Errorf("Failed to push image: %v", err)
}
glog.V(1).Infof("Push successful")
glog.V(0).Infof("Successfully pushed %s", pushTag)
}
return nil
}
Expand Down
18 changes: 13 additions & 5 deletions pkg/build/builder/dockerutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
"strings"
"time"

s2iapi "github.com/openshift/source-to-image/pkg/api"
"github.com/docker/docker/pkg/parsers"
docker "github.com/fsouza/go-dockerclient"
"k8s.io/kubernetes/pkg/util/interrupt"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"

"github.com/docker/docker/pkg/parsers"
docker "github.com/fsouza/go-dockerclient"
s2iapi "github.com/openshift/source-to-image/pkg/api"
"github.com/openshift/source-to-image/pkg/tar"

"github.com/openshift/origin/pkg/cmd/util/imageprogress"
)

var (
Expand Down Expand Up @@ -55,12 +57,18 @@ type DockerClient interface {
// If any other scenario the push will fail, without retries.
func pushImage(client DockerClient, name string, authConfig docker.AuthConfiguration) error {
repository, tag := docker.ParseRepositoryTag(name)
logProgress := func(s string) {
glog.V(0).Infof("%s", s)
}
opts := docker.PushImageOptions{
Name: repository,
Tag: tag,
Name: repository,
Tag: tag,
OutputStream: imageprogress.NewPushWriter(logProgress),
RawJSONStream: true,
}
if glog.Is(5) {
opts.OutputStream = os.Stderr
opts.RawJSONStream = false
}
var err error
var retriableError = false
Expand Down
4 changes: 2 additions & 2 deletions pkg/build/builder/sti.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (s *S2IBuilder) Build() error {
} else {
glog.V(2).Infof("No push secret provided")
}
glog.V(1).Infof("Pushing %s image ...", pushTag)
glog.V(0).Infof("Pushing %s image ...", pushTag)
if err := pushImage(s.dockerClient, pushTag, pushAuthConfig); err != nil {
// write extended error message to assist in problem resolution
msg := fmt.Sprintf("Failed to push image. Response from registry is: %v", err)
Expand All @@ -281,7 +281,7 @@ func (s *S2IBuilder) Build() error {
}
return errors.New(msg)
}
glog.V(1).Infof("Successfully pushed %s", pushTag)
glog.V(0).Infof("Successfully pushed %s", pushTag)
}
return nil
}
Expand Down
256 changes: 256 additions & 0 deletions pkg/cmd/util/imageprogress/progress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
package imageprogress

import (
"encoding/json"
"io"
"regexp"
"time"
)

const (
defaultCountTimeThreshhold = 10 * time.Second
defaultProgressTimeThreshhold = 30 * time.Second
defaultStableThreshhold = 3
)

// progressLine is a structure representation of a Docker pull progress line
type progressLine struct {
ID string `json:"id"`
Status string `json:"status"`
Detail *progressDetail `json:"progressDetail"`
}

// progressDetail is the progressDetail structure in a Docker pull progress line
type progressDetail struct {
Current int64 `json:"current"`
Total int64 `json:"total"`
}

// layerDetail is layer information associated with a specific layerStatus
type layerDetail struct {
Count int
Current int64
Total int64
}

// layerStatus is one of different possible status for layers detected by
// the ProgressWriter
type layerStatus int

const (
statusPending layerStatus = iota
statusDownloading
statusExtracting
statusComplete
statusPushing
)

// layerStatusFromDockerString translates a string in a Docker status
// line to a layerStatus
func layerStatusFromDockerString(dockerStatus string) layerStatus {
switch dockerStatus {
case "Pushing":
return statusPushing
case "Downloading":
return statusDownloading
case "Extracting", "Verifying Checksum", "Download complete":
return statusExtracting
case "Pull complete", "Already exists", "Pushed":
return statusComplete
default:
return statusPending
}
}

type report map[layerStatus]*layerDetail

func (r report) count(status layerStatus) int {
detail, ok := r[status]
if !ok {
return 0
}
return detail.Count
}

func (r report) percentProgress(status layerStatus) float32 {
detail, ok := r[status]
if !ok {
return 0
}
if detail.Total == 0 {
return 0
}
pct := float32(detail.Current) / float32(detail.Total) * 100.0
if pct > 100.0 {
pct = 100.0
}
return pct
}

func (r report) totalCount() int {
cnt := 0
for _, detail := range r {
cnt += detail.Count
}
return cnt
}

func (r report) countsChanged(previous report) bool {
if previous == nil {
return true
}
for k := range r {
detail, hasDetail := previous[k]
if !hasDetail {
return true
}
if r[k].Count != detail.Count {
return true
}
}
return false
}

// newWriter creates a writer that periodically reports
// on pull/push progress of a Docker image. It only reports when the state of the
// different layers has changed and uses time thresholds to limit the
// rate of the reports.
func newWriter(reportFn func(report)) io.Writer {
pipeIn, pipeOut := io.Pipe()
writer := &imageProgressWriter{
Writer: pipeOut,
decoder: json.NewDecoder(pipeIn),
layerStatus: map[string]progressLine{},
reportFn: reportFn,
countTimeThreshhold: defaultCountTimeThreshhold,
progressTimeThreshhold: defaultProgressTimeThreshhold,
stableThreshhold: defaultStableThreshhold,
}
go func() {
err := writer.readProgress()
if err != nil {
pipeIn.CloseWithError(err)
}
}()
return writer
}

type imageProgressWriter struct {
io.Writer
decoder *json.Decoder
layerStatus map[string]progressLine
lastLayerCount int
stableLines int
stableThreshhold int
countTimeThreshhold time.Duration
progressTimeThreshhold time.Duration
lastReport report
lastReportTime time.Time
reportFn func(report)
}

func (w *imageProgressWriter) readProgress() error {
for {
line := &progressLine{}
err := w.decoder.Decode(line)
if err == io.EOF {
break
}
if err != nil {
return err
}
err = w.processLine(line)
if err != nil {
return err
}
}
return nil
}

func (w *imageProgressWriter) processLine(line *progressLine) error {
// determine if it's a line we want to process
if !islayerStatus(line) {
return nil
}

w.layerStatus[line.ID] = *line

// if the number of layers has not stabilized yet, return and wait for more
// progress
if !w.isStableLayerCount() {
return nil
}

r := createReport(w.layerStatus)

// check if the count of layers in each state has changed
if r.countsChanged(w.lastReport) {
// only report on changed counts if the change occurs after
// a predefined set of seconds (10 sec by default). This prevents
// multiple reports in rapid succession
if time.Since(w.lastReportTime) > w.countTimeThreshhold {
w.lastReport = r
w.lastReportTime = time.Now()
w.reportFn(r)
}
return nil
}
// If counts haven't changed, but enough time has passed (30 sec by default),
// at least report on download/push progress
if time.Since(w.lastReportTime) > w.progressTimeThreshhold {
w.lastReport = r
w.lastReportTime = time.Now()
w.reportFn(r)
}
return nil
}

func (w *imageProgressWriter) isStableLayerCount() bool {
// If the number of layers has changed since last status, we're not stable
if w.lastLayerCount != len(w.layerStatus) {
w.lastLayerCount = len(w.layerStatus)
w.stableLines = 0
return false
}
// Only proceed after we've received status for the same number
// of layers at least stableThreshhold times. If not, they're still increasing
w.stableLines++
if w.stableLines < w.stableThreshhold {
// We're not stable enough yet
return false
}

return true
}

var layerIDRegexp = regexp.MustCompile("^[a-f,0-9]*$")

func islayerStatus(line *progressLine) bool {
// ignore status lines with no layer id
if len(line.ID) == 0 {
return false
}
// ignore layer ids that are not hex string
if !layerIDRegexp.MatchString(line.ID) {
return false
}
return true
}

func createReport(dockerProgress map[string]progressLine) report {
r := report{}
for _, line := range dockerProgress {
layerStatus := layerStatusFromDockerString(line.Status)
detail, exists := r[layerStatus]
if !exists {
detail = &layerDetail{}
r[layerStatus] = detail
}
detail.Count++
if line.Detail != nil {
detail.Current += line.Detail.Current
detail.Total += line.Detail.Total
}
}
return r
}
Loading

0 comments on commit fd2a53e

Please sign in to comment.