diff --git a/models/limit_config.go b/models/limit_config.go index 273af0de1..2196b5b6d 100644 --- a/models/limit_config.go +++ b/models/limit_config.go @@ -2,6 +2,24 @@ package models import "code.gitea.io/gitea/modules/timeutil" +type LimitType string + +const ( + LimitTypeTask LimitType = "TASK" + LimitTypeReward LimitType = "REWARD" +) + +func (l LimitType) Name() string { + switch l { + case LimitTypeTask: + return "TASK" + case LimitTypeReward: + return "REWARD" + default: + return "" + } +} + type LimitConfig struct { ID int64 `xorm:"pk autoincr"` Tittle string @@ -9,14 +27,15 @@ type LimitConfig struct { Scope string `xorm:"NOT NULL"` LimitNum int64 `xorm:"NOT NULL"` LimitCode string `xorm:"NOT NULL"` + LimitType string `xorm:"NOT NULL"` Creator int64 `xorm:"NOT NULL"` CreatedUnix timeutil.TimeStamp `xorm:"created"` DeletedAt timeutil.TimeStamp `xorm:"deleted"` } -func findLimitConfig(tl *LimitConfig) ([]LimitConfig, error) { +func GetLimitConfigByLimitCode(limitCode string, limitType LimitType) ([]LimitConfig, error) { r := make([]LimitConfig, 0) - err := x.Find(r, tl) + err := x.Where("limit_code = ? and limit_type = ?", limitCode, limitType.Name()).Find(&r) if err != nil { return nil, err } else if len(r) == 0 { @@ -24,10 +43,3 @@ func findLimitConfig(tl *LimitConfig) ([]LimitConfig, error) { } return r, nil } - -func GetLimitConfigByLimitCode(limitCode string) ([]LimitConfig, error) { - t := &LimitConfig{ - LimitCode: limitCode, - } - return findLimitConfig(t) -} diff --git a/models/models.go b/models/models.go index 9d255c5e6..59e7a3a48 100755 --- a/models/models.go +++ b/models/models.go @@ -144,6 +144,13 @@ func init() { new(WechatBindLog), new(OrgStatistic), new(SearchRecord), + new(TaskConfig), + new(TaskAccomplishLog), + new(RewardOperateRecord), + new(LimitConfig), + new(PeriodicTask), + new(PointAccountLog), + new(PointAccount), ) tablesStatistic = append(tablesStatistic, diff --git a/models/point_account.go b/models/point_account.go index 7fa38cb7a..9a8032553 100644 --- a/models/point_account.go +++ b/models/point_account.go @@ -1,6 +1,8 @@ package models -import "code.gitea.io/gitea/modules/timeutil" +import ( + "code.gitea.io/gitea/modules/timeutil" +) type PointAccountStatus int @@ -87,7 +89,7 @@ func GetAccountByUserId(userId int64) (*PointAccount, error) { return nil, err } if !has { - return nil, nil + return nil, ErrRecordNotExist{} } return p, nil } diff --git a/models/repo_watch.go b/models/repo_watch.go index 31868fcae..2d01bde1f 100644 --- a/models/repo_watch.go +++ b/models/repo_watch.go @@ -287,7 +287,7 @@ func NotifyWatchers(actions ...*Action) error { func producer(actions ...*Action) { for _, action := range actions { - if !action.IsPrivate{ + if !action.IsPrivate { ActionChan <- action } } diff --git a/models/reward_operate_record.go b/models/reward_operate_record.go index ca1f52168..6f4e9a797 100644 --- a/models/reward_operate_record.go +++ b/models/reward_operate_record.go @@ -5,18 +5,12 @@ import ( "fmt" ) -type RewardSourceType string - const ( - SourceTypeAccomplishTask RewardSourceType = "ACCOMPLISH_TASK" - SourceTypeAdminOperate RewardSourceType = "ADMIN_OPERATE" - SourceTypeRunCloudbrainTask RewardSourceType = "RUN_CLOUBRAIN_TASK" + SourceTypeAccomplishTask string = "ACCOMPLISH_TASK" + SourceTypeAdminOperate = "ADMIN_OPERATE" + SourceTypeRunCloudbrainTask = "RUN_CLOUBRAIN_TASK" ) -func (r *RewardSourceType) String() string { - return fmt.Sprint(r) -} - type RewardType string const ( @@ -40,6 +34,7 @@ const ( type RewardOperateRecord struct { ID int64 `xorm:"pk autoincr"` + RecordId string `xorm:"INDEX NOT NULL"` UserId int64 `xorm:"INDEX NOT NULL"` Amount int64 `xorm:"NOT NULL"` RewardType string `xorm:"NOT NULL"` @@ -80,5 +75,21 @@ func UpdateAwardOperateRecordStatus(sourceType, requestId, oldStatus, newStatus r := &RewardOperateRecord{ Status: newStatus, } - return x.Cols("status").Where("source_type=? and requestId=? and status=?", sourceType, requestId, oldStatus).Update(r) + return x.Cols("status").Where("source_type=? and request_id=? and status=?", sourceType, requestId, oldStatus).Update(r) +} + +type RewardOperateContext struct { + SourceType string + SourceId string + Remark string + Reward Reward + TargetUserId int64 + RequestId string + OperateType string + CycleIntervalSeconds int64 +} + +type Reward struct { + Amount int64 + Type string } diff --git a/models/task_config.go b/models/task_config.go index 036f4e315..aa09ee603 100644 --- a/models/task_config.go +++ b/models/task_config.go @@ -2,19 +2,13 @@ package models import ( "code.gitea.io/gitea/modules/timeutil" - "fmt" ) -type TaskType string - const ( - TaskTypeComment TaskType = "COMMENT" + TaskTypeCreateIssueComment string = "CREATE_IS" + TaskTypeNewIssue = "NEW_ISSUE" ) -func (t *TaskType) String() string { - return fmt.Sprint(t) -} - const ( PeriodNotCycle = "NOT_CYCLE" PeriodDaily = "DAILY" @@ -23,11 +17,9 @@ const ( //PointTaskConfig Only add and delete are allowed, edit is not allowed //so if you want to edit config for some task code,please delete first and add new one type TaskConfig struct { - ID int64 `xorm:"pk autoincr"` - TaskCode string `xorm:"NOT NULL"` - Tittle string `xorm:"NOT NULL"` - RefreshRate string `xorm:"NOT NULL"` - Times int64 `xorm:"NOT NULL"` + ID int64 `xorm:"pk autoincr"` + TaskCode string `xorm:"NOT NULL"` + Tittle string AwardType string `xorm:"NOT NULL"` AwardAmount int64 `xorm:"NOT NULL"` Creator int64 `xorm:"NOT NULL"` diff --git a/modules/auth/wechat/access_token.go b/modules/auth/wechat/access_token.go index af62c3e7b..e4e38ee30 100644 --- a/modules/auth/wechat/access_token.go +++ b/modules/auth/wechat/access_token.go @@ -26,14 +26,18 @@ func GetWechatAccessToken() string { } func refreshAccessToken() { - if ok := accessTokenLock.Lock(3 * time.Second); ok { + if ok, _ := accessTokenLock.Lock(3 * time.Second); ok { defer accessTokenLock.UnLock() callAccessTokenAndUpdateCache() } } func refreshAndGetAccessToken() string { - if ok := accessTokenLock.LockWithWait(3*time.Second, 3*time.Second); ok { + isOk, err := accessTokenLock.LockWithWait(3*time.Second, 3*time.Second) + if err != nil { + return "" + } + if isOk { defer accessTokenLock.UnLock() token, _ := redis_client.Get(redis_key.WechatAccessTokenKey()) if token != "" { diff --git a/modules/notification/notification.go b/modules/notification/notification.go index 0fd6fa471..8329ca903 100644 --- a/modules/notification/notification.go +++ b/modules/notification/notification.go @@ -10,6 +10,7 @@ import ( "code.gitea.io/gitea/modules/notification/base" "code.gitea.io/gitea/modules/notification/indexer" "code.gitea.io/gitea/modules/notification/mail" + "code.gitea.io/gitea/modules/notification/task" "code.gitea.io/gitea/modules/notification/ui" "code.gitea.io/gitea/modules/notification/webhook" "code.gitea.io/gitea/modules/repository" @@ -35,6 +36,7 @@ func NewContext() { RegisterNotifier(indexer.NewNotifier()) RegisterNotifier(webhook.NewNotifier()) RegisterNotifier(action.NewNotifier()) + RegisterNotifier(task.NewNotifier()) } // NotifyUploadAttachment notifies attachment upload message to notifiers diff --git a/modules/notification/task/task.go b/modules/notification/task/task.go new file mode 100644 index 000000000..ce3b023ba --- /dev/null +++ b/modules/notification/task/task.go @@ -0,0 +1,85 @@ +package task + +import ( + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/notification/base" + "code.gitea.io/gitea/modules/repository" + "code.gitea.io/gitea/services/task" + "fmt" +) + +type taskNotifier struct { + base.NullNotifier +} + +var ( + _ base.Notifier = &taskNotifier{} +) + +// NewNotifier create a new actionNotifier notifier +func NewNotifier() base.Notifier { + return &taskNotifier{} +} + +func (t *taskNotifier) NotifyNewIssue(issue *models.Issue) { + task.Accomplish(issue.Poster.ID, models.TaskTypeNewIssue, fmt.Sprint(issue.ID)) +} + +// NotifyIssueChangeStatus notifies close or reopen issue to notifiers +func (t *taskNotifier) NotifyIssueChangeStatus(doer *models.User, issue *models.Issue, actionComment *models.Comment, closeOrReopen bool) { + return +} + +// NotifyCreateIssueComment notifies comment on an issue to notifiers +func (t *taskNotifier) NotifyCreateIssueComment(doer *models.User, repo *models.Repository, + issue *models.Issue, comment *models.Comment) { + task.Accomplish(doer.ID, models.TaskTypeCreateIssueComment, fmt.Sprint(comment.ID)) +} + +func (t *taskNotifier) NotifyNewPullRequest(pull *models.PullRequest) { + task.Accomplish(pull.Issue.Poster.ID, models.TaskTypeCreateIssueComment, fmt.Sprint(pull.ID)) +} + +func (t *taskNotifier) NotifyRenameRepository(doer *models.User, repo *models.Repository, oldRepoName string) { + return +} + +func (t *taskNotifier) NotifyAliasRepository(doer *models.User, repo *models.Repository, oldAlias string) { + return +} + +func (t *taskNotifier) NotifyTransferRepository(doer *models.User, repo *models.Repository, oldOwnerName string) { + return +} + +func (t *taskNotifier) NotifyCreateRepository(doer *models.User, u *models.User, repo *models.Repository) { + return +} + +func (t *taskNotifier) NotifyForkRepository(doer *models.User, oldRepo, repo *models.Repository) { + return +} + +func (t *taskNotifier) NotifyPullRequestReview(pr *models.PullRequest, review *models.Review, comment *models.Comment) { + return +} + +func (t *taskNotifier) NotifyMergePullRequest(pr *models.PullRequest, doer *models.User) { + return +} + +func (t *taskNotifier) NotifySyncPushCommits(pusher *models.User, repo *models.Repository, refName, oldCommitID, newCommitID string, commits *repository.PushCommits) { + return +} + +func (t *taskNotifier) NotifySyncCreateRef(doer *models.User, repo *models.Repository, refType, refFullName string) { + return +} + +func (t *taskNotifier) NotifySyncDeleteRef(doer *models.User, repo *models.Repository, refType, refFullName string) { + return +} + +func (t *taskNotifier) NotifyOtherTask(doer *models.User, repo *models.Repository, id string, name string, optype models.ActionType) { + return +} diff --git a/modules/redis/redis_client/client.go b/modules/redis/redis_client/client.go index 2c487a72c..21a6da9fb 100644 --- a/modules/redis/redis_client/client.go +++ b/modules/redis/redis_client/client.go @@ -103,7 +103,7 @@ func Expire(key string, expireSeconds int64) error { redisClient := labelmsg.Get() defer redisClient.Close() - _, err := redisClient.Do("EXPIRE ", key, expireSeconds) + _, err := redisClient.Do("EXPIRE", key, expireSeconds) if err != nil { return err } diff --git a/modules/redis/redis_key/account_redis_key.go b/modules/redis/redis_key/account_redis_key.go index f36a8ea5c..896ea4ff4 100644 --- a/modules/redis/redis_key/account_redis_key.go +++ b/modules/redis/redis_key/account_redis_key.go @@ -8,8 +8,8 @@ func PointAccountOperateLock(accountCode string) string { return KeyJoin(ACCOUNT_REDIS_PREFIX, accountCode, "operate", "lock") } -func PointAccountDetail(userId int64) string { - return KeyJoin(ACCOUNT_REDIS_PREFIX, fmt.Sprint(userId), "detail") +func PointAccountInfo(userId int64) string { + return KeyJoin(ACCOUNT_REDIS_PREFIX, fmt.Sprint(userId), "info") } func PointAccountInitLock(userId int64) string { diff --git a/modules/redis/redis_key/limit_redis_key.go b/modules/redis/redis_key/limit_redis_key.go index e9d8352a2..86a77e59e 100644 --- a/modules/redis/redis_key/limit_redis_key.go +++ b/modules/redis/redis_key/limit_redis_key.go @@ -21,6 +21,6 @@ func LimitCount(userId int64, limitCode string, period *models.PeriodResult) str } -func LimitConfig(limitCode string) string { - return KeyJoin(LIMIT_REDIS_PREFIX, limitCode, "config") +func LimitConfig(limitCode string, limitType models.LimitType) string { + return KeyJoin(LIMIT_REDIS_PREFIX, limitCode, limitType.Name(), "config") } diff --git a/modules/redis/redis_key/task_redis_key.go b/modules/redis/redis_key/task_redis_key.go index 2eb8c21d1..3427c8f7f 100644 --- a/modules/redis/redis_key/task_redis_key.go +++ b/modules/redis/redis_key/task_redis_key.go @@ -1,15 +1,11 @@ package redis_key -import ( - "code.gitea.io/gitea/models" -) - const TASK_REDIS_PREFIX = "task" -func TaskAccomplishLock(sourceId string, taskType models.TaskType) string { - return KeyJoin(TASK_REDIS_PREFIX, sourceId, taskType.String(), "accomplish") +func TaskAccomplishLock(sourceId string, taskType string) string { + return KeyJoin(TASK_REDIS_PREFIX, sourceId, taskType, "accomplish") } -func TaskConfig(taskType models.TaskType) string { - return KeyJoin(TASK_REDIS_PREFIX, "config", taskType.String()) +func TaskConfig(taskType string) string { + return KeyJoin(TASK_REDIS_PREFIX, "config", taskType) } diff --git a/modules/redis/redis_lock/lock.go b/modules/redis/redis_lock/lock.go index b8cd837f1..5723c379d 100644 --- a/modules/redis/redis_lock/lock.go +++ b/modules/redis/redis_lock/lock.go @@ -13,26 +13,32 @@ func NewDistributeLock(lockKey string) *DistributeLock { return &DistributeLock{lockKey: lockKey} } -func (lock *DistributeLock) Lock(expireTime time.Duration) bool { - isOk, _ := redis_client.Setnx(lock.lockKey, "", expireTime) - return isOk +func (lock *DistributeLock) Lock(expireTime time.Duration) (bool, error) { + isOk, err := redis_client.Setnx(lock.lockKey, "", expireTime) + if err != nil { + return false, err + } + return isOk, nil } -func (lock *DistributeLock) LockWithWait(expireTime time.Duration, waitTime time.Duration) bool { +func (lock *DistributeLock) LockWithWait(expireTime time.Duration, waitTime time.Duration) (bool, error) { start := time.Now().Unix() * 1000 duration := waitTime.Milliseconds() for { - isOk, _ := redis_client.Setnx(lock.lockKey, "", expireTime) + isOk, err := redis_client.Setnx(lock.lockKey, "", expireTime) + if err != nil { + return false, err + } if isOk { - return true + return true, nil } if time.Now().Unix()*1000-start > duration { - return false + return false, nil } time.Sleep(50 * time.Millisecond) } - return false + return false, nil } func (lock *DistributeLock) UnLock() error { diff --git a/services/reward/limiter/limiter.go b/services/reward/limiter/limiter.go index aca8af22e..8117ba173 100644 --- a/services/reward/limiter/limiter.go +++ b/services/reward/limiter/limiter.go @@ -2,6 +2,7 @@ package limiter import ( "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/redis/redis_client" "code.gitea.io/gitea/modules/redis/redis_key" "code.gitea.io/gitea/services/task/period" @@ -17,25 +18,27 @@ type limiterRunner struct { userId int64 amount int64 limitCode string + limitType models.LimitType } -func newLimiterRunner(limitCode string, userId, amount int64) *limiterRunner { +func newLimiterRunner(limitCode string, limitType models.LimitType, userId, amount int64) *limiterRunner { return &limiterRunner{ userId: userId, amount: amount, limitCode: limitCode, + limitType: limitType, index: 0, } } func (l *limiterRunner) Run() error { - if err := l.LoadLimiters(l.limitCode); err != nil { + if err := l.LoadLimiters(); err != nil { return err } - //todo 验证未配置的情况 - for l.index <= len(l.limiters) { + for l.index < len(l.limiters) { err := l.limit(l.limiters[l.index]) if err != nil { + log.Info("limiter check failed,%v", err) return err } l.index += 1 @@ -62,32 +65,43 @@ func (l *limiterRunner) limit(r models.LimitConfig) error { return nil } -func (l *limiterRunner) LoadLimiters(limitCode string) error { - redisKey := redis_key.LimitConfig(limitCode) +func (l *limiterRunner) LoadLimiters() error { + limiters, err := GetLimiters(l.limitCode, l.limitType) + if err != nil { + return err + } + if limiters != nil { + l.limiters = limiters + } + return nil +} + +func CheckLimit(limitCode string, limitType models.LimitType, userId, amount int64) error { + r := newLimiterRunner(limitCode, limitType, userId, amount) + return r.Run() +} + +func GetLimiters(limitCode string, limitType models.LimitType) ([]models.LimitConfig, error) { + redisKey := redis_key.LimitConfig(limitCode, limitType) val, _ := redis_client.Get(redisKey) if val != "" { if val == redis_key.EMPTY_REDIS_VAL { - return nil + return nil, nil } limiters := make([]models.LimitConfig, 0) - json.Unmarshal([]byte(val), limiters) - return nil + json.Unmarshal([]byte(val), &limiters) + return limiters, nil } - limiters, err := models.GetLimitConfigByLimitCode(limitCode) + limiters, err := models.GetLimitConfigByLimitCode(limitCode, limitType) if err != nil { if models.IsErrRecordNotExist(err) { redis_client.Setex(redisKey, redis_key.EMPTY_REDIS_VAL, 5*time.Second) - return nil + return nil, nil } - return err + return nil, err } jsonStr, _ := json.Marshal(limiters) redis_client.Setex(redisKey, string(jsonStr), 30*24*time.Hour) - return nil -} - -func CheckLimit(limitCode string, userId, amount int64) error { - r := newLimiterRunner(limitCode, userId, amount) - return r.Run() + return limiters, nil } diff --git a/services/reward/operator.go b/services/reward/operator.go index 321562474..b0bd53f8a 100644 --- a/services/reward/operator.go +++ b/services/reward/operator.go @@ -5,6 +5,7 @@ import ( "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/redis/redis_key" "code.gitea.io/gitea/modules/redis/redis_lock" + "code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/services/reward/point" "errors" "fmt" @@ -15,26 +16,25 @@ var RewardOperatorMap = map[string]RewardOperator{ fmt.Sprint(models.RewardTypePoint): new(point.PointOperator), } -type RewardOperateContext struct { - SourceType models.RewardSourceType - SourceId string - Remark string - Reward Reward - TargetUserId int64 - RequestId string - OperateType string - CycleIntervalSeconds int64 -} - type RewardOperator interface { - IsLimited(ctx RewardOperateContext) bool - Operate(ctx RewardOperateContext) error + IsLimited(ctx models.RewardOperateContext) bool + Operate(ctx models.RewardOperateContext) error } -func Send(ctx RewardOperateContext) error { +func Send(ctx models.RewardOperateContext) error { + defer func() { + if err := recover(); err != nil { + combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2)) + log.Error("PANIC:%v", combinedErr) + } + }() //add lock - var rewardLock = redis_lock.NewDistributeLock(redis_key.RewardSendLock(ctx.RequestId, ctx.SourceType.String())) - if !rewardLock.Lock(3 * time.Second) { + var rewardLock = redis_lock.NewDistributeLock(redis_key.RewardSendLock(ctx.RequestId, ctx.SourceType)) + isOk, err := rewardLock.Lock(3 * time.Second) + if err != nil { + return err + } + if !isOk { log.Info("duplicated reward request,targetUserId=%d requestId=%s", ctx.TargetUserId, ctx.RequestId) return nil } @@ -63,19 +63,22 @@ func Send(ctx RewardOperateContext) error { } //new reward operate record - if err := initAwardOperateRecord(ctx); err != nil { + recordId, err := initAwardOperateRecord(ctx) + if err != nil { return err } + ctx.SourceId = recordId + //operate if err := operator.Operate(ctx); err != nil { - updateAwardOperateRecordStatus(ctx.SourceType.String(), ctx.RequestId, models.OperateStatusOperating, models.OperateStatusFailed) + updateAwardOperateRecordStatus(ctx.SourceType, ctx.RequestId, models.OperateStatusOperating, models.OperateStatusFailed) return err } //if not a cycle operate,update status to success - if ctx.CycleIntervalSeconds > 0 { - updateAwardOperateRecordStatus(ctx.SourceType.String(), ctx.RequestId, models.OperateStatusOperating, models.OperateStatusSucceeded) + if ctx.CycleIntervalSeconds == 0 { + updateAwardOperateRecordStatus(ctx.SourceType, ctx.RequestId, models.OperateStatusOperating, models.OperateStatusSucceeded) } return nil } @@ -84,8 +87,8 @@ func GetOperator(rewardType string) RewardOperator { return RewardOperatorMap[rewardType] } -func isHandled(sourceType models.RewardSourceType, requestId string) (bool, error) { - _, err := models.GetPointOperateRecordBySourceTypeAndRequestId(sourceType.String(), requestId) +func isHandled(sourceType string, requestId string) (bool, error) { + _, err := models.GetPointOperateRecordBySourceTypeAndRequestId(sourceType, requestId) if err != nil { if models.IsErrRecordNotExist(err) { return false, nil @@ -96,23 +99,25 @@ func isHandled(sourceType models.RewardSourceType, requestId string) (bool, erro } -func initAwardOperateRecord(ctx RewardOperateContext) error { - _, err := models.InsertAwardOperateRecord(&models.RewardOperateRecord{ +func initAwardOperateRecord(ctx models.RewardOperateContext) (string, error) { + record := &models.RewardOperateRecord{ + RecordId: util.UUID(), UserId: ctx.TargetUserId, Amount: ctx.Reward.Amount, RewardType: ctx.Reward.Type, - SourceType: ctx.SourceType.String(), + SourceType: ctx.SourceType, SourceId: ctx.SourceId, RequestId: ctx.RequestId, OperateType: ctx.OperateType, CycleIntervalSeconds: ctx.CycleIntervalSeconds, Status: models.OperateStatusOperating, Remark: ctx.Remark, - }) + } + _, err := models.InsertAwardOperateRecord(record) if err != nil { - return err + return "", err } - return nil + return record.RecordId, nil } func updateAwardOperateRecordStatus(sourceType, requestId, oldStatus, newStatus string) error { diff --git a/services/reward/point/account/point_account.go b/services/reward/point/account/point_account.go index 9ff5001fc..ea127e162 100644 --- a/services/reward/point/account/point_account.go +++ b/services/reward/point/account/point_account.go @@ -11,7 +11,7 @@ import ( ) func GetAccount(userId int64) (*models.PointAccount, error) { - redisKey := redis_key.PointAccountDetail(userId) + redisKey := redis_key.PointAccountInfo(userId) val, _ := redis_client.Get(redisKey) if val != "" { account := &models.PointAccount{} @@ -36,7 +36,11 @@ func GetAccount(userId int64) (*models.PointAccount, error) { func InitAccount(userId int64) (*models.PointAccount, error) { lock := redis_lock.NewDistributeLock(redis_key.PointAccountInitLock(userId)) - if lock.LockWithWait(3*time.Second, 3*time.Second) { + isOk, err := lock.LockWithWait(3*time.Second, 3*time.Second) + if err != nil { + return nil, err + } + if isOk { defer lock.UnLock() account, _ := models.GetAccountByUserId(userId) if account == nil { diff --git a/services/reward/point/point_operate.go b/services/reward/point/point_operate.go index ddcac515b..eeba83ac7 100644 --- a/services/reward/point/point_operate.go +++ b/services/reward/point/point_operate.go @@ -2,9 +2,9 @@ package point import ( "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/redis/redis_client" "code.gitea.io/gitea/modules/redis/redis_key" "code.gitea.io/gitea/modules/redis/redis_lock" - "code.gitea.io/gitea/services/reward" "code.gitea.io/gitea/services/reward/limiter" "code.gitea.io/gitea/services/reward/point/account" "errors" @@ -14,28 +14,36 @@ import ( type PointOperator struct { } -func (operator *PointOperator) IsLimited(ctx reward.RewardOperateContext) bool { - if err := limiter.CheckLimit(ctx.Reward.Type, ctx.TargetUserId, ctx.Reward.Amount); err != nil { - return false +func (operator *PointOperator) IsLimited(ctx models.RewardOperateContext) bool { + if err := limiter.CheckLimit(ctx.Reward.Type, models.LimitTypeReward, ctx.TargetUserId, ctx.Reward.Amount); err != nil { + return true } - return true + return false } -func (operator *PointOperator) Operate(ctx reward.RewardOperateContext) error { +func (operator *PointOperator) Operate(ctx models.RewardOperateContext) error { a, err := account.GetAccount(ctx.TargetUserId) if err != nil || a == nil { return errors.New("get account error") } lock := redis_lock.NewDistributeLock(redis_key.PointAccountOperateLock(a.AccountCode)) - if lock.LockWithWait(3*time.Second, 3*time.Second) { + isOk, err := lock.LockWithWait(3*time.Second, 3*time.Second) + if err != nil { + return err + } + if isOk { defer lock.UnLock() na, _ := account.GetAccount(ctx.TargetUserId) if ctx.OperateType == models.OperateTypeIncrease { - na.Increase(ctx.Reward.Amount, ctx.SourceId) + err = na.Increase(ctx.Reward.Amount, ctx.SourceId) } else if ctx.OperateType == models.OperateTypeDecrease { - na.Decrease(ctx.Reward.Amount, ctx.SourceId) + err = na.Decrease(ctx.Reward.Amount, ctx.SourceId) + } + if err != nil { + return err } + redis_client.Del(redis_key.PointAccountInfo(ctx.TargetUserId)) } else { return errors.New("Get account operate lock failed") diff --git a/services/reward/reward.go b/services/reward/reward.go deleted file mode 100644 index ca1c1f3cd..000000000 --- a/services/reward/reward.go +++ /dev/null @@ -1,6 +0,0 @@ -package reward - -type Reward struct { - Amount int64 - Type string -} diff --git a/services/task/task.go b/services/task/task.go index 403a2ba8f..f38793419 100644 --- a/services/task/task.go +++ b/services/task/task.go @@ -7,18 +7,30 @@ import ( "code.gitea.io/gitea/modules/redis/redis_lock" "code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/services/reward" - "code.gitea.io/gitea/services/task/period" + "code.gitea.io/gitea/services/reward/limiter" + "fmt" "time" ) -func Accomplish(userId int64, taskType models.TaskType, sourceId string) { +func Accomplish(userId int64, taskType string, sourceId string) { go accomplish(userId, taskType, sourceId) } -func accomplish(userId int64, taskType models.TaskType, sourceId string) error { +func accomplish(userId int64, taskType string, sourceId string) error { + defer func() { + if err := recover(); err != nil { + combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2)) + log.Error("PANIC:%v", combinedErr) + } + }() //lock var taskLock = redis_lock.NewDistributeLock(redis_key.TaskAccomplishLock(sourceId, taskType)) - if !taskLock.Lock(3 * time.Second) { + isOk, err := taskLock.Lock(3 * time.Second) + if err != nil { + log.Error("get taskLock error. %v", err) + return err + } + if !isOk { log.Info("duplicated task request,userId=%d taskType=%s sourceId=%s", userId, taskType, sourceId) return nil } @@ -47,12 +59,7 @@ func accomplish(userId int64, taskType models.TaskType, sourceId string) error { } //is limited? - isLimited, err := IsLimited(userId, config) - if err != nil { - log.Error("get limited error,%v", err) - return err - } - if isLimited { + if isLimited(userId, config) { log.Info("task accomplish maximum times are reached,userId=%d taskType=%s sourceId=%s", userId, taskType, sourceId) return nil } @@ -71,22 +78,23 @@ func accomplish(userId int64, taskType models.TaskType, sourceId string) error { } //reward - reward.Send(reward.RewardOperateContext{ + reward.Send(models.RewardOperateContext{ SourceType: models.SourceTypeAccomplishTask, - SourceId: sourceId, - Reward: reward.Reward{ + SourceId: logId, + Reward: models.Reward{ Amount: config.AwardAmount, Type: config.AwardType, }, TargetUserId: userId, RequestId: logId, + OperateType: models.OperateTypeIncrease, }) return nil } -func isHandled(taskType models.TaskType, sourceId string) (bool, error) { - _, err := models.GetTaskAccomplishLogBySourceIdAndTaskCode(sourceId, taskType.String()) +func isHandled(taskType string, sourceId string) (bool, error) { + _, err := models.GetTaskAccomplishLogBySourceIdAndTaskCode(sourceId, taskType) if err != nil { if models.IsErrRecordNotExist(err) { return false, nil @@ -97,15 +105,10 @@ func isHandled(taskType models.TaskType, sourceId string) (bool, error) { } -func IsLimited(userId int64, config *models.TaskConfig) (bool, error) { - p, err := period.GetPeriod(config.RefreshRate) - if err != nil { - return false, err - } - n, err := models.CountInTaskPeriod(config.ID, userId, p) - if err != nil { - return false, err +func isLimited(userId int64, config *models.TaskConfig) bool { + if err := limiter.CheckLimit(config.TaskCode, models.LimitTypeTask, userId, 1); err != nil { + return true } - return n >= config.Times, nil + return false } diff --git a/services/task/task_config.go b/services/task/task_config.go index 22e0b1828..6e7f22e14 100644 --- a/services/task/task_config.go +++ b/services/task/task_config.go @@ -10,7 +10,7 @@ import ( //GetTaskConfig get task config from redis cache first // if not exist in redis, find in db and refresh the redis key -func GetTaskConfig(taskType models.TaskType) (*models.TaskConfig, error) { +func GetTaskConfig(taskType string) (*models.TaskConfig, error) { redisKey := redis_key.TaskConfig(taskType) configStr, _ := redis_client.Get(redisKey) if configStr != "" { @@ -21,7 +21,7 @@ func GetTaskConfig(taskType models.TaskType) (*models.TaskConfig, error) { json.Unmarshal([]byte(configStr), config) return config, nil } - config, err := models.GetTaskConfigByTaskCode(taskType.String()) + config, err := models.GetTaskConfigByTaskCode(taskType) if err != nil { if models.IsErrRecordNotExist(err) { redis_client.Setex(redisKey, redis_key.EMPTY_REDIS_VAL, 5*time.Second)