Skip to content

Commit

Permalink
*: add max requests bytes, keepalive to server, blackhole methods to …
Browse files Browse the repository at this point in the history
…integration

Signed-off-by: Gyu-Ho Lee <[email protected]>
  • Loading branch information
gyuho committed Nov 16, 2017
1 parent 2a6d504 commit 939337f
Show file tree
Hide file tree
Showing 14 changed files with 275 additions and 81 deletions.
67 changes: 47 additions & 20 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/http"
"net/url"
"strings"
"time"

"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/pkg/cors"
Expand All @@ -37,9 +38,13 @@ const (
ClusterStateFlagNew = "new"
ClusterStateFlagExisting = "existing"

DefaultName = "default"
DefaultMaxSnapshots = 5
DefaultMaxWALs = 5
DefaultName = "default"
DefaultMaxSnapshots = 5
DefaultMaxWALs = 5
DefaultMaxRequestBytes = 1.5 * 1024 * 1024
DefaultGRPCKeepAliveMinTime = 5 * time.Second
DefaultGRPCKeepAliveInterval = 2 * time.Hour
DefaultGRPCKeepAliveTimeout = 20 * time.Second

DefaultListenPeerURLs = "http://localhost:2380"
DefaultListenClientURLs = "http://localhost:2379"
Expand Down Expand Up @@ -85,6 +90,24 @@ type Config struct {
TickMs uint `json:"heartbeat-interval"`
ElectionMs uint `json:"election-timeout"`
QuotaBackendBytes int64 `json:"quota-backend-bytes"`
MaxRequestBytes uint `json:"max-request-bytes"`

// gRPC server options

// GRPCKeepAliveMinTime is the minimum interval that a client should
// wait before pinging server. When client pings "too fast", server
// sends goaway and closes the connection (errors: too_many_pings,
// http2.ErrCodeEnhanceYourCalm). When too slow, nothing happens.
// Server expects client pings only when there is any active streams
// (PermitWithoutStream is set false).
GRPCKeepAliveMinTime time.Duration `json:"grpc-keepalive-min-time"`
// GRPCKeepAliveInterval is the frequency of server-to-client ping
// to check if a connection is alive. Close a non-responsive connection
// after an additional duration of Timeout. 0 to disable.
GRPCKeepAliveInterval time.Duration `json:"grpc-keepalive-interval"`
// GRPCKeepAliveTimeout is the additional duration of wait
// before closing a non-responsive connection. 0 to disable.
GRPCKeepAliveTimeout time.Duration `json:"grpc-keepalive-timeout"`

// clustering

Expand Down Expand Up @@ -167,23 +190,27 @@ func NewConfig() *Config {
lcurl, _ := url.Parse(DefaultListenClientURLs)
acurl, _ := url.Parse(DefaultAdvertiseClientURLs)
cfg := &Config{
CorsInfo: &cors.CORSInfo{},
MaxSnapFiles: DefaultMaxSnapshots,
MaxWalFiles: DefaultMaxWALs,
Name: DefaultName,
SnapCount: etcdserver.DefaultSnapCount,
TickMs: 100,
ElectionMs: 1000,
LPUrls: []url.URL{*lpurl},
LCUrls: []url.URL{*lcurl},
APUrls: []url.URL{*apurl},
ACUrls: []url.URL{*acurl},
ClusterState: ClusterStateFlagNew,
InitialClusterToken: "etcd-cluster",
StrictReconfigCheck: true,
Metrics: "basic",
EnableV2: true,
AuthToken: "simple",
CorsInfo: &cors.CORSInfo{},
MaxSnapFiles: DefaultMaxSnapshots,
MaxWalFiles: DefaultMaxWALs,
Name: DefaultName,
SnapCount: etcdserver.DefaultSnapCount,
MaxRequestBytes: DefaultMaxRequestBytes,
GRPCKeepAliveMinTime: DefaultGRPCKeepAliveMinTime,
GRPCKeepAliveInterval: DefaultGRPCKeepAliveInterval,
GRPCKeepAliveTimeout: DefaultGRPCKeepAliveTimeout,
TickMs: 100,
ElectionMs: 1000,
LPUrls: []url.URL{*lpurl},
LCUrls: []url.URL{*lcurl},
APUrls: []url.URL{*apurl},
ACUrls: []url.URL{*acurl},
ClusterState: ClusterStateFlagNew,
InitialClusterToken: "etcd-cluster",
StrictReconfigCheck: true,
Metrics: "basic",
EnableV2: true,
AuthToken: "simple",
}
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
return cfg
Expand Down
19 changes: 18 additions & 1 deletion embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/pkg/capnslog"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "embed")
Expand Down Expand Up @@ -140,6 +142,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
ElectionTicks: cfg.ElectionTicks(),
AutoCompactionRetention: cfg.AutoCompactionRetention,
QuotaBackendBytes: cfg.QuotaBackendBytes,
MaxRequestBytes: cfg.MaxRequestBytes,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
AuthToken: cfg.AuthToken,
Expand Down Expand Up @@ -415,9 +418,23 @@ func (e *Etcd) serve() (err error) {
}
h = http.Handler(&cors.CORSHandler{Handler: h, Info: e.cfg.CorsInfo})

gopts := []grpc.ServerOption{}
if e.cfg.GRPCKeepAliveMinTime > time.Duration(0) {
gopts = append(gopts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: e.cfg.GRPCKeepAliveMinTime,
PermitWithoutStream: false,
}))
}
if e.cfg.GRPCKeepAliveInterval > time.Duration(0) &&
e.cfg.GRPCKeepAliveTimeout > time.Duration(0) {
gopts = append(gopts, grpc.KeepaliveParams(keepalive.ServerParameters{
Time: e.cfg.GRPCKeepAliveInterval,
Timeout: e.cfg.GRPCKeepAliveTimeout,
}))
}
for _, sctx := range e.sctxs {
go func(s *serveCtx) {
e.errHandler(s.serve(e.Server, ctlscfg, h, e.errHandler))
e.errHandler(s.serve(e.Server, ctlscfg, h, e.errHandler, gopts...))
}(sctx)
}
return nil
Expand Down
11 changes: 8 additions & 3 deletions embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ func newServeCtx() *serveCtx {
// serve accepts incoming connections on the listener l,
// creating a new service goroutine for each. The service goroutines
// read requests and then call handler to reply to them.
func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handler http.Handler, errHandler func(error)) error {
func (sctx *serveCtx) serve(
s *etcdserver.EtcdServer,
tlscfg *tls.Config,
handler http.Handler,
errHandler func(error),
gopts ...grpc.ServerOption) error {
logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
<-s.ReadyNotify()
plog.Info("ready to serve client requests")
Expand All @@ -77,7 +82,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle
servLock := v3lock.NewLockServer(v3c)

if sctx.insecure {
gs := v3rpc.Server(s, nil)
gs := v3rpc.Server(s, nil, gopts...)
sctx.grpcServerC <- gs
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
Expand Down Expand Up @@ -107,7 +112,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlscfg *tls.Config, handle
}

if sctx.secure {
gs := v3rpc.Server(s, tlscfg)
gs := v3rpc.Server(s, tlscfg, gopts...)
sctx.grpcServerC <- gs
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
Expand Down
4 changes: 4 additions & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ func newConfig() *config {
fs.UintVar(&cfg.TickMs, "heartbeat-interval", cfg.TickMs, "Time (in milliseconds) of a heartbeat interval.")
fs.UintVar(&cfg.ElectionMs, "election-timeout", cfg.ElectionMs, "Time (in milliseconds) for an election to timeout.")
fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.")
fs.UintVar(&cfg.MaxRequestBytes, "max-request-bytes", cfg.MaxRequestBytes, "Maximum client request size in bytes the server will accept.")
fs.DurationVar(&cfg.GRPCKeepAliveMinTime, "grpc-keepalive-min-time", cfg.Config.GRPCKeepAliveMinTime, "Minimum interval duration that a client should wait before pinging server.")
fs.DurationVar(&cfg.GRPCKeepAliveInterval, "grpc-keepalive-interval", cfg.Config.GRPCKeepAliveInterval, "Frequency duration of server-to-client ping to check if a connection is alive (0 to disable).")
fs.DurationVar(&cfg.GRPCKeepAliveTimeout, "grpc-keepalive-timeout", cfg.Config.GRPCKeepAliveTimeout, "Additional duration of wait before closing a non-responsive connection (0 to disable).")

// clustering
fs.Var(flags.NewURLsValue(embed.DefaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.")
Expand Down
8 changes: 8 additions & 0 deletions etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ member flags:
comma-separated whitelist of origins for CORS (cross-origin resource sharing).
--quota-backend-bytes '0'
raise alarms when backend size exceeds the given quota (0 defaults to low space quota).
--max-request-bytes '1572864'
maximum client request size in bytes the server will accept.
--grpc-keepalive-min-time '5s'
minimum duration interval that a client should wait before pinging server.
--grpc-keepalive-interval '2h'
frequency duration of server-to-client ping to check if a connection is alive (0 to disable).
--grpc-keepalive-timeout '20s'
additional duration of wait before closing a non-responsive connection (0 to disable).
clustering flags:
Expand Down
12 changes: 9 additions & 3 deletions etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,28 @@ import (
"google.golang.org/grpc/grpclog"
)

const maxStreams = math.MaxUint32
const (
grpcOverheadBytes = 512 * 1024
maxStreams = math.MaxUint32
maxSendBytes = math.MaxInt32
)

func init() {
grpclog.SetLogger(plog)
}

func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
var opts []grpc.ServerOption
opts = append(opts, grpc.CustomCodec(&codec{}))
if tls != nil {
opts = append(opts, grpc.Creds(credentials.NewTLS(tls)))
}
opts = append(opts, grpc.UnaryInterceptor(newUnaryInterceptor(s)))
opts = append(opts, grpc.StreamInterceptor(newStreamInterceptor(s)))
opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
grpcServer := grpc.NewServer(opts...)
grpcServer := grpc.NewServer(append(opts, gopts...)...)

pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
Expand Down
8 changes: 8 additions & 0 deletions etcdserver/api/v3rpc/rpctypes/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package rpctypes
import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var (
Expand Down Expand Up @@ -188,3 +189,10 @@ func Error(err error) error {
}
return EtcdError{code: grpc.Code(verr), desc: grpc.ErrorDesc(verr)}
}

func ErrorDesc(err error) string {
if s, ok := status.FromError(err); ok {
return s.Message()
}
return err.Error()
}
3 changes: 3 additions & 0 deletions etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type ServerConfig struct {
AutoCompactionRetention int
QuotaBackendBytes int64

// MaxRequestBytes is the maximum request size to send over raft.
MaxRequestBytes uint

StrictReconfigCheck bool

// ClientCertAuthEnabled is true when cert has been signed by the client CA.
Expand Down
7 changes: 6 additions & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ const (
releaseDelayAfterSnapshot = 30 * time.Second

// maxPendingRevokes is the maximum number of outstanding expired lease revocations.
maxPendingRevokes = 16
maxPendingRevokes = 16
recommendedMaxRequestBytes = 10 * 1024 * 1024
)

var (
Expand Down Expand Up @@ -259,6 +260,10 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
cl *membership.RaftCluster
)

if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
plog.Warningf("MaxRequestBytes %v exceeds maximum recommended size %v", cfg.MaxRequestBytes, recommendedMaxRequestBytes)
}

if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil {
return nil, fmt.Errorf("cannot access data directory: %v", terr)
}
Expand Down
8 changes: 1 addition & 7 deletions etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ import (
)

const (
// the max request size that raft accepts.
// TODO: make this a flag? But we probably do not want to
// accept large request which might block raft stream. User
// specify a large value might end up with shooting in the foot.
maxRequestBytes = 1.5 * 1024 * 1024

// In the health case, there might be a small gap (10s of entries) between
// the applied index and committed index.
// However, if the committed entries are very heavy to apply, the gap might grow.
Expand Down Expand Up @@ -556,7 +550,7 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In
return nil, err
}

if len(data) > maxRequestBytes {
if len(data) > int(s.Cfg.MaxRequestBytes) {
return nil, ErrRequestTooLarge
}

Expand Down
Loading

0 comments on commit 939337f

Please sign in to comment.