Skip to content

Commit

Permalink
Merge pull request #16740 from smarterclayton/manage_dnsmasq_directly
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue (batch tested with PRs 16725, 16779, 16798, 16783, 16740).

The DNS subsystem should manage keeping dnsmasq in sync

On startup the DNS server will start a background subsystem that uses
DBus to keep dnsmasq pointing in-addr.arpa and cluster.local at the
local DNS server. If the configuration does not allow that (dnsmasq can
only talk to servers on port 53, for example) then DBus will not be
synced. The subsystem listens for dnsmasq restart events and applies
configuration, so no on-disk configuration should be necessary.

If the subsystem cannot talk to dbus the monitor routine is not
configured.

@sdodson with this change I don't believe any other dnsmasq config is 
necessary. The process manages registration on startup, refreshes periodically, and detects restarts.
  • Loading branch information
openshift-merge-robot authored Oct 11, 2017
2 parents 85a99b8 + c90b738 commit 08889c7
Show file tree
Hide file tree
Showing 5 changed files with 363 additions and 5 deletions.
5 changes: 0 additions & 5 deletions images/node/system-container/service.template
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ Type=notify
EnvironmentFile=/etc/sysconfig/$NAME
EnvironmentFile=/etc/sysconfig/$NAME-dep

ExecStartPre=/usr/bin/cp /etc/origin/node/node-dnsmasq.conf /etc/dnsmasq.d/
ExecStartPre=/usr/bin/dbus-send --system --dest=uk.org.thekelleys.dnsmasq /uk/org/thekelleys/dnsmasq uk.org.thekelleys.SetDomainServers array:string:/in-addr.arpa/127.0.0.1,/${DNS_DOMAIN}/127.0.0.1
ExecStopPost=/usr/bin/rm /etc/dnsmasq.d/node-dnsmasq.conf
ExecStopPost=/usr/bin/dbus-send --system --dest=uk.org.thekelleys.dnsmasq /uk/org/thekelleys/dnsmasq uk.org.thekelleys.SetDomainServers array:string:

ExecStartPre=/bin/bash -c 'export -p > /run/$NAME-env'
ExecStart=$EXEC_START
ExecStop=$EXEC_STOP
Expand Down
127 changes: 127 additions & 0 deletions pkg/dns/dnsmasq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package dns

import (
"fmt"
"sync"
"time"

godbus "github.com/godbus/dbus"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilwait "k8s.io/apimachinery/pkg/util/wait"
utildbus "k8s.io/kubernetes/pkg/util/dbus"
)

const (
// dnsmasqRetryInterval is the duration between attempts to register and listen to DBUS.
dnsmasqRetryInterval = 2 * time.Second
// dnsmasqRefreshInterval is the maximum time between refreshes of the current dnsmasq configuration.
dnsmasqRefreshInterval = 30 * time.Second
dbusDnsmasqPath = "/uk/org/thekelleys/dnsmasq"
dbusDnsmasqInterface = "uk.org.thekelleys.dnsmasq"
)

type dnsmasqMonitor struct {
// metricsName is the prefix to apply to registered prometheus metrics. If unset no
// metrics will be registered.
metricsName string
metricError *prometheus.CounterVec
metricRestart prometheus.Counter

// dnsIP is the IP address this DNS server is reachable at from dnsmasq
dnsIP string
// dnsDomain is the domain name for this DNS server that dnsmasq should forward to
dnsDomain string
// lock controls sending a dnsmasq refresh
lock sync.Mutex
}

func (m *dnsmasqMonitor) initMetrics() {
m.metricError = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: m.metricsName,
Subsystem: "dnsmasq_sync",
Name: "error_count_total",
Help: "Counter of sync failures with dnsmasq.",
}, []string{"type"})
m.metricRestart = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.metricsName,
Subsystem: "dnsmasq_sync",
Name: "restart_count_total",
Help: "Counter of restarts detected from dnsmasq.",
})
if len(m.metricsName) > 0 {
prometheus.MustRegister(m.metricError)
prometheus.MustRegister(m.metricRestart)
}
}

func (m *dnsmasqMonitor) Start() error {
m.initMetrics()
conn, err := utildbus.New().SystemBus()
if err != nil {
return fmt.Errorf("cannot connect to DBus: %v", err)
}
if err := conn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, fmt.Sprintf("type='signal',path='%s',interface='%s'", dbusDnsmasqPath, dbusDnsmasqInterface)).Store(); err != nil {
return fmt.Errorf("unable to add a match rule to the system DBus: %v", err)
}
go m.run(conn, utilwait.NeverStop)
return nil
}

func (m *dnsmasqMonitor) run(conn utildbus.Connection, stopCh <-chan struct{}) {
ch := make(chan *godbus.Signal, 20)
defer func() {
utilruntime.HandleCrash()
// unregister the handler
conn.Signal(ch)
}()
conn.Signal(ch)

// watch for dnsmasq restart
go utilwait.Until(func() {
for s := range ch {
if s.Path != dbusDnsmasqPath {
continue
}
switch s.Name {
case "uk.org.thekelleys.dnsmasq.Up":
m.metricRestart.Inc()
glog.V(2).Infof("dnsmasq restarted, refreshing server configuration")
if err := m.refresh(conn); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to refresh dnsmasq status on dnsmasq startup: %v", err))
m.metricError.WithLabelValues("restart").Inc()
} else {
m.metricError.WithLabelValues("restart").Add(0)
}
}
}
}, dnsmasqRetryInterval, stopCh)

// no matter what, always keep trying to refresh dnsmasq
go utilwait.Until(func() {
if err := m.refresh(conn); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to periodically refresh dnsmasq status: %v", err))
m.metricError.WithLabelValues("periodic").Inc()
} else {
m.metricError.WithLabelValues("periodic").Add(0)
}
}, dnsmasqRefreshInterval, stopCh)

<-stopCh
}

// refresh invokes dnsmasq with the requested configuration
func (m *dnsmasqMonitor) refresh(conn utildbus.Connection) error {
m.lock.Lock()
defer m.lock.Unlock()
addresses := []string{
fmt.Sprintf("/in-addr.arpa/%s", m.dnsIP),
fmt.Sprintf("/%s/%s", m.dnsDomain, m.dnsIP),
}
glog.V(5).Infof("Instructing dnsmasq to set the following servers: %v", addresses)
return conn.Object(dbusDnsmasqInterface, dbusDnsmasqPath).
Call("uk.org.thekelleys.SetDomainServers", 0, addresses).
Store()
}
197 changes: 197 additions & 0 deletions pkg/dns/dnsmasq_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package dns

import (
"fmt"
"reflect"
"sync"
"testing"
"time"

godbus "github.com/godbus/dbus"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"

utildbus "k8s.io/kubernetes/pkg/util/dbus"
)

func Test_dnsmasqMonitor_run(t *testing.T) {
m := &dnsmasqMonitor{
dnsIP: "127.0.0.1",
dnsDomain: "test.domain",
}
m.initMetrics()
conn := utildbus.NewFakeConnection()
//fake := utildbus.NewFake(conn, nil)

callCh := make(chan string, 1)
conn.AddObject(dbusDnsmasqInterface, dbusDnsmasqPath, func(method string, args ...interface{}) ([]interface{}, error) {
defer func() { callCh <- method }()
switch method {
case "uk.org.thekelleys.SetDomainServers":
if len(args) != 1 {
t.Errorf("unexpected args: %v", args)
return nil, fmt.Errorf("unexpected args")
}
if arr, ok := args[0].([]string); !ok || !reflect.DeepEqual([]string{"/in-addr.arpa/127.0.0.1", "/test.domain/127.0.0.1"}, arr) {
t.Errorf("unexpected args: %v", args)
return nil, fmt.Errorf("unexpected args")
}
return nil, nil
default:
t.Errorf("unexpected method: %v", method)
return nil, fmt.Errorf("unexpected method")
}
})

stopCh := make(chan struct{})
go m.run(conn, stopCh)

// should always set on startup
if s := <-callCh; s != "uk.org.thekelleys.SetDomainServers" {
t.Errorf("unexpected call: %s", s)
}
select {
case s := <-callCh:
t.Fatalf("got an unexpected second call: %s", s)
default:
}

// restart and ensure we get a set
conn.EmitSignal(dbusDnsmasqInterface, dbusDnsmasqPath, dbusDnsmasqInterface, "Up")
if s := <-callCh; s != "uk.org.thekelleys.SetDomainServers" {
t.Errorf("unexpected call: %s", s)
}

// send a bogus signal and check whether anything was invoked
conn.EmitSignal(dbusDnsmasqInterface, dbusDnsmasqPath, dbusDnsmasqInterface, "Ignore")
select {
case s := <-callCh:
t.Fatalf("got an unexpected second call: %s", s)
default:
}

// shutdown, send one more bogus signal to ensure the channel is empty and the goroutines are done
close(stopCh)
conn.EmitSignal(dbusDnsmasqInterface, dbusDnsmasqPath, dbusDnsmasqInterface, "Ignore")
select {
case s := <-callCh:
t.Fatalf("got an unexpected second call: %s", s)
default:
}
}

type threadsafeDBusConn struct {
lock sync.Mutex
conn *utildbus.DBusFakeConnection
}

func (c *threadsafeDBusConn) BusObject() utildbus.Object {
c.lock.Lock()
defer c.lock.Unlock()
return c.conn.BusObject()
}

func (c *threadsafeDBusConn) Object(name, path string) utildbus.Object {
c.lock.Lock()
defer c.lock.Unlock()
return c.conn.Object(name, path)
}

func (c *threadsafeDBusConn) Signal(ch chan<- *godbus.Signal) {
c.lock.Lock()
defer c.lock.Unlock()
c.conn.Signal(ch)
}

func (c *threadsafeDBusConn) EmitSignal(name, path, iface, signal string, args ...interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
c.conn.EmitSignal(name, path, iface, signal, args...)
}

func Test_dnsmasqMonitor_run_metrics(t *testing.T) {
m := &dnsmasqMonitor{dnsIP: "127.0.0.1", dnsDomain: "test.domain"}
m.initMetrics()
fakeConn := utildbus.NewFakeConnection()
conn := &threadsafeDBusConn{conn: fakeConn}

callCh := make(chan string, 1)
fakeConn.AddObject(dbusDnsmasqInterface, dbusDnsmasqPath, func(method string, args ...interface{}) ([]interface{}, error) {
defer func() { callCh <- method }()
switch method {
case "uk.org.thekelleys.SetDomainServers":
return nil, fmt.Errorf("unable to send error")
default:
t.Errorf("unexpected method: %v", method)
return nil, fmt.Errorf("unexpected method")
}
})

// stops the test
stopCh := make(chan struct{})
// prevents the test from exiting until all values are checked
exitCh := make(chan struct{})
go func() {
m.run(conn, stopCh)
expectCounterValue(t, 2, m.metricRestart)
expectCounterVecValue(t, 1, m.metricError, "periodic")
expectCounterVecValue(t, 2, m.metricError, "restart")
close(exitCh)
}()

// should always set on startup
if s := <-callCh; s != "uk.org.thekelleys.SetDomainServers" {
t.Errorf("unexpected call: %s", s)
}

conn.EmitSignal(dbusDnsmasqInterface, dbusDnsmasqPath, dbusDnsmasqInterface, "Up")
if s := <-callCh; s != "uk.org.thekelleys.SetDomainServers" {
t.Errorf("unexpected call: %s", s)
}
conn.EmitSignal(dbusDnsmasqInterface, dbusDnsmasqPath, dbusDnsmasqInterface, "Up")
if s := <-callCh; s != "uk.org.thekelleys.SetDomainServers" {
t.Errorf("unexpected call: %s", s)
}

// shutdown, send one more bogus signal to ensure the channel is empty and the goroutines are done
close(stopCh)
conn.EmitSignal(dbusDnsmasqInterface, dbusDnsmasqPath, dbusDnsmasqInterface, "Ignore")
select {
case s := <-callCh:
t.Fatalf("got an unexpected second call: %s", s)
default:
}
<-exitCh
}

func expectCounterVecValue(t *testing.T, expect float64, vec *prometheus.CounterVec, labels ...string) {
// loop a number of times to let the value stabilize, because the metric is incremented in a goroutine
// we cannot signal from
for i := 0; ; i++ {
c, err := vec.GetMetricWithLabelValues(labels...)
if err != nil {
t.Error(err)
}
m := &dto.Metric{}
if err := c.Write(m); err != nil {
t.Error(err)
}
if m.Counter.GetValue() == expect {
break
}
if m.Counter.GetValue() > expect || i > 100 {
t.Errorf("%v: value %f != expected %f", labels, m.Counter.GetValue(), expect)
}
time.Sleep(time.Millisecond)
}
}

func expectCounterValue(t *testing.T, expect float64, c prometheus.Counter) {
m := &dto.Metric{}
if err := c.Write(m); err != nil {
t.Error(err)
}
if m.Counter.GetValue() != expect {
t.Errorf("value %f != expected %f", m.Counter.GetValue(), expect)
}
}
33 changes: 33 additions & 0 deletions pkg/dns/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package dns

import (
"net"
"strings"

"github.com/golang/glog"

"github.com/skynetservices/skydns/metrics"
Expand Down Expand Up @@ -41,6 +44,8 @@ func NewServer(config *server.Config, services ServiceAccessor, endpoints Endpoi
// ListenAndServe starts a DNS server that exposes services and values stored in etcd (if etcdclient
// is not nil). It will block until the server exits.
func (s *Server) ListenAndServe() error {
monitorDnsmasq(s.Config, s.MetricsName)

resolver := NewServiceResolver(s.Config, s.Services, s.Endpoints, openshiftFallback)
resolvers := server.FirstBackend{resolver}
if len(s.MetricsName) > 0 {
Expand All @@ -53,6 +58,34 @@ func (s *Server) ListenAndServe() error {
return dns.Run()
}

// monitorDnsmasq attempts to start the dnsmasq monitoring goroutines to keep dnsmasq
// in sync with this server. It will take no action if the current config DnsAddr does
// not point to port 53 (dnsmasq does not support alternate upstream ports). It will
// convert the bind address from 0.0.0.0 to the BindNetwork appropriate listen address.
func monitorDnsmasq(config *server.Config, metricsName string) {
if host, port, err := net.SplitHostPort(config.DnsAddr); err == nil && port == "53" {
if ip := net.ParseIP(host); ip != nil && ip.IsUnspecified() {
if config.BindNetwork == "ipv6" {
host = "::1"
} else {
host = "127.0.0.1"
}
}
monitor := &dnsmasqMonitor{
metricsName: metricsName,
dnsIP: host,
dnsDomain: strings.TrimSuffix(config.Domain, "."),
}
if err := monitor.Start(); err != nil {
glog.Warningf("Unable to start dnsmasq monitor: %v", err)
} else {
glog.V(2).Infof("Monitoring dnsmasq to point cluster queries to %s", host)
}
} else {
glog.Warningf("Unable to keep dnsmasq up to date, %s must point to port 53", config.DnsAddr)
}
}

func openshiftFallback(name string, exact bool) (string, bool) {
if name == "openshift.default.svc" {
return "kubernetes.default.svc.", true
Expand Down
Loading

0 comments on commit 08889c7

Please sign in to comment.