Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,11 +898,13 @@ func (s *testBalanceHotWriteRegionSchedulerSuite) TestBalance(c *C) {
}
}

// hot region scheduler is restricted by schedule limit.
opt.RegionScheduleLimit, opt.LeaderScheduleLimit = 0, 0
// hot region scheduler is restricted by `hot-region-schedule-limit`.
opt.HotRegionScheduleLimit = 0
c.Assert(hb.Schedule(tc), HasLen, 0)
opt.LeaderScheduleLimit = schedule.NewMockSchedulerOptions().LeaderScheduleLimit
opt.RegionScheduleLimit = schedule.NewMockSchedulerOptions().RegionScheduleLimit
// hot region scheduler is not affect by `balance-region-schedule-limit`.
opt.HotRegionScheduleLimit = schedule.NewMockSchedulerOptions().HotRegionScheduleLimit
opt.RegionScheduleLimit = 0
c.Assert(hb.Schedule(tc), HasLen, 1)

//| region_id | leader_store | follower_store | follower_store | written_bytes |
//|-----------|--------------|----------------|----------------|---------------|
Expand Down
29 changes: 16 additions & 13 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ func newStoreStaticstics() *storeStatistics {
type balanceHotRegionsScheduler struct {
*baseScheduler
sync.RWMutex
limit uint64
types []BalanceType
leaderLimit uint64
peerLimit uint64
types []BalanceType

// store id -> hot regions statistics as the role of leader
stats *storeStatistics
Expand All @@ -82,7 +83,8 @@ func newBalanceHotRegionsScheduler(opController *schedule.OperatorController) *b
base := newBaseScheduler(opController)
return &balanceHotRegionsScheduler{
baseScheduler: base,
limit: 1,
leaderLimit: 1,
peerLimit: 1,
stats: newStoreStaticstics(),
types: []BalanceType{hotWriteRegionBalance, hotReadRegionBalance},
r: rand.New(rand.NewSource(time.Now().UnixNano())),
Expand All @@ -93,7 +95,8 @@ func newBalanceHotReadRegionsScheduler(opController *schedule.OperatorController
base := newBaseScheduler(opController)
return &balanceHotRegionsScheduler{
baseScheduler: base,
limit: 1,
leaderLimit: 1,
peerLimit: 1,
stats: newStoreStaticstics(),
types: []BalanceType{hotReadRegionBalance},
r: rand.New(rand.NewSource(time.Now().UnixNano())),
Expand All @@ -104,7 +107,8 @@ func newBalanceHotWriteRegionsScheduler(opController *schedule.OperatorControlle
base := newBaseScheduler(opController)
return &balanceHotRegionsScheduler{
baseScheduler: base,
limit: 1,
leaderLimit: 1,
peerLimit: 1,
stats: newStoreStaticstics(),
types: []BalanceType{hotWriteRegionBalance},
r: rand.New(rand.NewSource(time.Now().UnixNano())),
Expand All @@ -131,13 +135,12 @@ func min(a, b uint64) uint64 {
}

func (h *balanceHotRegionsScheduler) allowBalanceLeader(cluster schedule.Cluster) bool {
return h.opController.OperatorCount(schedule.OpHotRegion) < min(h.limit, cluster.GetHotRegionScheduleLimit()) &&
return h.opController.OperatorCount(schedule.OpHotRegion) < min(h.leaderLimit, cluster.GetHotRegionScheduleLimit()) &&
h.opController.OperatorCount(schedule.OpLeader) < cluster.GetLeaderScheduleLimit()
}

func (h *balanceHotRegionsScheduler) allowBalanceRegion(cluster schedule.Cluster) bool {
return h.opController.OperatorCount(schedule.OpHotRegion) < min(h.limit, cluster.GetHotRegionScheduleLimit()) &&
h.opController.OperatorCount(schedule.OpRegion) < cluster.GetRegionScheduleLimit()
return h.opController.OperatorCount(schedule.OpHotRegion) < min(h.peerLimit, cluster.GetHotRegionScheduleLimit())
}

func (h *balanceHotRegionsScheduler) Schedule(cluster schedule.Cluster) []*schedule.Operator {
Expand Down Expand Up @@ -313,7 +316,7 @@ func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, sto

destStoreID = h.selectDestStore(candidateStoreIDs, rs.FlowBytes, srcStoreID, storesStat)
if destStoreID != 0 {
h.adjustBalanceLimit(srcStoreID, storesStat)
h.peerLimit = h.adjustBalanceLimit(srcStoreID, storesStat)

srcPeer := srcRegion.GetStorePeer(srcStoreID)
if srcPeer == nil {
Expand Down Expand Up @@ -371,7 +374,8 @@ func (h *balanceHotRegionsScheduler) balanceByLeader(cluster schedule.Cluster, s

destPeer := srcRegion.GetStoreVoter(destStoreID)
if destPeer != nil {
h.adjustBalanceLimit(srcStoreID, storesStat)
h.leaderLimit = h.adjustBalanceLimit(srcStoreID, storesStat)

return srcRegion, destPeer
}
}
Expand Down Expand Up @@ -430,7 +434,7 @@ func (h *balanceHotRegionsScheduler) selectDestStore(candidateStoreIDs []uint64,
return
}

func (h *balanceHotRegionsScheduler) adjustBalanceLimit(storeID uint64, storesStat core.StoreHotRegionsStat) {
func (h *balanceHotRegionsScheduler) adjustBalanceLimit(storeID uint64, storesStat core.StoreHotRegionsStat) uint64 {
srcStoreStatistics := storesStat[storeID]

var hotRegionTotalCount float64
Expand All @@ -440,8 +444,7 @@ func (h *balanceHotRegionsScheduler) adjustBalanceLimit(storeID uint64, storesSt

avgRegionCount := hotRegionTotalCount / float64(len(storesStat))
// Multiplied by hotRegionLimitFactor to avoid transfer back and forth
limit := uint64((float64(srcStoreStatistics.RegionsStat.Len()) - avgRegionCount) * hotRegionLimitFactor)
h.limit = maxUint64(1, limit)
return uint64((float64(srcStoreStatistics.RegionsStat.Len()) - avgRegionCount) * hotRegionLimitFactor)
}

func (h *balanceHotRegionsScheduler) GetHotReadStatus() *core.StoreHotRegionInfos {
Expand Down