You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

blocktransfer.go 8.6 kB


  1. package models
  2. import (
  3. "errors"
  4. "log"
  5. "strconv"
  6. "time"
  7. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  8. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  9. "gitlink.org.cn/cloudream/storage/common/pkgs/sysevent"
  10. "gorm.io/gorm"
  11. )
  12. type StorageTransferCount struct {
  13. RelationshipID int64 `gorm:"column:RelationshipID; primaryKey; type:bigint; autoIncrement" json:"relationshipID"`
  14. ObjectID int64 `gorm:"column:ObjectID; type:bigint; not null" json:"objectID"`
  15. Status int64 `gorm:"column:Status; type:bigint; not null" json:"status"` // 连线左侧的状态
  16. SourceStorageID int64 `gorm:"column:SourceStorageID; type:bigint; not null" json:"sourceStorageID"` // 源存储节点 ID
  17. TargetStorageID int64 `gorm:"column:TargetStorageID; type:bigint; not null" json:"targetStorageID"` // 目标存储节点 ID
  18. DataTransferCount int64 `gorm:"column:DataTransferCount; type:bigint; not null" json:"dataTransferCount"` // 数据传输量
  19. Timestamp time.Time `gorm:"column:Timestamp; type:datatime; not null" json:"timestamp"` // 变化结束时间戳
  20. }
  21. func (StorageTransferCount) TableName() string {
  22. return "storagetransfercount"
  23. }
  24. type StorageTransferCountRepository struct {
  25. repo *GormRepository
  26. }
  27. func NewStorageTransferCountRepository(db *gorm.DB) *StorageTransferCountRepository {
  28. return &StorageTransferCountRepository{repo: NewGormRepository(db)}
  29. }
  30. func (r *StorageTransferCountRepository) CreateStorageTransferCount(storageTransferCount *StorageTransferCount) error {
  31. return r.repo.Create(storageTransferCount)
  32. }
  33. func (r *StorageTransferCountRepository) UpdateStorageTransferCount(storageTransferCount *StorageTransferCount) error {
  34. return r.repo.Update(storageTransferCount)
  35. }
  36. func (r *StorageTransferCountRepository) GetStorageTransferCountByID(id int) (*StorageTransferCount, error) {
  37. var storageTransferCount StorageTransferCount
  38. err := r.repo.GetByID(uint(id), &storageTransferCount)
  39. if err != nil {
  40. return nil, err
  41. }
  42. return &storageTransferCount, nil
  43. }
  44. func (r *StorageTransferCountRepository) GetStorageTransferCountByObjectID(objectID int64) ([]StorageTransferCount, error) {
  45. var storageTransferCounts []StorageTransferCount
  46. query := "SELECT * FROM storagetransfercount WHERE ObjectID = ?"
  47. err := r.repo.db.Raw(query, objectID).Scan(&storageTransferCounts).Error
  48. if err != nil {
  49. return nil, err
  50. }
  51. return storageTransferCounts, nil
  52. }
  53. func (r *StorageTransferCountRepository) GetAllStorageTransferCounts() ([]StorageTransferCount, error) {
  54. var storageTransferCounts []StorageTransferCount
  55. err := r.repo.GetAll(&storageTransferCounts)
  56. if err != nil {
  57. return nil, err
  58. }
  59. return storageTransferCounts, nil
  60. }
  61. type BlockTransferWatcher struct {
  62. Name string
  63. }
  64. func (w *BlockTransferWatcher) OnEvent(event sysevent.SysEvent) {
  65. body, ok := event.Body.(*stgmod.BodyBlockTransfer)
  66. if !ok {
  67. return
  68. }
  69. repoDist := NewBlockDistributionRepository(DB)
  70. repoStorage := NewStorageRepository(DB)
  71. repoStorageTrans := NewStorageTransferCountRepository(DB)
  72. repoObject := NewObjectRepository(DB)
  73. for _, change := range body.BlockChanges {
  74. objectID, _ := strconv.ParseInt(string(body.ObjectID), 10, 64)
  75. object, _ := repoObject.GetObjectByID(objectID)
  76. // index, _ := strconv.ParseInt(change.Index, 10, 64)
  77. // sourceStorageID, _ := strconv.ParseInt(string(change.SourceStorageID), 10, 64)
  78. // targetStorageID, _ := strconv.ParseInt(string(change.TargetStorageID), 10, 64)
  79. // newDataCount, _ := strconv.ParseInt(change.DataTransferCount, 10, 64)
  80. switch change := change.(type) {
  81. case *stgmod.BlockChangeClone: //拷贝
  82. // TODO 从change中获取index, sourceStorageID, targetStorageID, newDataCount,下同
  83. //查询出存储在数据库中的BlockDistribution信息
  84. blockSource, errSource := repoDist.GetBlockDistributionByIndex(objectID, index, sourceStorageID)
  85. //没有记录就将source和target的信息都保存到库中
  86. if errors.Is(errSource, gorm.ErrRecordNotFound) {
  87. err := repoDist.CreateBlockDistribution(&BlockDistribution{
  88. ObjectID: objectID,
  89. Type: change.BlockType,
  90. Index: index,
  91. StorageID: sourceStorageID,
  92. Status: StatusNow,
  93. Timestamp: time.Now(),
  94. })
  95. if err != nil {
  96. log.Printf("Error create source blockdistribution: %v", err)
  97. }
  98. } else {
  99. //有数据则新增一条storageID为targetStorageID的记录,同时更新状态
  100. err := repoDist.CreateBlockDistribution(&BlockDistribution{
  101. ObjectID: blockSource.ObjectID,
  102. Type: change.BlockType,
  103. Index: index,
  104. StorageID: targetStorageID,
  105. Status: StatusNow,
  106. Timestamp: time.Now(),
  107. })
  108. if err != nil {
  109. log.Printf("Error update blockdistribution: %v", err)
  110. }
  111. //复制完成之后增加的dataCount要加到targetStorage的记录中
  112. storageOld, err := repoStorage.GetStorageByID(targetStorageID)
  113. if errors.Is(err, gorm.ErrRecordNotFound) {
  114. err = repoStorage.CreateStorage(&Storage{
  115. StorageID: cdssdk.StorageID(targetStorageID),
  116. DataCount: newDataCount,
  117. Timestamp: time.Now(),
  118. })
  119. if err != nil {
  120. log.Printf("Error increase datacount in targetstorage: %v", err)
  121. }
  122. } else {
  123. err = repoStorage.UpdateStorage(&Storage{
  124. StorageID: cdssdk.StorageID(targetStorageID),
  125. DataCount: storageOld.DataCount + newDataCount,
  126. Timestamp: time.Now(),
  127. })
  128. if err != nil {
  129. log.Printf("Error increase datacount in targetstorage: %v", err)
  130. }
  131. }
  132. }
  133. //新增记录到storageTransferCount表中
  134. err := repoStorageTrans.CreateStorageTransferCount(&StorageTransferCount{
  135. ObjectID: objectID,
  136. Status: int64(blockSource.Status),
  137. SourceStorageID: sourceStorageID,
  138. TargetStorageID: targetStorageID,
  139. DataTransferCount: newDataCount,
  140. Timestamp: time.Now(),
  141. })
  142. if err != nil {
  143. log.Printf("Error create StorageTransferCount : %v", err)
  144. }
  145. case *stgmod.BlockChangeEnDecode: //编解码
  146. //删除所有的sourceBlock
  147. for _, sourceBlock := range change.SourceBlocks {
  148. sourceBlockIndex, _ := strconv.ParseInt(sourceBlock.Index, 10, 64)
  149. err := repoDist.DeleteBlockDistribution(objectID, sourceBlockIndex, sourceStorageID)
  150. if err != nil {
  151. log.Printf("Error delete blockdistribution: %v", err)
  152. }
  153. }
  154. //插入所有的targetBlock
  155. for _, targetBlock := range change.TargetBlocks {
  156. storageID, _ := strconv.ParseInt(string(targetBlock.StorageID), 10, 64)
  157. err := repoDist.CreateBlockDistribution(&BlockDistribution{
  158. ObjectID: objectID,
  159. Type: targetBlock.BlockType,
  160. Index: index,
  161. //直接保存到目标中心
  162. StorageID: storageID,
  163. Status: StatusNow,
  164. Timestamp: time.Now(),
  165. })
  166. if err != nil {
  167. log.Printf("Error create blockdistribution: %v", err)
  168. }
  169. }
  170. //新增记录到storageTransferCount表中
  171. err := repoStorageTrans.CreateStorageTransferCount(&StorageTransferCount{
  172. ObjectID: objectID,
  173. Status: int64(object.Status),
  174. SourceStorageID: sourceStorageID,
  175. TargetStorageID: targetStorageID,
  176. DataTransferCount: newDataCount,
  177. Timestamp: time.Now(),
  178. })
  179. if err != nil {
  180. log.Printf("Error create StorageTransferCount : %v", err)
  181. }
  182. case *stgmod.BlockChangeDeleted: //删除
  183. storageID, _ := strconv.ParseInt(string(change.StorageID), 10, 64)
  184. changeIndex, _ := strconv.ParseInt(change.Index, 10, 64)
  185. err := repoDist.DeleteBlockDistribution(objectID, changeIndex, storageID)
  186. if err != nil {
  187. log.Printf("Error delete blockdistribution: %v", err)
  188. }
  189. // case *stgmod.BlockChangeUpdated: //更新
  190. // for _, blockUpdate := range change.Blocks {
  191. // //查询出存储在数据库中的BlockDistribution信息
  192. // blockIndex, _ := strconv.ParseInt(blockUpdate.Index, 10, 64)
  193. // blockOld, err := repoDist.GetBlockDistributionByIndex(objectID, blockIndex, sourceStorageID)
  194. // newStorageID, _ := strconv.ParseInt(string(blockUpdate.StorageID), 10, 64)
  195. // err = repoDist.UpdateBlockDistribution(&BlockDistribution{
  196. // BlockID: blockOld.BlockID,
  197. // ObjectID: blockOld.ObjectID,
  198. // Type: blockUpdate.BlockType,
  199. // Index: blockIndex,
  200. // StorageID: newStorageID,
  201. // Status: StatusNow,
  202. // Timestamp: time.Now(),
  203. // })
  204. // if err != nil {
  205. // log.Printf("Error delete blockdistribution: %v", err)
  206. // }
  207. // }
  208. default:
  209. break
  210. }
  211. }
  212. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。