You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

blocktransfer.go 8.3 kB


  1. package models
  2. import (
  3. "errors"
  4. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  5. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  6. "gorm.io/gorm"
  7. "log"
  8. "strconv"
  9. "time"
  10. )
  11. type StorageTransferCount struct {
  12. RelationshipID int64 `gorm:"column:RelationshipID; primaryKey; type:bigint; autoIncrement" json:"relationshipID"`
  13. ObjectID int64 `gorm:"column:ObjectID; type:bigint; not null" json:"objectID"`
  14. Status int64 `gorm:"column:Status; type:bigint; not null" json:"status"` // 连线左侧的状态
  15. SourceStorageID int64 `gorm:"column:SourceStorageID; type:bigint; not null" json:"sourceStorageID"` // 源存储节点 ID
  16. TargetStorageID int64 `gorm:"column:TargetStorageID; type:bigint; not null" json:"targetStorageID"` // 目标存储节点 ID
  17. DataTransferCount int64 `gorm:"column:DataTransferCount; type:bigint; not null" json:"dataTransferCount"` // 数据传输量
  18. Timestamp time.Time `gorm:"column:Timestamp; type:datatime; not null" json:"timestamp"` // 变化结束时间戳
  19. }
  20. func (StorageTransferCount) TableName() string {
  21. return "storagetransfercount"
  22. }
  23. type StorageTransferCountRepository struct {
  24. repo *GormRepository
  25. }
  26. func NewStorageTransferCountRepository(db *gorm.DB) *StorageTransferCountRepository {
  27. return &StorageTransferCountRepository{repo: NewGormRepository(db)}
  28. }
  29. func (r *StorageTransferCountRepository) CreateStorageTransferCount(storageTransferCount *StorageTransferCount) error {
  30. return r.repo.Create(storageTransferCount)
  31. }
  32. func (r *StorageTransferCountRepository) UpdateStorageTransferCount(storageTransferCount *StorageTransferCount) error {
  33. return r.repo.Update(storageTransferCount)
  34. }
  35. func (r *StorageTransferCountRepository) GetStorageTransferCountByID(id int) (*StorageTransferCount, error) {
  36. var storageTransferCount StorageTransferCount
  37. err := r.repo.GetByID(uint(id), &storageTransferCount)
  38. if err != nil {
  39. return nil, err
  40. }
  41. return &storageTransferCount, nil
  42. }
  43. func (r *StorageTransferCountRepository) GetStorageTransferCountByObjectID(objectID int64) ([]StorageTransferCount, error) {
  44. var storageTransferCounts []StorageTransferCount
  45. query := "SELECT * FROM storagetransfercount WHERE ObjectID = ?"
  46. err := r.repo.db.Raw(query, objectID).Scan(&storageTransferCounts).Error
  47. if err != nil {
  48. return nil, err
  49. }
  50. return storageTransferCounts, nil
  51. }
  52. func (r *StorageTransferCountRepository) GetAllStorageTransferCounts() ([]StorageTransferCount, error) {
  53. var storageTransferCounts []StorageTransferCount
  54. err := r.repo.GetAll(&storageTransferCounts)
  55. if err != nil {
  56. return nil, err
  57. }
  58. return storageTransferCounts, nil
  59. }
  60. // ProcessBlockTransfer 处理块传输信息的实时日志,当对象的块信息发生变化时推送触发数据库刷新
  61. func ProcessBlockTransfer(data stgmod.BlockTransfer) {
  62. repoDist := NewBlockDistributionRepository(DB)
  63. repoStorage := NewStorageRepository(DB)
  64. repoStorageTrans := NewStorageTransferCountRepository(DB)
  65. repoObject := NewObjectRepository(DB)
  66. for _, change := range data.Body.BlockChanges {
  67. objectID, _ := strconv.ParseInt(data.Body.ObjectID, 10, 64)
  68. object, _ := repoObject.GetObjectByID(objectID)
  69. index, _ := strconv.ParseInt(change.Index, 10, 64)
  70. sourceStorageID, _ := strconv.ParseInt(change.SourceStorageID, 10, 64)
  71. targetStorageID, _ := strconv.ParseInt(change.TargetStorageID, 10, 64)
  72. newDataCount, _ := strconv.ParseInt(change.DataTransferCount, 10, 64)
  73. switch change.Type {
  74. case "0": //拷贝
  75. //查询出存储在数据库中的BlockDistribution信息
  76. blockSource, errSource := repoDist.GetBlockDistributionByIndex(objectID, index, sourceStorageID)
  77. //没有记录就将source和target的信息都保存到库中
  78. if errors.Is(errSource, gorm.ErrRecordNotFound) {
  79. err := repoDist.CreateBlockDistribution(&BlockDistribution{
  80. ObjectID: objectID,
  81. Type: change.BlockType,
  82. Index: index,
  83. StorageID: sourceStorageID,
  84. Status: StatusNow,
  85. Timestamp: time.Now(),
  86. })
  87. if err != nil {
  88. log.Printf("Error create source blockdistribution: %v", err)
  89. }
  90. } else {
  91. //有数据则新增一条storageID为targetStorageID的记录,同时更新状态
  92. err := repoDist.CreateBlockDistribution(&BlockDistribution{
  93. ObjectID: blockSource.ObjectID,
  94. Type: change.BlockType,
  95. Index: index,
  96. StorageID: targetStorageID,
  97. Status: StatusNow,
  98. Timestamp: time.Now(),
  99. })
  100. if err != nil {
  101. log.Printf("Error update blockdistribution: %v", err)
  102. }
  103. //复制完成之后增加的dataCount要加到targetStorage的记录中
  104. storageOld, err := repoStorage.GetStorageByID(targetStorageID)
  105. if errors.Is(err, gorm.ErrRecordNotFound) {
  106. err = repoStorage.CreateStorage(&Storage{
  107. StorageID: cdssdk.StorageID(targetStorageID),
  108. DataCount: newDataCount,
  109. Timestamp: time.Now(),
  110. })
  111. if err != nil {
  112. log.Printf("Error increase datacount in targetstorage: %v", err)
  113. }
  114. } else {
  115. err = repoStorage.UpdateStorage(&Storage{
  116. StorageID: cdssdk.StorageID(targetStorageID),
  117. DataCount: storageOld.DataCount + newDataCount,
  118. Timestamp: time.Now(),
  119. })
  120. if err != nil {
  121. log.Printf("Error increase datacount in targetstorage: %v", err)
  122. }
  123. }
  124. }
  125. //新增记录到storageTransferCount表中
  126. err := repoStorageTrans.CreateStorageTransferCount(&StorageTransferCount{
  127. ObjectID: objectID,
  128. Status: int64(blockSource.Status),
  129. SourceStorageID: sourceStorageID,
  130. TargetStorageID: targetStorageID,
  131. DataTransferCount: newDataCount,
  132. Timestamp: time.Now(),
  133. })
  134. if err != nil {
  135. log.Printf("Error create StorageTransferCount : %v", err)
  136. }
  137. case "1": //编解码
  138. //删除所有的sourceBlock
  139. for _, sourceBlock := range change.SourceBlocks {
  140. sourceBlockIndex, _ := strconv.ParseInt(sourceBlock.Index, 10, 64)
  141. err := repoDist.DeleteBlockDistribution(objectID, sourceBlockIndex, sourceStorageID)
  142. if err != nil {
  143. log.Printf("Error delete blockdistribution: %v", err)
  144. }
  145. }
  146. //插入所有的targetBlock
  147. for _, targetBlock := range change.TargetBlocks {
  148. storageID, _ := strconv.ParseInt(targetBlock.StorageID, 10, 64)
  149. err := repoDist.CreateBlockDistribution(&BlockDistribution{
  150. ObjectID: objectID,
  151. Type: targetBlock.BlockType,
  152. Index: index,
  153. //直接保存到目标中心
  154. StorageID: storageID,
  155. Status: StatusNow,
  156. Timestamp: time.Now(),
  157. })
  158. if err != nil {
  159. log.Printf("Error create blockdistribution: %v", err)
  160. }
  161. }
  162. //新增记录到storageTransferCount表中
  163. err := repoStorageTrans.CreateStorageTransferCount(&StorageTransferCount{
  164. ObjectID: objectID,
  165. Status: int64(object.Status),
  166. SourceStorageID: sourceStorageID,
  167. TargetStorageID: targetStorageID,
  168. DataTransferCount: newDataCount,
  169. Timestamp: time.Now(),
  170. })
  171. if err != nil {
  172. log.Printf("Error create StorageTransferCount : %v", err)
  173. }
  174. case "2": //删除
  175. for _, block := range change.Blocks {
  176. storageID, _ := strconv.ParseInt(block.StorageID, 10, 64)
  177. changeIndex, _ := strconv.ParseInt(block.Index, 10, 64)
  178. err := repoDist.DeleteBlockDistribution(objectID, changeIndex, storageID)
  179. if err != nil {
  180. log.Printf("Error delete blockdistribution: %v", err)
  181. }
  182. }
  183. case "3": //更新
  184. for _, blockUpdate := range change.Blocks {
  185. //查询出存储在数据库中的BlockDistribution信息
  186. blockIndex, _ := strconv.ParseInt(blockUpdate.Index, 10, 64)
  187. blockOld, err := repoDist.GetBlockDistributionByIndex(objectID, blockIndex, sourceStorageID)
  188. newStorageID, _ := strconv.ParseInt(blockUpdate.StorageID, 10, 64)
  189. err = repoDist.UpdateBlockDistribution(&BlockDistribution{
  190. BlockID: blockOld.BlockID,
  191. ObjectID: blockOld.ObjectID,
  192. Type: blockUpdate.BlockType,
  193. Index: blockIndex,
  194. StorageID: newStorageID,
  195. Status: StatusNow,
  196. Timestamp: time.Now(),
  197. })
  198. if err != nil {
  199. log.Printf("Error delete blockdistribution: %v", err)
  200. }
  201. }
  202. default:
  203. break
  204. }
  205. }
  206. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。