Skip to content

Commit

Permalink
clientv3: call KV/Txn APIs with default gRPC call options
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Dec 20, 2017
1 parent e82f055 commit c67e6d5
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 15 deletions.
2 changes: 1 addition & 1 deletion clientv3/integration/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestDialForeignEndpoint(t *testing.T) {

// grpc can return a lazy connection that's not connected yet; confirm
// that it can communicate with the cluster.
kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn))
kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn), clus.Client(0))
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
if _, gerr := kvc.Get(ctx, "abc"); gerr != nil {
Expand Down
33 changes: 22 additions & 11 deletions clientv3/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"

"golang.org/x/net/context"
"google.golang.org/grpc"
)

type (
Expand Down Expand Up @@ -88,15 +89,24 @@ func (resp *TxnResponse) OpResponse() OpResponse {
}

type kv struct {
remote pb.KVClient
remote pb.KVClient
callOpts []grpc.CallOption
}

func NewKV(c *Client) KV {
return &kv{remote: RetryKVClient(c)}
api := &kv{remote: RetryKVClient(c)}
if c != nil {
api.callOpts = c.callOpts
}
return api
}

func NewKVFromKVClient(remote pb.KVClient) KV {
return &kv{remote: remote}
func NewKVFromKVClient(remote pb.KVClient, c *Client) KV {
api := &kv{remote: remote}
if c != nil {
api.callOpts = c.callOpts
}
return api
}

func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
Expand All @@ -115,7 +125,7 @@ func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*Delete
}

func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) {
resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest())
resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest(), kv.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
}
Expand All @@ -124,8 +134,9 @@ func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*C

func (kv *kv) Txn(ctx context.Context) Txn {
return &txn{
kv: kv,
ctx: ctx,
kv: kv,
ctx: ctx,
callOpts: kv.callOpts,
}
}

Expand All @@ -134,27 +145,27 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
switch op.t {
case tRange:
var resp *pb.RangeResponse
resp, err = kv.remote.Range(ctx, op.toRangeRequest())
resp, err = kv.remote.Range(ctx, op.toRangeRequest(), kv.callOpts...)
if err == nil {
return OpResponse{get: (*GetResponse)(resp)}, nil
}
case tPut:
var resp *pb.PutResponse
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
resp, err = kv.remote.Put(ctx, r)
resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
if err == nil {
return OpResponse{put: (*PutResponse)(resp)}, nil
}
case tDeleteRange:
var resp *pb.DeleteRangeResponse
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
resp, err = kv.remote.DeleteRange(ctx, r)
resp, err = kv.remote.DeleteRange(ctx, r, kv.callOpts...)
if err == nil {
return OpResponse{del: (*DeleteResponse)(resp)}, nil
}
case tTxn:
var resp *pb.TxnResponse
resp, err = kv.remote.Txn(ctx, op.toTxnRequest())
resp, err = kv.remote.Txn(ctx, op.toTxnRequest(), kv.callOpts...)
if err == nil {
return OpResponse{txn: (*TxnResponse)(resp)}, nil
}
Expand Down
5 changes: 4 additions & 1 deletion clientv3/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"

"golang.org/x/net/context"
"google.golang.org/grpc"
)

// Txn is the interface that wraps mini-transactions.
Expand Down Expand Up @@ -66,6 +67,8 @@ type txn struct {

sus []*pb.RequestOp
fas []*pb.RequestOp

callOpts []grpc.CallOption
}

func (txn *txn) If(cs ...Cmp) Txn {
Expand Down Expand Up @@ -140,7 +143,7 @@ func (txn *txn) Commit() (*TxnResponse, error) {

var resp *pb.TxnResponse
var err error
resp, err = txn.kv.remote.Txn(txn.ctx, r)
resp, err = txn.kv.remote.Txn(txn.ctx, r, txn.callOpts...)
if err != nil {
return nil, toErr(txn.ctx, err)
}
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/api/v3client/v3client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client {
c := clientv3.NewCtxClient(context.Background())

kvc := adapter.KvServerToKvClient(v3rpc.NewQuotaKVServer(s))
c.KV = clientv3.NewKVFromKVClient(kvc)
c.KV = clientv3.NewKVFromKVClient(kvc, c)

lc := adapter.LeaseServerToLeaseClient(v3rpc.NewQuotaLeaseServer(s))
c.Lease = clientv3.NewLeaseFromLeaseClient(lc, time.Second)
Expand Down
2 changes: 1 addition & 1 deletion integration/cluster_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
return nil, err
}
rpc := toGRPC(c)
c.KV = clientv3.NewKVFromKVClient(rpc.KV)
c.KV = clientv3.NewKVFromKVClient(rpc.KV, c)
pmu.Lock()
lc := c.Lease
c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, cfg.DialTimeout)
Expand Down

0 comments on commit c67e6d5

Please sign in to comment.