forked from mailgun/gubernator
-
Notifications
You must be signed in to change notification settings - Fork 0
/
region_picker.go
95 lines (82 loc) · 2.37 KB
/
region_picker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package gubernator
import (
"github.com/mailgun/holster/v4/syncutil"
)
type RegionPeerPicker interface {
GetClients(string) ([]*PeerClient, error)
GetByPeerInfo(PeerInfo) *PeerClient
Pickers() map[string]PeerPicker
Peers() []*PeerClient
Add(*PeerClient)
New() RegionPeerPicker
}
// RegionPicker encapsulates pickers for a set of regions
type RegionPicker struct {
*ReplicatedConsistentHash
// A map of all the pickers by region
regions map[string]PeerPicker
// The implementation of picker we will use for each region
conf BehaviorConfig
wg syncutil.WaitGroup
reqQueue chan *RateLimitReq
}
func NewRegionPicker(fn HashString64) *RegionPicker {
rp := &RegionPicker{
regions: make(map[string]PeerPicker),
reqQueue: make(chan *RateLimitReq, 0),
ReplicatedConsistentHash: NewReplicatedConsistentHash(fn, defaultReplicas),
}
return rp
}
func (rp *RegionPicker) New() RegionPeerPicker {
hash := rp.ReplicatedConsistentHash.New().(*ReplicatedConsistentHash)
return &RegionPicker{
regions: make(map[string]PeerPicker),
reqQueue: make(chan *RateLimitReq, 0),
ReplicatedConsistentHash: hash,
}
}
// GetClients returns all the PeerClients that match this key in all regions
func (rp *RegionPicker) GetClients(key string) ([]*PeerClient, error) {
result := make([]*PeerClient, len(rp.regions))
var i int
for _, picker := range rp.regions {
peer, err := picker.Get(key)
if err != nil {
return nil, err
}
result[i] = peer
i++
}
return result, nil
}
// GetByPeerInfo returns the first PeerClient the PeerInfo.HasKey() matches
func (rp *RegionPicker) GetByPeerInfo(info PeerInfo) *PeerClient {
for _, picker := range rp.regions {
if client := picker.GetByPeerInfo(info); client != nil {
return client
}
}
return nil
}
// Pickers returns a map of each region and its respective PeerPicker
func (rp *RegionPicker) Pickers() map[string]PeerPicker {
return rp.regions
}
func (rp *RegionPicker) Peers() []*PeerClient {
var peers []*PeerClient
for _, picker := range rp.regions {
for _, peer := range picker.Peers() {
peers = append(peers, peer)
}
}
return peers
}
func (rp *RegionPicker) Add(peer *PeerClient) {
picker, ok := rp.regions[peer.Info().DataCenter]
if !ok {
picker = rp.ReplicatedConsistentHash.New()
rp.regions[peer.Info().DataCenter] = picker
}
picker.Add(peer)
}