You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

commandServer.go 5.0 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package main
  2. import (
  3. //"context"
  4. //"io"
  5. "fmt"
  6. //"path/filepath"
  7. //"sync"
  8. "encoding/json"
  9. "rabbitmq"
  10. "strconv"
  11. //agentcaller "proto"
  12. //"github.com/pborman/uuid"
  13. //"github.com/streadway/amqp"
  14. //"google.golang.org/grpc"
  15. )
  16. func rabbitSend(c []byte, userId int) {
  17. queueName := "coorClientQueue" + strconv.Itoa(userId)
  18. rabbit := rabbitmq.NewRabbitMQSimple(queueName)
  19. fmt.Println(string(c))
  20. rabbit.PublishSimple(c)
  21. rabbit.Destroy()
  22. }
  23. func TempCacheReport(command rabbitmq.TempCacheReport) {
  24. fmt.Println("TempCacheReport")
  25. fmt.Println(command.Hashs)
  26. fmt.Println(command.Ip)
  27. //返回消息
  28. //jh:将hashs中的hash,IP插入缓存表中,TempOrPin字段为true,Time为插入时的时间戳
  29. //-如果要插入的hash、IP在表中已存在且所对应的TempOrPin字段为false,则不做任何操作
  30. //-如果要插入的hash、IP在表中已存在且所对应的TempOrPin字段为true,则更新Time
  31. }
  32. func CoorMove(command rabbitmq.MoveCommand) {
  33. fmt.Println("CoorMove")
  34. fmt.Println(command.BucketName)
  35. //查询数据库,获取冗余类型,冗余参数
  36. //jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSizeInBytes
  37. //-若redundancy是rep,查询对象副本表, 获得repHash
  38. //--ids :={0}
  39. //--hashs := {repHash}
  40. //-若redundancy是ec,查询对象编码块表,获得blockHashs, ids(innerID),
  41. //--查询缓存表,获得每个hash的nodeIps、TempOrPins、Times
  42. //--查询节点延迟表,得到command.destination与各个nodeIps的的延迟,存到一个map类型中(Delay)
  43. //--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids
  44. redundancy := "rep"
  45. ecName := "ecName"
  46. hashs := []string{"block1.json", "block2.json"}
  47. ids := []int{0, 1}
  48. fileSizeInBytes := 21
  49. res := rabbitmq.MoveRes{
  50. Redundancy: redundancy,
  51. EcName: ecName,
  52. Hashs: hashs,
  53. Ids: ids,
  54. FileSizeInBytes: int64(fileSizeInBytes),
  55. }
  56. c, _ := json.Marshal(res)
  57. rabbitSend(c, command.UserId)
  58. }
  59. func CoorEcWrite(command rabbitmq.EcWriteCommand) {
  60. fmt.Println("CoorEcWrite")
  61. fmt.Println(command.BucketName)
  62. //jh:根据command中的UserId查询用户节点权限表,返回用户可用的NodeIp
  63. //kx:根据command中的ecName,得到ecN,然后从jh查到的NodeIp中选择ecN个,赋值给Ips
  64. //jh:完成对象表、对象编码块表的插入(对象编码块表的Hash字段先不插入)
  65. //返回消息
  66. res := rabbitmq.WriteRes{
  67. Ips: []string{"localhost", "localhost", "localhost"},
  68. }
  69. c, _ := json.Marshal(res)
  70. rabbitSend(c, command.UserId)
  71. }
  72. func CoorEcWriteHash(command rabbitmq.WriteHashCommand) {
  73. fmt.Println("CoorEcWriteHash")
  74. fmt.Println(command.BucketName)
  75. //jh:根据command中的信息,插入对象编码块表中的Hash字段,并完成缓存表的插入
  76. //返回消息
  77. res := rabbitmq.WriteHashRes{
  78. MetaCode: 0,
  79. }
  80. c, _ := json.Marshal(res)
  81. rabbitSend(c, command.UserId)
  82. }
  83. func CoorRead(command rabbitmq.ReadCommand) {
  84. fmt.Println("CoorRead")
  85. fmt.Println(command.BucketName)
  86. //jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSizeInBytes
  87. //-若redundancy是rep,查询对象副本表, 获得repHash
  88. //--ids :={0}
  89. //--hashs := {repHash}
  90. //-若redundancy是ec,查询对象编码块表,获得blockHashs, ids(innerID),
  91. //--查询缓存表,获得每个hash的nodeIps、TempOrPins、Times
  92. //--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)确定hashs、ids
  93. //返回消息
  94. res := rabbitmq.ReadRes{
  95. Redundancy: "rep",
  96. Ips: []string{"localhost", "localhost"},
  97. Hashs: []string{"block1.json", "block2.json"},
  98. BlockIds: []int{0, 1},
  99. EcName: "ecName",
  100. FileSizeInBytes: 21,
  101. }
  102. c, _ := json.Marshal(res)
  103. rabbitSend(c, command.UserId)
  104. }
  105. func CoorRepWriteHash(command rabbitmq.WriteHashCommand) {
  106. fmt.Println("CoorRepWriteHash")
  107. fmt.Println(command.BucketName)
  108. //jh:根据command中的信息,插入对象副本表中的Hash字段,并完成缓存表的插入
  109. //返回消息
  110. res := rabbitmq.WriteHashRes{
  111. MetaCode: 0,
  112. }
  113. c, _ := json.Marshal(res)
  114. rabbitSend(c, command.UserId)
  115. }
  116. func CoorRepWrite(command rabbitmq.RepWriteCommand) {
  117. fmt.Println("CoorRepWrite")
  118. fmt.Println(command.BucketName)
  119. //jh:根据command中的UserId查询用户节点权限表,返回用户可用的NodeIp;
  120. //kx:根据command中的ecName,得到ecN,然后从jh查到的NodeIp中选择numRep个,赋值给Ips
  121. //jh:完成对象表、对象副本表的插入(对象副本表的Hash字段先不插入)
  122. //返回消息
  123. res := rabbitmq.WriteRes{
  124. Ips: []string{"localhost", "localhost", "localhost"},
  125. }
  126. c, _ := json.Marshal(res)
  127. rabbitSend(c, command.UserId)
  128. }
  129. func HeartReport(command rabbitmq.HeartReport) {
  130. //jh:根据command中的Ip,插入节点延迟表,和节点表的NodeStatus
  131. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。

Contributors (1)