Skip to content

Commit

Permalink
Merge pull request #19987 from smarterclayton/dns
Browse files Browse the repository at this point in the history
Remove the node from dnsmasq config when shutting down
  • Loading branch information
openshift-merge-robot authored Jun 14, 2018
2 parents 9d969d1 + f8d0494 commit 42a5965
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 45 deletions.
4 changes: 2 additions & 2 deletions pkg/cmd/server/kubernetes/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ func (c *NetworkConfig) RunSDN() {
}

// RunDNS starts the DNS server as soon as services are loaded.
func (c *NetworkConfig) RunDNS() {
func (c *NetworkConfig) RunDNS(stopCh <-chan struct{}) {
go func() {
glog.Infof("Starting DNS on %s", c.DNSServer.Config.DnsAddr)
err := c.DNSServer.ListenAndServe()
err := c.DNSServer.ListenAndServe(stopCh)
glog.Fatalf("DNS server failed to start: %v", err)
}()
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/cmd/server/kubernetes/network/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ func Build(options configapi.NodeConfig) (*kubeproxyconfig.KubeProxyConfiguratio
return nil, fmt.Errorf("The provided value to bind to must be an ip:port: %q", addr)
}
proxyconfig.BindAddress = ip.String()
// MetricsBindAddress - disable by default but allow enablement until we switch to
// reading proxy config directly
proxyconfig.MetricsBindAddress = ""
// MetricsBindAddress - enable as a separate port in the 11xxx range for now, but only
// on localhost. Metrics contains no tenant information.
// TODO: move this to a secured port that we can query from prometheus.
proxyconfig.MetricsBindAddress = "localhost:11256"
if arg := options.ProxyArguments["metrics-bind-address"]; len(arg) > 0 {
proxyconfig.MetricsBindAddress = arg[0]
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/cmd/server/origin/dns_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

"github.com/golang/glog"

"k8s.io/apimachinery/pkg/util/wait"

cmdutil "github.com/openshift/origin/pkg/cmd/util"
"github.com/openshift/origin/pkg/dns"
)
Expand Down Expand Up @@ -52,7 +54,7 @@ func (c *MasterConfig) RunDNSServer() {

go func() {
s := dns.NewServer(config, services, endpoints, "apiserver")
err := s.ListenAndServe()
err := s.ListenAndServe(wait.NeverStop)
glog.Fatalf("Could not start DNS: %v", err)
}()

Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/server/start/start_allinone.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/spf13/cobra"

kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/flag"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
kcmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
Expand All @@ -26,7 +27,6 @@ import (
configapi "github.com/openshift/origin/pkg/cmd/server/apis/config"
"github.com/openshift/origin/pkg/cmd/server/origin"
tsbcmd "github.com/openshift/origin/pkg/templateservicebroker/cmd/server"
"k8s.io/apimachinery/pkg/util/wait"
)

type AllInOneOptions struct {
Expand Down Expand Up @@ -310,7 +310,7 @@ func (o AllInOneOptions) StartAllInOne() error {
ConfigFile: o.NodeConfigFile,
Output: o.MasterOptions.Output,
}
if err := nodeOptions.RunNode(); err != nil {
if err := nodeOptions.RunNode(wait.NeverStop); err != nil {
return err
}

Expand Down
33 changes: 23 additions & 10 deletions pkg/cmd/server/start/start_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"strings"
"syscall"
"time"

"github.com/coreos/go-systemd/daemon"
"github.com/golang/glog"
Expand All @@ -18,11 +19,11 @@ import (
kerrors "k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
kubeletapp "k8s.io/kubernetes/cmd/kubelet/app"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
kcmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/util/interrupt"

"github.com/openshift/library-go/pkg/crypto"
"github.com/openshift/origin/pkg/cmd/server/admin"
Expand Down Expand Up @@ -115,14 +116,26 @@ var networkLong = templates.LongDesc(`

// NewCommandStartNetwork provides a CLI handler for 'start network' command
func NewCommandStartNetwork(basename string, out, errout io.Writer) (*cobra.Command, *NodeOptions) {
options := &NodeOptions{Output: out}
options := &NodeOptions{
ExpireDays: crypto.DefaultCertificateLifetimeInDays,
Output: out,
}

cmd := &cobra.Command{
Use: "network",
Short: "Launch node network",
Long: fmt.Sprintf(networkLong, basename),
Run: func(c *cobra.Command, args []string) {
options.Run(c, errout, args, wait.NeverStop)
ch := make(chan struct{})
interrupt.New(func(s os.Signal) {
close(ch)
fmt.Fprintf(errout, "interrupt: Gracefully shutting down ...\n")
time.Sleep(200 * time.Millisecond)
os.Exit(1)
}).Run(func() error {
options.Run(c, errout, args, ch)
return nil
})
},
}

Expand Down Expand Up @@ -209,7 +222,7 @@ func (o NodeOptions) Complete(cmd *cobra.Command) error {

// StartNode calls RunNode and then waits forever
func (o NodeOptions) StartNode(stopCh <-chan struct{}) error {
if err := o.RunNode(); err != nil {
if err := o.RunNode(stopCh); err != nil {
return err
}

Expand All @@ -227,7 +240,7 @@ func (o NodeOptions) StartNode(stopCh <-chan struct{}) error {
// 2. Reads fully specified node config OR builds a fully specified node config from the args
// 3. Writes the fully specified node config and exits if needed
// 4. Starts the node based on the fully specified config
func (o NodeOptions) RunNode() error {
func (o NodeOptions) RunNode(stopCh <-chan struct{}) error {
nodeConfig, configFile, err := o.resolveNodeConfig()
if err != nil {
return err
Expand Down Expand Up @@ -277,7 +290,7 @@ func (o NodeOptions) RunNode() error {
return originnode.WriteKubeletFlags(*nodeConfig)
}

return StartNode(*nodeConfig, o.NodeArgs.Components)
return StartNode(*nodeConfig, o.NodeArgs.Components, stopCh)
}

// resolveNodeConfig creates a new configuration on disk by reading from the master, reads
Expand Down Expand Up @@ -421,7 +434,7 @@ func execKubelet(kubeletArgs []string) error {
}

// StartNode launches the node processes.
func StartNode(nodeConfig configapi.NodeConfig, components *utilflags.ComponentFlag) error {
func StartNode(nodeConfig configapi.NodeConfig, components *utilflags.ComponentFlag, stopCh <-chan struct{}) error {
kubeletArgs, err := nodeoptions.ComputeKubeletFlags(nodeConfig.KubeletArguments, nodeConfig)
if err != nil {
return fmt.Errorf("cannot create kubelet args: %v", err)
Expand Down Expand Up @@ -476,12 +489,12 @@ func StartNode(nodeConfig configapi.NodeConfig, components *utilflags.ComponentF
networkConfig.RunProxy()
}
if components.Enabled(ComponentDNS) && networkConfig.DNSServer != nil {
networkConfig.RunDNS()
networkConfig.RunDNS(stopCh)
}

networkConfig.InternalKubeInformers.Start(wait.NeverStop)
networkConfig.InternalKubeInformers.Start(stopCh)
if networkConfig.InternalNetworkInformers != nil {
networkConfig.InternalNetworkInformers.Start(wait.NeverStop)
networkConfig.InternalNetworkInformers.Start(stopCh)
}

return nil
Expand Down
35 changes: 26 additions & 9 deletions pkg/dns/dnsmasq.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type dnsmasqMonitor struct {
metricError *prometheus.CounterVec
metricRestart prometheus.Counter

ready func() bool

// dnsIP is the IP address this DNS server is reachable at from dnsmasq
dnsIP string
// dnsDomain is the domain name for this DNS server that dnsmasq should forward to
Expand Down Expand Up @@ -57,7 +59,10 @@ func (m *dnsmasqMonitor) initMetrics() {
}
}

func (m *dnsmasqMonitor) Start() error {
func (m *dnsmasqMonitor) Start(stopCh <-chan struct{}) error {
if m.ready == nil {
m.ready = func() bool { return true }
}
m.initMetrics()
conn, err := utildbus.New().SystemBus()
if err != nil {
Expand All @@ -66,14 +71,17 @@ func (m *dnsmasqMonitor) Start() error {
if err := conn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, fmt.Sprintf("type='signal',path='%s',interface='%s'", dbusDnsmasqPath, dbusDnsmasqInterface)).Store(); err != nil {
return fmt.Errorf("unable to add a match rule to the system DBus: %v", err)
}
go m.run(conn, utilwait.NeverStop)
go m.run(conn, stopCh)
return nil
}

func (m *dnsmasqMonitor) run(conn utildbus.Connection, stopCh <-chan struct{}) {
ch := make(chan *godbus.Signal, 20)
defer func() {
utilruntime.HandleCrash()
glog.V(2).Infof("dnsmasq monitor shutting down")
// clear our configuration on shutdown
m.refresh(conn, false)
// unregister the handler
conn.Signal(ch)
}()
Expand All @@ -89,7 +97,7 @@ func (m *dnsmasqMonitor) run(conn utildbus.Connection, stopCh <-chan struct{}) {
case "uk.org.thekelleys.dnsmasq.Up":
m.metricRestart.Inc()
glog.V(2).Infof("dnsmasq restarted, refreshing server configuration")
if err := m.refresh(conn); err != nil {
if err := m.refresh(conn, m.ready()); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to refresh dnsmasq status on dnsmasq startup: %v", err))
m.metricError.WithLabelValues("restart").Inc()
} else {
Expand All @@ -101,7 +109,11 @@ func (m *dnsmasqMonitor) run(conn utildbus.Connection, stopCh <-chan struct{}) {

// no matter what, always keep trying to refresh dnsmasq
go utilwait.Until(func() {
if err := m.refresh(conn); err != nil {
for !m.ready() {
glog.V(4).Infof("Waiting for DNS data to be available to update dnsmasq")
time.Sleep(time.Second)
}
if err := m.refresh(conn, true); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to periodically refresh dnsmasq status: %v", err))
m.metricError.WithLabelValues("periodic").Inc()
} else {
Expand All @@ -113,14 +125,19 @@ func (m *dnsmasqMonitor) run(conn utildbus.Connection, stopCh <-chan struct{}) {
}

// refresh invokes dnsmasq with the requested configuration
func (m *dnsmasqMonitor) refresh(conn utildbus.Connection) error {
func (m *dnsmasqMonitor) refresh(conn utildbus.Connection, ready bool) error {
m.lock.Lock()
defer m.lock.Unlock()
addresses := []string{
fmt.Sprintf("/in-addr.arpa/%s", m.dnsIP),
fmt.Sprintf("/%s/%s", m.dnsDomain, m.dnsIP),
var addresses []string
if ready {
addresses = []string{
fmt.Sprintf("/in-addr.arpa/%s", m.dnsIP),
fmt.Sprintf("/%s/%s", m.dnsDomain, m.dnsIP),
}
glog.V(4).Infof("Instructing dnsmasq to set the following servers: %v", addresses)
} else {
glog.V(2).Infof("DNS data is not ready, removing configuration from dnsmasq")
}
glog.V(5).Infof("Instructing dnsmasq to set the following servers: %v", addresses)
return conn.Object(dbusDnsmasqInterface, dbusDnsmasqPath).
Call("uk.org.thekelleys.SetDomainServers", 0, addresses).
Store()
Expand Down
27 changes: 23 additions & 4 deletions pkg/dns/dnsmasq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ func Test_dnsmasqMonitor_run(t *testing.T) {
m := &dnsmasqMonitor{
dnsIP: "127.0.0.1",
dnsDomain: "test.domain",
ready: func() bool { return true },
}
m.initMetrics()
conn := utildbus.NewFakeConnection()
//fake := utildbus.NewFake(conn, nil)
stopCh := make(chan struct{})

callCh := make(chan string, 1)
conn.AddObject(dbusDnsmasqInterface, dbusDnsmasqPath, func(method string, args ...interface{}) ([]interface{}, error) {
Expand All @@ -32,18 +33,36 @@ func Test_dnsmasqMonitor_run(t *testing.T) {
t.Errorf("unexpected args: %v", args)
return nil, fmt.Errorf("unexpected args")
}
if arr, ok := args[0].([]string); !ok || !reflect.DeepEqual([]string{"/in-addr.arpa/127.0.0.1", "/test.domain/127.0.0.1"}, arr) {

// we send empty after shutdown
completed := false
select {
case <-stopCh:
completed = true
default:
}

arr, ok := args[0].([]string)
if !ok {
t.Errorf("unexpected args: %v", args)
return nil, fmt.Errorf("unexpected args")
}
expected := []string{"/in-addr.arpa/127.0.0.1", "/test.domain/127.0.0.1"}
if completed {
expected = nil
}
if !reflect.DeepEqual(expected, arr) {
t.Errorf("unexpected args: %v", args)
return nil, fmt.Errorf("unexpected args")
}

return nil, nil
default:
t.Errorf("unexpected method: %v", method)
return nil, fmt.Errorf("unexpected method")
}
})

stopCh := make(chan struct{})
go m.run(conn, stopCh)

// should always set on startup
Expand Down Expand Up @@ -110,7 +129,7 @@ func (c *threadsafeDBusConn) EmitSignal(name, path, iface, signal string, args .
}

func Test_dnsmasqMonitor_run_metrics(t *testing.T) {
m := &dnsmasqMonitor{dnsIP: "127.0.0.1", dnsDomain: "test.domain"}
m := &dnsmasqMonitor{dnsIP: "127.0.0.1", dnsDomain: "test.domain", ready: func() bool { return true }}
m.initMetrics()
fakeConn := utildbus.NewFakeConnection()
conn := &threadsafeDBusConn{conn: fakeConn}
Expand Down
16 changes: 5 additions & 11 deletions pkg/dns/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,44 +25,37 @@ type Server struct {
Services ServiceAccessor
Endpoints EndpointsAccessor
MetricsName string

Stop chan struct{}
}

// NewServer creates a server.
func NewServer(config *server.Config, services ServiceAccessor, endpoints EndpointsAccessor, metricsName string) *Server {
stop := make(chan struct{})
return &Server{
Config: config,
Services: services,
Endpoints: endpoints,
MetricsName: metricsName,
Stop: stop,
}
}

// ListenAndServe starts a DNS server that exposes services and values stored in etcd (if etcdclient
// is not nil). It will block until the server exits.
func (s *Server) ListenAndServe() error {
monitorDnsmasq(s.Config, s.MetricsName)
func (s *Server) ListenAndServe(stopCh <-chan struct{}) error {
monitorDnsmasq(s.Config, s.MetricsName, stopCh, func() bool { return s.Services.HasSynced() && s.Endpoints.HasSynced() })

resolver := NewServiceResolver(s.Config, s.Services, s.Endpoints, openshiftFallback)
resolvers := server.FirstBackend{resolver}
if len(s.MetricsName) > 0 {
metrics.RegisterPrometheusMetrics(s.MetricsName, "")
}
dns := server.New(resolvers, s.Config)
if s.Stop != nil {
defer close(s.Stop)
}
return dns.Run()
}

// monitorDnsmasq attempts to start the dnsmasq monitoring goroutines to keep dnsmasq
// in sync with this server. It will take no action if the current config DnsAddr does
// not point to port 53 (dnsmasq does not support alternate upstream ports). It will
// convert the bind address from 0.0.0.0 to the BindNetwork appropriate listen address.
func monitorDnsmasq(config *server.Config, metricsName string) {
func monitorDnsmasq(config *server.Config, metricsName string, stopCh <-chan struct{}, readyFn func() bool) {
if host, port, err := net.SplitHostPort(config.DnsAddr); err == nil && port == "53" {
if ip := net.ParseIP(host); ip != nil && ip.IsUnspecified() {
if config.BindNetwork == "ipv6" {
Expand All @@ -73,10 +66,11 @@ func monitorDnsmasq(config *server.Config, metricsName string) {
}
monitor := &dnsmasqMonitor{
metricsName: metricsName,
ready: readyFn,
dnsIP: host,
dnsDomain: strings.TrimSuffix(config.Domain, "."),
}
if err := monitor.Start(); err != nil {
if err := monitor.Start(stopCh); err != nil {
glog.Warningf("Unable to start dnsmasq monitor: %v", err)
} else {
glog.V(2).Infof("Monitoring dnsmasq to point cluster queries to %s", host)
Expand Down
Loading

0 comments on commit 42a5965

Please sign in to comment.