Browse Source

增加日志库;增加读取配置的功能

gitlink
Gitea 2 years ago
parent
commit
5ae2250e1f
5 changed files with 71 additions and 28 deletions
  1. +15
    -0
      Makefile
  2. +6
    -0
      assets/config/config.json
  3. +12
    -17
      clientCommand.go
  4. +20
    -0
      config/config.go
  5. +18
    -11
      main.go

+ 15
- 0
Makefile View File

@@ -0,0 +1,15 @@
OUTPUT_BINARY_NAME = "cloud-client"
OUTPUT_DIR_NAME = "cloud-client"


ASSETS_DIR_NAME = "assets"
BUILD_DIR = "../../build"

build:
go build -o ${BUILD_DIR}/${OUTPUT_DIR_NAME}/${OUTPUT_BINARY_NAME}
if [ -d ${ASSETS_DIR_NAME} ];then \
cp -r ${ASSETS_DIR_NAME}/* ${BUILD_DIR}/${OUTPUT_DIR_NAME}/; \
fi

clean:
rm -f ${BUILD_DIR}/${OUTPUT_DIR_NAME}/${OUTPUT_BINARY_NAME}

+ 6
- 0
assets/config/config.json View File

@@ -0,0 +1,6 @@
{
"grpcPort": 5010,
"grpcPacketSize": 10,
"ipfsPort": 10,
"maxReplicateNumber": 10
}

+ 12
- 17
clientCommand.go View File

@@ -9,6 +9,7 @@ import (
"strconv"
"sync"

"gitlink.org.cn/cloudream/client/config"
agentcaller "gitlink.org.cn/cloudream/proto"

racli "gitlink.org.cn/cloudream/rabbitmq/client"
@@ -24,12 +25,6 @@ import (
_ "google.golang.org/grpc/balancer/grpclb"
)

const (
// TODO2 改为配置文件读取
port = ":5010"
packetSizeInBytes = 10
)

func Move(bucketName string, objectName string, destination string) error {
//将bucketName, objectName, destination发给协调端
fmt.Println("move " + bucketName + "/" + objectName + " to " + destination)
@@ -119,7 +114,7 @@ func Read(localFilePath string, bucketName string, objectName string) error {
}

func repRead(fileSizeInBytes int64, ip string, repHash string, localFilePath string) error {
grpcAddr := ip + port
grpcAddr := fmt.Sprintf("%s:%d", ip, config.Cfg().GRPCPort)
conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure())
if err != nil {
return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
@@ -159,7 +154,7 @@ func repRead(fileSizeInBytes int64, ip string, repHash string, localFilePath str
return fmt.Errorf("request grpc failed, err: %w", err)
}

numPacket := (fileSizeInBytes + packetSizeInBytes - 1) / (packetSizeInBytes)
numPacket := (fileSizeInBytes + config.Cfg().GRCPPacketSize - 1) / config.Cfg().GRCPPacketSize
for i := int64(0); i < numPacket; i++ {
resp, err := stream.Recv()
if err != nil {
@@ -185,7 +180,7 @@ func ecRead(fileSizeInBytes int64, ips []string, blockHashs []string, blockIds [
ecN := ecPolicy.GetN()
var coefs = [][]int64{{1, 1, 1}, {1, 2, 3}} //2应替换为ecK,3应替换为ecN

numPacket := (fileSizeInBytes + int64(ecK)*packetSizeInBytes - 1) / (int64(ecK) * packetSizeInBytes)
numPacket := (fileSizeInBytes + int64(ecK)*config.Cfg().GRCPPacketSize - 1) / (int64(ecK) * config.Cfg().GRCPPacketSize)
fmt.Println(numPacket)
//创建channel
getBufs := make([]chan []byte, ecN)
@@ -218,8 +213,8 @@ func RepWrite(localFilePath string, bucketName string, objectName string, numRep
fileSizeInBytes := fileInfo.Size()

//写入对象的packet数
numWholePacket := fileSizeInBytes / packetSizeInBytes
lastPacketInBytes := fileSizeInBytes % packetSizeInBytes
numWholePacket := fileSizeInBytes / config.Cfg().GRCPPacketSize
lastPacketInBytes := fileSizeInBytes % config.Cfg().GRCPPacketSize
numPacket := numWholePacket
if lastPacketInBytes > 0 {
numPacket++
@@ -297,7 +292,7 @@ func EcWrite(localFilePath string, bucketName string, objectName string, ecName
var coefs = [][]int64{{1, 1, 1}, {1, 2, 3}} //2应替换为ecK,3应替换为ecN

//计算每个块的packet数
numPacket := (fileSizeInBytes + int64(ecK)*packetSizeInBytes - 1) / (int64(ecK) * packetSizeInBytes)
numPacket := (fileSizeInBytes + int64(ecK)*config.Cfg().GRCPPacketSize - 1) / (int64(ecK) * config.Cfg().GRCPPacketSize)
fmt.Println(numPacket)

userId := 0
@@ -368,7 +363,7 @@ func loadDistribute(localFilePath string, loadDistributeBufs []chan []byte, numW
fmt.Println("loadDistribute " + localFilePath)
file, _ := os.Open(localFilePath)
for i := 0; int64(i) < numWholePacket; i++ {
buf := make([]byte, packetSizeInBytes)
buf := make([]byte, config.Cfg().GRCPPacketSize)
_, err := file.Read(buf)
if err != nil && err != io.EOF {
break
@@ -398,7 +393,7 @@ func load(localFilePath string, loadBufs []chan []byte, ecK int, totalNumPacket
for i := 0; int64(i) < totalNumPacket; i++ {
print(totalNumPacket)

buf := make([]byte, packetSizeInBytes)
buf := make([]byte, config.Cfg().GRCPPacketSize)
idx := i % ecK
print(len(loadBufs))
_, err := file.Read(buf)
@@ -408,7 +403,7 @@ func load(localFilePath string, loadBufs []chan []byte, ecK int, totalNumPacket
print("***")
for j := ecK; j < len(loadBufs); j++ {
print(j)
zeroPkt := make([]byte, packetSizeInBytes)
zeroPkt := make([]byte, config.Cfg().GRCPPacketSize)
fmt.Printf("%v", zeroPkt)
loadBufs[j] <- zeroPkt
}
@@ -502,7 +497,7 @@ func send(blockName string, ip string, inBuf chan []byte, numPacket int64, wg *s
否则,像目前一样,使用grpc向指定节点获取
*/
//rpc相关
conn, err := grpc.Dial(ip+port, grpc.WithInsecure())
conn, err := grpc.Dial(fmt.Sprintf("%s:%d", ip, config.Cfg().GRPCPort), grpc.WithInsecure())
if err != nil {
panic(err)
}
@@ -535,7 +530,7 @@ func send(blockName string, ip string, inBuf chan []byte, numPacket int64, wg *s
func get(blockHash string, ip string, getBuf chan []byte, numPacket int64) {
//rpc相关
print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
conn, err := grpc.Dial(ip+port, grpc.WithInsecure())
conn, err := grpc.Dial(fmt.Sprintf("%s:%d", ip, config.Cfg().GRPCPort), grpc.WithInsecure())
if err != nil {
panic(err)
}


+ 20
- 0
config/config.go View File

@@ -0,0 +1,20 @@
package config

import "gitlink.org.cn/cloudream/utils/config"

type Config struct {
GRPCPort int `json:"grpcPort"`
GRCPPacketSize int64 `json:"grpcPacketSize"`
IPFSPort int `json:"ipfsPort"`
MaxReplicateNumber int `json:"maxReplicateNumber"`
}

var cfg Config

func Init() error {
return config.DefaultLoad(&cfg)
}

func Cfg() *Config {
return &cfg
}

+ 18
- 11
main.go View File

@@ -6,41 +6,48 @@ import (

_ "google.golang.org/grpc/balancer/grpclb"

"gitlink.org.cn/cloudream/client/config"

"strconv"
)

// TODO2 配置读取
//TODO xh: 读取配置文件,初始化变量,获取packetSizeInBytes、grpc port、ipfs port、最大副本数、本机公网Ip等信息,参照src/utils/config.go

func main() {
args := os.Args
arg_num := len(os.Args)
for i := 0; i < arg_num; i++ {
fmt.Println(args[i])
err := config.Init()
if err != nil {
fmt.Printf("init config failed, err: %s", err.Error())
os.Exit(1)
}

args := os.Args
switch args[1] {
case "ecWrite":
//TODO: 写入对象时,Coor判断对象是否已存在,如果存在,则直接返回
if err := EcWrite(args[2], args[3], args[4], args[5]); err != nil {
fmt.Printf("ec write failed, err: %s", err.Error())
os.Exit(1)
}
case "write":
numRep, _ := strconv.Atoi(args[5])
if numRep <= 0 || numRep > 10 { //TODO xh:10改为从配置文件中读出的最大副本数
print("write::InputError!") //TODO xh:优化提示语
} else {
if err := RepWrite(args[2], args[3], args[4], numRep); err != nil {
fmt.Printf("rep write failed, err: %s", err.Error())
}
if numRep <= 0 || numRep > config.Cfg().MaxReplicateNumber {
fmt.Printf("replicate number should not be more than %d", config.Cfg().MaxReplicateNumber)
os.Exit(1)
}
if err := RepWrite(args[2], args[3], args[4], numRep); err != nil {
fmt.Printf("rep write failed, err: %s", err.Error())
os.Exit(1)
}

case "read":
if err := Read(args[2], args[3], args[4]); err != nil {
fmt.Printf("read failed, err: %s", err.Error())
os.Exit(1)
}
case "move":
if err := Move(args[2], args[3], args[4]); err != nil {
fmt.Printf("move failed, err: %s", err.Error())
os.Exit(1)
}
}
/*


Loading…
Cancel
Save