You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

operator.go 8.8 kB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago

  1. package reward
  2. import (
  3. "code.gitea.io/gitea/models"
  4. "code.gitea.io/gitea/modules/log"
  5. "code.gitea.io/gitea/modules/redis/redis_key"
  6. "code.gitea.io/gitea/modules/redis/redis_lock"
  7. "code.gitea.io/gitea/services/reward/point"
  8. "errors"
  9. "fmt"
  10. "time"
  11. )
  12. var RewardOperatorMap = map[string]RewardOperator{
  13. fmt.Sprint(models.RewardTypePoint): new(point.PointOperator),
  14. }
  15. type RewardOperator interface {
  16. IsLimited(ctx *models.RewardOperateContext) error
  17. Operate(ctx *models.RewardOperateContext) error
  18. }
  19. func Operate(ctx *models.RewardOperateContext) error {
  20. defer func() {
  21. if err := recover(); err != nil {
  22. combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2))
  23. log.Error("PANIC:%v", combinedErr)
  24. }
  25. }()
  26. if !checkRewardOperationParam(ctx) {
  27. log.Error("send reward error,param incorrect")
  28. return errors.New("param incorrect")
  29. }
  30. //add lock
  31. var rewardLock = redis_lock.NewDistributeLock(redis_key.RewardOperateLock(ctx.RequestId, ctx.SourceType.Name(), ctx.OperateType.Name()))
  32. isOk, err := rewardLock.Lock(3 * time.Second)
  33. if err != nil {
  34. return err
  35. }
  36. if !isOk {
  37. log.Info("duplicated reward request,targetUserId=%d requestId=%s", ctx.TargetUserId, ctx.RequestId)
  38. return nil
  39. }
  40. defer rewardLock.UnLock()
  41. //is handled before?
  42. isHandled, err := isHandled(ctx.SourceType.Name(), ctx.RequestId, ctx.OperateType.Name())
  43. if err != nil {
  44. log.Error("reward is handled error,%v", err)
  45. return err
  46. }
  47. if isHandled {
  48. log.Info("reward has been handled,ctx=%+v", ctx)
  49. return nil
  50. }
  51. //get operator
  52. operator := GetOperator(ctx.Reward.Type)
  53. if operator == nil {
  54. log.Error("operator of reward type is not exist,ctx=%v", ctx)
  55. return errors.New("operator of reward type is not exist")
  56. }
  57. if ctx.OperateType == models.OperateTypeIncrease {
  58. //is limited?
  59. if err := operator.IsLimited(ctx); err != nil {
  60. log.Info("operator IsLimited, err=%v", err)
  61. return err
  62. }
  63. }
  64. //new reward operate record
  65. recordId, err := initRewardOperateRecord(ctx)
  66. if err != nil {
  67. log.Error("initRewardOperateRecord error,err=%v", err)
  68. return err
  69. }
  70. ctx.SourceId = recordId
  71. //operate
  72. if err := operator.Operate(ctx); err != nil {
  73. log.Error("operator Operate error,err=%v", err)
  74. UpdateRewardRecordToFinalStatus(ctx.SourceType.Name(), ctx.RequestId, models.OperateStatusFailed)
  75. return err
  76. }
  77. UpdateRewardRecordToFinalStatus(ctx.SourceType.Name(), ctx.RequestId, models.OperateStatusSucceeded)
  78. NotifyRewardOperation(ctx.TargetUserId, ctx.Reward.Amount, ctx.SourceType, ctx.Reward.Type, ctx.OperateType)
  79. return nil
  80. }
  81. func checkRewardOperationParam(ctx *models.RewardOperateContext) bool {
  82. if ctx.Reward.Type == "" {
  83. return false
  84. }
  85. return true
  86. }
  87. func GetOperator(rewardType models.RewardType) RewardOperator {
  88. return RewardOperatorMap[rewardType.Name()]
  89. }
  90. func isHandled(sourceType string, requestId string, operateType string) (bool, error) {
  91. _, err := models.GetPointOperateRecordBySourceTypeAndRequestId(sourceType, requestId, operateType)
  92. if err != nil {
  93. log.Error("operator isHandled error. %v", err)
  94. if models.IsErrRecordNotExist(err) {
  95. return false, nil
  96. }
  97. log.Error("GetPointOperateRecordBySourceTypeAndRequestId ZRangeByScore error. %v", err)
  98. return false, err
  99. }
  100. return true, nil
  101. }
  102. func initRewardOperateRecord(ctx *models.RewardOperateContext) (string, error) {
  103. sn, err := generateOperateSerialNo(ctx.OperateType, ctx.Reward.Type)
  104. if err != nil {
  105. log.Error("generateOperateSerialNo error. %v", err)
  106. return "", err
  107. }
  108. record := &models.RewardOperateRecord{
  109. UserId: ctx.TargetUserId,
  110. Amount: ctx.Reward.Amount,
  111. LossAmount: ctx.LossAmount,
  112. RewardType: ctx.Reward.Type.Name(),
  113. SourceType: ctx.SourceType.Name(),
  114. SourceId: ctx.SourceId,
  115. SourceTemplateId: ctx.SourceTemplateId,
  116. RequestId: ctx.RequestId,
  117. OperateType: ctx.OperateType.Name(),
  118. Status: models.OperateStatusOperating,
  119. Remark: ctx.Remark,
  120. Title: ctx.Title,
  121. SerialNo: sn,
  122. }
  123. _, err = models.InsertRewardOperateRecord(record)
  124. if err != nil {
  125. log.Error("InsertRewardOperateRecord error. %v", err)
  126. return "", err
  127. }
  128. return record.SerialNo, nil
  129. }
  130. func createPeriodicRewardOperateRecord(ctx *models.StartPeriodicTaskOpts) (string, error) {
  131. sn, err := generateOperateSerialNo(ctx.OperateType, ctx.RewardType)
  132. if err != nil {
  133. log.Error("createPeriodic generateOperateSerialNo error. %v", err)
  134. return "", err
  135. }
  136. record := &models.RewardOperateRecord{
  137. UserId: ctx.TargetUserId,
  138. Amount: 0,
  139. RewardType: ctx.RewardType.Name(),
  140. SourceType: ctx.SourceType.Name(),
  141. SourceId: ctx.SourceId,
  142. RequestId: ctx.RequestId,
  143. OperateType: ctx.OperateType.Name(),
  144. Status: models.OperateStatusOperating,
  145. Remark: ctx.Remark,
  146. Title: ctx.Title,
  147. SerialNo: sn,
  148. }
  149. _, err = models.InsertRewardOperateRecord(record)
  150. if err != nil {
  151. log.Error("createPeriodic InsertRewardOperateRecord error. %v", err)
  152. return "", err
  153. }
  154. return record.SerialNo, nil
  155. }
  156. func UpdateRewardRecordToFinalStatus(sourceType, requestId, newStatus string) error {
  157. _, err := models.UpdateRewardRecordToFinalStatus(sourceType, requestId, newStatus)
  158. if err != nil {
  159. log.Error("UpdateRewardRecord UpdateRewardRecordToFinalStatus error. %v", err)
  160. return err
  161. }
  162. return nil
  163. }
  164. func StartPeriodicTaskAsyn(opts *models.StartPeriodicTaskOpts) {
  165. go StartAndGetPeriodicTask(opts)
  166. }
  167. func StartAndGetPeriodicTask(opts *models.StartPeriodicTaskOpts) (*models.RewardPeriodicTask, error) {
  168. defer func() {
  169. if err := recover(); err != nil {
  170. combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2))
  171. log.Error("PANIC:%v", combinedErr)
  172. }
  173. }()
  174. //add lock
  175. var rewardLock = redis_lock.NewDistributeLock(redis_key.RewardOperateLock(opts.RequestId, opts.SourceType.Name(), opts.OperateType.Name()))
  176. isOk, err := rewardLock.Lock(3 * time.Second)
  177. if err != nil {
  178. log.Error("StartAndGetPeriodicTask RewardOperateLock error. %v", err)
  179. return nil, err
  180. }
  181. if !isOk {
  182. log.Info("duplicated operate request,targetUserId=%d requestId=%s", opts.TargetUserId, opts.RequestId)
  183. return nil, nil
  184. }
  185. defer rewardLock.UnLock()
  186. _, err = models.GetPointOperateRecordBySourceTypeAndRequestId(opts.SourceType.Name(), opts.RequestId, opts.OperateType.Name())
  187. if err == nil {
  188. task, err := models.GetPeriodicTaskBySourceIdAndType(opts.SourceType, opts.SourceId, opts.OperateType)
  189. if err != nil {
  190. log.Error("GetPeriodicTaskBySourceIdAndType error,%v", err)
  191. return nil, err
  192. }
  193. return task, nil
  194. }
  195. if err != nil && !models.IsErrRecordNotExist(err) {
  196. log.Error("operate is handled error,%v", err)
  197. return nil, err
  198. }
  199. //new reward operate record
  200. recordId, err := createPeriodicRewardOperateRecord(opts)
  201. if err != nil {
  202. log.Error("StartAndGetPeriodicTask createPeriodicRewardOperateRecord error. %v", err)
  203. return nil, err
  204. }
  205. if err = NewRewardPeriodicTask(recordId, opts); err != nil {
  206. log.Error("StartAndGetPeriodicTask NewRewardPeriodicTask error. %v", err)
  207. UpdateRewardRecordToFinalStatus(opts.SourceType.Name(), opts.RequestId, models.OperateStatusFailed)
  208. return nil, err
  209. }
  210. task, err := models.GetPeriodicTaskBySourceIdAndType(opts.SourceType, opts.SourceId, opts.OperateType)
  211. if err != nil {
  212. log.Error("GetPeriodicTaskBySourceIdAndType error,%v", err)
  213. return nil, err
  214. }
  215. return task, nil
  216. }
  217. func StopPeriodicTaskAsyn(sourceType models.SourceType, sourceId string, operateType models.RewardOperateType) {
  218. go StopPeriodicTask(sourceType, sourceId, operateType)
  219. }
  220. func StopPeriodicTask(sourceType models.SourceType, sourceId string, operateType models.RewardOperateType) error {
  221. defer func() {
  222. if err := recover(); err != nil {
  223. combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2))
  224. log.Error("PANIC:%v", combinedErr)
  225. }
  226. }()
  227. task, err := models.GetPeriodicTaskBySourceIdAndType(sourceType, sourceId, operateType)
  228. if err != nil {
  229. log.Error("StopPeriodicTask. GetPeriodicTaskBySourceIdAndType error. %v", err)
  230. return err
  231. }
  232. if task == nil {
  233. log.Info("Periodic task is not exist")
  234. return nil
  235. }
  236. if task.Status == models.PeriodicTaskStatusFinished {
  237. log.Info("Periodic task is finished")
  238. return nil
  239. }
  240. now := time.Now()
  241. RunRewardTask(*task, now)
  242. return models.StopPeriodicTask(task.ID, task.OperateSerialNo, now)
  243. }
  244. func generateOperateSerialNo(operateType models.RewardOperateType, rewardType models.RewardType) (string, error) {
  245. s, err := GetSerialNoByRedis()
  246. if err != nil {
  247. log.Error("generateOperateSerialNo error. %v", err)
  248. return "", err
  249. }
  250. switch operateType {
  251. case models.OperateTypeIncrease:
  252. s += "1"
  253. case models.OperateTypeDecrease:
  254. s += "2"
  255. default:
  256. s += "9"
  257. }
  258. switch rewardType {
  259. case models.RewardTypePoint:
  260. s += "1"
  261. default:
  262. s += "9"
  263. }
  264. return s, nil
  265. }