Browse Source

#1249

update task trigger
tags/v1.22.9.2^2
chenyifan01 3 years ago
parent
commit
0562bee1c2
17 changed files with 300 additions and 62 deletions
  1. +8
    -0
      models/attachment.go
  2. +17
    -2
      models/reward_operate_record.go
  3. +27
    -18
      models/task_config.go
  4. +2
    -1
      modules/auth/wechat/event_handle.go
  5. +11
    -6
      modules/cloudbrain/resty.go
  6. +0
    -4
      modules/notification/action/action.go
  7. +5
    -1
      modules/notification/base/notifier.go
  8. +13
    -1
      modules/notification/base/null.go
  9. +30
    -2
      modules/notification/notification.go
  10. +53
    -3
      modules/notification/task/task.go
  11. +3
    -0
      routers/admin/dataset.go
  12. +2
    -0
      routers/image/image.go
  13. +0
    -1
      routers/repo/ai_model_manage.go
  14. +0
    -1
      routers/repo/cloudbrain.go
  15. +2
    -0
      routers/user/setting/profile.go
  16. +118
    -21
      services/reward/limiter/limiter.go
  17. +9
    -1
      services/reward/point/point_operate.go

+ 8
- 0
models/attachment.go View File

@@ -653,3 +653,11 @@ func Attachments(opts *AttachmentsOptions) ([]*AttachmentInfo, int64, error) {

return attachments, count, nil
}

func GetAllUserIdByDatasetId(datasetId int64) ([]int64, error) {
r := make([]int64, 0)
if err := x.Table("attachment").Where("dataset_id = ?", datasetId).Distinct("uploader_id").Find(&r); err != nil {
return nil, err
}
return r, nil
}

+ 17
- 2
models/reward_operate_record.go View File

@@ -2,6 +2,7 @@ package models

import (
"code.gitea.io/gitea/modules/timeutil"
"xorm.io/builder"
)

const (
@@ -26,8 +27,8 @@ func (r RewardType) Name() string {
}

const (
OperateTypeIncrease = "INCREASE_POINT"
OperateTypeDecrease = "DECREASE_POINT"
OperateTypeIncrease = "INCREASE"
OperateTypeDecrease = "DECREASE"
)

const (
@@ -82,6 +83,20 @@ func UpdateAwardOperateRecordStatus(sourceType, requestId, oldStatus, newStatus
return x.Cols("status").Where("source_type=? and request_id=? and status=?", sourceType, requestId, oldStatus).Update(r)
}

func SumRewardAmountInTaskPeriod(rewardType string, sourceType string, userId int64, period *PeriodResult) (int64, error) {
var cond = builder.NewCond()
if period != nil {
cond = cond.And(builder.Gte{"created_unix": period.StartTime.Unix()})
cond = cond.And(builder.Lt{"created_unix": period.EndTime.Unix()})
}
if sourceType != "" {
cond = cond.And(builder.Eq{"source_type": sourceType})
}
cond = cond.And(builder.Eq{"reward_type": rewardType})
cond = cond.And(builder.Eq{"user_id": userId})
return x.Where(cond).SumInt(&RewardOperateRecord{}, "amount")
}

type RewardOperateContext struct {
SourceType string
SourceId string


+ 27
- 18
models/task_config.go View File

@@ -5,24 +5,33 @@ import (
)

const (
TaskTypeNewIssue = "NEW_ISSUE"
TaskTypeIssueChangeStatus = "ISSUE_CHANGE_STATUS"
TaskTypeCreateIssueComment = "CREATE_ISSUE_COMMENT"
TaskTypeNewPullRequest = "NEW_PULL_REQUEST"
TaskTypeRenameRepository = "RENAME_REPOSITORY"
TaskTypeAliasRepository = "ALIAS_REPOSITORY"
TaskTypeTransferRepository = "TRANSFER_REPOSITORY"
TaskTypeCreateRepository = "CREATE_REPOSITORY"
TaskTypeForkRepository = "FORK_REPOSITORY"
TaskTypePullRequestReview = "PULL_REQUEST_REVIEW"
TaskTypeCommentPull = "COMMENT_PULL"
TaskTypeApprovePullRequest = "APPROVE_PULL_REQUEST"
TaskTypeRejectPullRequest = "REJECT_PULL_REQUEST"
TaskTypeMergePullRequest = "MERGE_PULL_REQUEST"
TaskTypeSyncPushCommits = "SYNC_PUSH_COMMITS"
TaskTypeSyncCreateRef = "SYNC_CREATE_REF"
TaskTypeSyncDeleteRef = "SYNC_DELETE_REF"
TaskTypeBindWechat = "BIND_WECHAT"
TaskTypeNewIssue = "NEW_ISSUE"
TaskTypeIssueChangeStatus = "ISSUE_CHANGE_STATUS"
TaskTypeCreateIssueComment = "CREATE_ISSUE_COMMENT"
TaskTypeNewPullRequest = "NEW_PULL_REQUEST"
TaskTypeRenameRepository = "RENAME_REPOSITORY"
TaskTypeAliasRepository = "ALIAS_REPOSITORY"
TaskTypeTransferRepository = "TRANSFER_REPOSITORY"
TaskTypeCreateRepository = "CREATE_REPOSITORY"
TaskTypeCreatePublicRepository = "CREATE_PUBLIC_REPOSITORY"
TaskTypeForkRepository = "FORK_REPOSITORY"
TaskTypePullRequestReview = "PULL_REQUEST_REVIEW"
TaskTypeCommentPull = "COMMENT_PULL"
TaskTypeApprovePullRequest = "APPROVE_PULL_REQUEST"
TaskTypeRejectPullRequest = "REJECT_PULL_REQUEST"
TaskTypeMergePullRequest = "MERGE_PULL_REQUEST"
TaskTypeSyncPushCommits = "SYNC_PUSH_COMMITS"
TaskTypeSyncCreateRef = "SYNC_CREATE_REF"
TaskTypeSyncDeleteRef = "SYNC_DELETE_REF"
TaskTypeBindWechat = "BIND_WECHAT"
TaskTypeUploadAttachment = "UPLOAD_ATTACHMENT"
TaskTypeCreateCloudbrainTask = "CREATE_CLOUDBRAIN_TASK"
TaskTypeDatasetRecommended = "DATASET_RECOMMENDED"
TaskTypeCreateModel = "CREATE_MODEL"
TaskTypeCreatePublicImage = "CREATE_PUBLIC_IMAGE"
TaskTypeImageRecommend = "IMAGE_RECOMMEND"
TaskTypeChangeUserAvatar = "CHANGE_USER_AVATAR"
TaskTypePushCommits = "PUSH_COMMITS"
)

const (


+ 2
- 1
modules/auth/wechat/event_handle.go View File

@@ -1,6 +1,7 @@
package wechat

import (
"code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/redis/redis_client"
"code.gitea.io/gitea/modules/redis/redis_key"
"encoding/json"
@@ -71,6 +72,6 @@ func HandleSubscribeEvent(we WechatEvent) string {
jsonStr, _ := json.Marshal(qrCache)
redis_client.Setex(redis_key.WechatBindingUserIdKey(sceneStr), string(jsonStr), 60*time.Second)
}
notification.NotifyWechatBind(qrCache.UserId, we.FromUserName)
return BIND_REPLY_SUCCESS
}

+ 11
- 6
modules/cloudbrain/resty.go View File

@@ -1,6 +1,7 @@
package cloudbrain

import (
"code.gitea.io/gitea/modules/notification"
"encoding/json"
"errors"
"fmt"
@@ -24,10 +25,10 @@ var (
)

const (
JobHasBeenStopped = "S410"
Public = "public"
Custom = "custom"
LogPageSize = 500
JobHasBeenStopped = "S410"
Public = "public"
Custom = "custom"
LogPageSize = 500
LogPageTokenExpired = "5m"
pageSize = 15
)
@@ -313,6 +314,7 @@ sendjob:
})
if err == nil {
go updateImageStatus(image, isSetCreatedUnix, createTime)
notification.NotifyCreateImage(params.UID, image)
}
return err
}
@@ -354,6 +356,9 @@ func CommitAdminImage(params models.CommitImageParams) error {
}
return nil
})
if err == nil {
notification.NotifyCreateImage(params.UID, image)
}
return err
}

@@ -474,7 +479,7 @@ func GetJobAllLog(scrollID string) (*models.GetJobLogResult, error) {
client := getRestyClient()
var result models.GetJobLogResult
req := models.GetAllJobLogParams{
Scroll: LogPageTokenExpired,
Scroll: LogPageTokenExpired,
ScrollID: scrollID,
}

@@ -498,7 +503,7 @@ func GetJobAllLog(scrollID string) (*models.GetJobLogResult, error) {
return &result, nil
}

func DeleteJobLogToken(scrollID string) (error) {
func DeleteJobLogToken(scrollID string) error {
checkSetting()
client := getRestyClient()
var result models.DeleteJobLogTokenResult


+ 0
- 4
modules/notification/action/action.go View File

@@ -345,7 +345,3 @@ func (a *actionNotifier) NotifyOtherTask(doer *models.User, repo *models.Reposit
log.Error("notifyWatchers: %v", err)
}
}

func (a *actionNotifier) NotifyWechatBind(doer *models.User) {
return
}

+ 5
- 1
modules/notification/base/notifier.go View File

@@ -56,5 +56,9 @@ type Notifier interface {
NotifySyncDeleteRef(doer *models.User, repo *models.Repository, refType, refFullName string)

NotifyOtherTask(doer *models.User, repo *models.Repository, id string, name string, optype models.ActionType)
NotifyWechatBind(doer *models.User)
NotifyWechatBind(userId int64, wechatOpenId string)
NotifyDatasetRecommend(optUser *models.User, dataset *models.Dataset, action string)
NotifyCreateImage(optUserId int64, image models.Image)
NotifyImageRecommend(optUser *models.User, imageId int64, action string)
NotifyChangeUserAvatar(user *models.User)
}

+ 13
- 1
modules/notification/base/null.go View File

@@ -159,6 +159,18 @@ func (*NullNotifier) NotifyOtherTask(doer *models.User, repo *models.Repository,

}

func (*NullNotifier) NotifyWechatBind(doer *models.User) {
func (*NullNotifier) NotifyWechatBind(userId int64, wechatOpenId string) {

}

func (*NullNotifier) NotifyDatasetRecommend(optUser *models.User, dataset *models.Dataset, action string) {
}

func (*NullNotifier) NotifyCreateImage(optUserId int64, image models.Image) {
}

func (*NullNotifier) NotifyImageRecommend(optUser *models.User, imageId int64, action string) {
}

func (*NullNotifier) NotifyChangeUserAvatar(user *models.User) {
}

+ 30
- 2
modules/notification/notification.go View File

@@ -273,8 +273,36 @@ func NotifySyncDeleteRef(pusher *models.User, repo *models.Repository, refType,
}

// NotifyWechatBind notifies wechat bind
func NotifyWechatBind(doer *models.User) {
func NotifyWechatBind(userId int64, wechatOpenId string) {
for _, notifier := range notifiers {
notifier.NotifyWechatBind(doer)
notifier.NotifyWechatBind(userId, wechatOpenId)
}
}

// NotifyDatasetRecommend
func NotifyDatasetRecommend(optUser *models.User, dataset *models.Dataset, action string) {
for _, notifier := range notifiers {
notifier.NotifyDatasetRecommend(optUser, dataset, action)
}
}

// NotifyDatasetRecommend
func NotifyCreateImage(optUserId int64, image models.Image) {
for _, notifier := range notifiers {
notifier.NotifyCreateImage(optUserId, image)
}
}

// NotifyDatasetRecommend
func NotifyImageRecommend(optUser *models.User, imageId int64, action string) {
for _, notifier := range notifiers {
notifier.NotifyImageRecommend(optUser, imageId, action)
}
}

// NotifyDatasetRecommend
func NotifyChangeUserAvatar(user *models.User) {
for _, notifier := range notifiers {
notifier.NotifyChangeUserAvatar(user)
}
}

+ 53
- 3
modules/notification/task/task.go View File

@@ -53,7 +53,10 @@ func (t *taskNotifier) NotifyTransferRepository(doer *models.User, repo *models.
}

func (t *taskNotifier) NotifyCreateRepository(doer *models.User, u *models.User, repo *models.Repository) {
task.Accomplish(doer.ID, models.TaskTypeCreateRepository)
if !repo.IsPrivate {
task.Accomplish(doer.ID, models.TaskTypeCreatePublicRepository)
}

}

func (t *taskNotifier) NotifyForkRepository(doer *models.User, oldRepo, repo *models.Repository) {
@@ -99,9 +102,56 @@ func (t *taskNotifier) NotifySyncDeleteRef(doer *models.User, repo *models.Repos
}

func (t *taskNotifier) NotifyOtherTask(doer *models.User, repo *models.Repository, id string, name string, optype models.ActionType) {
switch optype {
case models.ActionUploadAttachment:
task.Accomplish(doer.ID, models.TaskTypeUploadAttachment)
case models.ActionCreateDebugGPUTask,
models.ActionCreateDebugNPUTask,
models.ActionCreateTrainTask,
models.ActionCreateInferenceTask,
models.ActionCreateBenchMarkTask,
models.ActionCreateGPUTrainTask:
task.Accomplish(doer.ID, models.TaskTypeCreateCloudbrainTask)
case models.ActionCreateNewModelTask:
task.Accomplish(doer.ID, models.TaskTypeCreateModel)
}
return
}

func (t *taskNotifier) NotifyWechatBind(doer *models.User) {
task.Accomplish(doer.ID, models.TaskTypeSyncDeleteRef)
func (t *taskNotifier) NotifyWechatBind(userId int64, wechatOpenId string) {
task.Accomplish(userId, models.TaskTypeBindWechat)
}

func (t *taskNotifier) NotifyDatasetRecommend(optUser *models.User, dataset *models.Dataset, action string) {
switch action {
case "recommend":
userIds, err := models.GetAllUserIdByDatasetId(dataset.ID)
if err != nil {
return
}
for _, userId := range userIds {
task.Accomplish(userId, models.TaskTypeDatasetRecommended)
}
}
}

func (t *taskNotifier) NotifyCreateImage(optUserId int64, image models.Image) {
if !image.IsPrivate {
task.Accomplish(optUserId, models.TaskTypeCreatePublicImage)
}
}

func (t *taskNotifier) NotifyImageRecommend(optUser *models.User, imageId int64, action string) {
switch action {
case "recommend":
task.Accomplish(optUser.ID, models.TaskTypeImageRecommend)
}
}

func (t *taskNotifier) NotifyChangeUserAvatar(user *models.User) {
task.Accomplish(user.ID, models.TaskTypeChangeUserAvatar)
}

func (t *taskNotifier) NotifyPushCommits(pusher *models.User, repo *models.Repository, refName, oldCommitID, newCommitID string, commits *repository.PushCommits) {
task.Accomplish(pusher.ID, models.TaskTypePushCommits)
}

+ 3
- 0
routers/admin/dataset.go View File

@@ -1,6 +1,7 @@
package admin

import (
"code.gitea.io/gitea/modules/notification"
"net/http"
"strconv"
"strings"
@@ -106,6 +107,8 @@ func DatasetAction(ctx *context.Context) {
if err != nil {
ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("repo.star_fail", ctx.Params(":action"))))
} else {
d, _ := models.GetDatasetByID(datasetId)
notification.NotifyDatasetRecommend(ctx.User, d, ctx.Params(":action"))
ctx.JSON(http.StatusOK, models.BaseOKMessage)
}
}


+ 2
- 0
routers/image/image.go View File

@@ -1,6 +1,7 @@
package image

import (
"code.gitea.io/gitea/modules/notification"
"net/http"
"strconv"

@@ -25,6 +26,7 @@ func Action(ctx *context.Context) {
if err != nil {
ctx.JSON(http.StatusOK, models.BaseErrorMessage(ctx.Tr("repo.star_fail", ctx.Params(":action"))))
} else {
notification.NotifyImageRecommend(ctx.User, imageId, ctx.Params(":action"))
ctx.JSON(http.StatusOK, models.BaseOKMessage)
}
}

+ 0
- 1
routers/repo/ai_model_manage.go View File

@@ -170,7 +170,6 @@ func SaveModel(ctx *context.Context) {
ctx.Error(500, fmt.Sprintf("save model error. %v", err))
return
}

log.Info("save model end.")
}



+ 0
- 1
routers/repo/cloudbrain.go View File

@@ -783,7 +783,6 @@ func CloudBrainCommitImage(ctx *context.Context, form auth.CommitImageCloudBrain

return
}

ctx.JSON(200, models.BaseOKMessage)
}



+ 2
- 0
routers/user/setting/profile.go View File

@@ -6,6 +6,7 @@
package setting

import (
"code.gitea.io/gitea/modules/notification"
"errors"
"fmt"
"io/ioutil"
@@ -165,6 +166,7 @@ func AvatarPost(ctx *context.Context, form auth.AvatarForm) {
if err := UpdateAvatarSetting(ctx, form, ctx.User); err != nil {
ctx.Flash.Error(err.Error())
} else {
notification.NotifyChangeUserAvatar(ctx.User)
ctx.Flash.Success(ctx.Tr("settings.update_avatar_success"))
}



+ 118
- 21
services/reward/limiter/limiter.go View File

@@ -12,56 +12,118 @@ import (
"time"
)

type limiterRejectPolicy string

const (
JustReject limiterRejectPolicy = "JUST_REJECT"
PermittedOnce limiterRejectPolicy = "PERMITTED_ONCE"
FillUp limiterRejectPolicy = "FillUp"
)

type limiterRunner struct {
limiters []models.LimitConfig
index int
userId int64
amount int64
limitCode string
limitType models.LimitType
limiters []models.LimitConfig
index int
userId int64
amount int64
limitCode string
limitType models.LimitType
rejectPolicy limiterRejectPolicy
resultMap map[int]limitResult
minRealAmount int64
}

type limitResult struct {
isLoss bool
planAmount int64
realAmount int64
}

func newLimitResult(isLoss bool, planAmount int64, realAmount int64) limitResult {
return limitResult{
isLoss: isLoss,
planAmount: planAmount,
realAmount: realAmount,
}
}

func newLimiterRunner(limitCode string, limitType models.LimitType, userId, amount int64) *limiterRunner {
func newLimiterRunner(limitCode string, limitType models.LimitType, userId, amount int64, policy limiterRejectPolicy) *limiterRunner {
return &limiterRunner{
userId: userId,
amount: amount,
limitCode: limitCode,
limitType: limitType,
index: 0,
userId: userId,
amount: amount,
limitCode: limitCode,
limitType: limitType,
index: 0,
rejectPolicy: policy,
resultMap: make(map[int]limitResult, 0),
}
}

//Run run all limiters
//return real used amount(when choose the FillUp reject policy, amount may only be partially used)
func (l *limiterRunner) Run() error {
if err := l.LoadLimiters(); err != nil {
return err
}
l.minRealAmount = l.amount
for l.index < len(l.limiters) {
err := l.limit(l.limiters[l.index])
if err != nil {
log.Info("limiter check failed,%v", err)
l.Rollback(l.index)
l.Rollback()
return err
}
result := l.resultMap[l.index]
if result.isLoss {
//find the minimum real amount
if l.minRealAmount > result.realAmount {
l.minRealAmount = result.realAmount
}
}
l.index += 1
}

//post process
l.PostProcess()
return nil
}

//Rollback rollback the usedNum from limiters[0] to limiters[index]
func (l *limiterRunner) Rollback(index int) error {
for i := index; i >= 0; i-- {
l.rollback(l.limiters[i])
func (l *limiterRunner) Rollback() error {
for i := l.index - 1; i >= 0; i-- {
l.rollback(l.limiters[i], l.resultMap[i])
}
return nil
}

func (l *limiterRunner) rollback(r models.LimitConfig, result limitResult) error {
p, err := period.GetPeriod(r.RefreshRate)
if err != nil {
return err
}
redisKey := redis_key.LimitCount(l.userId, r.LimitCode, r.LimitType, r.Scope, p)
redis_client.IncrBy(redisKey, -1*result.realAmount)
return nil
}

//PostProcess process loss,if realAmount < planAmount
func (l *limiterRunner) PostProcess() error {
for i := l.index - 1; i >= 0; i-- {
l.postProcess(l.limiters[i], l.resultMap[i])
}
return nil
}

func (l *limiterRunner) rollback(r models.LimitConfig) error {
func (l *limiterRunner) postProcess(r models.LimitConfig, result limitResult) error {
if result.realAmount == l.minRealAmount {
return nil
}
p, err := period.GetPeriod(r.RefreshRate)
if err != nil {
return err
}
diff := result.realAmount - l.minRealAmount
redisKey := redis_key.LimitCount(l.userId, r.LimitCode, r.LimitType, r.Scope, p)
redis_client.IncrBy(redisKey, -1*l.amount)
redis_client.IncrBy(redisKey, -1*diff)
return nil
}

@@ -91,8 +153,25 @@ func (l *limiterRunner) limit(r models.LimitConfig) error {
}
}
if usedNum > r.LimitNum {
return errors.New(fmt.Sprintf("%s:over limit", r.Tittle))
if usedNum-r.LimitNum >= l.amount {
redis_client.IncrBy(redisKey, -1*l.amount)
return errors.New(fmt.Sprintf("%s:over limit", r.Tittle))
}
switch l.rejectPolicy {
case FillUp:
exceed := usedNum - r.LimitNum
realAmount := l.amount - exceed
redis_client.IncrBy(redisKey, -1*exceed)
l.resultMap[l.index] = newLimitResult(true, l.amount, realAmount)
case JustReject:
redis_client.IncrBy(redisKey, -1*l.amount)
return errors.New(fmt.Sprintf("%s:over limit", r.Tittle))
case PermittedOnce:
l.resultMap[l.index] = newLimitResult(false, l.amount, l.amount)
}

}
l.resultMap[l.index] = newLimitResult(false, l.amount, l.amount)
return nil
}

@@ -111,15 +190,33 @@ func (l *limiterRunner) countInPeriod(r models.LimitConfig, p *models.PeriodResu
switch r.LimitType {
case models.LimitTypeTask.Name():
return models.CountTaskAccomplishLogInTaskPeriod(r.ID, l.userId, p)
case models.LimitTypeRewardPoint.Name():
return models.SumRewardAmountInTaskPeriod(models.RewardTypePoint.Name(), r.LimitCode, l.userId, p)
default:
return 0, nil

}
}

func CheckLimitWithFillUp(limitCode string, limitType models.LimitType, userId, amount int64) (int64, error) {
r := newLimiterRunner(limitCode, limitType, userId, amount, FillUp)
err := r.Run()
if err != nil {
return 0, err
}
return r.minRealAmount, nil
}

func CheckLimitWithPermittedOnce(limitCode string, limitType models.LimitType, userId, amount int64) error {
r := newLimiterRunner(limitCode, limitType, userId, amount, PermittedOnce)
err := r.Run()
return err
}

func CheckLimit(limitCode string, limitType models.LimitType, userId, amount int64) error {
r := newLimiterRunner(limitCode, limitType, userId, amount)
return r.Run()
r := newLimiterRunner(limitCode, limitType, userId, amount, JustReject)
err := r.Run()
return err
}

func GetLimiters(limitCode string, limitType models.LimitType) ([]models.LimitConfig, error) {


+ 9
- 1
services/reward/point/point_operate.go View File

@@ -8,16 +8,24 @@ import (
"code.gitea.io/gitea/services/reward/limiter"
"code.gitea.io/gitea/services/reward/point/account"
"errors"
"fmt"
"time"
)

const LossMsg = "达到奖励上限,应得%d积分,实得%d积分"

type PointOperator struct {
}

func (operator *PointOperator) IsLimited(ctx models.RewardOperateContext) bool {
if err := limiter.CheckLimit(ctx.SourceType, models.LimitTypeRewardPoint, ctx.TargetUserId, ctx.Reward.Amount); err != nil {
realAmount, err := limiter.CheckLimitWithFillUp(ctx.SourceType, models.LimitTypeRewardPoint, ctx.TargetUserId, ctx.Reward.Amount)
if err != nil {
return true
}
if realAmount < ctx.Reward.Amount {
ctx.Remark = ctx.Remark + ";" + fmt.Sprintf(LossMsg, ctx.Reward.Amount, realAmount)
ctx.Reward.Amount = realAmount
}
return false
}



Loading…
Cancel
Save