Skip to content

Commit

Permalink
cmd/other: threads option for webdav and libjfs (#5535)
Browse files Browse the repository at this point in the history
  • Loading branch information
anysql authored Jan 17, 2025
1 parent 4048b50 commit b87967f
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 9 deletions.
7 changes: 7 additions & 0 deletions cmd/webdav.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ func cmdWebDav() *cli.Command {
Aliases: []string{"d"},
Usage: "run in background",
},
&cli.IntFlag{
Name: "threads",
Aliases: []string{"p"},
Value: 50,
Usage: "number of threads for delete jobs (max 255)",
},
}

return &cli.Command{
Expand Down Expand Up @@ -95,6 +101,7 @@ func webdav(c *cli.Context) error {
CertFile: c.String("cert-file"),
KeyFile: c.String("key-file"),
EnableProppatch: c.Bool("enable-proppatch"),
MaxDeletes: c.Int("threads"),
})
return jfs.Meta().CloseSession()
}
4 changes: 2 additions & 2 deletions pkg/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,15 +501,15 @@ func (fs *FileSystem) Delete(ctx meta.Context, p string) (err syscall.Errno) {
return
}

func (fs *FileSystem) Rmr(ctx meta.Context, p string) (err syscall.Errno) {
func (fs *FileSystem) Rmr(ctx meta.Context, p string, numthreads int) (err syscall.Errno) {
defer trace.StartRegion(context.TODO(), "fs.Rmr").End()
l := vfs.NewLogContext(ctx)
defer func() { fs.log(l, "Rmr (%s): %s", p, errstr(err)) }()
parent, err := fs.resolve(ctx, parentDir(p), true)
if err != 0 {
return
}
err = fs.m.Remove(ctx, parent.inode, path.Base(p), false, meta.RmrDefaultThreads, nil)
err = fs.m.Remove(ctx, parent.inode, path.Base(p), false, numthreads, nil)
fs.invalidateEntry(parent.inode, path.Base(p))
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/fs/fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestFileSystem(t *testing.T) {
if err := fs.Delete(ctx, "/d/f"); err == 0 || !IsNotExist(err) {
t.Fatalf("delete /d/f: %s", err)
}
if e := fs.Rmr(ctx, "/d"); e != 0 {
if e := fs.Rmr(ctx, "/d", meta.RmrDefaultThreads); e != 0 {
t.Fatalf("delete /d -r: %s", e)
}

Expand Down Expand Up @@ -261,7 +261,7 @@ func TestFileSystem(t *testing.T) {
if err := fs.Rename(ctx, "/ddd/", "/ttt/", 0); err != 0 {
t.Fatalf("delete /ddd/: %s", err)
}
if err := fs.Rmr(ctx, "/ttt/"); err != 0 {
if err := fs.Rmr(ctx, "/ttt/", meta.RmrDefaultThreads); err != 0 {
t.Fatalf("rmr /ttt/: %s", err)
}
if _, err := fs.Stat(ctx, "/ttt/"); err != syscall.ENOENT {
Expand Down
3 changes: 2 additions & 1 deletion pkg/fs/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (hfs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm o
}

func (hfs *webdavFS) RemoveAll(ctx context.Context, name string) error {
return econv(hfs.fs.Rmr(hfs.ctx, name))
return econv(hfs.fs.Rmr(hfs.ctx, name, hfs.config.MaxDeletes))
}

func (hfs *webdavFS) Rename(ctx context.Context, oldName, newName string) error {
Expand Down Expand Up @@ -252,6 +252,7 @@ type WebdavConfig struct {
Password string
CertFile string
KeyFile string
MaxDeletes int
}

type indexHandler struct {
Expand Down
6 changes: 3 additions & 3 deletions pkg/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,7 @@ func (n *jfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, object
}

// remove parts
_ = n.fs.Rmr(mctx, n.upath(bucket, uploadID))
_ = n.fs.Rmr(mctx, n.upath(bucket, uploadID), meta.RmrDefaultThreads)
return minio.ObjectInfo{
Bucket: bucket,
Name: object,
Expand All @@ -1179,7 +1179,7 @@ func (n *jfsObjects) AbortMultipartUpload(ctx context.Context, bucket, object, u
if err = n.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
return
}
eno := n.fs.Rmr(mctx, n.upath(bucket, uploadID))
eno := n.fs.Rmr(mctx, n.upath(bucket, uploadID), meta.RmrDefaultThreads)
return jfsToObjectErr(ctx, eno, bucket, object, uploadID)
}

Expand Down Expand Up @@ -1224,7 +1224,7 @@ func (n *jfsObjects) cleanupDir(dir string) bool {
continue
}
if now.Sub(time.Unix(entry.Attr.Mtime, 0)) > 7*24*time.Hour {
if errno = n.fs.Rmr(mctx, dirPath); errno != 0 {
if errno = n.fs.Rmr(mctx, dirPath, meta.RmrDefaultThreads); errno != 0 {
logger.Errorf("failed to delete expired temporary files path: %s, err: %s", dirPath, errno)
} else {
deleted += 1
Expand Down
7 changes: 6 additions & 1 deletion sdk/java/libjfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ var (

userGroupCache = make(map[string]map[string][]string) // name -> (user -> groups)

MaxDeletes = meta.RmrDefaultThreads
)

const (
Expand Down Expand Up @@ -467,6 +468,10 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) int64
utils.SetLogLevel(logrus.WarnLevel)
}

if jConf.MaxDeletes > 0 {
MaxDeletes = jConf.MaxDeletes
}

metaConf := meta.DefaultConf()
metaConf.Retries = jConf.IORetries
metaConf.MaxDeletes = jConf.MaxDeletes
Expand Down Expand Up @@ -865,7 +870,7 @@ func jfs_rmr(pid int64, h int64, cpath *C.char) int32 {
if w == nil {
return EINVAL
}
return errno(w.Rmr(w.withPid(pid), C.GoString(cpath)))
return errno(w.Rmr(w.withPid(pid), C.GoString(cpath), MaxDeletes))
}

//export jfs_rename
Expand Down

0 comments on commit b87967f

Please sign in to comment.