diff --git a/clientv3/integration/dial_test.go b/clientv3/integration/dial_test.go index 779c6edbc7a..aba0e2eea3a 100644 --- a/clientv3/integration/dial_test.go +++ b/clientv3/integration/dial_test.go @@ -183,7 +183,7 @@ func TestDialForeignEndpoint(t *testing.T) { // grpc can return a lazy connection that's not connected yet; confirm // that it can communicate with the cluster. - kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn)) + kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn), clus.Client(0)) ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) defer cancel() if _, gerr := kvc.Get(ctx, "abc"); gerr != nil { diff --git a/clientv3/kv.go b/clientv3/kv.go index 949f6dc5b14..6289605c8e0 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -18,6 +18,7 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" + "google.golang.org/grpc" ) type ( @@ -88,15 +89,24 @@ func (resp *TxnResponse) OpResponse() OpResponse { } type kv struct { - remote pb.KVClient + remote pb.KVClient + callOpts []grpc.CallOption } func NewKV(c *Client) KV { - return &kv{remote: RetryKVClient(c)} + api := &kv{remote: RetryKVClient(c)} + if c != nil { + api.callOpts = c.callOpts + } + return api } -func NewKVFromKVClient(remote pb.KVClient) KV { - return &kv{remote: remote} +func NewKVFromKVClient(remote pb.KVClient, c *Client) KV { + api := &kv{remote: remote} + if c != nil { + api.callOpts = c.callOpts + } + return api } func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) { @@ -115,7 +125,7 @@ func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*Delete } func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) { - resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest()) + resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest(), kv.callOpts...) if err != nil { return nil, toErr(ctx, err) } @@ -124,8 +134,9 @@ func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*C func (kv *kv) Txn(ctx context.Context) Txn { return &txn{ - kv: kv, - ctx: ctx, + kv: kv, + ctx: ctx, + callOpts: kv.callOpts, } } @@ -134,27 +145,27 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) { switch op.t { case tRange: var resp *pb.RangeResponse - resp, err = kv.remote.Range(ctx, op.toRangeRequest()) + resp, err = kv.remote.Range(ctx, op.toRangeRequest(), kv.callOpts...) if err == nil { return OpResponse{get: (*GetResponse)(resp)}, nil } case tPut: var resp *pb.PutResponse r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease} - resp, err = kv.remote.Put(ctx, r) + resp, err = kv.remote.Put(ctx, r, kv.callOpts...) if err == nil { return OpResponse{put: (*PutResponse)(resp)}, nil } case tDeleteRange: var resp *pb.DeleteRangeResponse r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV} - resp, err = kv.remote.DeleteRange(ctx, r) + resp, err = kv.remote.DeleteRange(ctx, r, kv.callOpts...) if err == nil { return OpResponse{del: (*DeleteResponse)(resp)}, nil } case tTxn: var resp *pb.TxnResponse - resp, err = kv.remote.Txn(ctx, op.toTxnRequest()) + resp, err = kv.remote.Txn(ctx, op.toTxnRequest(), kv.callOpts...) if err == nil { return OpResponse{txn: (*TxnResponse)(resp)}, nil } diff --git a/clientv3/txn.go b/clientv3/txn.go index 2661c5942e2..1a80c8ebaab 100644 --- a/clientv3/txn.go +++ b/clientv3/txn.go @@ -20,6 +20,7 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" + "google.golang.org/grpc" ) // Txn is the interface that wraps mini-transactions. @@ -66,6 +67,8 @@ type txn struct { sus []*pb.RequestOp fas []*pb.RequestOp + + callOpts []grpc.CallOption } func (txn *txn) If(cs ...Cmp) Txn { @@ -140,7 +143,7 @@ func (txn *txn) Commit() (*TxnResponse, error) { var resp *pb.TxnResponse var err error - resp, err = txn.kv.remote.Txn(txn.ctx, r) + resp, err = txn.kv.remote.Txn(txn.ctx, r, txn.callOpts...) if err != nil { return nil, toErr(txn.ctx, err) } diff --git a/etcdserver/api/v3client/v3client.go b/etcdserver/api/v3client/v3client.go index cc4147d2f0c..445d0408f36 100644 --- a/etcdserver/api/v3client/v3client.go +++ b/etcdserver/api/v3client/v3client.go @@ -32,7 +32,7 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client { c := clientv3.NewCtxClient(context.Background()) kvc := adapter.KvServerToKvClient(v3rpc.NewQuotaKVServer(s)) - c.KV = clientv3.NewKVFromKVClient(kvc) + c.KV = clientv3.NewKVFromKVClient(kvc, c) lc := adapter.LeaseServerToLeaseClient(v3rpc.NewQuotaLeaseServer(s)) c.Lease = clientv3.NewLeaseFromLeaseClient(lc, time.Second) diff --git a/integration/cluster_proxy.go b/integration/cluster_proxy.go index 3916553be86..6d0cd40d35c 100644 --- a/integration/cluster_proxy.go +++ b/integration/cluster_proxy.go @@ -99,7 +99,7 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) { return nil, err } rpc := toGRPC(c) - c.KV = clientv3.NewKVFromKVClient(rpc.KV) + c.KV = clientv3.NewKVFromKVClient(rpc.KV, c) pmu.Lock() lc := c.Lease c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, cfg.DialTimeout)