Skip to content

Commit 6cfcfe6

Browse files
committed
update
1 parent 99b2d82 commit 6cfcfe6

File tree

1 file changed

+31
-18
lines changed

1 file changed

+31
-18
lines changed

driver/etcddriver.go

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,11 @@ type EtcdDriver struct {
2121
nodeID string
2222
serviceName string
2323

24-
cli *clientv3.Client
25-
nodes *sync.Map
26-
logger dlog.Logger
27-
28-
leaseTimeout int64
29-
leaseID clientv3.LeaseID
30-
lease clientv3.Lease
24+
cli *clientv3.Client
25+
lease int64
26+
nodes *sync.Map
27+
leaseID clientv3.LeaseID
28+
logger dlog.Logger
3129

3230
ctx context.Context
3331
cancel context.CancelFunc
@@ -49,12 +47,13 @@ func newEtcdDriver(cli *clientv3.Client) *EtcdDriver {
4947
// 设置key value,绑定租约
5048
func (e *EtcdDriver) putKeyWithLease(ctx context.Context, key, val string) (clientv3.LeaseID, error) {
5149
//设置租约时间,最少5s
52-
if e.leaseTimeout < etcdDefaultLease {
53-
e.leaseTimeout = etcdDefaultLease
50+
if e.lease < etcdDefaultLease {
51+
e.lease = etcdDefaultLease
5452
}
53+
5554
subCtx, cancel := context.WithTimeout(ctx, etcdBusinessTimeout)
5655
defer cancel()
57-
resp, err := e.lease.Grant(ctx, e.leaseTimeout)
56+
resp, err := e.cli.Grant(subCtx, e.lease)
5857
if err != nil {
5958
return 0, err
6059
}
@@ -121,26 +120,27 @@ func (e *EtcdDriver) getServices() []string {
121120
return addrs
122121
}
123122

124-
func (e *EtcdDriver) createLease(ctx context.Context, nodeID string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
123+
func (e *EtcdDriver) keepAlive(ctx context.Context, nodeID string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
125124
var err error
126-
e.lease = clientv3.NewLease(e.cli)
127125
e.leaseID, err = e.putKeyWithLease(ctx, nodeID, nodeID)
128126
if err != nil {
129127
e.logger.Errorf("putKeyWithLease error: %v", err)
130128
return nil, err
131129
}
132-
return e.lease.KeepAlive(ctx, e.leaseID)
130+
131+
return e.cli.KeepAlive(ctx, e.leaseID)
133132
}
134133

135134
func (e *EtcdDriver) revoke(ctx context.Context) {
136-
_, err := e.lease.Revoke(ctx, e.leaseID)
135+
_, err := e.cli.Lease.Revoke(ctx, e.leaseID)
137136
if err != nil {
138137
e.logger.Infof("lease revoke error: %v", err)
139138
}
140139
}
141140

142141
func (e *EtcdDriver) heartBeat(ctx context.Context) {
143-
leaseCh, err := e.createLease(ctx, e.nodeID)
142+
label:
143+
leaseCh, err := e.keepAlive(ctx, e.nodeID)
144144
if err != nil {
145145
e.logger.Errorf("keep alive error, %v", err)
146146
return
@@ -152,11 +152,24 @@ func (e *EtcdDriver) heartBeat(ctx context.Context) {
152152
e.logger.Infof("driver stopped")
153153
return
154154
}
155-
case resp := <-leaseCh:
155+
case _, ok := <-leaseCh:
156156
{
157157
// if lease timeout, goto top of
158158
// this function to keepalive
159-
e.logger.Infof("leaseID=%0x", resp.ID)
159+
if !ok {
160+
goto label
161+
}
162+
}
163+
case <-time.After(etcdBusinessTimeout):
164+
{
165+
e.logger.Errorf("ectd cli keepalive timeout")
166+
return
167+
}
168+
case <-time.After(time.Duration(e.lease) * (time.Second) / 2):
169+
{
170+
// if near to nodes time,
171+
// renew the lease
172+
goto label
160173
}
161174
}
162175
}
@@ -196,7 +209,7 @@ func (e *EtcdDriver) withOption(opt Option) (err error) {
196209
switch opt.Type() {
197210
case OptionTypeTimeout:
198211
{
199-
e.leaseTimeout = int64(opt.(TimeoutOption).timeout.Seconds())
212+
e.lease = int64(opt.(TimeoutOption).timeout.Seconds())
200213
}
201214
case OptionTypeLogger:
202215
{

0 commit comments

Comments
 (0)