diff --git a/README.md b/README.md index 9fc1d78..840e3b9 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,28 @@ -# storage +# 跨中心存储系统 + +## 目录结构 +此仓库是一个go module,但包含了多个服务的源码,你可以在每个服务的目录中找到main.go。可以通过编译脚本的参数来指定生成哪一个服务。 + +- `agent`:Agent服务的源码。 +- `client`:Client服务的源码。 +- `common`:存放在几个服务之间共享的代码以及一些数据结构定义。 +- `coordinator`:Coordinator服务的源码。 +- `scanner`:Scanner服务的源码。 + +同时还有以下两个与编译相关的目录: +- `build`:服务编译后的输出目录,只会在编译后生成。 +- `magefiles`:mage工具使用的编译脚本。 + +## 编译 +运行编译脚本需要使用mage工具,此处是[仓库链接](https://github.com/magefile/mage)。 + +安装好mage工具之后,进入到仓库根目录,使用`mage agent`即可编译Agent服务。与此相同的还有`mage client`、`mage coodinator`、`mage scanner`。可以同时指定多个参数来编译多个服务,如果要一次性编译所有服务,可以使用`mage bin`。 + +使用`mage confs`命令可以将`common/assets/confs`的配置文件拷贝到输出目录,使用`mage scripts`将`scripts`目录里的脚本拷贝到输出目录。 + +使用`mage all`可以一次性完成编译、拷贝工作。 + +可以通过增加额外的参数来指定编译目标平台,比如`mage win amd64 agent`。支持的操作系统参数有`win`、`linux`,支持的CPU架构参数有`amd64`、`arm64`。这些参数同样可以和`bin`、`all`参数一起使用。 + +注意:编译目标平台参数必须在编译二进制参数之前。 diff --git a/agent/README.md b/agent/README.md index ac900ba..7b452e2 100644 --- a/agent/README.md +++ b/agent/README.md @@ -1,2 +1,10 @@ -# storage-agent +# Agent服务 +## 目录结构 +- `internal`:服务源码。 + - `config`:服务使用的配置文件结构定义。 + - `grpc`:通过grpc对外提供的接口。实现了.proto文件里定义的接口,这个文件位于`common\pkgs\grpc\agent\agent.proto`。 + - `mq`:通过rabbitmq对外提供的接口。实现了`common\pkgs\mq\agent`目录里文件定义的接口。 + - `task`:需要在后台异步运行的任务。 + + \ No newline at end of file diff --git a/agent/internal/services/grpc/io.go b/agent/internal/grpc/io.go similarity index 100% rename from agent/internal/services/grpc/io.go rename to agent/internal/grpc/io.go diff --git a/agent/internal/services/grpc/service.go b/agent/internal/grpc/service.go similarity index 100% rename from agent/internal/services/grpc/service.go rename to agent/internal/grpc/service.go diff --git a/agent/internal/services/mq/agent.go b/agent/internal/mq/agent.go similarity index 100% rename from agent/internal/services/mq/agent.go rename to agent/internal/mq/agent.go diff --git a/agent/internal/services/mq/cache.go b/agent/internal/mq/cache.go similarity index 100% rename from agent/internal/services/mq/cache.go rename to agent/internal/mq/cache.go diff --git a/agent/internal/services/mq/io.go b/agent/internal/mq/io.go similarity index 100% rename from agent/internal/services/mq/io.go rename to agent/internal/mq/io.go diff --git a/agent/internal/services/mq/object.go b/agent/internal/mq/object.go similarity index 100% rename from agent/internal/services/mq/object.go rename to agent/internal/mq/object.go diff --git a/agent/internal/services/mq/service.go b/agent/internal/mq/service.go similarity index 100% rename from agent/internal/services/mq/service.go rename to agent/internal/mq/service.go diff --git a/agent/internal/services/mq/storage.go b/agent/internal/mq/storage.go similarity index 100% rename from agent/internal/services/mq/storage.go rename to agent/internal/mq/storage.go diff --git a/agent/main.go b/agent/main.go index e7c5258..1949f69 100644 --- a/agent/main.go +++ b/agent/main.go @@ -5,7 +5,6 @@ import ( "net" "os" "sync" - "time" log "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/storage/agent/internal/config" @@ -20,14 +19,10 @@ import ( "google.golang.org/grpc" - "gitlink.org.cn/cloudream/storage/common/consts" - "gitlink.org.cn/cloudream/storage/common/utils" - agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" - coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" - grpcsvc "gitlink.org.cn/cloudream/storage/agent/internal/services/grpc" - cmdsvc "gitlink.org.cn/cloudream/storage/agent/internal/services/mq" + grpcsvc "gitlink.org.cn/cloudream/storage/agent/internal/grpc" + cmdsvc "gitlink.org.cn/cloudream/storage/agent/internal/mq" ) // TODO 此数据是否在运行时会发生变化? @@ -136,57 +131,3 @@ func serveDistLock(svc *distlock.Service) { log.Info("distlock stopped") } - -func reportStatus(wg *sync.WaitGroup) { - coorCli, err := coormq.NewClient(&config.Cfg().RabbitMQ) - if err != nil { - wg.Done() - log.Error("new coordinator client failed, err: %w", err) - return - } - - // TODO 增加退出死循环的方法 - for { - //挨个ping其他agent(AgentIpList),记录延迟到AgentDelay - // TODO AgentIP考虑放到配置文件里或者启动时从coor获取 - ips := utils.GetAgentIps() - agentDelay := make([]int, len(ips)) - waitG := sync.WaitGroup{} - waitG.Add(len(ips)) - for i := 0; i < len(ips); i++ { - go func(i int, wg *sync.WaitGroup) { - connStatus, err := utils.GetConnStatus(ips[i]) - if err != nil { - wg.Done() - log.Warnf("ping %s failed, err: %s", ips[i], err.Error()) - return - } - - log.Debugf("connection status to %s: %+v", ips[i], connStatus) - - if connStatus.IsReachable { - agentDelay[i] = int(connStatus.Delay.Milliseconds()) + 1 - } else { - agentDelay[i] = -1 - } - - wg.Done() - }(i, &waitG) - } - waitG.Wait() - //TODO: 查看本地IPFS daemon是否正常,记录到ipfsStatus - ipfsStatus := consts.IPFSStateOK - //TODO:访问自身资源目录(配置文件中获取路径),记录是否正常,记录到localDirStatus - localDirStatus := consts.StorageDirectoryStateOK - - //发送心跳 - // TODO 由于数据结构未定,暂时不发送真实数据 - coorCli.AgentStatusReport(coormq.NewAgentStatusReportBody(config.Cfg().ID, []int64{}, []int{}, ipfsStatus, localDirStatus)) - - time.Sleep(time.Minute * 5) - } - - coorCli.Close() - - wg.Done() -} diff --git a/client/README.md b/client/README.md index 3e52ab1..82768fb 100644 --- a/client/README.md +++ b/client/README.md @@ -1,2 +1,18 @@ -# storage-client +# Client服务 +## 目录结构 +- `internal`:服务源码。 + - `cmdline`:此服务提供的命令行功能。 + - `config`:服务使用的配置文件结构定义。 + - `http`:此服务提供的http接口。 + - `services`:服务的功能,被cmdline和http调用。 + - `task`:需要在后台异步运行的任务。 + +## 命令行 +Client程序可以作为一个命令行程序使用,能在`internal/cmdline`中找到它提供的所有命令。 + +使用时按照`./client <命令前缀1> <命令前缀2>... <命令函数参数1> <命令函数参数2>...`的方式编写命令。命令前缀在每个文件的init函数中能找到。 + +以列出某个Bucket下所有Package的命令PackageListBucketPackages为例,它的命令前缀是`pkg ls`,它的函数签名是`PackageListBucketPackages(ctx CommandContext, bucketID cdssdk.BucketID)`,忽略掉会自动填写的ctx参数,需要通过命令行提供的就是bucketID参数,假设为5,因此调用它的命令是:`./client pkg ls 5`。 + +可以通过使用`serve http`命令将Client程序作为一个http服务启动,并保持运行。 \ No newline at end of file diff --git a/common/README.md b/common/README.md index e2f2215..5459cf5 100644 --- a/common/README.md +++ b/common/README.md @@ -1,2 +1,21 @@ -# storage-common +# 公共库 +这个目录存放的是在storage仓库的几个程序之间共享的代码和数据结构定义。 +## 目录结构 +- `assets`:存放程序会读取使用的配置文件等。会在编译时一并复制到输出目录。 + - `confs`:服务的配置文件。 + - `scripts`:脚本文件。 +- `consts`:常量定义。 +- `globals`:全局变量定义,主要是各种客户端的Pool。 +- `magefiles`:mage工具的脚本。 +- `models`:公共数据结构定义。 +- `pkgs`:一些相对独立的功能模块。 + - `cmd`:公用的业务逻辑,比如上传Package和下载Package。 + - `db`:数据库的数据结构和操作函数。 + - `distlock`:分布式锁服务,核心机制使用的是`common/pkgs/distlock`,增加了根据存储系统的业务需求设计的锁。 + - `ec`:纠删码的库。 + - `grpc`:存放proto文件,以及使用protogen工具生成的代码文件。 + - `ioswitch`:IOSwitch模块。 + - `iterator`:迭代器。 + - `mq`:各个服务的rabbitmq接口的声明。 +- `utils`:一些暂时没有归类的工具函数。 \ No newline at end of file diff --git a/common/utils/config.go b/common/utils/config.go deleted file mode 100644 index 332b851..0000000 --- a/common/utils/config.go +++ /dev/null @@ -1,74 +0,0 @@ -package utils - -import ( - "fmt" - "regexp" - "strconv" - - "github.com/beevik/etree" -) - -type EcConfig struct { - ecid string `xml:"ecid"` - class string `xml:"class"` - n int `xml:"n"` - k int `xml:"k"` - w int `xml:"w"` - opt int `xml:"opt"` -} - -func (r *EcConfig) GetK() int { - return r.k -} - -func (r *EcConfig) GetN() int { - return r.n -} - -func GetEcPolicy() *map[string]EcConfig { - doc := etree.NewDocument() - if err := doc.ReadFromFile("../confs/sysSetting.xml"); err != nil { - panic(err) - } - ecMap := make(map[string]EcConfig, 20) - root := doc.SelectElement("setting") - for _, attr := range root.SelectElements("attribute") { - if name := attr.SelectElement("name"); name.Text() == "ec.policy" { - for _, eci := range attr.SelectElements("value") { - tt := EcConfig{} - tt.ecid = eci.SelectElement("ecid").Text() - tt.class = eci.SelectElement("class").Text() - tt.n, _ = strconv.Atoi(eci.SelectElement("n").Text()) - tt.k, _ = strconv.Atoi(eci.SelectElement("k").Text()) - tt.w, _ = strconv.Atoi(eci.SelectElement("w").Text()) - tt.opt, _ = strconv.Atoi(eci.SelectElement("opt").Text()) - ecMap[tt.ecid] = tt - } - } - } - fmt.Println(ecMap) - return &ecMap - // -} - -func GetAgentIps() []string { - doc := etree.NewDocument() - if err := doc.ReadFromFile("../confs/sysSetting.xml"); err != nil { - panic(err) - } - root := doc.SelectElement("setting") - var ips []string // 定义存储 IP 的字符串切片 - - for _, attr := range root.SelectElements("attribute") { - if name := attr.SelectElement("name"); name.Text() == "agents.addr" { - for _, ip := range attr.SelectElements("value") { - ipRegex := regexp.MustCompile(`\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b`) - match := ipRegex.FindString(ip.Text()) - print(match) - ips = append(ips, match) - } - } - } - - return ips -} diff --git a/common/utils/ping.go b/common/utils/ping.go deleted file mode 100644 index 85532d3..0000000 --- a/common/utils/ping.go +++ /dev/null @@ -1,82 +0,0 @@ -package utils - -import ( - //"fmt" - "github.com/go-ping/ping" - //"net" - "io/ioutil" - "net/http" - "strings" - "time" -) - -type ConnStatus struct { - Addr string - IsReachable bool - Delay time.Duration - TTL int -} - -// 获取本地主机 IP 地址 -func getLocalIP() string { - resp, err := http.Get("https://api.ipify.org") - if err != nil { - panic(err) - } - defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - panic(err) - } - - ip := strings.TrimSpace(string(body)) - return ip -} - -func GetConnStatus(remoteIP string) (*ConnStatus, error) { - // 本地主机 IP 地址 - //localIP := getLocalIP() - //print("!@#@#!") - //print(localIP) - conn := ConnStatus{ - Addr: remoteIP, - IsReachable: false, - } - pinger, err := ping.NewPinger(remoteIP) - - if err != nil { - return nil, err - } - pinger.Count = 5 // 设置 ping 次数为 5 - // pinger.Interval = 1 // 设置 ping 时间间隔为 1 秒 - //pinger.Timeout = 2 // 设置 ping 超时时间为 2 秒 - //pinger.SetPrivileged(true) // 设置使用特权模式以获取 TTL 值 - pinger.OnRecv = func(pkt *ping.Packet) { - //fmt.Printf("%d bytes from %s: icmp_seq=%d time=%v ttl=%v (DUP!)\n", - // pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt, pkt.Ttl) - conn.TTL = pkt.Ttl - } - - /*pinger.OnDuplicateRecv = func(pkt *ping.Packet) { - fmt.Printf("%d bytes from %s: icmp_seq=%d time=%v ttl=%v (DUP!)\n", - pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt, pkt.Ttl) - }*/ - - pinger.OnFinish = func(stats *ping.Statistics) { - //fmt.Printf("\n--- %s ping statistics ---\n", stats.Addr) - //fmt.Printf("%d packets transmitted, %d packets received, %v%% packet loss\n", - // stats.PacketsSent, stats.PacketsRecv, stats.PacketLoss) - //fmt.Printf("round-trip min/avg/max/stddev = %v/%v/%v/%v\n", - // stats.MinRtt, stats.AvgRtt, stats.MaxRtt, stats.StdDevRtt) - if stats.PacketLoss == 0.0 { - conn.IsReachable = true - } - conn.Delay = stats.AvgRtt - } - err = pinger.Run() // Blocks until finished. - if err != nil { - return nil, err - } - return &conn, nil -} diff --git a/coordinator/README.md b/coordinator/README.md index d09937e..6bb9407 100644 --- a/coordinator/README.md +++ b/coordinator/README.md @@ -1,2 +1,6 @@ -# storage-coordinator +# Coordinator服务 +## 目录结构 +- `internal`:服务源码。 + - `config`:服务使用的配置文件结构定义。 + - `mq`:通过rabbitmq对外提供的接口。实现了`common\pkgs\mq\coodinator`目录里文件定义的接口。 diff --git a/coordinator/internal/services/agent.go b/coordinator/internal/mq/agent.go similarity index 97% rename from coordinator/internal/services/agent.go rename to coordinator/internal/mq/agent.go index fa4137c..f782325 100644 --- a/coordinator/internal/services/agent.go +++ b/coordinator/internal/mq/agent.go @@ -1,4 +1,4 @@ -package services +package mq import ( coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" diff --git a/coordinator/internal/services/bucket.go b/coordinator/internal/mq/bucket.go similarity index 99% rename from coordinator/internal/services/bucket.go rename to coordinator/internal/mq/bucket.go index 564b105..78e5b41 100644 --- a/coordinator/internal/services/bucket.go +++ b/coordinator/internal/mq/bucket.go @@ -1,4 +1,4 @@ -package services +package mq import ( "database/sql" diff --git a/coordinator/internal/services/cache.go b/coordinator/internal/mq/cache.go similarity index 98% rename from coordinator/internal/services/cache.go rename to coordinator/internal/mq/cache.go index 9df882c..215f8a0 100644 --- a/coordinator/internal/services/cache.go +++ b/coordinator/internal/mq/cache.go @@ -1,4 +1,4 @@ -package services +package mq import ( "database/sql" diff --git a/coordinator/internal/services/node.go b/coordinator/internal/mq/node.go similarity index 98% rename from coordinator/internal/services/node.go rename to coordinator/internal/mq/node.go index 0614264..fb6e13b 100644 --- a/coordinator/internal/services/node.go +++ b/coordinator/internal/mq/node.go @@ -1,4 +1,4 @@ -package services +package mq import ( "gitlink.org.cn/cloudream/common/consts/errorcode" diff --git a/coordinator/internal/services/object.go b/coordinator/internal/mq/object.go similarity index 99% rename from coordinator/internal/services/object.go rename to coordinator/internal/mq/object.go index d901c88..876644c 100644 --- a/coordinator/internal/services/object.go +++ b/coordinator/internal/mq/object.go @@ -1,4 +1,4 @@ -package services +package mq import ( "database/sql" diff --git a/coordinator/internal/services/package.go b/coordinator/internal/mq/package.go similarity index 99% rename from coordinator/internal/services/package.go rename to coordinator/internal/mq/package.go index b4822e9..d7bf332 100644 --- a/coordinator/internal/services/package.go +++ b/coordinator/internal/mq/package.go @@ -1,4 +1,4 @@ -package services +package mq import ( "database/sql" diff --git a/coordinator/internal/services/service.go b/coordinator/internal/mq/service.go similarity index 94% rename from coordinator/internal/services/service.go rename to coordinator/internal/mq/service.go index 7d7db1d..1b00486 100644 --- a/coordinator/internal/services/service.go +++ b/coordinator/internal/mq/service.go @@ -1,4 +1,4 @@ -package services +package mq import ( mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db" diff --git a/coordinator/internal/services/storage.go b/coordinator/internal/mq/storage.go similarity index 98% rename from coordinator/internal/services/storage.go rename to coordinator/internal/mq/storage.go index abdaa6a..08fe3e4 100644 --- a/coordinator/internal/services/storage.go +++ b/coordinator/internal/mq/storage.go @@ -1,4 +1,4 @@ -package services +package mq import ( "database/sql" diff --git a/coordinator/main.go b/coordinator/main.go index ced3f98..182e847 100644 --- a/coordinator/main.go +++ b/coordinator/main.go @@ -9,7 +9,7 @@ import ( coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" "gitlink.org.cn/cloudream/storage/coordinator/internal/config" - "gitlink.org.cn/cloudream/storage/coordinator/internal/services" + "gitlink.org.cn/cloudream/storage/coordinator/internal/mq" ) func main() { @@ -35,7 +35,7 @@ func main() { logger.Fatalf("new scanner client failed, err: %s", err.Error()) } - coorSvr, err := coormq.NewServer(services.NewService(db, scanner), &config.Cfg().RabbitMQ) + coorSvr, err := coormq.NewServer(mq.NewService(db, scanner), &config.Cfg().RabbitMQ) if err != nil { logger.Fatalf("new coordinator server failed, err: %s", err.Error()) } diff --git a/scanner/README.md b/scanner/README.md index 2ebec19..b0fda77 100644 --- a/scanner/README.md +++ b/scanner/README.md @@ -1,2 +1,8 @@ -# storage-scanner +# Scanner服务 +## 目录结构 +- `internal`:服务源码。 + - `config`:服务使用的配置文件结构定义。 + - `event`:被投递到队列顺序执行的事件。 + - `mq`:通过rabbitmq对外提供的接口。实现了`common\pkgs\mq\scanner`目录里文件定义的接口。 + - `tickevent`:定时执行的事件。 \ No newline at end of file diff --git a/scanner/internal/services/event.go b/scanner/internal/mq/event.go similarity index 96% rename from scanner/internal/services/event.go rename to scanner/internal/mq/event.go index 9333c13..e6f83c0 100644 --- a/scanner/internal/services/event.go +++ b/scanner/internal/mq/event.go @@ -1,4 +1,4 @@ -package services +package mq import ( "gitlink.org.cn/cloudream/common/pkgs/logger" diff --git a/scanner/internal/services/service.go b/scanner/internal/mq/service.go similarity index 93% rename from scanner/internal/services/service.go rename to scanner/internal/mq/service.go index e87a8d2..0a234d3 100644 --- a/scanner/internal/services/service.go +++ b/scanner/internal/mq/service.go @@ -1,4 +1,4 @@ -package services +package mq import ( "gitlink.org.cn/cloudream/storage/scanner/internal/event" diff --git a/scanner/main.go b/scanner/main.go index a80d1b4..f3aee3d 100644 --- a/scanner/main.go +++ b/scanner/main.go @@ -12,7 +12,7 @@ import ( scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" "gitlink.org.cn/cloudream/storage/scanner/internal/config" "gitlink.org.cn/cloudream/storage/scanner/internal/event" - "gitlink.org.cn/cloudream/storage/scanner/internal/services" + "gitlink.org.cn/cloudream/storage/scanner/internal/mq" "gitlink.org.cn/cloudream/storage/scanner/internal/tickevent" ) @@ -49,7 +49,7 @@ func main() { eventExecutor := event.NewExecutor(db, distlockSvc) go serveEventExecutor(&eventExecutor, &wg) - agtSvr, err := scmq.NewServer(services.NewService(&eventExecutor), &config.Cfg().RabbitMQ) + agtSvr, err := scmq.NewServer(mq.NewService(&eventExecutor), &config.Cfg().RabbitMQ) if err != nil { logger.Fatalf("new agent server failed, err: %s", err.Error()) }