Browse Source

Merge branch 'feature_gxh'

gitlink
Sydonian 1 year ago
parent
commit
ec80dca410
29 changed files with 101 additions and 237 deletions
  1. +27
    -1
      README.md
  2. +9
    -1
      agent/README.md
  3. +0
    -0
      agent/internal/grpc/io.go
  4. +0
    -0
      agent/internal/grpc/service.go
  5. +0
    -0
      agent/internal/mq/agent.go
  6. +0
    -0
      agent/internal/mq/cache.go
  7. +0
    -0
      agent/internal/mq/io.go
  8. +0
    -0
      agent/internal/mq/object.go
  9. +0
    -0
      agent/internal/mq/service.go
  10. +0
    -0
      agent/internal/mq/storage.go
  11. +2
    -61
      agent/main.go
  12. +17
    -1
      client/README.md
  13. +20
    -1
      common/README.md
  14. +0
    -74
      common/utils/config.go
  15. +0
    -82
      common/utils/ping.go
  16. +5
    -1
      coordinator/README.md
  17. +1
    -1
      coordinator/internal/mq/agent.go
  18. +1
    -1
      coordinator/internal/mq/bucket.go
  19. +1
    -1
      coordinator/internal/mq/cache.go
  20. +1
    -1
      coordinator/internal/mq/node.go
  21. +1
    -1
      coordinator/internal/mq/object.go
  22. +1
    -1
      coordinator/internal/mq/package.go
  23. +1
    -1
      coordinator/internal/mq/service.go
  24. +1
    -1
      coordinator/internal/mq/storage.go
  25. +2
    -2
      coordinator/main.go
  26. +7
    -1
      scanner/README.md
  27. +1
    -1
      scanner/internal/mq/event.go
  28. +1
    -1
      scanner/internal/mq/service.go
  29. +2
    -2
      scanner/main.go

+ 27
- 1
README.md View File

@@ -1,2 +1,28 @@
# storage
# 跨中心存储系统


## 目录结构
此仓库是一个go module,但包含了多个服务的源码,你可以在每个服务的目录中找到main.go。可以通过编译脚本的参数来指定生成哪一个服务。

- `agent`:Agent服务的源码。
- `client`:Client服务的源码。
- `common`:存放在几个服务之间共享的代码以及一些数据结构定义。
- `coordinator`:Coordinator服务的源码。
- `scanner`:Scanner服务的源码。

同时还有以下两个与编译相关的目录:
- `build`:服务编译后的输出目录,只会在编译后生成。
- `magefiles`:mage工具使用的编译脚本。

## 编译
运行编译脚本需要使用mage工具,此处是[仓库链接](https://github.com/magefile/mage)。

安装好mage工具之后,进入到仓库根目录,使用`mage agent`即可编译Agent服务。与此相同的还有`mage client`、`mage coodinator`、`mage scanner`。可以同时指定多个参数来编译多个服务,如果要一次性编译所有服务,可以使用`mage bin`。

使用`mage confs`命令可以将`common/assets/confs`的配置文件拷贝到输出目录,使用`mage scripts`将`scripts`目录里的脚本拷贝到输出目录。

使用`mage all`可以一次性完成编译、拷贝工作。

可以通过增加额外的参数来指定编译目标平台,比如`mage win amd64 agent`。支持的操作系统参数有`win`、`linux`,支持的CPU架构参数有`amd64`、`arm64`。这些参数同样可以和`bin`、`all`参数一起使用。

注意:编译目标平台参数必须在编译二进制参数之前。

+ 9
- 1
agent/README.md View File

@@ -1,2 +1,10 @@
# storage-agent
# Agent服务

## 目录结构
- `internal`:服务源码。
- `config`:服务使用的配置文件结构定义。
- `grpc`:通过grpc对外提供的接口。实现了.proto文件里定义的接口,这个文件位于`common\pkgs\grpc\agent\agent.proto`。
- `mq`:通过rabbitmq对外提供的接口。实现了`common\pkgs\mq\agent`目录里文件定义的接口。
- `task`:需要在后台异步运行的任务。


agent/internal/services/grpc/io.go → agent/internal/grpc/io.go View File


agent/internal/services/grpc/service.go → agent/internal/grpc/service.go View File


agent/internal/services/mq/agent.go → agent/internal/mq/agent.go View File


agent/internal/services/mq/cache.go → agent/internal/mq/cache.go View File


agent/internal/services/mq/io.go → agent/internal/mq/io.go View File


agent/internal/services/mq/object.go → agent/internal/mq/object.go View File


agent/internal/services/mq/service.go → agent/internal/mq/service.go View File


agent/internal/services/mq/storage.go → agent/internal/mq/storage.go View File


+ 2
- 61
agent/main.go View File

@@ -5,7 +5,6 @@ import (
"net"
"os"
"sync"
"time"

log "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage/agent/internal/config"
@@ -20,14 +19,10 @@ import (

"google.golang.org/grpc"

"gitlink.org.cn/cloudream/storage/common/consts"
"gitlink.org.cn/cloudream/storage/common/utils"

agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"

grpcsvc "gitlink.org.cn/cloudream/storage/agent/internal/services/grpc"
cmdsvc "gitlink.org.cn/cloudream/storage/agent/internal/services/mq"
grpcsvc "gitlink.org.cn/cloudream/storage/agent/internal/grpc"
cmdsvc "gitlink.org.cn/cloudream/storage/agent/internal/mq"
)

// TODO 此数据是否在运行时会发生变化?
@@ -136,57 +131,3 @@ func serveDistLock(svc *distlock.Service) {

log.Info("distlock stopped")
}

func reportStatus(wg *sync.WaitGroup) {
coorCli, err := coormq.NewClient(&config.Cfg().RabbitMQ)
if err != nil {
wg.Done()
log.Error("new coordinator client failed, err: %w", err)
return
}

// TODO 增加退出死循环的方法
for {
//挨个ping其他agent(AgentIpList),记录延迟到AgentDelay
// TODO AgentIP考虑放到配置文件里或者启动时从coor获取
ips := utils.GetAgentIps()
agentDelay := make([]int, len(ips))
waitG := sync.WaitGroup{}
waitG.Add(len(ips))
for i := 0; i < len(ips); i++ {
go func(i int, wg *sync.WaitGroup) {
connStatus, err := utils.GetConnStatus(ips[i])
if err != nil {
wg.Done()
log.Warnf("ping %s failed, err: %s", ips[i], err.Error())
return
}

log.Debugf("connection status to %s: %+v", ips[i], connStatus)

if connStatus.IsReachable {
agentDelay[i] = int(connStatus.Delay.Milliseconds()) + 1
} else {
agentDelay[i] = -1
}

wg.Done()
}(i, &waitG)
}
waitG.Wait()
//TODO: 查看本地IPFS daemon是否正常,记录到ipfsStatus
ipfsStatus := consts.IPFSStateOK
//TODO:访问自身资源目录(配置文件中获取路径),记录是否正常,记录到localDirStatus
localDirStatus := consts.StorageDirectoryStateOK

//发送心跳
// TODO 由于数据结构未定,暂时不发送真实数据
coorCli.AgentStatusReport(coormq.NewAgentStatusReportBody(config.Cfg().ID, []int64{}, []int{}, ipfsStatus, localDirStatus))

time.Sleep(time.Minute * 5)
}

coorCli.Close()

wg.Done()
}

+ 17
- 1
client/README.md View File

@@ -1,2 +1,18 @@
# storage-client
# Client服务

## 目录结构
- `internal`:服务源码。
- `cmdline`:此服务提供的命令行功能。
- `config`:服务使用的配置文件结构定义。
- `http`:此服务提供的http接口。
- `services`:服务的功能,被cmdline和http调用。
- `task`:需要在后台异步运行的任务。

## 命令行
Client程序可以作为一个命令行程序使用,能在`internal/cmdline`中找到它提供的所有命令。

使用时按照`./client <命令前缀1> <命令前缀2>... <命令函数参数1> <命令函数参数2>...`的方式编写命令。命令前缀在每个文件的init函数中能找到。

以列出某个Bucket下所有Package的命令PackageListBucketPackages为例,它的命令前缀是`pkg ls`,它的函数签名是`PackageListBucketPackages(ctx CommandContext, bucketID cdssdk.BucketID)`,忽略掉会自动填写的ctx参数,需要通过命令行提供的就是bucketID参数,假设为5,因此调用它的命令是:`./client pkg ls 5`。

可以通过使用`serve http`命令将Client程序作为一个http服务启动,并保持运行。

+ 20
- 1
common/README.md View File

@@ -1,2 +1,21 @@
# storage-common
# 公共库
这个目录存放的是在storage仓库的几个程序之间共享的代码和数据结构定义。

## 目录结构
- `assets`:存放程序会读取使用的配置文件等。会在编译时一并复制到输出目录。
- `confs`:服务的配置文件。
- `scripts`:脚本文件。
- `consts`:常量定义。
- `globals`:全局变量定义,主要是各种客户端的Pool。
- `magefiles`:mage工具的脚本。
- `models`:公共数据结构定义。
- `pkgs`:一些相对独立的功能模块。
- `cmd`:公用的业务逻辑,比如上传Package和下载Package。
- `db`:数据库的数据结构和操作函数。
- `distlock`:分布式锁服务,核心机制使用的是`common/pkgs/distlock`,增加了根据存储系统的业务需求设计的锁。
- `ec`:纠删码的库。
- `grpc`:存放proto文件,以及使用protogen工具生成的代码文件。
- `ioswitch`:IOSwitch模块。
- `iterator`:迭代器。
- `mq`:各个服务的rabbitmq接口的声明。
- `utils`:一些暂时没有归类的工具函数。

+ 0
- 74
common/utils/config.go View File

@@ -1,74 +0,0 @@
package utils

import (
"fmt"
"regexp"
"strconv"

"github.com/beevik/etree"
)

type EcConfig struct {
ecid string `xml:"ecid"`
class string `xml:"class"`
n int `xml:"n"`
k int `xml:"k"`
w int `xml:"w"`
opt int `xml:"opt"`
}

func (r *EcConfig) GetK() int {
return r.k
}

func (r *EcConfig) GetN() int {
return r.n
}

func GetEcPolicy() *map[string]EcConfig {
doc := etree.NewDocument()
if err := doc.ReadFromFile("../confs/sysSetting.xml"); err != nil {
panic(err)
}
ecMap := make(map[string]EcConfig, 20)
root := doc.SelectElement("setting")
for _, attr := range root.SelectElements("attribute") {
if name := attr.SelectElement("name"); name.Text() == "ec.policy" {
for _, eci := range attr.SelectElements("value") {
tt := EcConfig{}
tt.ecid = eci.SelectElement("ecid").Text()
tt.class = eci.SelectElement("class").Text()
tt.n, _ = strconv.Atoi(eci.SelectElement("n").Text())
tt.k, _ = strconv.Atoi(eci.SelectElement("k").Text())
tt.w, _ = strconv.Atoi(eci.SelectElement("w").Text())
tt.opt, _ = strconv.Atoi(eci.SelectElement("opt").Text())
ecMap[tt.ecid] = tt
}
}
}
fmt.Println(ecMap)
return &ecMap
//
}

func GetAgentIps() []string {
doc := etree.NewDocument()
if err := doc.ReadFromFile("../confs/sysSetting.xml"); err != nil {
panic(err)
}
root := doc.SelectElement("setting")
var ips []string // 定义存储 IP 的字符串切片

for _, attr := range root.SelectElements("attribute") {
if name := attr.SelectElement("name"); name.Text() == "agents.addr" {
for _, ip := range attr.SelectElements("value") {
ipRegex := regexp.MustCompile(`\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b`)
match := ipRegex.FindString(ip.Text())
print(match)
ips = append(ips, match)
}
}
}

return ips
}

+ 0
- 82
common/utils/ping.go View File

@@ -1,82 +0,0 @@
package utils

import (
//"fmt"
"github.com/go-ping/ping"
//"net"
"io/ioutil"
"net/http"
"strings"
"time"
)

type ConnStatus struct {
Addr string
IsReachable bool
Delay time.Duration
TTL int
}

// 获取本地主机 IP 地址
func getLocalIP() string {
resp, err := http.Get("https://api.ipify.org")
if err != nil {
panic(err)
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
panic(err)
}

ip := strings.TrimSpace(string(body))
return ip
}

func GetConnStatus(remoteIP string) (*ConnStatus, error) {
// 本地主机 IP 地址
//localIP := getLocalIP()
//print("!@#@#!")
//print(localIP)
conn := ConnStatus{
Addr: remoteIP,
IsReachable: false,
}
pinger, err := ping.NewPinger(remoteIP)

if err != nil {
return nil, err
}
pinger.Count = 5 // 设置 ping 次数为 5
// pinger.Interval = 1 // 设置 ping 时间间隔为 1 秒
//pinger.Timeout = 2 // 设置 ping 超时时间为 2 秒
//pinger.SetPrivileged(true) // 设置使用特权模式以获取 TTL 值
pinger.OnRecv = func(pkt *ping.Packet) {
//fmt.Printf("%d bytes from %s: icmp_seq=%d time=%v ttl=%v (DUP!)\n",
// pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt, pkt.Ttl)
conn.TTL = pkt.Ttl
}

/*pinger.OnDuplicateRecv = func(pkt *ping.Packet) {
fmt.Printf("%d bytes from %s: icmp_seq=%d time=%v ttl=%v (DUP!)\n",
pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt, pkt.Ttl)
}*/

pinger.OnFinish = func(stats *ping.Statistics) {
//fmt.Printf("\n--- %s ping statistics ---\n", stats.Addr)
//fmt.Printf("%d packets transmitted, %d packets received, %v%% packet loss\n",
// stats.PacketsSent, stats.PacketsRecv, stats.PacketLoss)
//fmt.Printf("round-trip min/avg/max/stddev = %v/%v/%v/%v\n",
// stats.MinRtt, stats.AvgRtt, stats.MaxRtt, stats.StdDevRtt)
if stats.PacketLoss == 0.0 {
conn.IsReachable = true
}
conn.Delay = stats.AvgRtt
}
err = pinger.Run() // Blocks until finished.
if err != nil {
return nil, err
}
return &conn, nil
}

+ 5
- 1
coordinator/README.md View File

@@ -1,2 +1,6 @@
# storage-coordinator
# Coordinator服务

## 目录结构
- `internal`:服务源码。
- `config`:服务使用的配置文件结构定义。
- `mq`:通过rabbitmq对外提供的接口。实现了`common\pkgs\mq\coodinator`目录里文件定义的接口。

coordinator/internal/services/agent.go → coordinator/internal/mq/agent.go View File

@@ -1,4 +1,4 @@
package services
package mq

import (
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"

coordinator/internal/services/bucket.go → coordinator/internal/mq/bucket.go View File

@@ -1,4 +1,4 @@
package services
package mq

import (
"database/sql"

coordinator/internal/services/cache.go → coordinator/internal/mq/cache.go View File

@@ -1,4 +1,4 @@
package services
package mq

import (
"database/sql"

coordinator/internal/services/node.go → coordinator/internal/mq/node.go View File

@@ -1,4 +1,4 @@
package services
package mq

import (
"gitlink.org.cn/cloudream/common/consts/errorcode"

coordinator/internal/services/object.go → coordinator/internal/mq/object.go View File

@@ -1,4 +1,4 @@
package services
package mq

import (
"database/sql"

coordinator/internal/services/package.go → coordinator/internal/mq/package.go View File

@@ -1,4 +1,4 @@
package services
package mq

import (
"database/sql"

coordinator/internal/services/service.go → coordinator/internal/mq/service.go View File

@@ -1,4 +1,4 @@
package services
package mq

import (
mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db"

coordinator/internal/services/storage.go → coordinator/internal/mq/storage.go View File

@@ -1,4 +1,4 @@
package services
package mq

import (
"database/sql"

+ 2
- 2
coordinator/main.go View File

@@ -9,7 +9,7 @@ import (
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner"
"gitlink.org.cn/cloudream/storage/coordinator/internal/config"
"gitlink.org.cn/cloudream/storage/coordinator/internal/services"
"gitlink.org.cn/cloudream/storage/coordinator/internal/mq"
)

func main() {
@@ -35,7 +35,7 @@ func main() {
logger.Fatalf("new scanner client failed, err: %s", err.Error())
}

coorSvr, err := coormq.NewServer(services.NewService(db, scanner), &config.Cfg().RabbitMQ)
coorSvr, err := coormq.NewServer(mq.NewService(db, scanner), &config.Cfg().RabbitMQ)
if err != nil {
logger.Fatalf("new coordinator server failed, err: %s", err.Error())
}


+ 7
- 1
scanner/README.md View File

@@ -1,2 +1,8 @@
# storage-scanner
# Scanner服务

## 目录结构
- `internal`:服务源码。
- `config`:服务使用的配置文件结构定义。
- `event`:被投递到队列顺序执行的事件。
- `mq`:通过rabbitmq对外提供的接口。实现了`common\pkgs\mq\scanner`目录里文件定义的接口。
- `tickevent`:定时执行的事件。

scanner/internal/services/event.go → scanner/internal/mq/event.go View File

@@ -1,4 +1,4 @@
package services
package mq

import (
"gitlink.org.cn/cloudream/common/pkgs/logger"

scanner/internal/services/service.go → scanner/internal/mq/service.go View File

@@ -1,4 +1,4 @@
package services
package mq

import (
"gitlink.org.cn/cloudream/storage/scanner/internal/event"

+ 2
- 2
scanner/main.go View File

@@ -12,7 +12,7 @@ import (
scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner"
"gitlink.org.cn/cloudream/storage/scanner/internal/config"
"gitlink.org.cn/cloudream/storage/scanner/internal/event"
"gitlink.org.cn/cloudream/storage/scanner/internal/services"
"gitlink.org.cn/cloudream/storage/scanner/internal/mq"
"gitlink.org.cn/cloudream/storage/scanner/internal/tickevent"
)

@@ -49,7 +49,7 @@ func main() {
eventExecutor := event.NewExecutor(db, distlockSvc)
go serveEventExecutor(&eventExecutor, &wg)

agtSvr, err := scmq.NewServer(services.NewService(&eventExecutor), &config.Cfg().RabbitMQ)
agtSvr, err := scmq.NewServer(mq.NewService(&eventExecutor), &config.Cfg().RabbitMQ)
if err != nil {
logger.Fatalf("new agent server failed, err: %s", err.Error())
}


Loading…
Cancel
Save