Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactoring(p2p/peerTracker): extend conditions for peers handling #165

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,23 @@ require (
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/flynn/noise v1.1.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20240207164012-fb44976bdcd5 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/huin/goupnp v1.3.0 // indirect
github.com/ipfs/go-cid v0.4.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
Expand Down Expand Up @@ -69,6 +75,8 @@ require (
github.com/multiformats/go-multistream v0.5.0 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/onsi/ginkgo/v2 v2.15.0 // indirect
github.com/opencontainers/runtime-spec v1.2.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pion/datachannel v1.5.6 // indirect
github.com/pion/dtls/v2 v2.2.11 // indirect
github.com/pion/ice/v2 v2.3.24 // indirect
Expand All @@ -85,6 +93,7 @@ require (
github.com/pion/transport/v2 v2.2.5 // indirect
github.com/pion/turn/v2 v2.1.6 // indirect
github.com/pion/webrtc/v3 v3.2.40 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
Expand All @@ -93,7 +102,10 @@ require (
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/quic-go v0.44.0 // indirect
github.com/quic-go/webtransport-go v0.8.0 // indirect
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
go.uber.org/dig v1.17.1 // indirect
go.uber.org/fx v1.21.1 // indirect
go.uber.org/mock v0.4.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8Nz
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d h1:t5Wuyh53qYyg9eqn4BbnlIT+vmhyww0TatL+zT3uWgI=
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
Expand Down
18 changes: 12 additions & 6 deletions p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ func NewExchange[H header.Header[H]](
}
}

id := protocolID(params.networkID)
ex := &Exchange[H]{
host: host,
protocolID: protocolID(params.networkID),
peerTracker: newPeerTracker(host, gater, params.pidstore, metrics),
protocolID: id,
peerTracker: newPeerTracker(host, gater, params.networkID, params.pidstore, metrics),
Params: params,
metrics: metrics,
}
Expand All @@ -98,7 +99,6 @@ func (ex *Exchange[H]) Start(ctx context.Context) error {
ex.ctx, ex.cancel = context.WithCancel(context.Background())
log.Infow("client: starting client", "protocol ID", ex.protocolID)

go ex.peerTracker.gc()
go ex.peerTracker.track()

// bootstrap the peerTracker with trusted peers as well as previously seen
Expand Down Expand Up @@ -150,9 +150,11 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
// their Head and verify against the given trusted header.
useTrackedPeers := !reqParams.TrustedHead.IsZero()
if useTrackedPeers {
trackedPeers := ex.peerTracker.getPeers(maxUntrustedHeadRequests)
trackedPeers := ex.peerTracker.peers(maxUntrustedHeadRequests)
if len(trackedPeers) > 0 {
peers = trackedPeers
peers = transform(trackedPeers, func(p *peerStat) peer.ID {
return p.peerID
})
log.Debugw("requesting head from tracked peers", "amount", len(peers))
}
}
Expand Down Expand Up @@ -292,9 +294,13 @@ func (ex *Exchange[H]) GetRangeByHeight(
attribute.Int64("to", int64(to)),
))
defer span.End()
session := newSession[H](
session, err := newSession[H](
ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RequestTimeout, ex.metrics, withValidation(from),
)
// TODO(@vgonkivs): decide what to do with this error. Maybe we should fall into "discovery mode" and try to collect peers???
if err != nil {
return nil, err
}
defer session.close()
// we request the next header height that we don't have: `fromHead`+1
amount := to - (from.Height() + 1)
Expand Down
70 changes: 49 additions & 21 deletions p2p/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p"
libhost "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
blankhost "github.com/libp2p/go-libp2p/p2p/host/blank"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
swarm "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -242,7 +244,7 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) {
require.NoError(t, err)
servers[index].Start(context.Background()) //nolint:errcheck
exchange.peerTracker.peerLk.Lock()
exchange.peerTracker.trackedPeers[hosts[index].ID()] = &peerStat{peerID: hosts[index].ID()}
exchange.peerTracker.trackedPeers[hosts[index].ID()] = struct{}{}
exchange.peerTracker.peerLk.Unlock()
}

Expand All @@ -262,30 +264,38 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) {
// TestExchange_RequestHeadersFromAnotherPeer tests that the Exchange instance will request range
// from another peer with lower score after receiving header.ErrNotFound
func TestExchange_RequestHeadersFromAnotherPeer(t *testing.T) {
hosts := createMocknet(t, 3)
hosts := quicHosts(t, 3)

// create client + server(it does not have needed headers)
exchg, store := createP2PExAndServer(t, hosts[0], hosts[1])

serverSideStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 10)
tmServerSideStore := &timedOutStore{timeout: time.Millisecond * 200, Store: *serverSideStore}

hosts[0].ConnManager().TagPeer(hosts[1].ID(), string(protocolID(networkID)), 100)
hosts[0].ConnManager().TagPeer(hosts[2].ID(), string(protocolID(networkID)), 90)

// create one more server(with more headers in the store)
serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](
hosts[2], headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 10),
hosts[2], tmServerSideStore,
WithNetworkID[ServerParameters](networkID),
)
require.NoError(t, err)
require.NoError(t, serverSideEx.Start(context.Background()))
t.Cleanup(func() {
serverSideEx.Stop(context.Background()) //nolint:errcheck
})

exchg.peerTracker.peerLk.Lock()
exchg.peerTracker.trackedPeers[hosts[2].ID()] = &peerStat{peerID: hosts[2].ID(), peerScore: 20}
exchg.peerTracker.trackedPeers[hosts[2].ID()] = struct{}{}
exchg.peerTracker.peerLk.Unlock()

h, err := store.GetByHeight(context.Background(), 5)
require.NoError(t, err)

_, err = exchg.GetRangeByHeight(context.Background(), h, 8)
require.NoError(t, err)
// ensure that peerScore for the second peer is changed
newPeerScore := exchg.peerTracker.trackedPeers[hosts[2].ID()].score()
newPeerScore := score(t, exchg.peerTracker.host, hosts[2].ID())
require.NotEqual(t, 20, newPeerScore)
}

Expand Down Expand Up @@ -464,7 +474,9 @@ func TestExchange_HandleHeaderWithDifferentChainID(t *testing.T) {
func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) {
// create blankhost because mocknet does not support deadlines
swarm0 := swarm.GenSwarm(t)
host0 := blankhost.NewBlankHost(swarm0)
mngr, err := connmgr.NewConnManager(0, 50)
require.NoError(t, err)
host0 := blankhost.NewBlankHost(swarm0, blankhost.WithConnectionManager(mngr))
swarm1 := swarm.GenSwarm(t)
host1 := blankhost.NewBlankHost(swarm1)
swarm2 := swarm.GenSwarm(t)
Expand Down Expand Up @@ -495,24 +507,25 @@ func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) {
t.Cleanup(func() {
serverSideEx.Stop(context.Background()) //nolint:errcheck
})
prevScore := exchg.peerTracker.trackedPeers[host1.ID()].score()
prevScore := score(t, exchg.host, host1.ID())
exchg.peerTracker.peerLk.Lock()
exchg.peerTracker.trackedPeers[host2.ID()] = &peerStat{peerID: host2.ID(), peerScore: 200}
host0.ConnManager().TagPeer(host2.ID(), string(protocolID(networkID)), 100)
exchg.peerTracker.trackedPeers[host2.ID()] = struct{}{}
exchg.peerTracker.peerLk.Unlock()

gen, err := store.GetByHeight(context.Background(), 1)
require.NoError(t, err)

_, err = exchg.GetRangeByHeight(context.Background(), gen, 3)
require.NoError(t, err)
newPeerScore := exchg.peerTracker.trackedPeers[host1.ID()].score()
newPeerScore := score(t, exchg.host, host1.ID())
assert.NotEqual(t, newPeerScore, prevScore)
}

// TestExchange_RequestPartialRange enusres in case of receiving a partial response
// from server, Exchange will re-request remaining headers from another peer
func TestExchange_RequestPartialRange(t *testing.T) {
hosts := createMocknet(t, 3)
hosts := quicHosts(t, 3)
exchg, store := createP2PExAndServer(t, hosts[0], hosts[1])

// create one more server(with more headers in the store)
Expand All @@ -523,13 +536,14 @@ func TestExchange_RequestPartialRange(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
t.Cleanup(cancel)

hosts[0].ConnManager().TagPeer(hosts[1].ID(), string(protocolID(networkID)), 100)

require.NoError(t, err)
require.NoError(t, serverSideEx.Start(ctx))
exchg.peerTracker.peerLk.Lock()
prevScoreBefore1 := exchg.peerTracker.trackedPeers[hosts[1].ID()].peerScore
prevScoreBefore2 := 50
// reducing peerScore of the second server, so our exchange will request host[1] first.
exchg.peerTracker.trackedPeers[hosts[2].ID()] = &peerStat{peerID: hosts[2].ID(), peerScore: 50}
prevScoreBefore1 := score(t, exchg.host, hosts[1].ID())
prevScoreBefore2 := score(t, exchg.host, hosts[2].ID())
exchg.peerTracker.trackedPeers[hosts[2].ID()] = struct{}{}
exchg.peerTracker.peerLk.Unlock()

gen, err := store.GetByHeight(context.Background(), 1)
Expand All @@ -540,8 +554,8 @@ func TestExchange_RequestPartialRange(t *testing.T) {
require.NoError(t, err)

exchg.peerTracker.peerLk.Lock()
prevScoreAfter1 := exchg.peerTracker.trackedPeers[hosts[1].ID()].peerScore
prevScoreAfter2 := exchg.peerTracker.trackedPeers[hosts[2].ID()].peerScore
prevScoreAfter1 := score(t, exchg.host, hosts[1].ID())
prevScoreAfter2 := score(t, exchg.host, hosts[2].ID())
exchg.peerTracker.peerLk.Unlock()

assert.NotEqual(t, prevScoreBefore1, prevScoreAfter1)
Expand All @@ -561,7 +575,6 @@ func createP2PExAndServer(
host, tpeer libhost.Host,
) (*Exchange[*headertest.DummyHeader], *headertest.Store[*headertest.DummyHeader]) {
store := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 5)

serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](tpeer, store,
WithNetworkID[ServerParameters](networkID),
)
Expand All @@ -582,7 +595,7 @@ func createP2PExAndServer(
time.Sleep(time.Millisecond * 100) // give peerTracker time to add a trusted peer

ex.peerTracker.peerLk.Lock()
ex.peerTracker.trackedPeers[tpeer.ID()] = &peerStat{peerID: tpeer.ID(), peerScore: 100.0}
ex.peerTracker.trackedPeers[tpeer.ID()] = struct{}{}
ex.peerTracker.peerLk.Unlock()

t.Cleanup(func() {
Expand All @@ -595,12 +608,15 @@ func createP2PExAndServer(

func quicHosts(t *testing.T, n int) []libhost.Host {
hosts := make([]libhost.Host, n)
var err error
for i := range hosts {
swrm := swarm.GenSwarm(t, swarm.OptDisableTCP)
hosts[i] = blankhost.NewBlankHost(swrm)
require.NoError(t, err)
hosts[i], err = libp2p.New()
for _, host := range hosts[:i] {
hosts[i].Peerstore().AddAddrs(host.ID(), host.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
host.Peerstore().AddAddrs(hosts[i].ID(), hosts[i].Network().ListenAddresses(), peerstore.PermanentAddrTTL)
err = hosts[i].Connect(context.Background(), peer.AddrInfo{ID: host.ID(), Addrs: host.Addrs()})
require.NoError(t, err)
}
}

Expand Down Expand Up @@ -647,3 +663,15 @@ func (t *timedOutStore) Head(context.Context, ...header.HeadOption[*headertest.D
time.Sleep(t.timeout)
return nil, header.ErrNoHead
}

func (t *timedOutStore) GetRange(ctx context.Context, from, to uint64) ([]*headertest.DummyHeader, error) {
time.Sleep(t.timeout)
return t.Store.GetRange(ctx, from, to)
}

func score(t *testing.T, h libhost.Host, id peer.ID) int {
t.Helper()
tags := h.ConnManager().GetTagInfo(id)
tag, _ := tags.Tags[string(protocolID(networkID))]
return tag
}
12 changes: 11 additions & 1 deletion p2p/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func sendMessage(
}

if err == nil {
if closeErr := stream.Close(); closeErr != nil {
if closeErr := stream.CloseRead(); closeErr != nil {
log.Errorw("closing stream", "err", closeErr)
}
} else {
Expand All @@ -124,3 +124,13 @@ func convertStatusCodeToError(code p2p_pb.StatusCode) error {
return fmt.Errorf("unknown status code %d", code)
}
}

// transform applies a provided function to each element of the input slice,
// producing a new slice with the results of the function.
func transform[T, U any](ts []T, f func(T) U) []U {
us := make([]U, len(ts))
for i := range ts {
us[i] = f(ts[i])
}
return us
}
40 changes: 14 additions & 26 deletions p2p/peer_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ type peerStat struct {
sync.RWMutex
peerID peer.ID
// score is the average speed per single request
peerScore float32
// pruneDeadline specifies when disconnected peer will be removed if
// it does not return online.
pruneDeadline time.Time
peerScore int
}

// updateStats recalculates peer.score by averaging the last score
Expand All @@ -26,33 +23,28 @@ type peerStat struct {
// by dividing the amount by time, so the result score will represent how many bytes
// were retrieved in 1 millisecond. This value will then be averaged relative to the
// previous peerScore.
func (p *peerStat) updateStats(amount uint64, duration time.Duration) {
p.Lock()
defer p.Unlock()
func (p *peerStat) updateStats(amount uint64, duration time.Duration) int {
if amount == 0 && duration == 0 {
// decrease peerScore by 20% of the peer that failed the request by any reason.
// NOTE: peerScore will not be decreased if the score is less than 100.
p.peerScore -= p.peerScore / 100 * 20
return p.peerScore
}

averageSpeed := float32(amount)
if duration != 0 {
averageSpeed /= float32(duration.Milliseconds())
}
if p.peerScore == 0.0 {
p.peerScore = averageSpeed
return
p.peerScore = int(averageSpeed * 100)
return p.peerScore
}
p.peerScore = (p.peerScore + averageSpeed) / 2
}

// decreaseScore decreases peerScore by 20% of the peer that failed the request by any reason.
// NOTE: decreasing peerScore in one session will not affect its position in queue in another
// session(as we can have multiple sessions running concurrently).
// TODO(vgonkivs): to figure out the better scoring increments/decrements
func (p *peerStat) decreaseScore() {
p.Lock()
defer p.Unlock()

p.peerScore -= p.peerScore / 100 * 20
p.peerScore = (p.peerScore + int(averageSpeed*100)) / 2
return p.peerScore
}

// score reads a peer's latest score from the queue
func (p *peerStat) score() float32 {
func (p *peerStat) score() int {
p.RLock()
defer p.RUnlock()
return p.peerScore
Expand Down Expand Up @@ -123,10 +115,6 @@ func newPeerQueue(ctx context.Context, stats []*peerStat) *peerQueue {
// in case if there are no peer available in current session, it blocks until
// the peer will be pushed in.
func (p *peerQueue) waitPop(ctx context.Context) *peerStat {
// TODO(vgonkivs): implement fallback solution for cases when peer queue is empty.
// As we discussed with @Wondertan there could be 2 possible solutions:
// * use libp2p.Discovery to find new peers outside peerTracker to request headers;
// * implement IWANT/IHAVE messaging system and start requesting ranges from the Peerstore;
select {
case <-ctx.Done():
return &peerStat{}
Expand Down
Loading
Loading