Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the node from dnsmasq config when shutting down #19987

Merged
merged 3 commits into from
Jun 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/cmd/server/kubernetes/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ func (c *NetworkConfig) RunSDN() {
}

// RunDNS starts the DNS server as soon as services are loaded.
func (c *NetworkConfig) RunDNS() {
func (c *NetworkConfig) RunDNS(stopCh <-chan struct{}) {
go func() {
glog.Infof("Starting DNS on %s", c.DNSServer.Config.DnsAddr)
err := c.DNSServer.ListenAndServe()
err := c.DNSServer.ListenAndServe(stopCh)
glog.Fatalf("DNS server failed to start: %v", err)
}()
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/cmd/server/kubernetes/network/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ func Build(options configapi.NodeConfig) (*kubeproxyconfig.KubeProxyConfiguratio
return nil, fmt.Errorf("The provided value to bind to must be an ip:port: %q", addr)
}
proxyconfig.BindAddress = ip.String()
// MetricsBindAddress - disable by default but allow enablement until we switch to
// reading proxy config directly
proxyconfig.MetricsBindAddress = ""
// MetricsBindAddress - enable as a separate port in the 11xxx range for now, but only
// on localhost. Metrics contains no tenant information.
// TODO: move this to a secured port that we can query from prometheus.
proxyconfig.MetricsBindAddress = "localhost:11256"
if arg := options.ProxyArguments["metrics-bind-address"]; len(arg) > 0 {
proxyconfig.MetricsBindAddress = arg[0]
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/cmd/server/origin/dns_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

"github.com/golang/glog"

"k8s.io/apimachinery/pkg/util/wait"

cmdutil "github.com/openshift/origin/pkg/cmd/util"
"github.com/openshift/origin/pkg/dns"
)
Expand Down Expand Up @@ -52,7 +54,7 @@ func (c *MasterConfig) RunDNSServer() {

go func() {
s := dns.NewServer(config, services, endpoints, "apiserver")
err := s.ListenAndServe()
err := s.ListenAndServe(wait.NeverStop)
glog.Fatalf("Could not start DNS: %v", err)
}()

Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/server/start/start_allinone.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/spf13/cobra"

kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/flag"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
kcmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
Expand All @@ -26,7 +27,6 @@ import (
configapi "github.com/openshift/origin/pkg/cmd/server/apis/config"
"github.com/openshift/origin/pkg/cmd/server/origin"
tsbcmd "github.com/openshift/origin/pkg/templateservicebroker/cmd/server"
"k8s.io/apimachinery/pkg/util/wait"
)

type AllInOneOptions struct {
Expand Down Expand Up @@ -310,7 +310,7 @@ func (o AllInOneOptions) StartAllInOne() error {
ConfigFile: o.NodeConfigFile,
Output: o.MasterOptions.Output,
}
if err := nodeOptions.RunNode(); err != nil {
if err := nodeOptions.RunNode(wait.NeverStop); err != nil {
return err
}

Expand Down
33 changes: 23 additions & 10 deletions pkg/cmd/server/start/start_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"strings"
"syscall"
"time"

"github.com/coreos/go-systemd/daemon"
"github.com/golang/glog"
Expand All @@ -18,11 +19,11 @@ import (
kerrors "k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
kubeletapp "k8s.io/kubernetes/cmd/kubelet/app"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
kcmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/util/interrupt"

"github.com/openshift/library-go/pkg/crypto"
"github.com/openshift/origin/pkg/cmd/server/admin"
Expand Down Expand Up @@ -115,14 +116,26 @@ var networkLong = templates.LongDesc(`

// NewCommandStartNetwork provides a CLI handler for 'start network' command
func NewCommandStartNetwork(basename string, out, errout io.Writer) (*cobra.Command, *NodeOptions) {
options := &NodeOptions{Output: out}
options := &NodeOptions{
ExpireDays: crypto.DefaultCertificateLifetimeInDays,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't start openshift start network anymore without args - someone broke it in a refactor at some point in the last month or so.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh :-(

Output: out,
}

cmd := &cobra.Command{
Use: "network",
Short: "Launch node network",
Long: fmt.Sprintf(networkLong, basename),
Run: func(c *cobra.Command, args []string) {
options.Run(c, errout, args, wait.NeverStop)
ch := make(chan struct{})
interrupt.New(func(s os.Signal) {
close(ch)
fmt.Fprintf(errout, "interrupt: Gracefully shutting down ...\n")
time.Sleep(200 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is much better than 1 second?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 times better.

os.Exit(1)
}).Run(func() error {
options.Run(c, errout, args, ch)
return nil
})
},
}

Expand Down Expand Up @@ -209,7 +222,7 @@ func (o NodeOptions) Complete(cmd *cobra.Command) error {

// StartNode calls RunNode and then waits forever
func (o NodeOptions) StartNode(stopCh <-chan struct{}) error {
if err := o.RunNode(); err != nil {
if err := o.RunNode(stopCh); err != nil {
return err
}

Expand All @@ -227,7 +240,7 @@ func (o NodeOptions) StartNode(stopCh <-chan struct{}) error {
// 2. Reads fully specified node config OR builds a fully specified node config from the args
// 3. Writes the fully specified node config and exits if needed
// 4. Starts the node based on the fully specified config
func (o NodeOptions) RunNode() error {
func (o NodeOptions) RunNode(stopCh <-chan struct{}) error {
nodeConfig, configFile, err := o.resolveNodeConfig()
if err != nil {
return err
Expand Down Expand Up @@ -277,7 +290,7 @@ func (o NodeOptions) RunNode() error {
return originnode.WriteKubeletFlags(*nodeConfig)
}

return StartNode(*nodeConfig, o.NodeArgs.Components)
return StartNode(*nodeConfig, o.NodeArgs.Components, stopCh)
}

// resolveNodeConfig creates a new configuration on disk by reading from the master, reads
Expand Down Expand Up @@ -421,7 +434,7 @@ func execKubelet(kubeletArgs []string) error {
}

// StartNode launches the node processes.
func StartNode(nodeConfig configapi.NodeConfig, components *utilflags.ComponentFlag) error {
func StartNode(nodeConfig configapi.NodeConfig, components *utilflags.ComponentFlag, stopCh <-chan struct{}) error {
kubeletArgs, err := nodeoptions.ComputeKubeletFlags(nodeConfig.KubeletArguments, nodeConfig)
if err != nil {
return fmt.Errorf("cannot create kubelet args: %v", err)
Expand Down Expand Up @@ -476,12 +489,12 @@ func StartNode(nodeConfig configapi.NodeConfig, components *utilflags.ComponentF
networkConfig.RunProxy()
}
if components.Enabled(ComponentDNS) && networkConfig.DNSServer != nil {
networkConfig.RunDNS()
networkConfig.RunDNS(stopCh)
}

networkConfig.InternalKubeInformers.Start(wait.NeverStop)
networkConfig.InternalKubeInformers.Start(stopCh)
if networkConfig.InternalNetworkInformers != nil {
networkConfig.InternalNetworkInformers.Start(wait.NeverStop)
networkConfig.InternalNetworkInformers.Start(stopCh)
}

return nil
Expand Down
35 changes: 26 additions & 9 deletions pkg/dns/dnsmasq.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type dnsmasqMonitor struct {
metricError *prometheus.CounterVec
metricRestart prometheus.Counter

ready func() bool

// dnsIP is the IP address this DNS server is reachable at from dnsmasq
dnsIP string
// dnsDomain is the domain name for this DNS server that dnsmasq should forward to
Expand Down Expand Up @@ -57,7 +59,10 @@ func (m *dnsmasqMonitor) initMetrics() {
}
}

func (m *dnsmasqMonitor) Start() error {
func (m *dnsmasqMonitor) Start(stopCh <-chan struct{}) error {
if m.ready == nil {
m.ready = func() bool { return true }
}
m.initMetrics()
conn, err := utildbus.New().SystemBus()
if err != nil {
Expand All @@ -66,14 +71,17 @@ func (m *dnsmasqMonitor) Start() error {
if err := conn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, fmt.Sprintf("type='signal',path='%s',interface='%s'", dbusDnsmasqPath, dbusDnsmasqInterface)).Store(); err != nil {
return fmt.Errorf("unable to add a match rule to the system DBus: %v", err)
}
go m.run(conn, utilwait.NeverStop)
go m.run(conn, stopCh)
return nil
}

func (m *dnsmasqMonitor) run(conn utildbus.Connection, stopCh <-chan struct{}) {
ch := make(chan *godbus.Signal, 20)
defer func() {
utilruntime.HandleCrash()
glog.V(2).Infof("dnsmasq monitor shutting down")
// clear our configuration on shutdown
m.refresh(conn, false)
// unregister the handler
conn.Signal(ch)
}()
Expand All @@ -89,7 +97,7 @@ func (m *dnsmasqMonitor) run(conn utildbus.Connection, stopCh <-chan struct{}) {
case "uk.org.thekelleys.dnsmasq.Up":
m.metricRestart.Inc()
glog.V(2).Infof("dnsmasq restarted, refreshing server configuration")
if err := m.refresh(conn); err != nil {
if err := m.refresh(conn, m.ready()); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to refresh dnsmasq status on dnsmasq startup: %v", err))
m.metricError.WithLabelValues("restart").Inc()
} else {
Expand All @@ -101,7 +109,11 @@ func (m *dnsmasqMonitor) run(conn utildbus.Connection, stopCh <-chan struct{}) {

// no matter what, always keep trying to refresh dnsmasq
go utilwait.Until(func() {
if err := m.refresh(conn); err != nil {
for !m.ready() {
glog.V(4).Infof("Waiting for DNS data to be available to update dnsmasq")
time.Sleep(time.Second)
}
if err := m.refresh(conn, true); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to periodically refresh dnsmasq status: %v", err))
m.metricError.WithLabelValues("periodic").Inc()
} else {
Expand All @@ -113,14 +125,19 @@ func (m *dnsmasqMonitor) run(conn utildbus.Connection, stopCh <-chan struct{}) {
}

// refresh invokes dnsmasq with the requested configuration
func (m *dnsmasqMonitor) refresh(conn utildbus.Connection) error {
func (m *dnsmasqMonitor) refresh(conn utildbus.Connection, ready bool) error {
m.lock.Lock()
defer m.lock.Unlock()
addresses := []string{
fmt.Sprintf("/in-addr.arpa/%s", m.dnsIP),
fmt.Sprintf("/%s/%s", m.dnsDomain, m.dnsIP),
var addresses []string
if ready {
addresses = []string{
fmt.Sprintf("/in-addr.arpa/%s", m.dnsIP),
fmt.Sprintf("/%s/%s", m.dnsDomain, m.dnsIP),
}
glog.V(4).Infof("Instructing dnsmasq to set the following servers: %v", addresses)
} else {
glog.V(2).Infof("DNS data is not ready, removing configuration from dnsmasq")
}
glog.V(5).Infof("Instructing dnsmasq to set the following servers: %v", addresses)
return conn.Object(dbusDnsmasqInterface, dbusDnsmasqPath).
Call("uk.org.thekelleys.SetDomainServers", 0, addresses).
Store()
Expand Down
27 changes: 23 additions & 4 deletions pkg/dns/dnsmasq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ func Test_dnsmasqMonitor_run(t *testing.T) {
m := &dnsmasqMonitor{
dnsIP: "127.0.0.1",
dnsDomain: "test.domain",
ready: func() bool { return true },
}
m.initMetrics()
conn := utildbus.NewFakeConnection()
//fake := utildbus.NewFake(conn, nil)
stopCh := make(chan struct{})

callCh := make(chan string, 1)
conn.AddObject(dbusDnsmasqInterface, dbusDnsmasqPath, func(method string, args ...interface{}) ([]interface{}, error) {
Expand All @@ -32,18 +33,36 @@ func Test_dnsmasqMonitor_run(t *testing.T) {
t.Errorf("unexpected args: %v", args)
return nil, fmt.Errorf("unexpected args")
}
if arr, ok := args[0].([]string); !ok || !reflect.DeepEqual([]string{"/in-addr.arpa/127.0.0.1", "/test.domain/127.0.0.1"}, arr) {

// we send empty after shutdown
completed := false
select {
case <-stopCh:
completed = true
default:
}

arr, ok := args[0].([]string)
if !ok {
t.Errorf("unexpected args: %v", args)
return nil, fmt.Errorf("unexpected args")
}
expected := []string{"/in-addr.arpa/127.0.0.1", "/test.domain/127.0.0.1"}
if completed {
expected = nil
}
if !reflect.DeepEqual(expected, arr) {
t.Errorf("unexpected args: %v", args)
return nil, fmt.Errorf("unexpected args")
}

return nil, nil
default:
t.Errorf("unexpected method: %v", method)
return nil, fmt.Errorf("unexpected method")
}
})

stopCh := make(chan struct{})
go m.run(conn, stopCh)

// should always set on startup
Expand Down Expand Up @@ -110,7 +129,7 @@ func (c *threadsafeDBusConn) EmitSignal(name, path, iface, signal string, args .
}

func Test_dnsmasqMonitor_run_metrics(t *testing.T) {
m := &dnsmasqMonitor{dnsIP: "127.0.0.1", dnsDomain: "test.domain"}
m := &dnsmasqMonitor{dnsIP: "127.0.0.1", dnsDomain: "test.domain", ready: func() bool { return true }}
m.initMetrics()
fakeConn := utildbus.NewFakeConnection()
conn := &threadsafeDBusConn{conn: fakeConn}
Expand Down
16 changes: 5 additions & 11 deletions pkg/dns/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,44 +25,37 @@ type Server struct {
Services ServiceAccessor
Endpoints EndpointsAccessor
MetricsName string

Stop chan struct{}
}

// NewServer creates a server.
func NewServer(config *server.Config, services ServiceAccessor, endpoints EndpointsAccessor, metricsName string) *Server {
stop := make(chan struct{})
return &Server{
Config: config,
Services: services,
Endpoints: endpoints,
MetricsName: metricsName,
Stop: stop,
}
}

// ListenAndServe starts a DNS server that exposes services and values stored in etcd (if etcdclient
// is not nil). It will block until the server exits.
func (s *Server) ListenAndServe() error {
monitorDnsmasq(s.Config, s.MetricsName)
func (s *Server) ListenAndServe(stopCh <-chan struct{}) error {
monitorDnsmasq(s.Config, s.MetricsName, stopCh, func() bool { return s.Services.HasSynced() && s.Endpoints.HasSynced() })

resolver := NewServiceResolver(s.Config, s.Services, s.Endpoints, openshiftFallback)
resolvers := server.FirstBackend{resolver}
if len(s.MetricsName) > 0 {
metrics.RegisterPrometheusMetrics(s.MetricsName, "")
}
dns := server.New(resolvers, s.Config)
if s.Stop != nil {
defer close(s.Stop)
}
return dns.Run()
}

// monitorDnsmasq attempts to start the dnsmasq monitoring goroutines to keep dnsmasq
// in sync with this server. It will take no action if the current config DnsAddr does
// not point to port 53 (dnsmasq does not support alternate upstream ports). It will
// convert the bind address from 0.0.0.0 to the BindNetwork appropriate listen address.
func monitorDnsmasq(config *server.Config, metricsName string) {
func monitorDnsmasq(config *server.Config, metricsName string, stopCh <-chan struct{}, readyFn func() bool) {
if host, port, err := net.SplitHostPort(config.DnsAddr); err == nil && port == "53" {
if ip := net.ParseIP(host); ip != nil && ip.IsUnspecified() {
if config.BindNetwork == "ipv6" {
Expand All @@ -73,10 +66,11 @@ func monitorDnsmasq(config *server.Config, metricsName string) {
}
monitor := &dnsmasqMonitor{
metricsName: metricsName,
ready: readyFn,
dnsIP: host,
dnsDomain: strings.TrimSuffix(config.Domain, "."),
}
if err := monitor.Start(); err != nil {
if err := monitor.Start(stopCh); err != nil {
glog.Warningf("Unable to start dnsmasq monitor: %v", err)
} else {
glog.V(2).Infof("Monitoring dnsmasq to point cluster queries to %s", host)
Expand Down
Loading