Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
restart peer handler
  • Loading branch information
aarshkshah1992 authored and Stebalien committed May 13, 2020
commit b964c9f6c9a2300c14242b0d44ef4e8626058348
17 changes: 8 additions & 9 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func (ids *IDService) loop() {
mes := &pb.Identify{}
ids.populateMessage(mes, rp, addReq.localConnAddr, addReq.remoteConnAddr)
ph = newPeerHandler(rp, ids, mes)
ph.start()
phs[rp] = ph
addReq.resp <- ph
}
Expand Down Expand Up @@ -226,19 +227,17 @@ func (ids *IDService) loop() {

case rp := <-phClosedCh:
ph := phs[rp]
delete(phs, rp)

// If we are connected to the peer, it means that we got a connection from the peer
// before we could finish removing it's handler on the previous disconnection.
// If we delete the handler and dont replace it, we wont be able to push updates to it
// till we see a new connection. So, create and register a new handler for it with the state
// initialised to the last message we sent to that peer.
// If we delete the handler, we wont be able to push updates to it
// till we see a new connection. So, we should restart the handler.
// The fact that we got the handler on this channel means that it's context and handler
// have completed because we write the handler to this chanel only after it closed.
if ids.Host.Network().Connectedness(rp) == network.Connected {
ph.msgMu.RLock()
mes := ph.idMsgSnapshot
ph.msgMu.RUnlock()
ph = nil
phs[rp] = newPeerHandler(rp, ids, mes)
ph.start()
} else {
delete(phs, rp)
}

case e, more := <-sub.Out():
Expand Down
17 changes: 10 additions & 7 deletions p2p/protocol/identify/peer_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,9 @@ type peerHandler struct {
}

func newPeerHandler(pid peer.ID, ids *IDService, initState *pb.Identify) *peerHandler {
ctx, cancel := context.WithCancel(context.Background())

ph := &peerHandler{
ids: ids,
ctx: ctx,
cancel: cancel,
pid: pid,
ids: ids,
pid: pid,

idMsgSnapshot: initState,

Expand All @@ -56,9 +52,16 @@ func newPeerHandler(pid peer.ID, ids *IDService, initState *pb.Identify) *peerHa
ph.evalTestCh = make(chan func())
}

return ph
}

func (ph *peerHandler) start() {
ctx, cancel := context.WithCancel(context.Background())
ph.ctx = ctx
ph.cancel = cancel

ph.wg.Add(1)
go ph.loop()
return ph
}

func (ph *peerHandler) close() error {
Expand Down
3 changes: 3 additions & 0 deletions p2p/protocol/identify/peer_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func TestMakeApplyDelta(t *testing.T) {
defer h1.Close()
ids1 := NewIDService(h1)
ph := newPeerHandler(h1.ID(), ids1, &pb.Identify{})
ph.start()
defer ph.close()

m1 := ph.mkDelta()
require.NotNil(t, m1)
Expand Down Expand Up @@ -88,6 +90,7 @@ func TestHandlerClose(t *testing.T) {
defer h1.Close()
ids1 := NewIDService(h1)
ph := newPeerHandler(h1.ID(), ids1, nil)
ph.start()

require.NoError(t, ph.close())
}
Expand Down