You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

operator.go 8.7 kB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago

  1. package reward
  2. import (
  3. "code.gitea.io/gitea/models"
  4. "code.gitea.io/gitea/modules/log"
  5. "code.gitea.io/gitea/modules/redis/redis_key"
  6. "code.gitea.io/gitea/modules/redis/redis_lock"
  7. "code.gitea.io/gitea/services/reward/point"
  8. "errors"
  9. "fmt"
  10. "time"
  11. )
  12. var RewardOperatorMap = map[string]RewardOperator{
  13. fmt.Sprint(models.RewardTypePoint): new(point.PointOperator),
  14. }
  15. type RewardOperator interface {
  16. IsLimited(ctx *models.RewardOperateContext) error
  17. Operate(ctx *models.RewardOperateContext) error
  18. }
  19. func Operate(ctx *models.RewardOperateContext) error {
  20. defer func() {
  21. if err := recover(); err != nil {
  22. combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2))
  23. log.Error("PANIC:%v", combinedErr)
  24. }
  25. }()
  26. if !checkRewardOperationParam(ctx) {
  27. log.Error("send reward error,param incorrect")
  28. return errors.New("param incorrect")
  29. }
  30. //add lock
  31. var rewardLock = redis_lock.NewDistributeLock(redis_key.RewardOperateLock(ctx.RequestId, ctx.SourceType.Name(), ctx.OperateType.Name()))
  32. isOk, err := rewardLock.Lock(3 * time.Second)
  33. if err != nil {
  34. return err
  35. }
  36. if !isOk {
  37. log.Info("duplicated reward request,targetUserId=%d requestId=%s", ctx.TargetUserId, ctx.RequestId)
  38. return nil
  39. }
  40. defer rewardLock.UnLock()
  41. //is handled before?
  42. isHandled, err := isHandled(ctx.SourceType.Name(), ctx.RequestId, ctx.OperateType.Name())
  43. if err != nil {
  44. log.Error("reward is handled error,%v", err)
  45. return err
  46. }
  47. if isHandled {
  48. log.Info("reward has been handled,ctx=%+v", ctx)
  49. return nil
  50. }
  51. //get operator
  52. operator := GetOperator(ctx.Reward.Type)
  53. if operator == nil {
  54. log.Error("operator of reward type is not exist,ctx=%v", ctx)
  55. return errors.New("operator of reward type is not exist")
  56. }
  57. if ctx.OperateType == models.OperateTypeIncrease {
  58. //is limited?
  59. if err := operator.IsLimited(ctx); err != nil {
  60. log.Info("operator IsLimited, err=%v", err)
  61. return err
  62. }
  63. }
  64. //new reward operate record
  65. recordId, err := initRewardOperateRecord(ctx)
  66. if err != nil {
  67. log.Error("initRewardOperateRecord error,err=%v", err)
  68. return err
  69. }
  70. ctx.SourceId = recordId
  71. //operate
  72. if err := operator.Operate(ctx); err != nil {
  73. log.Error("operator Operate error,err=%v", err)
  74. UpdateRewardRecordToFinalStatus(ctx.SourceType.Name(), ctx.RequestId, models.OperateStatusFailed)
  75. return err
  76. }
  77. UpdateRewardRecordToFinalStatus(ctx.SourceType.Name(), ctx.RequestId, models.OperateStatusSucceeded)
  78. NotifyRewardOperation(ctx.TargetUserId, ctx.Reward.Amount, ctx.Reward.Type, ctx.OperateType)
  79. return nil
  80. }
  81. func checkRewardOperationParam(ctx *models.RewardOperateContext) bool {
  82. if ctx.Reward.Type == "" {
  83. return false
  84. }
  85. return true
  86. }
  87. func GetOperator(rewardType models.RewardType) RewardOperator {
  88. return RewardOperatorMap[rewardType.Name()]
  89. }
  90. func isHandled(sourceType string, requestId string, operateType string) (bool, error) {
  91. _, err := models.GetPointOperateRecordBySourceTypeAndRequestId(sourceType, requestId, operateType)
  92. if err != nil {
  93. log.Error("operator isHandled error. %v", err)
  94. if models.IsErrRecordNotExist(err) {
  95. return false, nil
  96. }
  97. log.Error("GetPointOperateRecordBySourceTypeAndRequestId ZRangeByScore error. %v", err)
  98. return false, err
  99. }
  100. return true, nil
  101. }
  102. func initRewardOperateRecord(ctx *models.RewardOperateContext) (string, error) {
  103. sn, err := generateOperateSerialNo(ctx.OperateType, ctx.Reward.Type)
  104. if err != nil {
  105. log.Error("generateOperateSerialNo error. %v", err)
  106. return "", err
  107. }
  108. record := &models.RewardOperateRecord{
  109. UserId: ctx.TargetUserId,
  110. Amount: ctx.Reward.Amount,
  111. LossAmount: ctx.LossAmount,
  112. RewardType: ctx.Reward.Type.Name(),
  113. SourceType: ctx.SourceType.Name(),
  114. SourceId: ctx.SourceId,
  115. RequestId: ctx.RequestId,
  116. OperateType: ctx.OperateType.Name(),
  117. Status: models.OperateStatusOperating,
  118. Remark: ctx.Remark,
  119. Tittle: ctx.Tittle,
  120. SerialNo: sn,
  121. }
  122. _, err = models.InsertRewardOperateRecord(record)
  123. if err != nil {
  124. log.Error("InsertRewardOperateRecord error. %v", err)
  125. return "", err
  126. }
  127. return record.SerialNo, nil
  128. }
  129. func createPeriodicRewardOperateRecord(ctx *models.StartPeriodicTaskOpts) (string, error) {
  130. sn, err := generateOperateSerialNo(ctx.OperateType, ctx.RewardType)
  131. if err != nil {
  132. log.Error("createPeriodic generateOperateSerialNo error. %v", err)
  133. return "", err
  134. }
  135. record := &models.RewardOperateRecord{
  136. UserId: ctx.TargetUserId,
  137. Amount: 0,
  138. RewardType: ctx.RewardType.Name(),
  139. SourceType: ctx.SourceType.Name(),
  140. SourceId: ctx.SourceId,
  141. RequestId: ctx.RequestId,
  142. OperateType: ctx.OperateType.Name(),
  143. Status: models.OperateStatusOperating,
  144. Remark: ctx.Remark,
  145. Tittle: ctx.Tittle,
  146. SerialNo: sn,
  147. }
  148. _, err = models.InsertRewardOperateRecord(record)
  149. if err != nil {
  150. log.Error("createPeriodic InsertRewardOperateRecord error. %v", err)
  151. return "", err
  152. }
  153. return record.SerialNo, nil
  154. }
  155. func UpdateRewardRecordToFinalStatus(sourceType, requestId, newStatus string) error {
  156. _, err := models.UpdateRewardRecordToFinalStatus(sourceType, requestId, newStatus)
  157. if err != nil {
  158. log.Error("UpdateRewardRecord UpdateRewardRecordToFinalStatus error. %v", err)
  159. return err
  160. }
  161. return nil
  162. }
  163. func StartPeriodicTaskAsyn(opts *models.StartPeriodicTaskOpts) {
  164. go StartAndGetPeriodicTask(opts)
  165. }
  166. func StartAndGetPeriodicTask(opts *models.StartPeriodicTaskOpts) (*models.RewardPeriodicTask, error) {
  167. defer func() {
  168. if err := recover(); err != nil {
  169. combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2))
  170. log.Error("PANIC:%v", combinedErr)
  171. }
  172. }()
  173. //add lock
  174. var rewardLock = redis_lock.NewDistributeLock(redis_key.RewardOperateLock(opts.RequestId, opts.SourceType.Name(), opts.OperateType.Name()))
  175. isOk, err := rewardLock.Lock(3 * time.Second)
  176. if err != nil {
  177. log.Error("StartAndGetPeriodicTask RewardOperateLock error. %v", err)
  178. return nil, err
  179. }
  180. if !isOk {
  181. log.Info("duplicated operate request,targetUserId=%d requestId=%s", opts.TargetUserId, opts.RequestId)
  182. return nil, nil
  183. }
  184. defer rewardLock.UnLock()
  185. _, err = models.GetPointOperateRecordBySourceTypeAndRequestId(opts.SourceType.Name(), opts.RequestId, opts.OperateType.Name())
  186. if err == nil {
  187. task, err := models.GetPeriodicTaskBySourceIdAndType(opts.SourceType, opts.SourceId, opts.OperateType)
  188. if err != nil {
  189. log.Error("GetPeriodicTaskBySourceIdAndType error,%v", err)
  190. return nil, err
  191. }
  192. return task, nil
  193. }
  194. if err != nil && !models.IsErrRecordNotExist(err) {
  195. log.Error("operate is handled error,%v", err)
  196. return nil, err
  197. }
  198. //new reward operate record
  199. recordId, err := createPeriodicRewardOperateRecord(opts)
  200. if err != nil {
  201. log.Error("StartAndGetPeriodicTask createPeriodicRewardOperateRecord error. %v", err)
  202. return nil, err
  203. }
  204. if err = NewRewardPeriodicTask(recordId, opts); err != nil {
  205. log.Error("StartAndGetPeriodicTask NewRewardPeriodicTask error. %v", err)
  206. UpdateRewardRecordToFinalStatus(opts.SourceType.Name(), opts.RequestId, models.OperateStatusFailed)
  207. return nil, err
  208. }
  209. task, err := models.GetPeriodicTaskBySourceIdAndType(opts.SourceType, opts.SourceId, opts.OperateType)
  210. if err != nil {
  211. log.Error("GetPeriodicTaskBySourceIdAndType error,%v", err)
  212. return nil, err
  213. }
  214. return task, nil
  215. }
  216. func StopPeriodicTaskAsyn(sourceType models.SourceType, sourceId string, operateType models.RewardOperateType) {
  217. go StopPeriodicTask(sourceType, sourceId, operateType)
  218. }
  219. func StopPeriodicTask(sourceType models.SourceType, sourceId string, operateType models.RewardOperateType) error {
  220. defer func() {
  221. if err := recover(); err != nil {
  222. combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2))
  223. log.Error("PANIC:%v", combinedErr)
  224. }
  225. }()
  226. task, err := models.GetPeriodicTaskBySourceIdAndType(sourceType, sourceId, operateType)
  227. if err != nil {
  228. log.Error("StopPeriodicTask. GetPeriodicTaskBySourceIdAndType error. %v", err)
  229. return err
  230. }
  231. if task == nil {
  232. log.Info("Periodic task is not exist")
  233. return nil
  234. }
  235. if task.Status == models.PeriodicTaskStatusFinished {
  236. log.Info("Periodic task is finished")
  237. return nil
  238. }
  239. now := time.Now()
  240. RunRewardTask(*task, now)
  241. return models.StopPeriodicTask(task.ID, task.OperateSerialNo, now)
  242. }
  243. func generateOperateSerialNo(operateType models.RewardOperateType, rewardType models.RewardType) (string, error) {
  244. s, err := GetSerialNoByRedis()
  245. if err != nil {
  246. log.Error("generateOperateSerialNo error. %v", err)
  247. return "", err
  248. }
  249. switch operateType {
  250. case models.OperateTypeIncrease:
  251. s += "1"
  252. case models.OperateTypeDecrease:
  253. s += "2"
  254. default:
  255. s += "9"
  256. }
  257. switch rewardType {
  258. case models.RewardTypePoint:
  259. s += "1"
  260. default:
  261. s += "9"
  262. }
  263. return s, nil
  264. }