Skip to content

Commit

Permalink
Support limiting number of cache items
Browse files Browse the repository at this point in the history
Signed-off-by: Changxin Miao <[email protected]>
  • Loading branch information
polyrabbit committed Jan 1, 2025
1 parent beaa0d0 commit c211ca4
Show file tree
Hide file tree
Showing 12 changed files with 63 additions and 25 deletions.
5 changes: 5 additions & 0 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ func dataCacheFlags() []cli.Flag {
Value: "100G",
Usage: "size of cached object for read in MiB",
},
&cli.Int64Flag{
Name: "cache-items",
Value: 1e9, // should be enough for most disks
Usage: "max number of cached items",
},
&cli.Float64Flag{
Name: "free-space-ratio",
Value: 0.1,
Expand Down
1 change: 1 addition & 0 deletions cmd/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ func getChunkConf(c *cli.Context, format *meta.Format) *chunk.Config {

CacheDir: c.String("cache-dir"),
CacheSize: utils.ParseBytes(c, "cache-size", 'M'),
CacheItems: c.Int64("cache-items"),
FreeSpace: float32(c.Float64("free-space-ratio")),
CacheMode: os.FileMode(cm),
CacheFullBlock: !c.Bool("cache-partial-only"),
Expand Down
13 changes: 9 additions & 4 deletions pkg/chunk/cached_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (s *rSlice) ReadAt(ctx context.Context, page *Page, off int) (n int, err er
}

key := s.key(indx)
if s.store.conf.CacheSize > 0 {
if s.store.conf.CacheEnabled() {
start := time.Now()
r, err := s.store.bcache.load(key)
if err == nil {
Expand Down Expand Up @@ -547,6 +547,7 @@ type Config struct {
CacheDir string
CacheMode os.FileMode
CacheSize uint64
CacheItems int64
CacheChecksum string
CacheEviction string
CacheScanInterval time.Duration
Expand Down Expand Up @@ -575,9 +576,9 @@ type Config struct {
}

func (c *Config) SelfCheck(uuid string) {
if c.CacheSize == 0 {
if !c.CacheEnabled() {
if c.Writeback || c.Prefetch > 0 {
logger.Warnf("cache-size is 0, writeback and prefetch will be disabled")
logger.Warnf("cache-size or cache-items is 0, writeback and prefetch will be disabled")
c.Writeback = false
c.Prefetch = 0
}
Expand Down Expand Up @@ -650,6 +651,10 @@ func (c *Config) parseHours() (start, end int, err error) {
return
}

func (c *Config) CacheEnabled() bool {
return c.CacheSize > 0 && c.CacheItems > 0
}

type cachedStore struct {
storage object.ObjectStorage
bcache CacheManager
Expand Down Expand Up @@ -812,7 +817,7 @@ func NewCachedStore(storage object.ObjectStorage, config Config, reg prometheus.
}
}()

if config.CacheSize == 0 {
if !config.CacheEnabled() {
config.Prefetch = 0 // disable prefetch if cache is disabled
}
store.fetcher = newPrefetcher(config.Prefetch, func(key string) {
Expand Down
1 change: 1 addition & 0 deletions pkg/chunk/cached_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ var defaultConf = Config{
CacheDir: filepath.Join(os.TempDir(), "diskCache"),
CacheMode: 0600,
CacheSize: 10 << 20,
CacheItems: 1e9,
CacheChecksum: CsNone,
CacheScanInterval: time.Second * 300,
MaxUpload: 1,
Expand Down
34 changes: 22 additions & 12 deletions pkg/chunk/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type cacheStore struct {
mode os.FileMode
maxStageWrite int
capacity int64
maxItems int64
freeRatio float32
hashPrefix bool
scanInterval time.Duration
Expand All @@ -105,7 +106,7 @@ type cacheStore struct {
stateLock sync.Mutex
}

func newCacheStore(m *cacheManagerMetrics, dir string, cacheSize int64, pendingPages int, config *Config, uploader func(key, path string, force bool) bool) *cacheStore {
func newCacheStore(m *cacheManagerMetrics, dir string, cacheSize, maxItems int64, pendingPages int, config *Config, uploader func(key, path string, force bool) bool) *cacheStore {
if config.CacheMode == 0 {
config.CacheMode = 0600 // only owner can read/write cache
}
Expand All @@ -117,6 +118,7 @@ func newCacheStore(m *cacheManagerMetrics, dir string, cacheSize int64, pendingP
dir: dir,
mode: config.CacheMode,
capacity: cacheSize,
maxItems: maxItems,
maxStageWrite: config.MaxStageWrite,
freeRatio: config.FreeSpace,
eviction: config.CacheEviction,
Expand All @@ -142,8 +144,8 @@ func newCacheStore(m *cacheManagerMetrics, dir string, cacheSize int64, pendingP
if br < c.freeRatio || fr < c.freeRatio {
logger.Warnf("not enough space (%d%%) or inodes (%d%%) for caching in %s: free ratio should be >= %d%%", int(br*100), int(fr*100), c.dir, int(c.freeRatio*100))
}
logger.Infof("Disk cache (%s): capacity (%s), free ratio %d%%, used ratio - [space %s%%, inode %s%%], max pending pages %d",
c.dir, humanize.IBytes(uint64(c.capacity)), int(c.freeRatio*100), humanize.FtoaWithDigits(float64((1-br)*100), 1), humanize.FtoaWithDigits(float64((1-fr)*100), 1), pendingPages)
logger.Infof("Disk cache (%s): used ratio - [space %s%%, inode %s%%]",
c.dir, humanize.FtoaWithDigits(float64((1-br)*100), 1), humanize.FtoaWithDigits(float64((1-fr)*100), 1))
c.createLockFile()
go c.checkLockFile()
go c.flush()
Expand Down Expand Up @@ -207,6 +209,10 @@ func (c *cacheStore) available() bool {
return c.state.state() != dcDown
}

func (c *cacheStore) enabled() bool {
return c.capacity > 0 && c.maxItems > 0
}

func (cache *cacheStore) checkErr(f func() error) error {
if !cache.available() {
return errCacheDown
Expand Down Expand Up @@ -391,7 +397,7 @@ func (cache *cacheStore) removeStage(key string) error {
}

func (cache *cacheStore) cache(key string, p *Page, force, dropCache bool) {
if cache.capacity == 0 {
if !cache.enabled() {
return
}
if cache.rawFull && cache.eviction == "none" {
Expand Down Expand Up @@ -661,7 +667,7 @@ func (cache *cacheStore) flush() {
for {
w := <-cache.pending
path := cache.cachePath(w.key)
if cache.capacity > 0 && cache.flushPage(path, w.page.Data, w.dropCache) == nil {
if cache.enabled() && cache.flushPage(path, w.page.Data, w.dropCache) == nil {
cache.add(w.key, int32(len(w.page.Data)), uint32(time.Now().Unix()))
}
cache.Lock()
Expand Down Expand Up @@ -694,7 +700,7 @@ func (cache *cacheStore) add(key string, size int32, atime uint32) {
cache.used += int64(size + 4096)
}

if cache.used > cache.capacity && cache.eviction != "none" {
if (cache.used > cache.capacity || int64(len(cache.keys)) > cache.maxItems) && cache.eviction != "none" {
logger.Debugf("Cleanup cache when add new data (%s): %d blocks (%d MB)", cache.dir, len(cache.keys), cache.used>>20)
cache.cleanupFull()
}
Expand All @@ -715,7 +721,7 @@ func (cache *cacheStore) stage(key string, data []byte, keepCache bool) (string,
cache.m.stageBlocks.Add(1)
cache.m.stageBlockBytes.Add(float64(len(data)))
cache.m.stageWriteBytes.Add(float64(len(data)))
if cache.capacity > 0 && keepCache {
if cache.enabled() && keepCache {
path := cache.cachePath(key)
cache.createDir(filepath.Dir(path))
if err = os.Link(stagingPath, path); err == nil {
Expand All @@ -739,7 +745,10 @@ func (cache *cacheStore) cleanupFull() {
}

goal := cache.capacity * 95 / 100
num := len(cache.keys) * 99 / 100
num := int64(len(cache.keys)) * 99 / 100
if num > cache.maxItems*99/100 {
num = cache.maxItems * 99 / 100
}
// make sure we have enough free space after cleanup
br, fr := cache.curFreeRatio()
if br < cache.freeRatio {
Expand All @@ -757,7 +766,7 @@ func (cache *cacheStore) cleanupFull() {
if toFree > len(cache.keys) {
num = 0
} else {
num = (len(cache.keys) - toFree) * 99 / 100
num = int64(len(cache.keys)-toFree) * 99 / 100
}
}

Expand Down Expand Up @@ -790,7 +799,7 @@ func (cache *cacheStore) cleanupFull() {
logger.Debugf("remove %s from cache, age: %ds", lastK, now-lastValue.atime)
cache.m.cacheEvicts.Add(1)
cnt = 0
if len(cache.keys) < num && cache.used < goal {
if int64(len(cache.keys)) <= num && cache.used <= goal {
break
}
}
Expand Down Expand Up @@ -1038,7 +1047,7 @@ type CacheManager interface {
func newCacheManager(config *Config, reg prometheus.Registerer, uploader func(key, path string, force bool) bool) CacheManager {
getEnvs()
metrics := newCacheManagerMetrics(reg)
if config.CacheDir == "memory" || config.CacheSize == 0 {
if config.CacheDir == "memory" || !config.CacheEnabled() {
return newMemStore(config, metrics)
}
var dirs []string
Expand All @@ -1061,6 +1070,7 @@ func newCacheManager(config *Config, reg prometheus.Registerer, uploader func(ke
}
sort.Strings(dirs)
dirCacheSize := int64(config.CacheSize) / int64(len(dirs))
dirCacheItems := config.CacheItems / int64(len(dirs))
m := &cacheManager{
consistentMap: consistenthash.New(100, murmur3.Sum32),
storeMap: make(map[string]*cacheStore, len(dirs)),
Expand All @@ -1071,7 +1081,7 @@ func newCacheManager(config *Config, reg prometheus.Registerer, uploader func(ke
// 20% of buffer could be used for pending pages
pendingPages := int(config.BufferSize) * 2 / 10 / config.BlockSize / len(dirs)
for i, d := range dirs {
store := newCacheStore(metrics, strings.TrimSpace(d)+string(filepath.Separator), dirCacheSize, pendingPages, config, uploader)
store := newCacheStore(metrics, strings.TrimSpace(d)+string(filepath.Separator), dirCacheSize, dirCacheItems, pendingPages, config, uploader)
m.stores[i] = store
m.storeMap[store.id] = store
m.consistentMap.Add(store.id)
Expand Down
8 changes: 4 additions & 4 deletions pkg/chunk/disk_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func toFloat64(c prometheus.Collector) float64 {
}

func TestNewCacheStore(t *testing.T) {
s := newCacheStore(nil, defaultConf.CacheDir, 1<<30, 1, &defaultConf, nil)
s := newCacheStore(nil, defaultConf.CacheDir, 1<<30, defaultConf.CacheItems, 1, &defaultConf, nil)
if s == nil {
t.Fatalf("Create new cache store failed")
}
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestExpand(t *testing.T) {

func BenchmarkLoadCached(b *testing.B) {
dir := b.TempDir()
s := newCacheStore(nil, filepath.Join(dir, "diskCache"), 1<<30, 1, &defaultConf, nil)
s := newCacheStore(nil, filepath.Join(dir, "diskCache"), 1<<30, defaultConf.CacheItems, 1, &defaultConf, nil)
p := NewPage(make([]byte, 1024))
key := "/chunks/1_1024"
s.cache(key, p, false, false)
Expand All @@ -259,11 +259,11 @@ func BenchmarkLoadCached(b *testing.B) {

func BenchmarkLoadUncached(b *testing.B) {
dir := b.TempDir()
s := newCacheStore(nil, filepath.Join(dir, "diskCache"), 1<<30, 1, &defaultConf, nil)
s := newCacheStore(nil, filepath.Join(dir, "diskCache"), 1<<30, defaultConf.CacheItems, 1, &defaultConf, nil)
key := "chunks/222_1024"
b.ResetTimer()
for i := 0; i < b.N; i++ {
if f, e := s.load(key); e != nil {
if f, e := s.load(key); e == nil {
_ = f.Close()
}
}
Expand Down
20 changes: 15 additions & 5 deletions pkg/chunk/mem_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type memItem struct {
type memcache struct {
sync.Mutex
capacity int64
maxItems int64
used int64
pages map[string]memItem
eviction string
Expand All @@ -44,6 +45,7 @@ type memcache struct {
func newMemStore(config *Config, metrics *cacheManagerMetrics) *memcache {
c := &memcache{
capacity: int64(config.CacheSize),
maxItems: config.CacheItems,
pages: make(map[string]memItem),
eviction: config.CacheEviction,
cacheExpire: config.CacheExpire,
Expand Down Expand Up @@ -78,12 +80,12 @@ func (c *memcache) stats() (int64, int64) {
}

func (c *memcache) cache(key string, p *Page, force, dropCache bool) {
if c.capacity == 0 {
if !c.enabled() {
return
}
c.Lock()
defer c.Unlock()
if c.used > c.capacity && c.eviction == "none" {
if c.full() && c.eviction == "none" {
logger.Debugf("Caching is full, drop %s (%d bytes)", key, len(p.Data))
c.metrics.cacheDrops.Add(1)
return
Expand All @@ -97,7 +99,7 @@ func (c *memcache) cache(key string, p *Page, force, dropCache bool) {
p.Acquire()
c.pages[key] = memItem{time.Now(), p}
c.used += size
if c.used > c.capacity {
if c.full() {
c.cleanup()
}
}
Expand Down Expand Up @@ -129,7 +131,7 @@ func (c *memcache) load(key string) (ReadCloser, error) {
}

func (c *memcache) exist(key string) bool {
if c.capacity == 0 {
if !c.enabled() {
return false
}
c.Lock()
Expand Down Expand Up @@ -159,13 +161,21 @@ func (c *memcache) cleanup() {
c.metrics.cacheEvicts.Add(1)
c.delete(lastKey, lastValue.page)
cnt = 0
if c.used < c.capacity {
if !c.full() {
break
}
}
}
}

func (c *memcache) enabled() bool {
return c.capacity > 0 && c.maxItems > 0
}

func (c *memcache) full() bool {
return c.used > c.capacity || int64(len(c.pages)) > c.maxItems
}

func (c *memcache) cleanupExpire() {
var interval = time.Minute
if c.cacheExpire < time.Minute {
Expand Down
1 change: 1 addition & 0 deletions pkg/fuse/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func mount(url, mp string) {
MaxUpload: 20,
BufferSize: 300 << 20,
CacheSize: 1024,
CacheItems: 1e9,
CacheDir: "memory",
}

Expand Down
1 change: 1 addition & 0 deletions pkg/vfs/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestCompact(t *testing.T) {
MaxUpload: 2,
BufferSize: 30 << 20,
CacheSize: 10 << 20,
CacheItems: 1e9,
CacheDir: "memory",
}
blob, _ := object.CreateStorage("mem", "", "", "", "")
Expand Down
1 change: 1 addition & 0 deletions pkg/vfs/vfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func createTestVFS(applyMetaConfOption func(metaConfig *meta.Config), metaUri st
MaxUpload: 2,
BufferSize: 30 << 20,
CacheSize: 10 << 20,
CacheItems: 1e9,
CacheDir: "memory",
},
FuseOpts: &FuseOptions{},
Expand Down
2 changes: 2 additions & 0 deletions sdk/java/libjfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ type javaConf struct {
Heartbeat string `json:"heartbeat"`
CacheDir string `json:"cacheDir"`
CacheSize string `json:"cacheSize"`
CacheItems int64 `json:"cacheItems"`
FreeSpace string `json:"freeSpace"`
AutoCreate bool `json:"autoCreate"`
CacheFullBlock bool `json:"cacheFullBlock"`
Expand Down Expand Up @@ -543,6 +544,7 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) int64
CacheDir: jConf.CacheDir,
CacheMode: 0644, // all user can read cache
CacheSize: utils.ParseBytesStr("cache-size", jConf.CacheSize, 'M'),
CacheItems: jConf.CacheItems,
FreeSpace: float32(freeSpaceRatio),
AutoCreate: jConf.AutoCreate,
CacheFullBlock: jConf.CacheFullBlock,
Expand Down
1 change: 1 addition & 0 deletions sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ public void initialize(URI uri, Configuration conf) throws IOException {
obj.put("noBGJob", Boolean.valueOf(getConf(conf, "no-bgjob", "false")));
obj.put("cacheDir", getConf(conf, "cache-dir", "memory"));
obj.put("cacheSize", getConf(conf, "cache-size", "100"));
obj.put("cacheItems", Integer.valueOf(getConf(conf, "cache-items", "1000000000")));
obj.put("openCache", getConf(conf, "open-cache", "0.0"));
obj.put("backupMeta", getConf(conf, "backup-meta", "3600"));
obj.put("backupSkipTrash", Boolean.valueOf(getConf(conf, "backup-skip-trash", "false")));
Expand Down

0 comments on commit c211ca4

Please sign in to comment.