BackgroundJobManager.cs 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. using System;
  2. using System.Reflection;
  3. using System.Threading.Tasks;
  4. using Abp.Dependency;
  5. using Abp.Events.Bus;
  6. using Abp.Events.Bus.Exceptions;
  7. using Abp.Json;
  8. using Abp.Threading;
  9. using Abp.Threading.BackgroundWorkers;
  10. using Abp.Threading.Timers;
  11. using Abp.Timing;
  12. using Newtonsoft.Json;
  13. namespace Abp.BackgroundJobs
  14. {
  15. /// <summary>
  16. /// Default implementation of <see cref="IBackgroundJobManager"/>.
  17. /// </summary>
  18. public class BackgroundJobManager : PeriodicBackgroundWorkerBase, IBackgroundJobManager, ISingletonDependency
  19. {
  20. public IEventBus EventBus { get; set; }
  21. /// <summary>
  22. /// Interval between polling jobs from <see cref="IBackgroundJobStore"/>.
  23. /// Default value: 5000 (5 seconds).
  24. /// </summary>
  25. public static int JobPollPeriod { get; set; }
  26. private readonly IIocResolver _iocResolver;
  27. private readonly IBackgroundJobStore _store;
  28. static BackgroundJobManager()
  29. {
  30. JobPollPeriod = 5000;
  31. }
  32. /// <summary>
  33. /// Initializes a new instance of the <see cref="BackgroundJobManager"/> class.
  34. /// </summary>
  35. public BackgroundJobManager(
  36. IIocResolver iocResolver,
  37. IBackgroundJobStore store,
  38. AbpTimer timer)
  39. : base(timer)
  40. {
  41. _store = store;
  42. _iocResolver = iocResolver;
  43. EventBus = NullEventBus.Instance;
  44. Timer.Period = JobPollPeriod;
  45. }
  46. public async Task<string> EnqueueAsync<TJob, TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
  47. where TJob : IBackgroundJob<TArgs>
  48. {
  49. var jobInfo = new BackgroundJobInfo
  50. {
  51. JobType = typeof(TJob).AssemblyQualifiedName,
  52. JobArgs = args.ToJsonString(),
  53. Priority = priority
  54. };
  55. if (delay.HasValue)
  56. {
  57. jobInfo.NextTryTime = Clock.Now.Add(delay.Value);
  58. }
  59. await _store.InsertAsync(jobInfo);
  60. return jobInfo.Id.ToString();
  61. }
  62. public async Task<bool> DeleteAsync(string jobId)
  63. {
  64. if (long.TryParse(jobId, out long finalJobId) == false)
  65. {
  66. throw new ArgumentException($"The jobId '{jobId}' should be a number.", nameof(jobId));
  67. }
  68. BackgroundJobInfo jobInfo = await _store.GetAsync(finalJobId);
  69. if (jobInfo == null)
  70. {
  71. return false;
  72. }
  73. await _store.DeleteAsync(jobInfo);
  74. return true;
  75. }
  76. protected override void DoWork()
  77. {
  78. var waitingJobs = AsyncHelper.RunSync(() => _store.GetWaitingJobsAsync(1000));
  79. foreach (var job in waitingJobs)
  80. {
  81. TryProcessJob(job);
  82. }
  83. }
  84. private void TryProcessJob(BackgroundJobInfo jobInfo)
  85. {
  86. try
  87. {
  88. jobInfo.TryCount++;
  89. jobInfo.LastTryTime = Clock.Now;
  90. var jobType = Type.GetType(jobInfo.JobType);
  91. using (var job = _iocResolver.ResolveAsDisposable(jobType))
  92. {
  93. try
  94. {
  95. var jobExecuteMethod = job.Object.GetType().GetTypeInfo().GetMethod("Execute");
  96. var argsType = jobExecuteMethod.GetParameters()[0].ParameterType;
  97. var argsObj = JsonConvert.DeserializeObject(jobInfo.JobArgs, argsType);
  98. jobExecuteMethod.Invoke(job.Object, new[] { argsObj });
  99. AsyncHelper.RunSync(() => _store.DeleteAsync(jobInfo));
  100. }
  101. catch (Exception ex)
  102. {
  103. Logger.Warn(ex.Message, ex);
  104. var nextTryTime = jobInfo.CalculateNextTryTime();
  105. if (nextTryTime.HasValue)
  106. {
  107. jobInfo.NextTryTime = nextTryTime.Value;
  108. }
  109. else
  110. {
  111. jobInfo.IsAbandoned = true;
  112. }
  113. TryUpdate(jobInfo);
  114. EventBus.Trigger(
  115. this,
  116. new AbpHandledExceptionData(
  117. new BackgroundJobException(
  118. "A background job execution is failed. See inner exception for details. See BackgroundJob property to get information on the background job.",
  119. ex
  120. )
  121. {
  122. BackgroundJob = jobInfo,
  123. JobObject = job.Object
  124. }
  125. )
  126. );
  127. }
  128. }
  129. }
  130. catch (Exception ex)
  131. {
  132. Logger.Warn(ex.ToString(), ex);
  133. jobInfo.IsAbandoned = true;
  134. TryUpdate(jobInfo);
  135. }
  136. }
  137. private void TryUpdate(BackgroundJobInfo jobInfo)
  138. {
  139. try
  140. {
  141. _store.UpdateAsync(jobInfo);
  142. }
  143. catch (Exception updateEx)
  144. {
  145. Logger.Warn(updateEx.ToString(), updateEx);
  146. }
  147. }
  148. }
  149. }