diff --git a/pkg/cmd/server/kubernetes/network/network_config.go b/pkg/cmd/server/kubernetes/network/network_config.go index e6fc4a277a5f..78b9959401e4 100644 --- a/pkg/cmd/server/kubernetes/network/network_config.go +++ b/pkg/cmd/server/kubernetes/network/network_config.go @@ -3,9 +3,11 @@ package network import ( "fmt" "net" + "time" miekgdns "github.com/miekg/dns" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" kclientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/apis/componentconfig" kclientsetexternal "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -13,6 +15,7 @@ import ( kinternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" configapi "github.com/openshift/origin/pkg/cmd/server/api" + "github.com/openshift/origin/pkg/cmd/server/kubernetes/network/transport" "github.com/openshift/origin/pkg/dns" "github.com/openshift/origin/pkg/network" networkclient "github.com/openshift/origin/pkg/network/generated/internalclientset" @@ -48,6 +51,12 @@ func New(options configapi.NodeConfig, clusterDomain string, proxyConfig *compon if err != nil { return nil, err } + // if a client cert is specified, when the certificate expires attempt to refresh it from disk + if len(kubeConfig.TLSClientConfig.KeyFile) > 0 && len(kubeConfig.TLSClientConfig.CertFile) > 0 { + if err := transport.RefreshCertificateAfterExpiry(nil, 10*time.Second, kubeConfig); err != nil { + utilruntime.HandleError(fmt.Errorf("Unable to enable client certificate rotation for network components: %v", err)) + } + } internalKubeClient, err := kclientsetinternal.NewForConfig(kubeConfig) if err != nil { return nil, err diff --git a/pkg/cmd/server/kubernetes/network/transport/transport.go b/pkg/cmd/server/kubernetes/network/transport/transport.go new file mode 100644 index 000000000000..1b15777e8f4f --- /dev/null +++ b/pkg/cmd/server/kubernetes/network/transport/transport.go @@ -0,0 +1,198 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Extracted from k8s.io/kubernetes/pkg/kubelet/certificate/transport.go, will be removed +// when openshift-sdn and the network components move out of the Kubelet. Is intended ONLY +// to provide certificate rollover until 3.8/3.9. +package transport + +import ( + "context" + "crypto/tls" + "fmt" + "net" + "net/http" + "sync" + "time" + + "github.com/golang/glog" + + utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/pkg/util/wait" + restclient "k8s.io/client-go/rest" +) + +// RefreshCertificateAfterExpiry instruments a restconfig with a transport that checks +// disk to reload expired certificates. +// +// The config must not already provide an explicit transport. +// +// The returned transport periodically checks the manager to determine if the +// certificate has changed. If it has, the transport shuts down all existing client +// connections, forcing the client to re-handshake with the server and use the +// new certificate. +// +// stopCh should be used to indicate when the transport is unused and doesn't need +// to continue checking the manager. +func RefreshCertificateAfterExpiry(stopCh <-chan struct{}, period time.Duration, clientConfig *restclient.Config) error { + if clientConfig.Transport != nil { + return fmt.Errorf("there is already a transport configured") + } + tlsConfig, err := restclient.TLSConfigFor(clientConfig) + if err != nil { + return fmt.Errorf("unable to configure TLS for the rest client: %v", err) + } + if tlsConfig == nil { + tlsConfig = &tls.Config{} + } + manager := &certificateManager{config: *clientConfig, minimumRefresh: period} + tlsConfig.Certificates = nil + tlsConfig.GetClientCertificate = func(requestInfo *tls.CertificateRequestInfo) (*tls.Certificate, error) { + cert := manager.Current() + if cert == nil { + return &tls.Certificate{Certificate: nil}, nil + } + return cert, nil + } + + // Custom dialer that will track all connections it creates. + t := &connTracker{ + dialer: &net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}, + conns: make(map[*closableConn]struct{}), + } + + lastCert := manager.Current() + go wait.Until(func() { + curr := manager.Current() + if curr == nil || lastCert == curr { + // Cert hasn't been rotated. + return + } + lastCert = curr + + glog.Infof("certificate rotation detected, shutting down client connections to start using new credentials") + // The cert has been rotated. Close all existing connections to force the client + // to reperform its TLS handshake with new cert. + // + // See: https://github.com/kubernetes-incubator/bootkube/pull/663#issuecomment-318506493 + t.closeAllConns() + }, period, stopCh) + + clientConfig.Transport = utilnet.SetTransportDefaults(&http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSHandshakeTimeout: 10 * time.Second, + TLSClientConfig: tlsConfig, + MaxIdleConnsPerHost: 25, + DialContext: t.DialContext, // Use custom dialer. + }) + + // Zero out all existing TLS options since our new transport enforces them. + clientConfig.CertData = nil + clientConfig.KeyData = nil + clientConfig.CertFile = "" + clientConfig.KeyFile = "" + clientConfig.CAData = nil + clientConfig.CAFile = "" + clientConfig.Insecure = false + return nil +} + +type certificateManager struct { + config restclient.Config + minimumRefresh time.Duration + + lock sync.Mutex + cert *tls.Certificate + lastCheck time.Time +} + +func (m *certificateManager) Current() *tls.Certificate { + m.lock.Lock() + defer m.lock.Unlock() + cert := m.cert + if cert != nil { + now := time.Now() + if now.After(m.cert.Leaf.NotAfter) { + if now.Sub(m.lastCheck) > m.minimumRefresh { + cert = nil + m.lastCheck = now + } + } + } + if cert == nil { + c, err := tls.LoadX509KeyPair(m.config.CertFile, m.config.KeyFile) + if err != nil { + return nil + } + m.cert = &c + } + return m.cert +} + +// connTracker is a dialer that tracks all open connections it creates. +type connTracker struct { + dialer *net.Dialer + + mu sync.Mutex + conns map[*closableConn]struct{} +} + +// closeAllConns forcibly closes all tracked connections. +func (c *connTracker) closeAllConns() { + c.mu.Lock() + conns := c.conns + c.conns = make(map[*closableConn]struct{}) + c.mu.Unlock() + + for conn := range conns { + conn.Close() + } +} + +func (c *connTracker) DialContext(ctx context.Context, network, address string) (net.Conn, error) { + conn, err := c.dialer.DialContext(ctx, network, address) + if err != nil { + return nil, err + } + + closable := &closableConn{Conn: conn} + + // Start tracking the connection + c.mu.Lock() + c.conns[closable] = struct{}{} + c.mu.Unlock() + + // When the connection is closed, remove it from the map. This will + // be no-op if the connection isn't in the map, e.g. if closeAllConns() + // is called. + closable.onClose = func() { + c.mu.Lock() + delete(c.conns, closable) + c.mu.Unlock() + } + + return closable, nil +} + +type closableConn struct { + onClose func() + net.Conn +} + +func (c *closableConn) Close() error { + go c.onClose() + return c.Conn.Close() +}