diff --git a/internal/event/agent_check_storage.go b/internal/event/agent_check_storage.go index 6b2808a..b4d29e1 100644 --- a/internal/event/agent_check_storage.go +++ b/internal/event/agent_check_storage.go @@ -11,18 +11,17 @@ import ( agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event" + scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" "gitlink.org.cn/cloudream/scanner/internal/config" ) type AgentCheckStorage struct { - StorageID int - ObjectIDs []int // 需要检查的Object文件列表,如果为nil(不是为空),则代表进行全量检查 + scevt.AgentCheckStorage } func NewAgentCheckStorage(storageID int, objectIDs []int) *AgentCheckStorage { return &AgentCheckStorage{ - StorageID: storageID, - ObjectIDs: objectIDs, + AgentCheckStorage: scevt.NewAgentCheckStorage(storageID, objectIDs), } } diff --git a/internal/event/check_object.go b/internal/event/check_object.go index 1d72c29..448e109 100644 --- a/internal/event/check_object.go +++ b/internal/event/check_object.go @@ -4,15 +4,16 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/utils/logger" mysql "gitlink.org.cn/cloudream/db/sql" + scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" ) type CheckObject struct { - ObjectIDs []int + scevt.CheckObject } func NewCheckObject(objIDs []int) *CheckObject { return &CheckObject{ - ObjectIDs: objIDs, + CheckObject: scevt.NewCheckObject(objIDs), } } diff --git a/internal/event/check_rep_count.go b/internal/event/check_rep_count.go index 69bcdf4..9fe012a 100644 --- a/internal/event/check_rep_count.go +++ b/internal/event/check_rep_count.go @@ -15,15 +15,16 @@ import ( "gitlink.org.cn/cloudream/db/model" mysql "gitlink.org.cn/cloudream/db/sql" + scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" ) type CheckRepCount struct { - FileHashes []string + scevt.CheckRepCount } func NewCheckRepCount(fileHashes []string) *CheckRepCount { return &CheckRepCount{ - FileHashes: fileHashes, + CheckRepCount: scevt.NewCheckRepCount(fileHashes), } } diff --git a/internal/event/update_agent_state.go b/internal/event/update_agent_state.go index 24d6f46..613fee9 100644 --- a/internal/event/update_agent_state.go +++ b/internal/event/update_agent_state.go @@ -4,11 +4,17 @@ import ( "gitlink.org.cn/cloudream/common/consts" "gitlink.org.cn/cloudream/common/utils/logger" mysql "gitlink.org.cn/cloudream/db/sql" + scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" ) type UpdateAgentState struct { - NodeID int - IPFSStatus string + scevt.UpdateAgentState +} + +func NewUpdateAgentState(nodeID int, ipfsState string) *UpdateAgentState { + return &UpdateAgentState{ + UpdateAgentState: scevt.NewUpdateAgentState(nodeID, ipfsState), + } } func (t *UpdateAgentState) TryMerge(other Event) bool { @@ -16,8 +22,8 @@ func (t *UpdateAgentState) TryMerge(other Event) bool { } func (t *UpdateAgentState) Execute(execCtx ExecuteContext) { - if t.IPFSStatus != consts.IPFS_STATUS_OK { - logger.WithField("NodeID", t.NodeID).Warnf("IPFS status is %s, set node state unavailable", t.IPFSStatus) + if t.IPFSState != consts.IPFS_STATUS_OK { + logger.WithField("NodeID", t.NodeID).Warnf("IPFS status is %s, set node state unavailable", t.IPFSState) err := mysql.Node.ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE) if err != nil { diff --git a/internal/event/update_cache.go b/internal/event/update_cache.go index 8d9598d..3dab9de 100644 --- a/internal/event/update_cache.go +++ b/internal/event/update_cache.go @@ -4,29 +4,18 @@ import ( evtcst "gitlink.org.cn/cloudream/common/consts/event" "gitlink.org.cn/cloudream/common/utils/logger" mysql "gitlink.org.cn/cloudream/db/sql" + scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" ) -type UpdateCacheEntry struct { - FileHash string - Operation string -} - -func NewUpdateCacheEntry(fileHash string, op string) UpdateCacheEntry { - return UpdateCacheEntry{ - FileHash: fileHash, - Operation: op, - } -} +type UpdateCacheEntry = scevt.UpdateCacheEntry type UpdateCache struct { - NodeID int - Entries []UpdateCacheEntry + scevt.UpdateCache } -func NewUpdateCache(nodeID int, entries []UpdateCacheEntry) UpdateCache { +func NewUpdateCache(nodeID int, entries []scevt.UpdateCacheEntry) UpdateCache { return UpdateCache{ - NodeID: nodeID, - Entries: entries, + UpdateCache: scevt.NewUpdateCache(nodeID, entries), } } diff --git a/internal/event/update_storage.go b/internal/event/update_storage.go index a133364..4e82871 100644 --- a/internal/event/update_storage.go +++ b/internal/event/update_storage.go @@ -4,32 +4,17 @@ import ( tskcst "gitlink.org.cn/cloudream/common/consts/event" "gitlink.org.cn/cloudream/common/utils/logger" mysql "gitlink.org.cn/cloudream/db/sql" + scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" ) -type UpdateStorageEntry struct { - ObjectID int - UserID int - Operation string -} - -func NewUpdateStorageEntry(objectID int, userID int, op string) UpdateStorageEntry { - return UpdateStorageEntry{ - ObjectID: objectID, - UserID: userID, - Operation: op, - } -} - +type UpdateStorageEntry = scevt.UpdateStorageEntry type UpdateStorage struct { - StorageID int - DirectoryStatus string - Entries []UpdateStorageEntry + scevt.UpdateStorage } -func NewUpdateStorage(dirStatus string, entries []UpdateStorageEntry) UpdateStorage { +func NewUpdateStorage(dirState string, entries []UpdateStorageEntry) UpdateStorage { return UpdateStorage{ - DirectoryStatus: dirStatus, - Entries: entries, + UpdateStorage: scevt.NewUpdateStorage(dirState, entries), } } @@ -43,7 +28,7 @@ func (t *UpdateStorage) TryMerge(other Event) bool { } // 后投递的任务的状态更新一些 - t.DirectoryStatus = event.DirectoryStatus + t.DirectoryState = event.DirectoryState // TODO 可以考虑合并同FileHash和NodeID的记录 t.Entries = append(t.Entries, event.Entries...) return true @@ -51,7 +36,7 @@ func (t *UpdateStorage) TryMerge(other Event) bool { func (t *UpdateStorage) Execute(execCtx ExecuteContext) { - err := mysql.Storage.ChangeState(execCtx.Args.DB.SQLCtx(), t.StorageID, t.DirectoryStatus) + err := mysql.Storage.ChangeState(execCtx.Args.DB.SQLCtx(), t.StorageID, t.DirectoryState) if err != nil { logger.WithField("StorageID", t.StorageID).Warnf("change storage state failed, err: %s", err.Error()) }