From b87967f7e27ac4840d29c359f87f946538e78e42 Mon Sep 17 00:00:00 2001 From: Fangxin Lou Date: Fri, 17 Jan 2025 14:55:04 +0800 Subject: [PATCH] cmd/other: threads option for webdav and libjfs (#5535) --- cmd/webdav.go | 7 +++++++ pkg/fs/fs.go | 4 ++-- pkg/fs/fs_test.go | 4 ++-- pkg/fs/http.go | 3 ++- pkg/gateway/gateway.go | 6 +++--- sdk/java/libjfs/main.go | 7 ++++++- 6 files changed, 22 insertions(+), 9 deletions(-) diff --git a/cmd/webdav.go b/cmd/webdav.go index d76467e413ac..e0fd4ac860e2 100644 --- a/cmd/webdav.go +++ b/cmd/webdav.go @@ -64,6 +64,12 @@ func cmdWebDav() *cli.Command { Aliases: []string{"d"}, Usage: "run in background", }, + &cli.IntFlag{ + Name: "threads", + Aliases: []string{"p"}, + Value: 50, + Usage: "number of threads for delete jobs (max 255)", + }, } return &cli.Command{ @@ -95,6 +101,7 @@ func webdav(c *cli.Context) error { CertFile: c.String("cert-file"), KeyFile: c.String("key-file"), EnableProppatch: c.Bool("enable-proppatch"), + MaxDeletes: c.Int("threads"), }) return jfs.Meta().CloseSession() } diff --git a/pkg/fs/fs.go b/pkg/fs/fs.go index 43ed5d245772..c800a5951dea 100644 --- a/pkg/fs/fs.go +++ b/pkg/fs/fs.go @@ -501,7 +501,7 @@ func (fs *FileSystem) Delete(ctx meta.Context, p string) (err syscall.Errno) { return } -func (fs *FileSystem) Rmr(ctx meta.Context, p string) (err syscall.Errno) { +func (fs *FileSystem) Rmr(ctx meta.Context, p string, numthreads int) (err syscall.Errno) { defer trace.StartRegion(context.TODO(), "fs.Rmr").End() l := vfs.NewLogContext(ctx) defer func() { fs.log(l, "Rmr (%s): %s", p, errstr(err)) }() @@ -509,7 +509,7 @@ func (fs *FileSystem) Rmr(ctx meta.Context, p string) (err syscall.Errno) { if err != 0 { return } - err = fs.m.Remove(ctx, parent.inode, path.Base(p), false, meta.RmrDefaultThreads, nil) + err = fs.m.Remove(ctx, parent.inode, path.Base(p), false, numthreads, nil) fs.invalidateEntry(parent.inode, path.Base(p)) return } diff --git a/pkg/fs/fs_test.go b/pkg/fs/fs_test.go index 47bede03f3b3..d1d91cbe3410 100644 --- a/pkg/fs/fs_test.go +++ b/pkg/fs/fs_test.go @@ -230,7 +230,7 @@ func TestFileSystem(t *testing.T) { if err := fs.Delete(ctx, "/d/f"); err == 0 || !IsNotExist(err) { t.Fatalf("delete /d/f: %s", err) } - if e := fs.Rmr(ctx, "/d"); e != 0 { + if e := fs.Rmr(ctx, "/d", meta.RmrDefaultThreads); e != 0 { t.Fatalf("delete /d -r: %s", e) } @@ -261,7 +261,7 @@ func TestFileSystem(t *testing.T) { if err := fs.Rename(ctx, "/ddd/", "/ttt/", 0); err != 0 { t.Fatalf("delete /ddd/: %s", err) } - if err := fs.Rmr(ctx, "/ttt/"); err != 0 { + if err := fs.Rmr(ctx, "/ttt/", meta.RmrDefaultThreads); err != 0 { t.Fatalf("rmr /ttt/: %s", err) } if _, err := fs.Stat(ctx, "/ttt/"); err != syscall.ENOENT { diff --git a/pkg/fs/http.go b/pkg/fs/http.go index b327b39541c8..69cacea8798b 100644 --- a/pkg/fs/http.go +++ b/pkg/fs/http.go @@ -125,7 +125,7 @@ func (hfs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm o } func (hfs *webdavFS) RemoveAll(ctx context.Context, name string) error { - return econv(hfs.fs.Rmr(hfs.ctx, name)) + return econv(hfs.fs.Rmr(hfs.ctx, name, hfs.config.MaxDeletes)) } func (hfs *webdavFS) Rename(ctx context.Context, oldName, newName string) error { @@ -252,6 +252,7 @@ type WebdavConfig struct { Password string CertFile string KeyFile string + MaxDeletes int } type indexHandler struct { diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 687dd5755d5b..5c6f7187b542 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -1162,7 +1162,7 @@ func (n *jfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, object } // remove parts - _ = n.fs.Rmr(mctx, n.upath(bucket, uploadID)) + _ = n.fs.Rmr(mctx, n.upath(bucket, uploadID), meta.RmrDefaultThreads) return minio.ObjectInfo{ Bucket: bucket, Name: object, @@ -1179,7 +1179,7 @@ func (n *jfsObjects) AbortMultipartUpload(ctx context.Context, bucket, object, u if err = n.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { return } - eno := n.fs.Rmr(mctx, n.upath(bucket, uploadID)) + eno := n.fs.Rmr(mctx, n.upath(bucket, uploadID), meta.RmrDefaultThreads) return jfsToObjectErr(ctx, eno, bucket, object, uploadID) } @@ -1224,7 +1224,7 @@ func (n *jfsObjects) cleanupDir(dir string) bool { continue } if now.Sub(time.Unix(entry.Attr.Mtime, 0)) > 7*24*time.Hour { - if errno = n.fs.Rmr(mctx, dirPath); errno != 0 { + if errno = n.fs.Rmr(mctx, dirPath, meta.RmrDefaultThreads); errno != 0 { logger.Errorf("failed to delete expired temporary files path: %s, err: %s", dirPath, errno) } else { deleted += 1 diff --git a/sdk/java/libjfs/main.go b/sdk/java/libjfs/main.go index 08698e6aac74..3f80091ea770 100644 --- a/sdk/java/libjfs/main.go +++ b/sdk/java/libjfs/main.go @@ -94,6 +94,7 @@ var ( userGroupCache = make(map[string]map[string][]string) // name -> (user -> groups) + MaxDeletes = meta.RmrDefaultThreads ) const ( @@ -467,6 +468,10 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) int64 utils.SetLogLevel(logrus.WarnLevel) } + if jConf.MaxDeletes > 0 { + MaxDeletes = jConf.MaxDeletes + } + metaConf := meta.DefaultConf() metaConf.Retries = jConf.IORetries metaConf.MaxDeletes = jConf.MaxDeletes @@ -865,7 +870,7 @@ func jfs_rmr(pid int64, h int64, cpath *C.char) int32 { if w == nil { return EINVAL } - return errno(w.Rmr(w.withPid(pid), C.GoString(cpath))) + return errno(w.Rmr(w.withPid(pid), C.GoString(cpath), MaxDeletes)) } //export jfs_rename