Skip to content

Commit

Permalink
server/cache: hide regions in clusterInfo (#367)
Browse files Browse the repository at this point in the history
* server/cache: hide regions in clusterInfo

regionsInfo will be rewrote later, we should not access it directly.
  • Loading branch information
huachaohuang authored Oct 31, 2016
1 parent 38052f2 commit 762b326
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 40 deletions.
45 changes: 31 additions & 14 deletions server/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,25 @@ var (
errStoreNotFound = func(storeID uint64) error {
return errors.Errorf("store %v not found", storeID)
}
errRegionNotFound = func(regionID uint64) error {
return errors.Errorf("region %v not found", regionID)
}
errRegionIsStale = func(region *metapb.Region, origin *metapb.Region) error {
return errors.Errorf("region is stale: region %v origin %v", region, origin)
}
)

func checkStaleRegion(origin *metapb.Region, region *metapb.Region) error {
o := origin.GetRegionEpoch()
e := region.GetRegionEpoch()

if e.GetVersion() < o.GetVersion() || e.GetConfVer() < o.GetConfVer() {
return errors.Trace(errRegionIsStale(region, origin))
}

return nil
}

type storesInfo struct {
stores map[uint64]*storeInfo
}
Expand Down Expand Up @@ -98,18 +115,6 @@ func cloneRegion(r *metapb.Region) *metapb.Region {
return proto.Clone(r).(*metapb.Region)
}

func checkStaleRegion(region *metapb.Region, checkRegion *metapb.Region) error {
epoch := region.GetRegionEpoch()
checkEpoch := checkRegion.GetRegionEpoch()

if checkEpoch.GetVersion() >= epoch.GetVersion() &&
checkEpoch.GetConfVer() >= epoch.GetConfVer() {
return nil
}

return errors.Errorf("epoch %s is staler than %s", checkEpoch, epoch)
}

type leaders struct {
// store id -> region id -> struct{}
storeRegions map[uint64]map[uint64]struct{}
Expand Down Expand Up @@ -379,11 +384,11 @@ func (r *regionsInfo) heartbeat(region *metapb.Region, leaderPeer *metapb.Peer)
return resp, nil
}

func (r *regionsInfo) getStoreRegionCount(storeID uint64) uint64 {
func (r *regionsInfo) getStoreRegionCount(storeID uint64) int {
r.RLock()
defer r.RUnlock()

return r.storeRegionCount[storeID]
return int(r.storeRegionCount[storeID])
}

func (r *regionsInfo) getStoreLeaderCount(storeID uint64) int {
Expand Down Expand Up @@ -552,10 +557,22 @@ func (c *clusterInfo) updateRegion(region *regionInfo) {
c.regions.updateRegion(region.Region)
}

func (c *clusterInfo) getMetaRegions() []*metapb.Region {
return c.regions.getRegions()
}

func (c *clusterInfo) getRegionCount() int {
return c.regions.getRegionCount()
}

func (c *clusterInfo) getStoreRegionCount(storeID uint64) int {
return c.regions.getStoreRegionCount(storeID)
}

func (c *clusterInfo) getStoreLeaderCount(storeID uint64) int {
return c.regions.getStoreLeaderCount(storeID)
}

func (c *clusterInfo) randLeaderRegion(storeID uint64) *regionInfo {
region := c.regions.randLeaderRegion(storeID)
if region == nil {
Expand Down
114 changes: 113 additions & 1 deletion server/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,36 @@ func newTestRegions(n, np uint64) []*regionInfo {
return regions
}

func checkRegion(c *C, a *regionInfo, b *regionInfo) {
c.Assert(a.Region, DeepEquals, b.Region)
c.Assert(a.Leader, DeepEquals, b.Leader)
}

func checkRegions(c *C, cache *regionsInfo, regions []*regionInfo) {
regionCount := make(map[uint64]int)
leaderCount := make(map[uint64]int)
for _, region := range regions {
for _, peer := range region.Peers {
regionCount[peer.StoreId]++
if peer.Id == region.Leader.Id {
leaderCount[peer.StoreId]++
}
}
}

c.Assert(cache.getRegionCount(), Equals, len(regions))
for id, count := range regionCount {
c.Assert(cache.getStoreRegionCount(id), Equals, count)
}
for id, count := range leaderCount {
c.Assert(cache.getStoreLeaderCount(id), Equals, count)
}

for _, region := range cache.getRegions() {
c.Assert(region, DeepEquals, regions[region.GetId()].Region)
}
}

var _ = Suite(&testClusterInfoSuite{})

type testClusterInfoSuite struct{}
Expand Down Expand Up @@ -126,6 +156,88 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) {
c.Assert(cache.getStoreCount(), Equals, int(n))
}

func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
n, np := uint64(3), uint64(3)
cache := newClusterInfo(newMockIDAllocator())
regions := newTestRegions(n, np)

for i, region := range regions {
// region does not exist.
_, err := cache.handleRegionHeartbeat(region)
c.Assert(err, IsNil)
checkRegions(c, cache.regions, regions[0:i+1])

// region is the same, not updated.
_, err = cache.handleRegionHeartbeat(region)
c.Assert(err, IsNil)
checkRegions(c, cache.regions, regions[0:i+1])

epoch := region.clone().GetRegionEpoch()

// region is updated.
region.RegionEpoch = &metapb.RegionEpoch{
Version: epoch.GetVersion() + 1,
}
_, err = cache.handleRegionHeartbeat(region)
c.Assert(err, IsNil)
checkRegions(c, cache.regions, regions[0:i+1])

// region is stale (Version).
stale := region.clone()
stale.RegionEpoch = &metapb.RegionEpoch{
ConfVer: epoch.GetConfVer() + 1,
}
_, err = cache.handleRegionHeartbeat(stale)
c.Assert(err, NotNil)
checkRegions(c, cache.regions, regions[0:i+1])

// region is updated.
region.RegionEpoch = &metapb.RegionEpoch{
Version: epoch.GetVersion() + 1,
ConfVer: epoch.GetConfVer() + 1,
}
_, err = cache.handleRegionHeartbeat(region)
c.Assert(err, IsNil)
checkRegions(c, cache.regions, regions[0:i+1])

// region is stale (ConfVer).
stale = region.clone()
stale.RegionEpoch = &metapb.RegionEpoch{
Version: epoch.GetVersion() + 1,
}
_, err = cache.handleRegionHeartbeat(stale)
c.Assert(err, NotNil)
checkRegions(c, cache.regions, regions[0:i+1])
}
}

var _ = Suite(&testClusterUtilSuite{})

type testClusterUtilSuite struct{}

func (s *testClusterUtilSuite) TestCheckStaleRegion(c *C) {
// (0, 0) v.s. (0, 0)
region := newRegion([]byte{}, []byte{})
origin := newRegion([]byte{}, []byte{})
c.Assert(checkStaleRegion(region, origin), IsNil)
c.Assert(checkStaleRegion(origin, region), IsNil)

// (1, 0) v.s. (0, 0)
region.RegionEpoch.Version++
c.Assert(checkStaleRegion(origin, region), IsNil)
c.Assert(checkStaleRegion(region, origin), NotNil)

// (1, 1) v.s. (0, 0)
region.RegionEpoch.ConfVer++
c.Assert(checkStaleRegion(origin, region), IsNil)
c.Assert(checkStaleRegion(region, origin), NotNil)

// (0, 1) v.s. (0, 0)
region.RegionEpoch.Version--
c.Assert(checkStaleRegion(origin, region), IsNil)
c.Assert(checkStaleRegion(region, origin), NotNil)
}

var _ = Suite(&testClusterCacheSuite{})

type testClusterCacheSuite struct {
Expand Down Expand Up @@ -323,7 +435,7 @@ func randRegions(count int) []*metapb.Region {
}

func checkStoreRegionCount(c *C, r *regionsInfo, regions []*metapb.Region) {
stores := make(map[uint64]uint64)
stores := make(map[uint64]int)
for _, region := range regions {
for _, peer := range region.GetPeers() {
stores[peer.GetStoreId()]++
Expand Down
28 changes: 14 additions & 14 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ import (

var (
errClusterNotBootstrapped = errors.New("cluster is not bootstrapped")
errRegionNotFound = func(regionID uint64) error {
return errors.Errorf("region %v not found", regionID)
}
)

const (
Expand Down Expand Up @@ -352,22 +349,25 @@ func (c *RaftCluster) cacheAllRegions() error {
}

func (c *RaftCluster) getRegion(regionKey []byte) (*metapb.Region, *metapb.Peer) {
return c.cachedCluster.regions.getRegion(regionKey)
region := c.cachedCluster.searchRegion(regionKey)
if region == nil {
return nil, nil
}
return region.Region, region.Leader
}

// GetRegionByID gets region and leader peer by regionID from cluster.
func (c *RaftCluster) GetRegionByID(regionID uint64) (*metapb.Region, *metapb.Peer) {
return c.cachedCluster.regions.getRegionByID(regionID)
}

// GetRegion returns the region from cluster.
func (c *RaftCluster) getRegionByID(regionID uint64) *regionInfo {
return c.cachedCluster.getRegion(regionID)
region := c.cachedCluster.getRegion(regionID)
if region == nil {
return nil, nil
}
return region.Region, region.Leader
}

// GetRegions gets regions from cluster.
func (c *RaftCluster) GetRegions() []*metapb.Region {
return c.cachedCluster.regions.getRegions()
return c.cachedCluster.getMetaRegions()
}

// GetStores gets stores from cluster.
Expand Down Expand Up @@ -500,7 +500,7 @@ func (c *RaftCluster) checkStores() {
if store.GetState() != metapb.StoreState_Offline {
continue
}
if cluster.regions.getStoreRegionCount(store.GetId()) == 0 {
if cluster.getStoreRegionCount(store.GetId()) == 0 {
err := c.BuryStore(store.GetId(), false)
if err != nil {
log.Errorf("bury store %v failed: %v", store, err)
Expand Down Expand Up @@ -627,7 +627,7 @@ func (c *RaftCluster) putConfig(meta *metapb.Cluster) error {
// NewAddPeerOperator creates an operator to add a peer to the region.
// If storeID is 0, it will be chosen according to the balance rules.
func (c *RaftCluster) NewAddPeerOperator(regionID uint64, storeID uint64) (Operator, error) {
region := c.getRegionByID(regionID)
region := c.cachedCluster.getRegion(regionID)
if region == nil {
return nil, errRegionNotFound(regionID)
}
Expand Down Expand Up @@ -679,7 +679,7 @@ func (c *RaftCluster) NewRemovePeerOperator(regionID uint64, peerID uint64) (Ope

// SetAdminOperator sets the balance operator of the region.
func (c *RaftCluster) SetAdminOperator(regionID uint64, ops []Operator) error {
region := c.getRegionByID(regionID)
region := c.cachedCluster.getRegion(regionID)
if region == nil {
return errRegionNotFound(regionID)
}
Expand Down
4 changes: 2 additions & 2 deletions server/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ func (s *testClusterSuite) testCheckStores(c *C, conn net.Conn, clusterID uint64
regionInfo := newRegionInfo(region, leader)
_, err := cluster.handleRegionHeartbeat(regionInfo)
c.Assert(err, IsNil)
c.Assert(cluster.cachedCluster.regions.getStoreRegionCount(store.GetId()), Equals, 1)
c.Assert(cluster.cachedCluster.getStoreRegionCount(store.GetId()), Equals, 1)

// store is up w/ region peers will not be buried.
cluster.checkStores()
Expand All @@ -479,7 +479,7 @@ func (s *testClusterSuite) testCheckStores(c *C, conn net.Conn, clusterID uint64
region.Peers = []*metapb.Peer{leader}
_, err = cluster.handleRegionHeartbeat(regionInfo)
c.Assert(err, IsNil)
c.Assert(cluster.cachedCluster.regions.getStoreRegionCount(store.GetId()), Equals, 0)
c.Assert(cluster.cachedCluster.getStoreRegionCount(store.GetId()), Equals, 0)

// store is offline w/o region peers will be buried.
cluster.checkStores()
Expand Down
21 changes: 12 additions & 9 deletions server/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ func (s *testRegionSuite) TestRegionTree(c *C) {

c.Assert(tree.search([]byte("a")), IsNil)

regionA := newRegionItem([]byte("a"), []byte("b")).region
regionC := newRegionItem([]byte("c"), []byte("d")).region
regionD := newRegionItem([]byte("d"), []byte{}).region
regionA := newRegion([]byte("a"), []byte("b"))
regionC := newRegion([]byte("c"), []byte("d"))
regionD := newRegion([]byte("d"), []byte{})

tree.insert(regionA)
tree.insert(regionC)
Expand All @@ -119,11 +119,14 @@ func (s *testRegionSuite) TestRegionTree(c *C) {
c.Assert(tree.search([]byte("e")), Equals, regionD)
}

func newRegionItem(start, end []byte) *regionItem {
return &regionItem{
region: &metapb.Region{
StartKey: start,
EndKey: end,
},
func newRegion(start, end []byte) *metapb.Region {
return &metapb.Region{
StartKey: start,
EndKey: end,
RegionEpoch: &metapb.RegionEpoch{},
}
}

func newRegionItem(start, end []byte) *regionItem {
return &regionItem{region: newRegion(start, end)}
}

0 comments on commit 762b326

Please sign in to comment.