diff --git a/config/constants.go b/config/constants.go new file mode 100644 index 0000000..f1c4f59 --- /dev/null +++ b/config/constants.go @@ -0,0 +1,79 @@ + +package config + +import ( + "time" +) + +// Reason of backing to source. +const ( + BackSourceReasonNone = 0 + BackSourceReasonRegisterFail = 1 + BackSourceReasonMd5NotMatch = 2 + BackSourceReasonDownloadError = 3 + BackSourceReasonNoSpace = 4 + BackSourceReasonInitError = 5 + BackSourceReasonWriteError = 6 + BackSourceReasonHostSysError = 7 + BackSourceReasonNodeEmpty = 8 + BackSourceReasonSourceError = 10 + BackSourceReasonUserSpecified = 100 + ForceNotBackSourceAddition = 1000 +) + +// Download pattern. +const ( + PatternP2P = "p2p" + PatternSeedPeer = "seed-peer" + PatternSource = "source" +) + +//// Download limit. +//const ( +// DefaultPerPeerDownloadLimit = 20 * unit.MB +// DefaultTotalDownloadLimit = 100 * unit.MB +// DefaultUploadLimit = 100 * unit.MB +// DefaultMinRate = 20 * unit.MB +//) + +// Others. +const ( + DefaultTimestampFormat = "2006-01-02 15:04:05" + SchemaHTTP = "http" + + DefaultTaskExpireTime = 6 * time.Hour + DefaultGCInterval = 1 * time.Minute + DefaultDaemonAliveTime = 5 * time.Minute + DefaultScheduleTimeout = 5 * time.Minute + DefaultDownloadTimeout = 5 * time.Minute + + DefaultSchedulerSchema = "http" + DefaultSchedulerIP = "127.0.0.1" + DefaultSchedulerPort = 8002 + + DefaultPieceChanSize = 16 + DefaultObjectMaxReplicas = 3 +) + + +// Dfcache subcommand names. +const ( + CmdStat = "stat" + CmdImport = "import" + CmdExport = "export" + CmdDelete = "delete" +) + +// Service defalut port of listening. +const ( + DefaultEndPort = 65535 + DefaultPeerStartPort = 65000 + DefaultUploadStartPort = 65002 + DefaultObjectStorageStartPort = 65004 + DefaultHealthyStartPort = 40901 +) + +var ( + // DefaultCertValidityPeriod is default validity period of certificate. + DefaultCertValidityPeriod = 180 * 24 * time.Hour +) diff --git a/config/dfstore.go b/config/dfstore.go new file mode 100644 index 0000000..946a442 --- /dev/null +++ b/config/dfstore.go @@ -0,0 +1,52 @@ + + +package config + +import ( + "errors" + "fmt" + "net/url" +) + +type DfstoreConfig struct { + // Address of the object storage service. + Endpoint string `yaml:"endpoint,omitempty" mapstructure:"endpoint,omitempty"` + + // Filter is used to generate a unique Task ID by + // filtering unnecessary query params in the URL, + // it is separated by & character. + Filter string `yaml:"filter,omitempty" mapstructure:"filter,omitempty"` + + // Mode is the mode in which the backend is written, + // including WriteBack and AsyncWriteBack. + Mode int `yaml:"mode,omitempty" mapstructure:"mode,omitempty"` + + // MaxReplicas is the maximum number of + // replicas of an object cache in seed peers. + MaxReplicas int `yaml:"maxReplicas,omitempty" mapstructure:"mode,maxReplicas"` +} + +// New dfstore configuration. +func NewDfstore() *DfstoreConfig { + url := url.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s:%d", "127.0.0.1", DefaultObjectStorageStartPort), + } + + return &DfstoreConfig{ + Endpoint: url.String(), + MaxReplicas: DefaultObjectMaxReplicas, + } +} + +func (cfg *DfstoreConfig) Validate() error { + if cfg.Endpoint == "" { + return errors.New("dfstore requires parameter endpoint") + } + + if _, err := url.ParseRequestURI(cfg.Endpoint); err != nil { + return fmt.Errorf("invalid endpoint: %w", err) + } + + return nil +} diff --git a/config/headers.go b/config/headers.go new file mode 100644 index 0000000..9a27296 --- /dev/null +++ b/config/headers.go @@ -0,0 +1,32 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +const ( + HeaderDragonflyFilter = "X-Dragonfly-Filter" + HeaderDragonflyPeer = "X-Dragonfly-Peer" + HeaderDragonflyTask = "X-Dragonfly-Task" + HeaderDragonflyRange = "X-Dragonfly-Range" + // HeaderDragonflyTag different HeaderDragonflyTag for the same url will be divided into different P2P overlay + HeaderDragonflyTag = "X-Dragonfly-Tag" + // HeaderDragonflyApplication is used for statistics and traffic control + HeaderDragonflyApplication = "X-Dragonfly-Application" + // HeaderDragonflyRegistry is used for dynamic registry mirrors. + HeaderDragonflyRegistry = "X-Dragonfly-Registry" + // HeaderDragonflyObjectMetaDigest is used for digest of object storage. + HeaderDragonflyObjectMetaDigest = "X-Dragonfly-Object-Meta-Digest" +) diff --git a/dfstore/dfstore.go b/dfstore/dfstore.go new file mode 100644 index 0000000..8d176da --- /dev/null +++ b/dfstore/dfstore.go @@ -0,0 +1,322 @@ +package dfstore + +import ( + "context" + "errors" + "fmt" + "github.com/go-http-utils/headers" + "io" + "net/http" + "net/url" + "path" + "strconv" + "urchinfs/config" + pkgobjectstorage "urchinfs/objectstorage" +) + +// Dfstore is the interface used for object storage. +type Dfstore interface { + + // GetUrfsMetadataRequestWithContext returns *http.Request of getting Urfs metadata. + GetUrfsMetadataRequestWithContext(ctx context.Context, input *GetUrfsMetadataInput, isDir bool) (*http.Request, error) + + // GetUrfsMetadataWithContext returns matedata of Urfs. + GetUrfsMetadataWithContext(ctx context.Context, input *GetUrfsMetadataInput, isDir bool) (*pkgobjectstorage.ObjectMetadata, error) + + // GetUrfsRequestWithContext returns *http.Request of getting Urfs. + GetUrfsRequestWithContext(ctx context.Context, input *GetUrfsInput, isDir bool) (*http.Request, error) + + // GetUrfsWithContext returns data of Urfs. + GetUrfsWithContext(ctx context.Context, input *GetUrfsInput, isDir bool) (io.ReadCloser, error) + + // GetUrfsStatusRequestWithContext returns *http.Request of getting Urfs status. + GetUrfsStatusRequestWithContext(ctx context.Context, input *GetUrfsInput, isDir bool) (*http.Request, error) + + // GetUrfsStatusWithContext returns schedule status of Urfs. + GetUrfsStatusWithContext(ctx context.Context, input *GetUrfsInput, isDir bool) (io.ReadCloser, error) +} + +// dfstore provides object storage function. +type dfstore struct { + endpoint string + httpClient *http.Client +} + +// Option is a functional option for configuring the dfstore. +type Option func(dfs *dfstore) + +// New dfstore instance. +func New(endpoint string, options ...Option) Dfstore { + dfs := &dfstore{ + endpoint: endpoint, + httpClient: http.DefaultClient, + } + + for _, opt := range options { + opt(dfs) + } + + return dfs +} + +// GetUrfsMetadataInput is used to construct request of getting object metadata. +type GetUrfsMetadataInput struct { + + // Endpoint is endpoint name. + Endpoint string + + // BucketName is bucket name. + BucketName string + + // ObjectKey is object key. + ObjectKey string + + // DstPeer is target peerHost. + DstPeer string +} + +// Validate validates GetUrfsMetadataInput fields. +func (i *GetUrfsMetadataInput) Validate() error { + + if i.Endpoint == "" { + return errors.New("invalid Endpoint") + + } + + if i.BucketName == "" { + return errors.New("invalid BucketName") + + } + + if i.ObjectKey == "" { + return errors.New("invalid ObjectKey") + } + + return nil +} + +// GetObjectMetadataRequestWithContext returns *http.Request of getting object metadata. +func (dfs *dfstore) GetUrfsMetadataRequestWithContext(ctx context.Context, input *GetUrfsMetadataInput, isDir bool) (*http.Request, error) { + if err := input.Validate(); err != nil { + return nil, err + } + + dstUrl := url.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s", input.DstPeer), + } + + u, err := url.Parse(dstUrl.String()) + if err != nil { + return nil, err + } + + u.Path = path.Join("buckets", input.BucketName+"."+input.Endpoint, "objects", input.ObjectKey) + req, err := http.NewRequestWithContext(ctx, http.MethodHead, u.String(), nil) + if err != nil { + return nil, err + } + + return req, nil +} + +// GetObjectMetadataWithContext returns metadata of object. +func (dfs *dfstore) GetUrfsMetadataWithContext(ctx context.Context, input *GetUrfsMetadataInput, isDir bool) (*pkgobjectstorage.ObjectMetadata, error) { + req, err := dfs.GetUrfsMetadataRequestWithContext(ctx, input, isDir) + if err != nil { + return nil, err + } + + resp, err := dfs.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode/100 != 2 { + return nil, fmt.Errorf("bad response status %s", resp.Status) + } + + contentLength, err := strconv.ParseInt(resp.Header.Get(headers.ContentLength), 10, 64) + if err != nil { + return nil, err + } + + return &pkgobjectstorage.ObjectMetadata{ + ContentDisposition: resp.Header.Get(headers.ContentDisposition), + ContentEncoding: resp.Header.Get(headers.ContentEncoding), + ContentLanguage: resp.Header.Get(headers.ContentLanguage), + ContentLength: int64(contentLength), + ContentType: resp.Header.Get(headers.ContentType), + ETag: resp.Header.Get(headers.ContentType), + Digest: resp.Header.Get(config.HeaderDragonflyObjectMetaDigest), + }, nil +} + +// GetUrfsInput is used to construct request of getting object. +type GetUrfsInput struct { + + // Endpoint is endpoint name. + Endpoint string + + // BucketName is bucket name. + BucketName string + + // ObjectKey is object key. + ObjectKey string + + // Filter is used to generate a unique Task ID by + // filtering unnecessary query params in the URL, + // it is separated by & character. + Filter string + + // Range is the HTTP range header. + Range string + + // DstPeer is target peerHost. + DstPeer string + + // Overwrite force overwrite flag + Overwrite bool +} + +// GetObjectWithContext returns data of object. +func (dfs *dfstore) GetUrfsWithContext(ctx context.Context, input *GetUrfsInput, isDir bool) (io.ReadCloser, error) { + req, err := dfs.GetUrfsRequestWithContext(ctx, input, isDir) + if err != nil { + return nil, err + } + + resp, err := dfs.httpClient.Do(req) + if err != nil { + return nil, err + } + + if resp.StatusCode/100 != 2 { + return nil, fmt.Errorf("bad response status %s", resp.Status) + } + + return resp.Body, nil +} + +// GetObjectRequestWithContext returns *http.Request of getting object. +func (dfs *dfstore) GetUrfsRequestWithContext(ctx context.Context, input *GetUrfsInput, isDir bool) (*http.Request, error) { + if err := input.Validate(); err != nil { + return nil, err + } + dstUrl := url.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s", input.DstPeer), + } + + u, err := url.Parse(dstUrl.String()) + if err != nil { + return nil, err + } + + if isDir { + u.Path = path.Join("buckets", input.BucketName+"."+input.Endpoint, "cache_folder", input.ObjectKey) + } else { + u.Path = path.Join("buckets", input.BucketName+"."+input.Endpoint, "cache_object", input.ObjectKey) + } + + query := u.Query() + if input.Filter != "" { + query.Set("filter", input.Filter) + } + + if !isDir && input.Overwrite { + query.Set("overwrite", "1") + } + u.RawQuery = query.Encode() + println("u.string", u.String()) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), nil) + if err != nil { + return nil, err + } + + if input.Range != "" { + req.Header.Set(headers.Range, input.Range) + } + + return req, nil +} + +// Validate validates GetUrfsInput fields. +func (i *GetUrfsInput) Validate() error { + + if i.Endpoint == "" { + return errors.New("invalid Endpoint") + + } + + if i.BucketName == "" { + return errors.New("invalid BucketName") + + } + + if i.ObjectKey == "" { + return errors.New("invalid ObjectKey") + } + + return nil +} + +// GetUrfsStatusWithContext returns schedule task status. +func (dfs *dfstore) GetUrfsStatusWithContext(ctx context.Context, input *GetUrfsInput, isDir bool) (io.ReadCloser, error) { + req, err := dfs.GetUrfsStatusRequestWithContext(ctx, input, isDir) + if err != nil { + return nil, err + } + + resp, err := dfs.httpClient.Do(req) + if err != nil { + return nil, err + } + + if resp.StatusCode/100 != 2 { + return nil, fmt.Errorf("bad response status %s", resp.Status) + } + + return resp.Body, nil +} + +// GetObjectStatusRequestWithContext returns *http.Request of check schedule task status. +func (dfs *dfstore) GetUrfsStatusRequestWithContext(ctx context.Context, input *GetUrfsInput, isDir bool) (*http.Request, error) { + if err := input.Validate(); err != nil { + return nil, err + } + + dstUrl := url.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s", input.DstPeer), + } + + u, err := url.Parse(dstUrl.String()) + if err != nil { + return nil, err + } + + if isDir { + u.Path = path.Join("buckets", input.BucketName+"."+input.Endpoint, "check_folder", input.ObjectKey) + } else { + u.Path = path.Join("buckets", input.BucketName+"."+input.Endpoint, "check_object", input.ObjectKey) + } + + query := u.Query() + if input.Filter != "" { + query.Set("filter", input.Filter) + } + u.RawQuery = query.Encode() + //println("u.string ", u.String()) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) + if err != nil { + return nil, err + } + + if input.Range != "" { + req.Header.Set(headers.Range, input.Range) + } + + return req, nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..63edb52 --- /dev/null +++ b/go.mod @@ -0,0 +1,7 @@ +module urchinfs + +go 1.18 + +require github.com/go-http-utils/headers v0.0.0-20181008091004-fed159eddc2a + +require github.com/stretchr/testify v1.8.0 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..c8faa9c --- /dev/null +++ b/go.sum @@ -0,0 +1,16 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-http-utils/headers v0.0.0-20181008091004-fed159eddc2a h1:v6zMvHuY9yue4+QkG/HQ/W67wvtQmWJ4SDo9aK/GIno= +github.com/go-http-utils/headers v0.0.0-20181008091004-fed159eddc2a/go.mod h1:I79BieaU4fxrw4LMXby6q5OS9XnoR9UIKLOzDFjUmuw= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main b/main new file mode 100644 index 0000000..9c533ff Binary files /dev/null and b/main differ diff --git a/main.go b/main.go new file mode 100644 index 0000000..13534ad --- /dev/null +++ b/main.go @@ -0,0 +1,93 @@ +package main + +import ( + "fmt" + "time" + "urchinfs/urchin" +) + +func trySchedule(sourceURL, endpoint, bucket, objectKey, dstPeer string) { + println("new request dstPeer: ", dstPeer) + urfs := urchin.New() + //scheduleResult, err := urfs.ScheduleDataToPeer(sourceURL, dstPeer) + //if err != nil { + // println(err.Error()) + //} + //fmt.Printf("ScheduleDataToPeer StatusCode:%v %v %v %v\n", scheduleResult.StatusCode, scheduleResult.DataEndpoint, scheduleResult.DataRoot, scheduleResult.DataPath) + // + //checkResult, err := urfs.CheckScheduleTaskStatus(sourceURL, dstPeer) + //if err != nil { + // println(err.Error()) + //} + // + //fmt.Printf("checkResult StatusCode:%v %v %v %v\n", checkResult.StatusCode, checkResult.DataEndpoint, checkResult.DataRoot, checkResult.DataPath) + + overwrite := true + scheduleResult, err := urfs.ScheduleDataToPeerByKey(endpoint, bucket, objectKey, dstPeer, overwrite) + if err != nil { + println(err.Error()) + } + fmt.Printf("ScheduleDataToPeerByKey StatusCode:%v %v %v %v\n", scheduleResult.StatusCode, scheduleResult.DataEndpoint, scheduleResult.DataRoot, scheduleResult.DataPath) + + time.Sleep(time.Second * 2) + + scheduleResult, err = urfs.CheckScheduleTaskStatusByKey(endpoint, bucket, objectKey, dstPeer) + if err != nil { + println(err.Error()) + } + fmt.Printf("CheckScheduleTaskStatusByKey StatusCode:%v StatusMsg:%v\n", scheduleResult.StatusCode, scheduleResult.StatusMsg) +} + +func tryScheduleDir(endpoint, bucket, objectKey, dstPeer string) { + println("new request dstPeer: ", dstPeer) + urfs := urchin.New() + + //scheduleResult, err := urfs.ScheduleDirToPeerByKey(endpoint, bucket, objectKey, dstPeer) + //if err != nil { + // print(err.Error()) + // return + //} + + //fmt.Printf("ScheduleDataToPeerByKey StatusCode:%v %v %v %v\n", scheduleResult.StatusCode, scheduleResult.DataEndpoint, scheduleResult.DataRoot, scheduleResult.DataPath) + + scheduleResult, err := urfs.CheckScheduleDirTaskStatusByKey(endpoint, bucket, objectKey, dstPeer) + if err != nil { + print(err.Error()) + return + } + fmt.Printf("CheckScheduleTaskStatusByKey StatusCode:%v %v %v %v\n", scheduleResult.StatusCode, scheduleResult.DataEndpoint, scheduleResult.DataRoot, scheduleResult.DataPath) +} + +func main() { + + sourceURL := "urfs://obs.cn-south-222.ai.pcl.cn/urchincache/glin/demo_x/object_detection3/code/openi_resource.version" + endpoint := "obs.cn-central-231.xckpjs.com" + bucket := "urchincache" + objectKey := "glin/demo_x/object_detection3/code/openi_resource.version" + dstPeer := "192.168.242.42:31814" + trySchedule(sourceURL, endpoint, bucket, objectKey, dstPeer) + //sourceURL2 := "urfs://11276.c8befbc1301665ba2dc5b2826f8dca1e.ac.sugon.com/work-home-denglf-denglf/code.rar" + //endpoint2 := "obs.cn-south-222.ai.pcl.cn" + + //bucket2 := "urchincache" + //objectKey2 := "yangxzh/object-detection-image.zip" + //dstPeer2 := "192.168.207.91:65004" + + //bucket2 := "open-data" + //objectKey2 := "attachment/9/6/96177b0c-6f84-4550-b293-09c206baf811/MNISTData.zip" + //dstPeer2 := "192.168.207.91:65004" + + //dstPeer2 := "192.168.242.25:65004" + //trySchedule(sourceURL2, endpoint2, bucket2, objectKey2, dstPeer2) + //endpoint := "192.168.242.23:31311" + //bucket := "grampus" + //objectKey := "job/wangj2023031409t5509373540/output/" + //dstPeer := "192.168.242.27:65004" + // schedule dir example + //endpoint := "192.168.242.23:31311" + //bucket := "grampus" + //objectKey := "/job/cheny2023030215t5435897690/output" + //dstPeer := "192.168.242.42:65004" + //tryScheduleDir(endpoint, bucket, objectKey, dstPeer) + +} diff --git a/objectstorage/objectstorage.go b/objectstorage/objectstorage.go new file mode 100644 index 0000000..2fb1774 --- /dev/null +++ b/objectstorage/objectstorage.go @@ -0,0 +1,32 @@ + +//go:generate mockgen -destination mocks/objectstorage_mock.go -source objectstorage.go -package mocks + +package objectstorage + +type ObjectMetadata struct { + // Key is object key. + Key string + + // ContentDisposition is Content-Disposition header. + ContentDisposition string + + // ContentEncoding is Content-Encoding header. + ContentEncoding string + + // ContentLanguage is Content-Language header. + ContentLanguage string + + // ContentLanguage is Content-Length header. + ContentLength int64 + + // ContentType is Content-Type header. + ContentType string + + // ETag is ETag header. + ETag string + + // Digest is object digest. + Digest string +} + + diff --git a/urchin/urchinfs.go b/urchin/urchinfs.go new file mode 100644 index 0000000..97d2e56 --- /dev/null +++ b/urchin/urchinfs.go @@ -0,0 +1,349 @@ +package urchin + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/url" + "strconv" + "strings" + "urchinfs/config" + urfs "urchinfs/dfstore" +) + +type Urchinfs interface { + + // schedule source dataset to target peer + ScheduleDataToPeer(sourceUrl, destPeerHost string) (*PeerResult, error) + + // check schedule data to peer task status + CheckScheduleTaskStatus(sourceUrl, destPeerHost string) (*PeerResult, error) + + ScheduleDataToPeerByKey(endpoint, bucketName, objectKey, destPeerHost string, overwrite bool) (*PeerResult, error) + + CheckScheduleTaskStatusByKey(endpoint, bucketName, objectKey, destPeerHost string) (*PeerResult, error) + + ScheduleDirToPeerByKey(endpoint, bucketName, objectKey, destPeerHost string) (*PeerResult, error) + + CheckScheduleDirTaskStatusByKey(endpoint, bucketName, objectKey, destPeerHost string) (*PeerResult, error) +} + +type urchinfs struct { + // Initialize default urfs config. + cfg *config.DfstoreConfig +} + +// New urchinfs instance. +func New() Urchinfs { + + urfs := &urchinfs{ + cfg: config.NewDfstore(), + } + return urfs +} + +const ( + // UrfsScheme if the scheme of object storage. + UrfsScheme = "urfs" +) + +func (urfs *urchinfs) ScheduleDataToPeer(sourceUrl, destPeerHost string) (*PeerResult, error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if err := urfs.cfg.Validate(); err != nil { + return nil, err + } + + if err := validateSchedulelArgs(sourceUrl, destPeerHost); err != nil { + return nil, err + } + + // Copy object storage to local file. + endpoint, bucketName, objectKey, err := parseUrfsURL(sourceUrl) + if err != nil { + return nil, err + } + peerResult, err := processScheduleDataToPeer(ctx, urfs.cfg, endpoint, bucketName, objectKey, destPeerHost, false) + if err != nil { + return nil, err + } + + return peerResult, err +} + +func (urfs *urchinfs) ScheduleDataToPeerByKey(endpoint, bucketName, objectKey, destPeerHost string, overwrite bool) (*PeerResult, error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + peerResult, err := processScheduleDataToPeer(ctx, urfs.cfg, endpoint, bucketName, objectKey, destPeerHost, overwrite) + if err != nil { + return nil, err + } + + return peerResult, err +} + +func (urfs *urchinfs) ScheduleDirToPeerByKey(endpoint, bucketName, objectKey, destPeerHost string) (*PeerResult, error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + peerResult, err := processScheduleDirToPeer(ctx, urfs.cfg, endpoint, bucketName, objectKey, destPeerHost) + if err != nil { + return nil, err + } + + return peerResult, err +} + +func (urfs *urchinfs) CheckScheduleTaskStatus(sourceUrl, destPeerHost string) (*PeerResult, error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if err := urfs.cfg.Validate(); err != nil { + return nil, err + } + + if err := validateSchedulelArgs(sourceUrl, destPeerHost); err != nil { + return nil, err + } + + // Copy object storage to local file. + endpoint, bucketName, objectKey, err := parseUrfsURL(sourceUrl) + if err != nil { + return nil, err + } + peerResult, err := processCheckScheduleTaskStatus(ctx, urfs.cfg, endpoint, bucketName, objectKey, destPeerHost) + if err != nil { + return nil, err + } + + return peerResult, err +} + +func (urfs *urchinfs) CheckScheduleTaskStatusByKey(endpoint, bucketName, objectKey, destPeerHost string) (*PeerResult, error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + peerResult, err := processCheckScheduleTaskStatus(ctx, urfs.cfg, endpoint, bucketName, objectKey, destPeerHost) + if err != nil { + return nil, err + } + + return peerResult, err +} + +func (urfs *urchinfs) CheckScheduleDirTaskStatusByKey(endpoint, bucketName, objectKey, destPeerHost string) (*PeerResult, error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + peerResult, err := processCheckScheduleDirTaskStatus(ctx, urfs.cfg, endpoint, bucketName, objectKey, destPeerHost) + if err != nil { + return nil, err + } + + return peerResult, err +} + +// isUrfsURL determines whether the raw url is urfs url. +func isUrfsURL(rawURL string) bool { + u, err := url.ParseRequestURI(rawURL) + if err != nil { + return false + } + + if u.Scheme != UrfsScheme || u.Host == "" || u.Path == "" { + return false + } + + return true +} + +// Validate copy arguments. +func validateSchedulelArgs(sourceUrl, destPeer string) error { + if !isUrfsURL(sourceUrl) { + return errors.New("source url should be urfs:// protocol") + } + + return nil +} + +// Parse object storage url. eg: urfs://源数据$endpoint/源数据$bucket/源数据filepath +func parseUrfsURL(rawURL string) (string, string, string, error) { + u, err := url.ParseRequestURI(rawURL) + if err != nil { + return "", "", "", err + } + + if u.Scheme != UrfsScheme { + return "", "", "", fmt.Errorf("invalid scheme, e.g. %s://endpoint/bucket_name/object_key", UrfsScheme) + } + + if u.Host == "" { + return "", "", "", errors.New("empty endpoint name") + } + + if u.Path == "" { + return "", "", "", errors.New("empty object path") + } + + bucket, key, found := strings.Cut(strings.Trim(u.Path, "/"), "/") + if found == false { + return "", "", "", errors.New("invalid bucket and object key " + u.Path) + } + + //println(u.Host, " ", bucket, " ", key) + + return u.Host, bucket, key, nil +} + +// Schedule object storage to peer. +func processScheduleDataToPeer(ctx context.Context, cfg *config.DfstoreConfig, endpoint, bucketName, objectKey, dstPeer string, overwrite bool) (*PeerResult, error) { + dfs := urfs.New(cfg.Endpoint) + meta, err := dfs.GetUrfsMetadataWithContext(ctx, &urfs.GetUrfsMetadataInput{ + Endpoint: endpoint, + BucketName: bucketName, + ObjectKey: objectKey, + DstPeer: dstPeer, + }, false) + if err != nil { + return nil, err + } + + reader, err := dfs.GetUrfsWithContext(ctx, &urfs.GetUrfsInput{ + Endpoint: endpoint, + BucketName: bucketName, + ObjectKey: objectKey, + DstPeer: dstPeer, + Overwrite: overwrite, + }, false) + if err != nil { + return nil, err + } + defer reader.Close() + + body, err := ioutil.ReadAll(reader) + + var peerResult PeerResult + if err == nil { + err = json.Unmarshal((body), &peerResult) + } + peerResult.SignedUrl = strings.ReplaceAll(peerResult.SignedUrl, "\\u0026", "&") + + fileContentLength, err := strconv.ParseInt(peerResult.ContentLength, 10, 64) + if err != nil { + return nil, err + } + if fileContentLength != meta.ContentLength { + return nil, errors.New("content length inconsistent with meta") + } + + return &peerResult, err +} + +// Schedule object storage dir to peer. +func processScheduleDirToPeer(ctx context.Context, cfg *config.DfstoreConfig, endpoint, bucketName, objectKey, dstPeer string) (*PeerResult, error) { + dfs := urfs.New(cfg.Endpoint) + + reader, err := dfs.GetUrfsWithContext(ctx, &urfs.GetUrfsInput{ + Endpoint: endpoint, + BucketName: bucketName, + ObjectKey: objectKey, + DstPeer: dstPeer, + }, true) + if err != nil { + return nil, err + } + defer reader.Close() + + body, err := ioutil.ReadAll(reader) + + var peerResult PeerResult + if err == nil { + err = json.Unmarshal((body), &peerResult) + } + peerResult.SignedUrl = strings.ReplaceAll(peerResult.SignedUrl, "\\u0026", "&") + + return &peerResult, err +} + +// check schedule task status. +func processCheckScheduleTaskStatus(ctx context.Context, cfg *config.DfstoreConfig, endpoint, bucketName, objectKey, dstPeer string) (*PeerResult, error) { + dfs := urfs.New(cfg.Endpoint) + meta, err := dfs.GetUrfsMetadataWithContext(ctx, &urfs.GetUrfsMetadataInput{ + Endpoint: endpoint, + BucketName: bucketName, + ObjectKey: objectKey, + DstPeer: dstPeer, + }, false) + if err != nil { + return nil, err + } + + reader, err := dfs.GetUrfsStatusWithContext(ctx, &urfs.GetUrfsInput{ + Endpoint: endpoint, + BucketName: bucketName, + ObjectKey: objectKey, + DstPeer: dstPeer, + }, false) + if err != nil { + return nil, err + } + defer reader.Close() + + body, err := ioutil.ReadAll(reader) + + var peerResult PeerResult + if err == nil { + err = json.Unmarshal((body), &peerResult) + } + peerResult.SignedUrl = strings.ReplaceAll(peerResult.SignedUrl, "\\u0026", "&") + + fileContentLength, err := strconv.ParseInt(peerResult.ContentLength, 10, 64) + if err != nil { + return nil, err + } + if fileContentLength != meta.ContentLength { + return nil, errors.New("content length inconsistent with meta") + } + return &peerResult, err +} + +// check schedule task status. +func processCheckScheduleDirTaskStatus(ctx context.Context, cfg *config.DfstoreConfig, endpoint, bucketName, objectKey, dstPeer string) (*PeerResult, error) { + dfs := urfs.New(cfg.Endpoint) + + reader, err := dfs.GetUrfsStatusWithContext(ctx, &urfs.GetUrfsInput{ + Endpoint: endpoint, + BucketName: bucketName, + ObjectKey: objectKey, + DstPeer: dstPeer, + }, true) + if err != nil { + return nil, err + } + defer reader.Close() + + body, err := ioutil.ReadAll(reader) + + var peerResult PeerResult + if err == nil { + err = json.Unmarshal((body), &peerResult) + } + peerResult.SignedUrl = strings.ReplaceAll(peerResult.SignedUrl, "\\u0026", "&") + return &peerResult, err +} + +type PeerResult struct { + ContentType string `json:"Content-Type"` + ContentLength string `json:"Content-Length"` + SignedUrl string + DataRoot string + DataPath string + DataEndpoint string + StatusCode int + StatusMsg string + TaskID string +}