You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

update_storage.go 2.2 kB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package event
  2. import (
  3. tskcst "gitlink.org.cn/cloudream/common/consts/event"
  4. "gitlink.org.cn/cloudream/common/utils/logger"
  5. mysql "gitlink.org.cn/cloudream/db/sql"
  6. scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
  7. )
  8. type UpdateStorageEntry = scevt.UpdateStorageEntry
  9. type UpdateStorage struct {
  10. scevt.UpdateStorage
  11. }
  12. func NewUpdateStorage(dirState string, entries []UpdateStorageEntry) *UpdateStorage {
  13. return &UpdateStorage{
  14. UpdateStorage: scevt.NewUpdateStorage(dirState, entries),
  15. }
  16. }
  17. func (t *UpdateStorage) TryMerge(other Event) bool {
  18. event, ok := other.(*UpdateStorage)
  19. if !ok {
  20. return false
  21. }
  22. if event.StorageID != t.StorageID {
  23. return false
  24. }
  25. // 后投递的任务的状态更新一些
  26. t.DirectoryState = event.DirectoryState
  27. // TODO 可以考虑合并同FileHash和NodeID的记录
  28. t.Entries = append(t.Entries, event.Entries...)
  29. return true
  30. }
  31. func (t *UpdateStorage) Execute(execCtx ExecuteContext) {
  32. err := mysql.Storage.ChangeState(execCtx.Args.DB.SQLCtx(), t.StorageID, t.DirectoryState)
  33. if err != nil {
  34. logger.WithField("StorageID", t.StorageID).Warnf("change storage state failed, err: %s", err.Error())
  35. }
  36. var chkObjIDs []int
  37. for _, entry := range t.Entries {
  38. switch entry.Operation {
  39. case tskcst.UPDATE_STORAGE_DELETE:
  40. err := mysql.StorageObject.Delete(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID, entry.UserID)
  41. if err != nil {
  42. logger.WithField("StorageID", t.StorageID).
  43. WithField("ObjectID", entry.ObjectID).
  44. Warnf("delete storage object failed, err: %s", err.Error())
  45. }
  46. chkObjIDs = append(chkObjIDs, entry.ObjectID)
  47. case tskcst.UPDATE_STORAGE_SET_NORMAL:
  48. err := mysql.StorageObject.SetStateNormal(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID, entry.UserID)
  49. if err != nil {
  50. logger.WithField("StorageID", t.StorageID).
  51. WithField("ObjectID", entry.ObjectID).
  52. Warnf("change storage object state failed, err: %s", err.Error())
  53. }
  54. }
  55. }
  56. if len(chkObjIDs) > 0 {
  57. execCtx.Executor.Post(NewCheckObject(chkObjIDs))
  58. }
  59. }
  60. func init() {
  61. Register(func(msg UpdateStorage) Event { return NewUpdateStorage(msg.DirectoryState, msg.Entries) })
  62. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。