Skip to content

Commit

Permalink
Take node out of dnsmasq when stopping or when caches aren't full
Browse files Browse the repository at this point in the history
This prevents dnsmasq from timing out trying to talk to us.
  • Loading branch information
smarterclayton committed Jun 12, 2018
1 parent e67134b commit 9fcc737
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 21 deletions.
28 changes: 20 additions & 8 deletions pkg/dns/dnsmasq.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type dnsmasqMonitor struct {
metricError *prometheus.CounterVec
metricRestart prometheus.Counter

ready func() bool

// dnsIP is the IP address this DNS server is reachable at from dnsmasq
dnsIP string
// dnsDomain is the domain name for this DNS server that dnsmasq should forward to
Expand Down Expand Up @@ -57,7 +59,10 @@ func (m *dnsmasqMonitor) initMetrics() {
}
}

func (m *dnsmasqMonitor) Start() error {
func (m *dnsmasqMonitor) Start(stopCh <-chan struct{}) error {
if m.ready == nil {
m.ready = func() bool { return true }
}
m.initMetrics()
conn, err := utildbus.New().SystemBus()
if err != nil {
Expand All @@ -66,14 +71,16 @@ func (m *dnsmasqMonitor) Start() error {
if err := conn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, fmt.Sprintf("type='signal',path='%s',interface='%s'", dbusDnsmasqPath, dbusDnsmasqInterface)).Store(); err != nil {
return fmt.Errorf("unable to add a match rule to the system DBus: %v", err)
}
go m.run(conn, utilwait.NeverStop)
go m.run(conn, stopCh)
return nil
}

func (m *dnsmasqMonitor) run(conn utildbus.Connection, stopCh <-chan struct{}) {
ch := make(chan *godbus.Signal, 20)
defer func() {
utilruntime.HandleCrash()
// clear our configuration on shutdown
m.refresh(conn, false)
// unregister the handler
conn.Signal(ch)
}()
Expand All @@ -89,7 +96,7 @@ func (m *dnsmasqMonitor) run(conn utildbus.Connection, stopCh <-chan struct{}) {
case "uk.org.thekelleys.dnsmasq.Up":
m.metricRestart.Inc()
glog.V(2).Infof("dnsmasq restarted, refreshing server configuration")
if err := m.refresh(conn); err != nil {
if err := m.refresh(conn, m.ready()); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to refresh dnsmasq status on dnsmasq startup: %v", err))
m.metricError.WithLabelValues("restart").Inc()
} else {
Expand All @@ -101,7 +108,7 @@ func (m *dnsmasqMonitor) run(conn utildbus.Connection, stopCh <-chan struct{}) {

// no matter what, always keep trying to refresh dnsmasq
go utilwait.Until(func() {
if err := m.refresh(conn); err != nil {
if err := m.refresh(conn, m.ready()); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to periodically refresh dnsmasq status: %v", err))
m.metricError.WithLabelValues("periodic").Inc()
} else {
Expand All @@ -113,12 +120,17 @@ func (m *dnsmasqMonitor) run(conn utildbus.Connection, stopCh <-chan struct{}) {
}

// refresh invokes dnsmasq with the requested configuration
func (m *dnsmasqMonitor) refresh(conn utildbus.Connection) error {
func (m *dnsmasqMonitor) refresh(conn utildbus.Connection, ready bool) error {
m.lock.Lock()
defer m.lock.Unlock()
addresses := []string{
fmt.Sprintf("/in-addr.arpa/%s", m.dnsIP),
fmt.Sprintf("/%s/%s", m.dnsDomain, m.dnsIP),
var addresses []string
if ready {
addresses = []string{
fmt.Sprintf("/in-addr.arpa/%s", m.dnsIP),
fmt.Sprintf("/%s/%s", m.dnsDomain, m.dnsIP),
}
} else {
glog.V(2).Infof("DNS data is not ready, removing configuration from dnsmasq")
}
glog.V(5).Infof("Instructing dnsmasq to set the following servers: %v", addresses)
return conn.Object(dbusDnsmasqInterface, dbusDnsmasqPath).
Expand Down
16 changes: 5 additions & 11 deletions pkg/dns/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,44 +25,37 @@ type Server struct {
Services ServiceAccessor
Endpoints EndpointsAccessor
MetricsName string

Stop chan struct{}
}

// NewServer creates a server.
func NewServer(config *server.Config, services ServiceAccessor, endpoints EndpointsAccessor, metricsName string) *Server {
stop := make(chan struct{})
return &Server{
Config: config,
Services: services,
Endpoints: endpoints,
MetricsName: metricsName,
Stop: stop,
}
}

// ListenAndServe starts a DNS server that exposes services and values stored in etcd (if etcdclient
// is not nil). It will block until the server exits.
func (s *Server) ListenAndServe() error {
monitorDnsmasq(s.Config, s.MetricsName)
func (s *Server) ListenAndServe(stopCh <-chan struct{}) error {
monitorDnsmasq(s.Config, s.MetricsName, stopCh, func() bool { return s.Services.HasSynced() && s.Endpoints.HasSynced() })

resolver := NewServiceResolver(s.Config, s.Services, s.Endpoints, openshiftFallback)
resolvers := server.FirstBackend{resolver}
if len(s.MetricsName) > 0 {
metrics.RegisterPrometheusMetrics(s.MetricsName, "")
}
dns := server.New(resolvers, s.Config)
if s.Stop != nil {
defer close(s.Stop)
}
return dns.Run()
}

// monitorDnsmasq attempts to start the dnsmasq monitoring goroutines to keep dnsmasq
// in sync with this server. It will take no action if the current config DnsAddr does
// not point to port 53 (dnsmasq does not support alternate upstream ports). It will
// convert the bind address from 0.0.0.0 to the BindNetwork appropriate listen address.
func monitorDnsmasq(config *server.Config, metricsName string) {
func monitorDnsmasq(config *server.Config, metricsName string, stopCh <-chan struct{}, readyFn func() bool) {
if host, port, err := net.SplitHostPort(config.DnsAddr); err == nil && port == "53" {
if ip := net.ParseIP(host); ip != nil && ip.IsUnspecified() {
if config.BindNetwork == "ipv6" {
Expand All @@ -73,10 +66,11 @@ func monitorDnsmasq(config *server.Config, metricsName string) {
}
monitor := &dnsmasqMonitor{
metricsName: metricsName,
ready: readyFn,
dnsIP: host,
dnsDomain: strings.TrimSuffix(config.Domain, "."),
}
if err := monitor.Start(); err != nil {
if err := monitor.Start(stopCh); err != nil {
glog.Warningf("Unable to start dnsmasq monitor: %v", err)
} else {
glog.V(2).Infof("Monitoring dnsmasq to point cluster queries to %s", host)
Expand Down
24 changes: 22 additions & 2 deletions pkg/dns/serviceaccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ import (
// ServiceAccessor is the interface used by the ServiceResolver to access
// services.
type ServiceAccessor interface {
HasSynced() bool
kcoreclient.ServicesGetter
ServiceByClusterIP(ip string) (*api.Service, error)
}

// cachedServiceAccessor provides a cache of services that can answer queries
// about service lookups efficiently.
type cachedServiceAccessor struct {
store cache.Indexer
store cache.Indexer
hasSynced func() bool
}

// cachedServiceAccessor implements ServiceAccessor
Expand All @@ -48,7 +50,14 @@ func NewCachedServiceAccessor(serviceInformer kcoreinformers.ServiceInformer) (S
if err != nil {
return nil, err
}
return &cachedServiceAccessor{store: serviceInformer.Informer().GetIndexer()}, nil
return &cachedServiceAccessor{
store: serviceInformer.Informer().GetIndexer(),
hasSynced: serviceInformer.Informer().HasSynced,
}, nil
}

func (a *cachedServiceAccessor) HasSynced() bool {
return a.hasSynced()
}

// ServiceByClusterIP returns the first service that matches the provided clusterIP value.
Expand Down Expand Up @@ -139,6 +148,7 @@ func (a cachedServiceNamespacer) ProxyGet(scheme, name, port, path string, param
// EndpointsAccessor is the interface used by the ServiceResolver to access
// endpoints.
type EndpointsAccessor interface {
HasSynced() bool
kcorelisters.EndpointsLister
// EndpointsByHostnameIP retrieves the Endpoints object containing a hostname
// that resolves to IP. Only endpoint addresses with a hostname field will match.
Expand All @@ -152,6 +162,7 @@ type EndpointsAccessor interface {
type cachedEndpointsAccessor struct {
store cache.Indexer
kcorelisters.EndpointsLister
hasSynced func() bool
}

// cachedEndpointsAccessor implements EndpointsAccessor
Expand All @@ -175,6 +186,7 @@ func NewCachedEndpointsAccessor(endpointsInformer kcoreinformers.EndpointsInform
return nil, err
}
return &cachedEndpointsAccessor{
hasSynced: endpointsInformer.Informer().HasSynced,
store: endpointsInformer.Informer().GetIndexer(),
EndpointsLister: endpointsInformer.Lister(),
}, nil
Expand All @@ -194,6 +206,10 @@ func (a *cachedEndpointsAccessor) EndpointsByHostnameIP(ip string) ([]*api.Endpo
return endpoints, nil
}

func (a *cachedEndpointsAccessor) HasSynced() bool {
return a.hasSynced()
}

// indexEndpointsByAddressHostnameIP
func indexEndpointsByAddressHostnameIP(obj interface{}) ([]string, error) {
var keys []string
Expand Down Expand Up @@ -225,3 +241,7 @@ var errNotSupported = fmt.Errorf("hostname lookups not supported")
func (a SimpleEndpointsAccessor) EndpointsByHostnameIP(_ string) ([]*api.Endpoints, error) {
return nil, errNotSupported
}

func (a SimpleEndpointsAccessor) HasSynced() bool {
return true
}

0 comments on commit 9fcc737

Please sign in to comment.