From 43e0e1dbe40b3031729a3ef1a3e879898151815a Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 2 Apr 2025 09:45:40 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=86client2=E4=B8=8Eclient=E5=90=88?= =?UTF-8?q?=E5=B9=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/cmdline/commandline.go | 4 +- client/internal/cmdline/getp.go | 2 +- client/internal/cmdline/load.go | 2 +- client/internal/cmdline/lsp.go | 2 +- .../cmd => client/internal/cmdline}/mount.go | 23 +-- client/internal/cmdline/newloadp.go | 2 +- client/internal/cmdline/put.go | 2 +- client/internal/cmdline/serve.go | 186 ++++++++++++++++-- client/internal/cmdline/sysevent.go | 2 +- client/internal/cmdline/test.go | 10 +- client/internal/config/config.go | 9 +- .../internal/mount/config/config.go | 0 .../internal/mount/fuse/dir_handle.go | 0 .../internal/mount/fuse/dir_node.go | 0 .../internal/mount/fuse/file_handle.go | 0 .../internal/mount/fuse/file_node.go | 0 .../internal/mount/fuse/fuse.go | 2 +- .../internal/mount/fuse/node.go | 0 .../internal/mount/fuse/types.go | 0 {client2 => client}/internal/mount/mount.go | 6 +- .../internal/mount/mount_win.go | 2 +- .../internal/mount/vfs/cache/cache.go | 4 +- .../internal/mount/vfs/cache/dir.go | 0 .../internal/mount/vfs/cache/file.go | 2 +- .../internal/mount/vfs/cache/range_test.go | 0 .../internal/mount/vfs/cache/remote.go | 0 .../internal/mount/vfs/cache/utils.go | 0 .../internal/mount/vfs/dir_reader.go | 2 +- .../internal/mount/vfs/fuse.go | 0 .../internal/mount/vfs/fuse_bucket.go | 4 +- .../internal/mount/vfs/fuse_dir.go | 4 +- .../internal/mount/vfs/fuse_file.go | 4 +- .../internal/mount/vfs/fuse_package.go | 4 +- .../internal/mount/vfs/fuse_root.go | 4 +- {client2 => client}/internal/mount/vfs/vfs.go | 6 +- client/main.go | 158 +-------------- client2/internal/cmd/cmd.go | 5 - client2/internal/config/config.go | 43 ---- client2/main.go | 7 - magefiles/main.go | 9 - 40 files changed, 217 insertions(+), 293 deletions(-) rename {client2/internal/cmd => client/internal/cmdline}/mount.go (87%) rename {client2 => client}/internal/mount/config/config.go (100%) rename {client2 => client}/internal/mount/fuse/dir_handle.go (100%) rename {client2 => client}/internal/mount/fuse/dir_node.go (100%) rename {client2 => client}/internal/mount/fuse/file_handle.go (100%) rename {client2 => client}/internal/mount/fuse/file_node.go (100%) rename {client2 => client}/internal/mount/fuse/fuse.go (96%) rename {client2 => client}/internal/mount/fuse/node.go (100%) rename {client2 => client}/internal/mount/fuse/types.go (100%) rename {client2 => client}/internal/mount/mount.go (89%) rename {client2 => client}/internal/mount/mount_win.go (93%) rename {client2 => client}/internal/mount/vfs/cache/cache.go (99%) rename {client2 => client}/internal/mount/vfs/cache/dir.go (100%) rename {client2 => client}/internal/mount/vfs/cache/file.go (99%) rename {client2 => client}/internal/mount/vfs/cache/range_test.go (100%) rename {client2 => client}/internal/mount/vfs/cache/remote.go (100%) rename {client2 => client}/internal/mount/vfs/cache/utils.go (100%) rename {client2 => client}/internal/mount/vfs/dir_reader.go (91%) rename {client2 => client}/internal/mount/vfs/fuse.go (100%) rename {client2 => client}/internal/mount/vfs/fuse_bucket.go (97%) rename {client2 => client}/internal/mount/vfs/fuse_dir.go (98%) rename {client2 => client}/internal/mount/vfs/fuse_file.go (95%) rename {client2 => client}/internal/mount/vfs/fuse_package.go (98%) rename {client2 => client}/internal/mount/vfs/fuse_root.go (97%) rename {client2 => client}/internal/mount/vfs/vfs.go (77%) delete mode 100644 client2/internal/cmd/cmd.go delete mode 100644 client2/internal/config/config.go delete mode 100644 client2/main.go diff --git a/client/internal/cmdline/commandline.go b/client/internal/cmdline/commandline.go index 78db3df..9b23c1f 100644 --- a/client/internal/cmdline/commandline.go +++ b/client/internal/cmdline/commandline.go @@ -17,7 +17,7 @@ type CommandContext struct { // TODO 逐步使用cobra代替cmdtrie var commands cmdtrie.CommandTrie[CommandContext, error] = cmdtrie.NewCommandTrie[CommandContext, error]() -var rootCmd = cobra.Command{} +var RootCmd = cobra.Command{} type Commandline struct { Svc *services.Service @@ -37,7 +37,7 @@ func (c *Commandline) DispatchCommand(allArgs []string) { if err != nil { if err == cmdtrie.ErrCommandNotFound { ctx := context.WithValue(context.Background(), "cmdCtx", &cmdCtx) - err = rootCmd.ExecuteContext(ctx) + err = RootCmd.ExecuteContext(ctx) if err != nil { fmt.Println(err) os.Exit(1) diff --git a/client/internal/cmdline/getp.go b/client/internal/cmdline/getp.go index 8707078..0a96f90 100644 --- a/client/internal/cmdline/getp.go +++ b/client/internal/cmdline/getp.go @@ -40,7 +40,7 @@ func init() { } cmd.Flags().BoolVarP(&usePkgID, "id", "i", false, "Download with package id instead of path") - rootCmd.AddCommand(cmd) + RootCmd.AddCommand(cmd) } func getpByPath(cmdCtx *CommandContext, path string, output string) { diff --git a/client/internal/cmdline/load.go b/client/internal/cmdline/load.go index 4e976b5..0739588 100644 --- a/client/internal/cmdline/load.go +++ b/client/internal/cmdline/load.go @@ -37,7 +37,7 @@ func init() { }, } cmd.Flags().BoolVarP(&useID, "id", "i", false, "Use ID for both package and storage service instead of their name or path") - rootCmd.AddCommand(&cmd) + RootCmd.AddCommand(&cmd) } func loadByPath(cmdCtx *CommandContext, pkgPath string, stgName string, rootPath string) { diff --git a/client/internal/cmdline/lsp.go b/client/internal/cmdline/lsp.go index 3393cdc..5712276 100644 --- a/client/internal/cmdline/lsp.go +++ b/client/internal/cmdline/lsp.go @@ -34,7 +34,7 @@ func init() { } usePkgID = cmd.Flags().BoolP("id", "i", false, "List with package id instead of path") - rootCmd.AddCommand(cmd) + RootCmd.AddCommand(cmd) } func lspByPath(cmdCtx *CommandContext, path string) { diff --git a/client2/internal/cmd/mount.go b/client/internal/cmdline/mount.go similarity index 87% rename from client2/internal/cmd/mount.go rename to client/internal/cmdline/mount.go index ea437d7..61c92b6 100644 --- a/client2/internal/cmd/mount.go +++ b/client/internal/cmdline/mount.go @@ -1,4 +1,4 @@ -package cmd +package cmdline import ( "fmt" @@ -7,9 +7,9 @@ import ( "github.com/spf13/cobra" "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/storage2/client2/internal/config" - "gitlink.org.cn/cloudream/storage2/client2/internal/mount" - mntcfg "gitlink.org.cn/cloudream/storage2/client2/internal/mount/config" + "gitlink.org.cn/cloudream/storage2/client/internal/config" + "gitlink.org.cn/cloudream/storage2/client/internal/mount" + mntcfg "gitlink.org.cn/cloudream/storage2/client/internal/mount/config" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" @@ -119,18 +119,3 @@ func mountCmd(mountPoint string, configPath string) { } } } - -func serveDistLock(svc *distlock.Service) { - logger.Info("start serving distlock") - - err := svc.Serve() - - if err != nil { - logger.Errorf("distlock stopped with error: %s", err.Error()) - } - - logger.Info("distlock stopped") - - // TODO 仅简单结束了程序 - os.Exit(1) -} diff --git a/client/internal/cmdline/newloadp.go b/client/internal/cmdline/newloadp.go index 005513f..ff87578 100644 --- a/client/internal/cmdline/newloadp.go +++ b/client/internal/cmdline/newloadp.go @@ -50,7 +50,7 @@ func init() { }, } - rootCmd.AddCommand(cmd) + RootCmd.AddCommand(cmd) } func newloadp(cmdCtx *CommandContext, path string, bucketID cdssdk.BucketID, packageName string, storageIDs []cdssdk.StorageID, rootPathes []string) { diff --git a/client/internal/cmdline/put.go b/client/internal/cmdline/put.go index a8f9b82..1aef760 100644 --- a/client/internal/cmdline/put.go +++ b/client/internal/cmdline/put.go @@ -118,5 +118,5 @@ func init() { } cmd.Flags().Int64VarP(&stgID, "storage", "s", 0, "storage affinity") - rootCmd.AddCommand(cmd) + RootCmd.AddCommand(cmd) } diff --git a/client/internal/cmdline/serve.go b/client/internal/cmdline/serve.go index a498926..aeced80 100644 --- a/client/internal/cmdline/serve.go +++ b/client/internal/cmdline/serve.go @@ -2,43 +2,197 @@ package cmdline import ( "fmt" + "os" + "time" + "github.com/spf13/cobra" + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage2/client/internal/config" "gitlink.org.cn/cloudream/storage2/client/internal/http" + "gitlink.org.cn/cloudream/storage2/client/internal/services" + "gitlink.org.cn/cloudream/storage2/client/internal/task" + stgglb "gitlink.org.cn/cloudream/storage2/common/globals" + "gitlink.org.cn/cloudream/storage2/common/pkgs/accessstat" + "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" + "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" + "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy" + "gitlink.org.cn/cloudream/storage2/common/pkgs/metacache" + coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" + "gitlink.org.cn/cloudream/storage2/common/pkgs/uploader" ) -// ServeHTTP 启动HTTP服务。 -// ctx: 命令行上下文,包含服务配置等信息。 -// args: 命令行参数,第一个参数可选地指定HTTP服务器监听地址。 -// 返回值: 如果启动过程中遇到错误,返回错误信息;否则返回nil。 -func ServeHTTP(ctx CommandContext, args []string) error { +// 初始化函数,将ServeHTTP命令注册到命令列表中。 +func init() { + var configPath, listenAddr string + cmd := cobra.Command{ + Use: "serve", + Short: "start serving storage service", + Run: func(cmd *cobra.Command, args []string) { + serveHTTP(configPath, listenAddr) + }, + } + cmd.Flags().StringVarP(&configPath, "config", "c", "", "config file path") + cmd.Flags().StringVarP(&listenAddr, "listen", "l", "", "listen address") + RootCmd.AddCommand(&cmd) +} + +func serveHTTP(configPath string, listenAddr string) { + err := config.Init(configPath) + if err != nil { + fmt.Printf("init config failed, err: %s", err.Error()) + os.Exit(1) + } + + err = logger.Init(&config.Cfg().Logger) + if err != nil { + fmt.Printf("init logger failed, err: %s", err.Error()) + os.Exit(1) + } + + stgglb.InitLocal(&config.Cfg().Local) + stgglb.InitMQPool(config.Cfg().RabbitMQ) + stgglb.InitAgentRPCPool(&config.Cfg().AgentGRPC) + + // 连接性信息收集 + var conCol connectivity.Collector + if config.Cfg().Local.HubID != nil { + //如果client与某个hub处于同一台机器,则使用这个hub的连通性信息 + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + logger.Warnf("acquire coordinator mq failed, err: %s", err.Error()) + os.Exit(1) + } + getCons, err := coorCli.GetHubConnectivities(coormq.ReqGetHubConnectivities([]cdssdk.HubID{*config.Cfg().Local.HubID})) + if err != nil { + logger.Warnf("get hub connectivities failed, err: %s", err.Error()) + os.Exit(1) + } + consMap := make(map[cdssdk.HubID]connectivity.Connectivity) + for _, con := range getCons.Connectivities { + var delay *time.Duration + if con.Latency != nil { + d := time.Duration(*con.Latency * float32(time.Millisecond)) + delay = &d + } + consMap[con.FromHubID] = connectivity.Connectivity{ + ToHubID: con.ToHubID, + Latency: delay, + } + } + conCol = connectivity.NewCollectorWithInitData(&config.Cfg().Connectivity, nil, consMap) + logger.Info("use local hub connectivities") + + } else { + // 否则需要就地收集连通性信息 + conCol = connectivity.NewCollector(&config.Cfg().Connectivity, nil) + conCol.CollectInPlace() + } + + metaCacheHost := metacache.NewHost() + go metaCacheHost.Serve() + stgMeta := metaCacheHost.AddStorageMeta() + hubMeta := metaCacheHost.AddHubMeta() + conMeta := metaCacheHost.AddConnectivity() + + // 分布式锁 + distlockSvc, err := distlock.NewService(&config.Cfg().DistLock) + if err != nil { + logger.Warnf("new distlock service failed, err: %s", err.Error()) + os.Exit(1) + } + go serveDistLock(distlockSvc) + + // 访问统计 + acStat := accessstat.NewAccessStat(accessstat.Config{ + // TODO 考虑放到配置里 + ReportInterval: time.Second * 10, + }) + go serveAccessStat(acStat) + + // 存储管理器 + stgAgts := agtpool.NewPool() + + // 任务管理器 + taskMgr := task.NewManager(distlockSvc, &conCol, stgAgts) + + strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta) + + // 下载器 + dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgAgts, strgSel) + + // 上传器 + uploader := uploader.NewUploader(distlockSvc, &conCol, stgAgts, stgMeta) + + svc, err := services.NewService(distlockSvc, &taskMgr, &dlder, acStat, uploader, strgSel, stgMeta) + if err != nil { + logger.Warnf("new services failed, err: %s", err.Error()) + os.Exit(1) + } + // 默认监听地址为":7890",如果提供了命令行参数,则使用参数指定的地址。 - listenAddr := ":7890" - if len(args) > 0 { - listenAddr = args[0] + if listenAddr == "" { + listenAddr = ":7890" } awsAuth, err := http.NewAWSAuth(config.Cfg().AuthAccessKey, config.Cfg().AuthSecretKey) if err != nil { - return fmt.Errorf("new aws auth: %w", err) + logger.Warnf("new aws auth: %v", err) + os.Exit(1) } // 创建一个新的HTTP服务器实例。 - httpSvr, err := http.NewServer(listenAddr, ctx.Cmdline.Svc, awsAuth) + httpSvr, err := http.NewServer(listenAddr, svc, awsAuth) if err != nil { - return fmt.Errorf("new http server: %w", err) + logger.Warnf("new http server: %v", err) + os.Exit(1) } // 启动HTTP服务。 err = httpSvr.Serve() if err != nil { - return fmt.Errorf("serving http: %w", err) + logger.Warnf("serving http: %v", err) + os.Exit(1) + } +} + +func serveDistLock(svc *distlock.Service) { + logger.Info("start serving distlock") + + err := svc.Serve() + + if err != nil { + logger.Errorf("distlock stopped with error: %s", err.Error()) } - return nil + logger.Info("distlock stopped") + + // TODO 仅简单结束了程序 + os.Exit(1) } -// 初始化函数,将ServeHTTP命令注册到命令列表中。 -func init() { - commands.MustAdd(ServeHTTP, "serve", "http") +func serveAccessStat(svc *accessstat.AccessStat) { + logger.Info("start serving access stat") + + ch := svc.Start() +loop: + for { + val, err := ch.Receive() + if err != nil { + logger.Errorf("access stat stopped with error: %v", err) + break + } + + switch val := val.(type) { + case error: + logger.Errorf("access stat stopped with error: %v", val) + break loop + } + } + logger.Info("access stat stopped") + + // TODO 仅简单结束了程序 + os.Exit(1) } diff --git a/client/internal/cmdline/sysevent.go b/client/internal/cmdline/sysevent.go index dc8741f..9416c79 100644 --- a/client/internal/cmdline/sysevent.go +++ b/client/internal/cmdline/sysevent.go @@ -16,7 +16,7 @@ func init() { Use: "sysevent", } - rootCmd.AddCommand(cmd) + RootCmd.AddCommand(cmd) outputJSON := false watchCmd := &cobra.Command{ diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go index c852432..53bf32f 100644 --- a/client/internal/cmdline/test.go +++ b/client/internal/cmdline/test.go @@ -17,7 +17,7 @@ import ( ) func init() { - rootCmd.AddCommand(&cobra.Command{ + RootCmd.AddCommand(&cobra.Command{ Use: "test", Short: "test", Run: func(cmd *cobra.Command, args []string) { @@ -72,7 +72,7 @@ func init() { }, }) - rootCmd.AddCommand(&cobra.Command{ + RootCmd.AddCommand(&cobra.Command{ Use: "test32", Short: "test32", Run: func(cmd *cobra.Command, args []string) { @@ -118,7 +118,7 @@ func init() { fut.Wait(context.TODO()) }, }) - rootCmd.AddCommand(&cobra.Command{ + RootCmd.AddCommand(&cobra.Command{ Use: "test1", Short: "test1", Run: func(cmd *cobra.Command, args []string) { @@ -188,7 +188,7 @@ func init() { }, }) - rootCmd.AddCommand(&cobra.Command{ + RootCmd.AddCommand(&cobra.Command{ Use: "test4", Short: "test4", Run: func(cmd *cobra.Command, args []string) { @@ -238,7 +238,7 @@ func init() { }, }) - rootCmd.AddCommand(&cobra.Command{ + RootCmd.AddCommand(&cobra.Command{ Use: "test11", Short: "test11", Run: func(cmd *cobra.Command, args []string) { diff --git a/client/internal/config/config.go b/client/internal/config/config.go index ba9cd77..1751947 100644 --- a/client/internal/config/config.go +++ b/client/internal/config/config.go @@ -8,6 +8,7 @@ import ( "gitlink.org.cn/cloudream/common/utils/config" stgmodels "gitlink.org.cn/cloudream/storage2/common/models" "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" + db "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/config" "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy" agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" @@ -26,14 +27,18 @@ type Config struct { AuthAccessKey string `json:"authAccessKey"` // TODO 临时办法 AuthSecretKey string `json:"authSecretKey"` MaxHTTPBodySize int64 `json:"maxHttpBodySize"` + DB db.Config `json:"db"` } var cfg Config // Init 初始化client // TODO 这里的modeulName参数弄成可配置的更好 -func Init() error { - return config.DefaultLoad("client", &cfg) +func Init(configPath string) error { + if configPath == "" { + return config.DefaultLoad("client", &cfg) + } + return config.Load(configPath, &cfg) } func Cfg() *Config { diff --git a/client2/internal/mount/config/config.go b/client/internal/mount/config/config.go similarity index 100% rename from client2/internal/mount/config/config.go rename to client/internal/mount/config/config.go diff --git a/client2/internal/mount/fuse/dir_handle.go b/client/internal/mount/fuse/dir_handle.go similarity index 100% rename from client2/internal/mount/fuse/dir_handle.go rename to client/internal/mount/fuse/dir_handle.go diff --git a/client2/internal/mount/fuse/dir_node.go b/client/internal/mount/fuse/dir_node.go similarity index 100% rename from client2/internal/mount/fuse/dir_node.go rename to client/internal/mount/fuse/dir_node.go diff --git a/client2/internal/mount/fuse/file_handle.go b/client/internal/mount/fuse/file_handle.go similarity index 100% rename from client2/internal/mount/fuse/file_handle.go rename to client/internal/mount/fuse/file_handle.go diff --git a/client2/internal/mount/fuse/file_node.go b/client/internal/mount/fuse/file_node.go similarity index 100% rename from client2/internal/mount/fuse/file_node.go rename to client/internal/mount/fuse/file_node.go diff --git a/client2/internal/mount/fuse/fuse.go b/client/internal/mount/fuse/fuse.go similarity index 96% rename from client2/internal/mount/fuse/fuse.go rename to client/internal/mount/fuse/fuse.go index 9a4dc5e..012f43b 100644 --- a/client2/internal/mount/fuse/fuse.go +++ b/client/internal/mount/fuse/fuse.go @@ -8,7 +8,7 @@ import ( fusefs "github.com/hanwen/go-fuse/v2/fs" "github.com/hanwen/go-fuse/v2/fuse" - "gitlink.org.cn/cloudream/storage2/client2/internal/mount/config" + "gitlink.org.cn/cloudream/storage2/client/internal/mount/config" ) type Fuse struct { diff --git a/client2/internal/mount/fuse/node.go b/client/internal/mount/fuse/node.go similarity index 100% rename from client2/internal/mount/fuse/node.go rename to client/internal/mount/fuse/node.go diff --git a/client2/internal/mount/fuse/types.go b/client/internal/mount/fuse/types.go similarity index 100% rename from client2/internal/mount/fuse/types.go rename to client/internal/mount/fuse/types.go diff --git a/client2/internal/mount/mount.go b/client/internal/mount/mount.go similarity index 89% rename from client2/internal/mount/mount.go rename to client/internal/mount/mount.go index 7f1d097..5ca3719 100644 --- a/client2/internal/mount/mount.go +++ b/client/internal/mount/mount.go @@ -7,9 +7,9 @@ import ( "github.com/hanwen/go-fuse/v2/fuse" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/sync2" - "gitlink.org.cn/cloudream/storage2/client2/internal/mount/config" - fuse2 "gitlink.org.cn/cloudream/storage2/client2/internal/mount/fuse" - "gitlink.org.cn/cloudream/storage2/client2/internal/mount/vfs" + "gitlink.org.cn/cloudream/storage2/client/internal/mount/config" + fuse2 "gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse" + "gitlink.org.cn/cloudream/storage2/client/internal/mount/vfs" "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" "gitlink.org.cn/cloudream/storage2/common/pkgs/uploader" diff --git a/client2/internal/mount/mount_win.go b/client/internal/mount/mount_win.go similarity index 93% rename from client2/internal/mount/mount_win.go rename to client/internal/mount/mount_win.go index dd36491..ef24924 100644 --- a/client2/internal/mount/mount_win.go +++ b/client/internal/mount/mount_win.go @@ -7,7 +7,7 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/sync2" - "gitlink.org.cn/cloudream/storage2/client2/internal/mount/config" + "gitlink.org.cn/cloudream/storage2/client/internal/mount/config" "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" "gitlink.org.cn/cloudream/storage2/common/pkgs/uploader" diff --git a/client2/internal/mount/vfs/cache/cache.go b/client/internal/mount/vfs/cache/cache.go similarity index 99% rename from client2/internal/mount/vfs/cache/cache.go rename to client/internal/mount/vfs/cache/cache.go index b433fdd..2b2df5a 100644 --- a/client2/internal/mount/vfs/cache/cache.go +++ b/client/internal/mount/vfs/cache/cache.go @@ -16,8 +16,8 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/lo2" - "gitlink.org.cn/cloudream/storage2/client2/internal/mount/config" - "gitlink.org.cn/cloudream/storage2/client2/internal/mount/fuse" + "gitlink.org.cn/cloudream/storage2/client/internal/mount/config" + "gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse" "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" "gitlink.org.cn/cloudream/storage2/common/pkgs/uploader" diff --git a/client2/internal/mount/vfs/cache/dir.go b/client/internal/mount/vfs/cache/dir.go similarity index 100% rename from client2/internal/mount/vfs/cache/dir.go rename to client/internal/mount/vfs/cache/dir.go diff --git a/client2/internal/mount/vfs/cache/file.go b/client/internal/mount/vfs/cache/file.go similarity index 99% rename from client2/internal/mount/vfs/cache/file.go rename to client/internal/mount/vfs/cache/file.go index 28dd2ea..6274601 100644 --- a/client2/internal/mount/vfs/cache/file.go +++ b/client/internal/mount/vfs/cache/file.go @@ -13,7 +13,7 @@ import ( "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/serder" - "gitlink.org.cn/cloudream/storage2/client2/internal/mount/fuse" + "gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse" ) type FileInfo struct { diff --git a/client2/internal/mount/vfs/cache/range_test.go b/client/internal/mount/vfs/cache/range_test.go similarity index 100% rename from client2/internal/mount/vfs/cache/range_test.go rename to client/internal/mount/vfs/cache/range_test.go diff --git a/client2/internal/mount/vfs/cache/remote.go b/client/internal/mount/vfs/cache/remote.go similarity index 100% rename from client2/internal/mount/vfs/cache/remote.go rename to client/internal/mount/vfs/cache/remote.go diff --git a/client2/internal/mount/vfs/cache/utils.go b/client/internal/mount/vfs/cache/utils.go similarity index 100% rename from client2/internal/mount/vfs/cache/utils.go rename to client/internal/mount/vfs/cache/utils.go diff --git a/client2/internal/mount/vfs/dir_reader.go b/client/internal/mount/vfs/dir_reader.go similarity index 91% rename from client2/internal/mount/vfs/dir_reader.go rename to client/internal/mount/vfs/dir_reader.go index 2f374d3..fab0966 100644 --- a/client2/internal/mount/vfs/dir_reader.go +++ b/client/internal/mount/vfs/dir_reader.go @@ -1,6 +1,6 @@ package vfs -import "gitlink.org.cn/cloudream/storage2/client2/internal/mount/fuse" +import "gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse" type FuseDirReader struct { allEntries []fuse.FsEntry diff --git a/client2/internal/mount/vfs/fuse.go b/client/internal/mount/vfs/fuse.go similarity index 100% rename from client2/internal/mount/vfs/fuse.go rename to client/internal/mount/vfs/fuse.go diff --git a/client2/internal/mount/vfs/fuse_bucket.go b/client/internal/mount/vfs/fuse_bucket.go similarity index 97% rename from client2/internal/mount/vfs/fuse_bucket.go rename to client/internal/mount/vfs/fuse_bucket.go index 32e988c..094cfb9 100644 --- a/client2/internal/mount/vfs/fuse_bucket.go +++ b/client/internal/mount/vfs/fuse_bucket.go @@ -7,8 +7,8 @@ import ( "time" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage2/client2/internal/mount/fuse" - "gitlink.org.cn/cloudream/storage2/client2/internal/mount/vfs/cache" + "gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse" + "gitlink.org.cn/cloudream/storage2/client/internal/mount/vfs/cache" "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" "gorm.io/gorm" ) diff --git a/client2/internal/mount/vfs/fuse_dir.go b/client/internal/mount/vfs/fuse_dir.go similarity index 98% rename from client2/internal/mount/vfs/fuse_dir.go rename to client/internal/mount/vfs/fuse_dir.go index 03e3bea..2b9b78c 100644 --- a/client2/internal/mount/vfs/fuse_dir.go +++ b/client/internal/mount/vfs/fuse_dir.go @@ -8,8 +8,8 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/lo2" - "gitlink.org.cn/cloudream/storage2/client2/internal/mount/fuse" - "gitlink.org.cn/cloudream/storage2/client2/internal/mount/vfs/cache" + "gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse" + "gitlink.org.cn/cloudream/storage2/client/internal/mount/vfs/cache" "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" "gorm.io/gorm" ) diff --git a/client2/internal/mount/vfs/fuse_file.go b/client/internal/mount/vfs/fuse_file.go similarity index 95% rename from client2/internal/mount/vfs/fuse_file.go rename to client/internal/mount/vfs/fuse_file.go index 0113636..f1c61bb 100644 --- a/client2/internal/mount/vfs/fuse_file.go +++ b/client/internal/mount/vfs/fuse_file.go @@ -5,8 +5,8 @@ import ( "time" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage2/client2/internal/mount/fuse" - "gitlink.org.cn/cloudream/storage2/client2/internal/mount/vfs/cache" + "gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse" + "gitlink.org.cn/cloudream/storage2/client/internal/mount/vfs/cache" "gorm.io/gorm" ) diff --git a/client2/internal/mount/vfs/fuse_package.go b/client/internal/mount/vfs/fuse_package.go similarity index 98% rename from client2/internal/mount/vfs/fuse_package.go rename to client/internal/mount/vfs/fuse_package.go index 94a5ec5..1ab1a77 100644 --- a/client2/internal/mount/vfs/fuse_package.go +++ b/client/internal/mount/vfs/fuse_package.go @@ -8,8 +8,8 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/lo2" - "gitlink.org.cn/cloudream/storage2/client2/internal/mount/fuse" - "gitlink.org.cn/cloudream/storage2/client2/internal/mount/vfs/cache" + "gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse" + "gitlink.org.cn/cloudream/storage2/client/internal/mount/vfs/cache" "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" "gorm.io/gorm" ) diff --git a/client2/internal/mount/vfs/fuse_root.go b/client/internal/mount/vfs/fuse_root.go similarity index 97% rename from client2/internal/mount/vfs/fuse_root.go rename to client/internal/mount/vfs/fuse_root.go index da28795..fe3ff5c 100644 --- a/client2/internal/mount/vfs/fuse_root.go +++ b/client/internal/mount/vfs/fuse_root.go @@ -6,8 +6,8 @@ import ( "time" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage2/client2/internal/mount/fuse" - "gitlink.org.cn/cloudream/storage2/client2/internal/mount/vfs/cache" + "gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse" + "gitlink.org.cn/cloudream/storage2/client/internal/mount/vfs/cache" "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" "gorm.io/gorm" ) diff --git a/client2/internal/mount/vfs/vfs.go b/client/internal/mount/vfs/vfs.go similarity index 77% rename from client2/internal/mount/vfs/vfs.go rename to client/internal/mount/vfs/vfs.go index 6316fac..af0e19c 100644 --- a/client2/internal/mount/vfs/vfs.go +++ b/client/internal/mount/vfs/vfs.go @@ -1,9 +1,9 @@ package vfs import ( - "gitlink.org.cn/cloudream/storage2/client2/internal/mount/config" - "gitlink.org.cn/cloudream/storage2/client2/internal/mount/fuse" - "gitlink.org.cn/cloudream/storage2/client2/internal/mount/vfs/cache" + "gitlink.org.cn/cloudream/storage2/client/internal/mount/config" + "gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse" + "gitlink.org.cn/cloudream/storage2/client/internal/mount/vfs/cache" "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" "gitlink.org.cn/cloudream/storage2/common/pkgs/uploader" diff --git a/client/main.go b/client/main.go index 38934dd..0873c71 100644 --- a/client/main.go +++ b/client/main.go @@ -1,167 +1,11 @@ package main import ( - "fmt" - "os" - "time" - _ "google.golang.org/grpc/balancer/grpclb" - "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage2/client/internal/cmdline" - "gitlink.org.cn/cloudream/storage2/client/internal/config" - "gitlink.org.cn/cloudream/storage2/client/internal/services" - "gitlink.org.cn/cloudream/storage2/client/internal/task" - stgglb "gitlink.org.cn/cloudream/storage2/common/globals" - "gitlink.org.cn/cloudream/storage2/common/pkgs/accessstat" - "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" - "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy" - "gitlink.org.cn/cloudream/storage2/common/pkgs/metacache" - coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" - "gitlink.org.cn/cloudream/storage2/common/pkgs/uploader" ) func main() { - err := config.Init() - if err != nil { - fmt.Printf("init config failed, err: %s", err.Error()) - os.Exit(1) - } - - err = logger.Init(&config.Cfg().Logger) - if err != nil { - fmt.Printf("init logger failed, err: %s", err.Error()) - os.Exit(1) - } - - stgglb.InitLocal(&config.Cfg().Local) - stgglb.InitMQPool(config.Cfg().RabbitMQ) - stgglb.InitAgentRPCPool(&config.Cfg().AgentGRPC) - - // 连接性信息收集 - var conCol connectivity.Collector - if config.Cfg().Local.HubID != nil { - //如果client与某个hub处于同一台机器,则使用这个hub的连通性信息 - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - logger.Warnf("acquire coordinator mq failed, err: %s", err.Error()) - os.Exit(1) - } - getCons, err := coorCli.GetHubConnectivities(coormq.ReqGetHubConnectivities([]cdssdk.HubID{*config.Cfg().Local.HubID})) - if err != nil { - logger.Warnf("get hub connectivities failed, err: %s", err.Error()) - os.Exit(1) - } - consMap := make(map[cdssdk.HubID]connectivity.Connectivity) - for _, con := range getCons.Connectivities { - var delay *time.Duration - if con.Latency != nil { - d := time.Duration(*con.Latency * float32(time.Millisecond)) - delay = &d - } - consMap[con.FromHubID] = connectivity.Connectivity{ - ToHubID: con.ToHubID, - Latency: delay, - } - } - conCol = connectivity.NewCollectorWithInitData(&config.Cfg().Connectivity, nil, consMap) - logger.Info("use local hub connectivities") - - } else { - // 否则需要就地收集连通性信息 - conCol = connectivity.NewCollector(&config.Cfg().Connectivity, nil) - conCol.CollectInPlace() - } - - metaCacheHost := metacache.NewHost() - go metaCacheHost.Serve() - stgMeta := metaCacheHost.AddStorageMeta() - hubMeta := metaCacheHost.AddHubMeta() - conMeta := metaCacheHost.AddConnectivity() - - // 分布式锁 - distlockSvc, err := distlock.NewService(&config.Cfg().DistLock) - if err != nil { - logger.Warnf("new distlock service failed, err: %s", err.Error()) - os.Exit(1) - } - go serveDistLock(distlockSvc) - - // 访问统计 - acStat := accessstat.NewAccessStat(accessstat.Config{ - // TODO 考虑放到配置里 - ReportInterval: time.Second * 10, - }) - go serveAccessStat(acStat) - - // 存储管理器 - stgAgts := agtpool.NewPool() - - // 任务管理器 - taskMgr := task.NewManager(distlockSvc, &conCol, stgAgts) - - strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta) - - // 下载器 - dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgAgts, strgSel) - - // 上传器 - uploader := uploader.NewUploader(distlockSvc, &conCol, stgAgts, stgMeta) - - svc, err := services.NewService(distlockSvc, &taskMgr, &dlder, acStat, uploader, strgSel, stgMeta) - if err != nil { - logger.Warnf("new services failed, err: %s", err.Error()) - os.Exit(1) - } - - cmds, err := cmdline.NewCommandline(svc) - if err != nil { - logger.Warnf("new command line failed, err: %s", err.Error()) - os.Exit(1) - } - - cmds.DispatchCommand(os.Args[1:]) -} - -func serveDistLock(svc *distlock.Service) { - logger.Info("start serving distlock") - - err := svc.Serve() - - if err != nil { - logger.Errorf("distlock stopped with error: %s", err.Error()) - } - - logger.Info("distlock stopped") - - // TODO 仅简单结束了程序 - os.Exit(1) -} - -func serveAccessStat(svc *accessstat.AccessStat) { - logger.Info("start serving access stat") - - ch := svc.Start() -loop: - for { - val, err := ch.Receive() - if err != nil { - logger.Errorf("access stat stopped with error: %v", err) - break - } - - switch val := val.(type) { - case error: - logger.Errorf("access stat stopped with error: %v", val) - break loop - } - } - logger.Info("access stat stopped") - - // TODO 仅简单结束了程序 - os.Exit(1) + cmdline.RootCmd.Execute() } diff --git a/client2/internal/cmd/cmd.go b/client2/internal/cmd/cmd.go deleted file mode 100644 index d79aea8..0000000 --- a/client2/internal/cmd/cmd.go +++ /dev/null @@ -1,5 +0,0 @@ -package cmd - -import "github.com/spf13/cobra" - -var RootCmd = cobra.Command{} diff --git a/client2/internal/config/config.go b/client2/internal/config/config.go deleted file mode 100644 index 82f6ee8..0000000 --- a/client2/internal/config/config.go +++ /dev/null @@ -1,43 +0,0 @@ -package config - -import ( - "gitlink.org.cn/cloudream/common/pkgs/distlock" - log "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - c "gitlink.org.cn/cloudream/common/utils/config" - stgmodels "gitlink.org.cn/cloudream/storage2/common/models" - "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" - db "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/config" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy" - "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc" -) - -type Config struct { - ID cdssdk.HubID `json:"id"` - ListenAddr string `json:"listenAddr"` - Local stgmodels.LocalMachineInfo `json:"local"` - GRPC *grpc.Config `json:"grpc"` - Logger log.Config `json:"logger"` - RabbitMQ mq.Config `json:"rabbitMQ"` - DistLock distlock.Config `json:"distlock"` - Connectivity connectivity.Config `json:"connectivity"` - Downloader downloader.Config `json:"downloader"` - DownloadStrategy strategy.Config `json:"downloadStrategy"` - DB db.Config `json:"db"` -} - -var cfg Config - -func Init(path string) error { - if path == "" { - return c.DefaultLoad("client2", &cfg) - } - - return c.Load(path, &cfg) -} - -func Cfg() *Config { - return &cfg -} diff --git a/client2/main.go b/client2/main.go deleted file mode 100644 index 873593f..0000000 --- a/client2/main.go +++ /dev/null @@ -1,7 +0,0 @@ -package main - -import "gitlink.org.cn/cloudream/storage2/client2/internal/cmd" - -func main() { - cmd.RootCmd.Execute() -} diff --git a/magefiles/main.go b/magefiles/main.go index 9a47632..2ee4010 100644 --- a/magefiles/main.go +++ b/magefiles/main.go @@ -117,15 +117,6 @@ func Client() error { }) } -func Client2() error { - return magefiles.Build(magefiles.BuildArgs{ - OutputName: "client2", - OutputDir: "client2", - AssetsDir: "assets", - EntryFile: "client2/main.go", - }) -} - func Coordinator() error { return magefiles.Build(magefiles.BuildArgs{ OutputName: "coordinator",