Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
remove races
  • Loading branch information
aarshkshah1992 authored and Stebalien committed May 13, 2020
commit 4e01c1f2bb8e1c6685021cb2bb9fd17c4e5de552
5 changes: 4 additions & 1 deletion p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ import (
// peer (for all addresses).
const maxAddressResolution = 32

// addrChangeTickrInterval is the interval between two address change ticks.
var addrChangeTickrInterval = 5 * time.Second

var log = logging.Logger("basichost")

var (
Expand Down Expand Up @@ -403,7 +406,7 @@ func (h *BasicHost) background() {

// periodically schedules an IdentifyPush to update our peers for changes
// in our address set (if needed)
ticker := time.NewTicker(5 * time.Second)
ticker := time.NewTicker(addrChangeTickrInterval)
defer ticker.Stop()

for {
Expand Down
6 changes: 3 additions & 3 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (ids *IDService) loop() {
if ids.Host.Network().Connectedness(rp) != network.Connected {
// before we remove the peerhandler, we should ensure that it will not send any
// more messages. Otherwise, we might create a new handler and the Identify response
// synchronized with the new handler might be overwritten by a message sent by this handler.
// synchronized with the new handler might be overwritten by a message sent by this "old" handler.
ids.refCount.Add(1)
go func(req *rmPeerHandlerReq, ph *peerHandler) {
defer ids.refCount.Done()
Expand Down Expand Up @@ -361,7 +361,7 @@ func (ids *IDService) identifyConn(c network.Conn, signal chan struct{}) {
return
}
s.SetProtocol(protocol.ID(selectedProto))
ids.responseHandler(s)
ids.handleIdentifyResponse(s)
}

func protoSupportsPeerRecords(proto protocol.ID) bool {
Expand Down Expand Up @@ -400,7 +400,7 @@ func (ids *IDService) sendIdentifyResp(s network.Stream) {
log.Debugf("%s sent message to %s %s", ID, c.RemotePeer(), c.RemoteMultiaddr())
}

func (ids *IDService) responseHandler(s network.Stream) {
func (ids *IDService) handleIdentifyResponse(s network.Stream) {
c := s.Conn()

r := ggio.NewDelimitedReader(s, 2048)
Expand Down
2 changes: 1 addition & 1 deletion p2p/protocol/identify/id_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ const LegacyIDPush = "/ipfs/id/push/1.0.0"

// pushHandler handles incoming identify push streams. The behaviour is identical to the ordinary identify protocol.
func (ids *IDService) pushHandler(s network.Stream) {
ids.responseHandler(s)
ids.handleIdentifyResponse(s)
}
126 changes: 126 additions & 0 deletions p2p/protocol/identify/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/record"
coretest "github.com/libp2p/go-libp2p-core/test"

blhost "github.com/libp2p/go-libp2p-blankhost"
Expand Down Expand Up @@ -216,6 +217,49 @@ func testHasPublicKey(t *testing.T, h host.Host, p peer.ID, shouldBe ic.PubKey)
}
}

func getSignedRecord(t *testing.T, h host.Host, p peer.ID) *record.Envelope {
cab, ok := peerstore.GetCertifiedAddrBook(h.Peerstore())
require.True(t, ok)
rec := cab.GetPeerRecord(p)
return rec
}

// we're using BlankHost in our tests, which doesn't automatically generate peer records
// and emit address change events on the bus like BasicHost.
// This generates a record, puts it in the peerstore and emits an addr change event
// will cause the identify service to push it to all peer it's connected to.
func emitAddrChangeEvt(t *testing.T, h host.Host) {
t.Helper()

key := h.Peerstore().PrivKey(h.ID())
if key == nil {
t.Fatal("no private key for host")
}

rec := peer.NewPeerRecord()
rec.PeerID = h.ID()
rec.Addrs = h.Addrs()
signed, err := record.Seal(rec, key)
if err != nil {
t.Fatalf("error generating peer record: %s", err)
}

cab, ok := peerstore.GetCertifiedAddrBook(h.Peerstore())
require.True(t, ok)
_, err = cab.ConsumePeerRecord(signed, peerstore.PermanentAddrTTL)
require.NoError(t, err)

evt := event.EvtLocalAddressesUpdated{}
emitter, err := h.EventBus().Emitter(new(event.EvtLocalAddressesUpdated), eventbus.Stateful)
if err != nil {
t.Fatal(err)
}
err = emitter.Emit(evt)
if err != nil {
t.Fatal(err)
}
}

// TestIDServiceWait gives the ID service 1s to finish after dialing
// this is because it used to be concurrent. Now, Dial wait till the
// id service is done.
Expand Down Expand Up @@ -524,6 +568,88 @@ func TestIdentifyDeltaWhileIdentifyingConn(t *testing.T) {
}
}

func TestIdentifyPushOnAddrChange(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

h1 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h2 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx))

h1p := h1.ID()
h2p := h2.ID()

ids1 := identify.NewIDService(h1)
ids2 := identify.NewIDService(h2)
defer ids1.Close()
defer ids2.Close()

testKnowsAddrs(t, h1, h2p, []ma.Multiaddr{}) // nothing
testKnowsAddrs(t, h2, h1p, []ma.Multiaddr{}) // nothing

h2pi := h2.Peerstore().PeerInfo(h2p)
require.NoError(t, h1.Connect(ctx, h2pi))
require.Len(t, h1.Network().ConnsToPeer(h2p), 1)
require.Len(t, h2.Network().ConnsToPeer(h1p), 1)

// wait for identify to complete and assert current addresses
ids1.IdentifyConn(h1.Network().ConnsToPeer(h2p)[0])
ids2.IdentifyConn(h2.Network().ConnsToPeer(h1p)[0])

testKnowsAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p))
testKnowsAddrs(t, h2, h1p, h1.Peerstore().Addrs(h1p))

// change addr on host 1 and ensure host2 gets a push
lad := ma.StringCast("/ip4/127.0.0.1/tcp/1234")
require.NoError(t, h1.Network().Listen(lad))
require.Contains(t, h1.Addrs(), lad)
emitAddrChangeEvt(t, h1)

require.Eventually(t, func() bool {
addrs := h2.Peerstore().Addrs(h1p)
for _, ad := range addrs {
if ad.Equal(lad) {
return true
}
}
return false
}, 5*time.Second, 500*time.Millisecond)
require.NotNil(t, getSignedRecord(t, h2, h1p))

// change addr on host2 and ensure host 1 gets a pus
lad = ma.StringCast("/ip4/127.0.0.1/tcp/1235")
require.NoError(t, h2.Network().Listen(lad))
require.Contains(t, h2.Addrs(), lad)
emitAddrChangeEvt(t, h2)

require.Eventually(t, func() bool {
addrs := h1.Peerstore().Addrs(h2p)
for _, ad := range addrs {
if ad.Equal(lad) {
return true
}
}
return false
}, 5*time.Second, 500*time.Millisecond)
require.NotNil(t, getSignedRecord(t, h1, h2p))

// change addr on host2 again
lad2 := ma.StringCast("/ip4/127.0.0.1/tcp/1236")
require.NoError(t, h2.Network().Listen(lad2))
require.Contains(t, h2.Addrs(), lad2)
emitAddrChangeEvt(t, h2)

require.Eventually(t, func() bool {
addrs := h1.Peerstore().Addrs(h2p)
for _, ad := range addrs {
if ad.Equal(lad2) {
return true
}
}
return false
}, 5*time.Second, 500*time.Millisecond)
require.NotNil(t, getSignedRecord(t, h1, h2p))
}

func TestUserAgent(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down