You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

object.go 16 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. package services
  2. import (
  3. "fmt"
  4. "io"
  5. "math/rand"
  6. "gitlink.org.cn/cloudream/client/internal/config"
  7. "gitlink.org.cn/cloudream/common/consts"
  8. "gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder"
  9. log "gitlink.org.cn/cloudream/common/pkg/logger"
  10. mygrpc "gitlink.org.cn/cloudream/common/utils/grpc"
  11. myio "gitlink.org.cn/cloudream/common/utils/io"
  12. serder "gitlink.org.cn/cloudream/common/utils/serder"
  13. mysort "gitlink.org.cn/cloudream/common/utils/sort"
  14. "gitlink.org.cn/cloudream/db/model"
  15. agentcaller "gitlink.org.cn/cloudream/proto"
  16. agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
  17. ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
  18. agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
  19. coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
  20. "google.golang.org/grpc"
  21. "google.golang.org/grpc/credentials/insecure"
  22. lo "github.com/samber/lo"
  23. )
  24. type ObjectService struct {
  25. *Service
  26. }
  27. func (svc *Service) ObjectSvc() *ObjectService {
  28. return &ObjectService{Service: svc}
  29. }
  30. func (svc *ObjectService) GetObject(userID int, objectID int) (model.Object, error) {
  31. // TODO
  32. panic("not implement yet")
  33. }
  34. func (svc *ObjectService) DownloadObject(userID int, objectID int) (io.ReadCloser, error) {
  35. mutex, err := reqbuilder.NewBuilder().
  36. // 用于判断用户是否有对象权限
  37. Metadata().UserBucket().ReadAny().
  38. // 用于查询可用的下载节点
  39. Node().ReadAny().
  40. // 用于查询Rep配置
  41. ObjectRep().ReadOne(objectID).
  42. // 用于查询Block配置
  43. ObjectBlock().ReadAny().
  44. // 用于查询包含了副本的节点
  45. Cache().ReadAny().
  46. MutexLock(svc.distlock)
  47. if err != nil {
  48. return nil, fmt.Errorf("acquire locks failed, err: %w", err)
  49. }
  50. defer mutex.Unlock()
  51. preDownloadResp, err := svc.coordinator.PreDownloadObject(coormsg.NewPreDownloadObjectBody(objectID, userID, config.Cfg().ExternalIP))
  52. if err != nil {
  53. return nil, fmt.Errorf("request to coordinator failed, err: %w", err)
  54. }
  55. if preDownloadResp.IsFailed() {
  56. return nil, fmt.Errorf("coordinator operation failed, code: %s, message: %s", preDownloadResp.ErrorCode, preDownloadResp.ErrorMessage)
  57. }
  58. switch preDownloadResp.Body.Redundancy {
  59. case consts.REDUNDANCY_REP:
  60. var repInfo ramsg.RespObjectRepInfo
  61. err := serder.MapToObject(preDownloadResp.Body.RedundancyData.(map[string]any), &repInfo)
  62. if err != nil {
  63. return nil, fmt.Errorf("redundancy data to rep info failed, err: %w", err)
  64. }
  65. if len(repInfo.Nodes) == 0 {
  66. return nil, fmt.Errorf("no node has this file")
  67. }
  68. // 选择下载节点
  69. entry := svc.chooseDownloadNode(repInfo.Nodes)
  70. // 如果客户端与节点在同一个地域,则使用内网地址连接节点
  71. nodeIP := entry.ExternalIP
  72. if entry.IsSameLocation {
  73. nodeIP = entry.LocalIP
  74. log.Infof("client and node %d are at the same location, use local ip\n", entry.ID)
  75. }
  76. reader, err := svc.downloadRepObject(entry.ID, nodeIP, repInfo.FileHash)
  77. if err != nil {
  78. return nil, fmt.Errorf("rep read failed, err: %w", err)
  79. }
  80. return reader, nil
  81. //case consts.REDUNDANCY_EC:
  82. // TODO EC部分的代码要考虑重构
  83. // ecRead(readResp.FileSize, readResp.NodeIPs, readResp.Hashes, readResp.BlockIDs, *readResp.ECName)
  84. }
  85. return nil, fmt.Errorf("unsupported redundancy type: %s", preDownloadResp.Body.Redundancy)
  86. }
  87. // chooseDownloadNode 选择一个下载节点
  88. // 1. 从与当前客户端相同地域的节点中随机选一个
  89. // 2. 没有用的话从所有节点中随机选一个
  90. func (svc *ObjectService) chooseDownloadNode(entries []ramsg.RespNode) ramsg.RespNode {
  91. sameLocationEntries := lo.Filter(entries, func(e ramsg.RespNode, i int) bool { return e.IsSameLocation })
  92. if len(sameLocationEntries) > 0 {
  93. return sameLocationEntries[rand.Intn(len(sameLocationEntries))]
  94. }
  95. return entries[rand.Intn(len(entries))]
  96. }
  97. func (svc *ObjectService) downloadRepObject(nodeID int, nodeIP string, fileHash string) (io.ReadCloser, error) {
  98. if svc.ipfs != nil {
  99. log.Infof("try to use local IPFS to download file")
  100. reader, err := svc.downloadFromLocalIPFS(fileHash)
  101. if err == nil {
  102. return reader, nil
  103. }
  104. log.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error())
  105. }
  106. return svc.downloadFromNode(nodeID, nodeIP, fileHash)
  107. }
  108. func (svc *ObjectService) downloadFromNode(nodeID int, nodeIP string, fileHash string) (io.ReadCloser, error) {
  109. // 二次获取锁
  110. mutex, err := reqbuilder.NewBuilder().
  111. // 用于从IPFS下载文件
  112. IPFS().ReadOneRep(nodeID, fileHash).
  113. MutexLock(svc.distlock)
  114. if err != nil {
  115. return nil, fmt.Errorf("acquire locks failed, err: %w", err)
  116. }
  117. // 连接grpc
  118. grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort)
  119. conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  120. if err != nil {
  121. return nil, fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
  122. }
  123. // 下载文件
  124. client := agentcaller.NewFileTransportClient(conn)
  125. reader, err := mygrpc.GetFileAsStream(client, fileHash)
  126. if err != nil {
  127. conn.Close()
  128. return nil, fmt.Errorf("request to get file failed, err: %w", err)
  129. }
  130. reader = myio.AfterReadClosed(reader, func(io.ReadCloser) {
  131. conn.Close()
  132. mutex.Unlock()
  133. })
  134. return reader, nil
  135. }
  136. func (svc *ObjectService) downloadFromLocalIPFS(fileHash string) (io.ReadCloser, error) {
  137. // TODO 这里也可以改成Task
  138. reader, err := svc.ipfs.OpenRead(fileHash)
  139. if err != nil {
  140. return nil, fmt.Errorf("read ipfs file failed, err: %w", err)
  141. }
  142. return reader, nil
  143. }
  144. func (svc *ObjectService) UploadRepObject(userID int, bucketID int, objectName string, file io.ReadCloser, fileSize int64, repCount int) error {
  145. mutex, err := reqbuilder.NewBuilder().
  146. Metadata().
  147. // 用于判断用户是否有桶的权限
  148. UserBucket().ReadOne(userID, bucketID).
  149. // 用于防止创建了多个同名对象
  150. Object().CreateOne(bucketID, objectName).
  151. // 用于查询可用的上传节点
  152. Node().ReadAny().
  153. // 用于设置Rep配置
  154. ObjectRep().CreateAny().
  155. // 用于创建Cache记录
  156. Cache().CreateAny().
  157. MutexLock(svc.distlock)
  158. if err != nil {
  159. return fmt.Errorf("acquire locks failed, err: %w", err)
  160. }
  161. defer mutex.Unlock()
  162. //发送写请求,请求Coor分配写入节点Ip
  163. repWriteResp, err := svc.coordinator.PreUploadRepObject(coormsg.NewPreUploadRepObjectBody(bucketID, objectName, fileSize, userID, config.Cfg().ExternalIP))
  164. if err != nil {
  165. return fmt.Errorf("request to coordinator failed, err: %w", err)
  166. }
  167. if repWriteResp.IsFailed() {
  168. return fmt.Errorf("coordinator RepWrite failed, code: %s, message: %s", repWriteResp.ErrorCode, repWriteResp.ErrorMessage)
  169. }
  170. if len(repWriteResp.Body.Nodes) == 0 {
  171. return fmt.Errorf("no node to upload file")
  172. }
  173. uploadNode := svc.chooseUploadNode(repWriteResp.Body.Nodes)
  174. var fileHash string
  175. uploadedNodeIDs := []int{}
  176. uploadToNode := true
  177. // 本地有IPFS,则直接从本地IPFS上传
  178. if svc.ipfs != nil {
  179. log.Infof("try to use local IPFS to upload file")
  180. fileHash, err = svc.uploadToLocalIPFS(file, uploadNode.ID)
  181. if err != nil {
  182. log.Warnf("upload to local IPFS failed, so try to upload to node %d, err: %s", uploadNode.ID, err.Error())
  183. } else {
  184. uploadToNode = false
  185. }
  186. }
  187. // 否则发送到agent上传
  188. if uploadToNode {
  189. // 如果客户端与节点在同一个地域,则使用内网地址连接节点
  190. nodeIP := uploadNode.ExternalIP
  191. if uploadNode.IsSameLocation {
  192. nodeIP = uploadNode.LocalIP
  193. log.Infof("client and node %d are at the same location, use local ip\n", uploadNode.ID)
  194. }
  195. mutex, err := reqbuilder.NewBuilder().
  196. // 防止上传的副本被清除
  197. IPFS().CreateAnyRep(uploadNode.ID).
  198. MutexLock(svc.distlock)
  199. if err != nil {
  200. return fmt.Errorf("acquire locks failed, err: %w", err)
  201. }
  202. defer mutex.Unlock()
  203. fileHash, err = svc.uploadToNode(file, nodeIP)
  204. if err != nil {
  205. return fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err)
  206. }
  207. uploadedNodeIDs = append(uploadedNodeIDs, uploadNode.ID)
  208. }
  209. // 记录写入的文件的Hash
  210. createObjectResp, err := svc.coordinator.CreateRepObject(coormsg.NewCreateRepObjectBody(bucketID, objectName, fileSize, repCount, userID, uploadedNodeIDs, fileHash))
  211. if err != nil {
  212. return fmt.Errorf("request to coordinator failed, err: %w", err)
  213. }
  214. if createObjectResp.IsFailed() {
  215. return fmt.Errorf("coordinator CreateRepObject failed, code: %s, message: %s", createObjectResp.ErrorCode, createObjectResp.ErrorMessage)
  216. }
  217. return nil
  218. }
  219. func (svc *ObjectService) uploadToNode(file io.ReadCloser, nodeIP string) (string, error) {
  220. // 建立grpc连接,发送请求
  221. grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort)
  222. grpcCon, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  223. if err != nil {
  224. return "", fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
  225. }
  226. defer grpcCon.Close()
  227. client := agentcaller.NewFileTransportClient(grpcCon)
  228. upload, err := mygrpc.SendFileAsStream(client)
  229. if err != nil {
  230. return "", fmt.Errorf("request to send file failed, err: %w", err)
  231. }
  232. // 发送文件数据
  233. _, err = io.Copy(upload, file)
  234. if err != nil {
  235. // 发生错误则关闭连接
  236. upload.Abort(io.ErrClosedPipe)
  237. return "", fmt.Errorf("copy file date to upload stream failed, err: %w", err)
  238. }
  239. // 发送EOF消息,并获得FileHash
  240. fileHash, err := upload.Finish()
  241. if err != nil {
  242. upload.Abort(io.ErrClosedPipe)
  243. return "", fmt.Errorf("send EOF failed, err: %w", err)
  244. }
  245. return fileHash, nil
  246. }
  247. func (svc *ObjectService) uploadToLocalIPFS(file io.ReadCloser, nodeID int) (string, error) {
  248. // 从本地IPFS上传文件
  249. writer, err := svc.ipfs.CreateFile()
  250. if err != nil {
  251. return "", fmt.Errorf("create IPFS file failed, err: %w", err)
  252. }
  253. _, err = io.Copy(writer, file)
  254. if err != nil {
  255. return "", fmt.Errorf("copy file data to IPFS failed, err: %w", err)
  256. }
  257. fileHash, err := writer.Finish()
  258. if err != nil {
  259. return "", fmt.Errorf("finish writing IPFS failed, err: %w", err)
  260. }
  261. // 然后让最近节点pin本地上传的文件
  262. agentClient, err := agtcli.NewClient(nodeID, &config.Cfg().RabbitMQ)
  263. if err != nil {
  264. return "", fmt.Errorf("create agent client to %d failed, err: %w", nodeID, err)
  265. }
  266. defer agentClient.Close()
  267. pinObjResp, err := agentClient.PinObject(agtmsg.NewPinObjectBody(fileHash))
  268. if err != nil {
  269. return "", fmt.Errorf("request to agent %d failed, err: %w", nodeID, err)
  270. }
  271. if pinObjResp.IsFailed() {
  272. return "", fmt.Errorf("agent %d PinObject failed, code: %s, message: %s", nodeID, pinObjResp.ErrorCode, pinObjResp.ErrorMessage)
  273. }
  274. return fileHash, nil
  275. }
  276. // chooseUploadNode 选择一个上传文件的节点
  277. // 1. 从与当前客户端相同地域的节点中随机选一个
  278. // 2. 没有用的话从所有节点中随机选一个
  279. func (svc *ObjectService) chooseUploadNode(nodes []ramsg.RespNode) ramsg.RespNode {
  280. sameLocationNodes := lo.Filter(nodes, func(e ramsg.RespNode, i int) bool { return e.IsSameLocation })
  281. if len(sameLocationNodes) > 0 {
  282. return sameLocationNodes[rand.Intn(len(sameLocationNodes))]
  283. }
  284. return nodes[rand.Intn(len(nodes))]
  285. }
  286. func (svc *ObjectService) UploadECObject(userID int, file io.ReadCloser, fileSize int64, ecName string) error {
  287. // TODO
  288. panic("not implement yet")
  289. }
  290. func (svc *ObjectService) UpdateRepObject(userID int, objectID int, file io.ReadCloser, fileSize int64) error {
  291. mutex, err := reqbuilder.NewBuilder().
  292. Metadata().
  293. // 用于判断用户是否有对象的权限
  294. UserBucket().ReadAny().
  295. // 用于读取、修改对象信息
  296. Object().WriteOne(objectID).
  297. // 用于更新Rep配置
  298. ObjectRep().WriteOne(objectID).
  299. // 用于查询可用的上传节点
  300. Node().ReadAny().
  301. // 用于创建Cache记录
  302. Cache().CreateAny().
  303. // 用于修改Move此Object的记录的状态
  304. StorageObject().WriteAny().
  305. MutexLock(svc.distlock)
  306. if err != nil {
  307. return fmt.Errorf("acquire locks failed, err: %w", err)
  308. }
  309. defer mutex.Unlock()
  310. preResp, err := svc.coordinator.PreUpdateRepObject(coormsg.NewPreUpdateRepObjectBody(
  311. objectID,
  312. fileSize,
  313. userID,
  314. config.Cfg().ExternalIP,
  315. ))
  316. if err != nil {
  317. return fmt.Errorf("request to coordinator failed, err: %w", err)
  318. }
  319. if preResp.IsFailed() {
  320. return fmt.Errorf("coordinator PreUpdateRepObject failed, code: %s, message: %s", preResp.ErrorCode, preResp.ErrorMessage)
  321. }
  322. if len(preResp.Body.Nodes) == 0 {
  323. return fmt.Errorf("no node to upload file")
  324. }
  325. // 上传文件的方式优先级:
  326. // 1. 本地IPFS
  327. // 2. 包含了旧文件,且与客户端在同地域的节点
  328. // 3. 不在同地域,但包含了旧文件的节点
  329. // 4. 同地域节点
  330. uploadNode := svc.chooseUpdateRepObjectNode(preResp.Body.Nodes)
  331. var fileHash string
  332. uploadedNodeIDs := []int{}
  333. uploadToNode := true
  334. // 本地有IPFS,则直接从本地IPFS上传
  335. if svc.ipfs != nil {
  336. log.Infof("try to use local IPFS to upload file")
  337. fileHash, err = svc.uploadToLocalIPFS(file, uploadNode.ID)
  338. if err != nil {
  339. log.Warnf("upload to local IPFS failed, so try to upload to node %d, err: %s", uploadNode.ID, err.Error())
  340. } else {
  341. uploadToNode = false
  342. }
  343. }
  344. // 否则发送到agent上传
  345. if uploadToNode {
  346. // 如果客户端与节点在同一个地域,则使用内网地址连接节点
  347. nodeIP := uploadNode.ExternalIP
  348. if uploadNode.IsSameLocation {
  349. nodeIP = uploadNode.LocalIP
  350. log.Infof("client and node %d are at the same location, use local ip\n", uploadNode.ID)
  351. }
  352. mutex, err := reqbuilder.NewBuilder().
  353. IPFS().
  354. // 防止上传的副本被清除
  355. CreateAnyRep(uploadNode.ID).
  356. MutexLock(svc.distlock)
  357. if err != nil {
  358. return fmt.Errorf("acquire locks failed, err: %w", err)
  359. }
  360. defer mutex.Unlock()
  361. fileHash, err = svc.uploadToNode(file, nodeIP)
  362. if err != nil {
  363. return fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err)
  364. }
  365. uploadedNodeIDs = append(uploadedNodeIDs, uploadNode.ID)
  366. }
  367. // 更新Object
  368. updateResp, err := svc.coordinator.UpdateRepObject(coormsg.NewUpdateRepObjectBody(objectID, fileHash, fileSize, uploadedNodeIDs, userID))
  369. if err != nil {
  370. return fmt.Errorf("request to coordinator failed, err: %w", err)
  371. }
  372. if updateResp.IsFailed() {
  373. return fmt.Errorf("coordinator UpdateRepObject failed, code: %s, message: %s", updateResp.ErrorCode, updateResp.ErrorMessage)
  374. }
  375. return nil
  376. }
  377. func (svc *ObjectService) chooseUpdateRepObjectNode(nodes []coormsg.PreUpdateRepObjectRespNode) coormsg.PreUpdateRepObjectRespNode {
  378. mysort.Sort(nodes, func(left, right coormsg.PreUpdateRepObjectRespNode) int {
  379. v := -mysort.CmpBool(left.HasOldObject, right.HasOldObject)
  380. if v != 0 {
  381. return v
  382. }
  383. return -mysort.CmpBool(left.IsSameLocation, right.IsSameLocation)
  384. })
  385. return nodes[0]
  386. }
  387. func (svc *ObjectService) DeleteObject(userID int, objectID int) error {
  388. mutex, err := reqbuilder.NewBuilder().
  389. Metadata().
  390. // 用于判断用户是否有对象的权限
  391. UserBucket().ReadAny().
  392. // 用于读取、修改对象信息
  393. Object().WriteOne(objectID).
  394. // 用于删除Rep配置
  395. ObjectRep().WriteOne(objectID).
  396. // 用于删除Block配置
  397. ObjectBlock().WriteAny().
  398. // 用于修改Move此Object的记录的状态
  399. StorageObject().WriteAny().
  400. MutexLock(svc.distlock)
  401. if err != nil {
  402. return fmt.Errorf("acquire locks failed, err: %w", err)
  403. }
  404. defer mutex.Unlock()
  405. resp, err := svc.coordinator.DeleteObject(coormsg.NewDeleteObjectBody(userID, objectID))
  406. if err != nil {
  407. return fmt.Errorf("request to coordinator failed, err: %w", err)
  408. }
  409. if resp.IsFailed() {
  410. return fmt.Errorf("create bucket objects failed, code: %s, message: %s", resp.ErrorCode, resp.ErrorMessage)
  411. }
  412. return nil
  413. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。