using System;
using System.Reflection;
using System.Threading.Tasks;
using Abp.Dependency;
using Abp.Events.Bus;
using Abp.Events.Bus.Exceptions;
using Abp.Json;
using Abp.Threading;
using Abp.Threading.BackgroundWorkers;
using Abp.Threading.Timers;
using Abp.Timing;
using Newtonsoft.Json;
namespace Abp.BackgroundJobs
{
///
/// Default implementation of .
///
public class BackgroundJobManager : PeriodicBackgroundWorkerBase, IBackgroundJobManager, ISingletonDependency
{
public IEventBus EventBus { get; set; }
///
/// Interval between polling jobs from .
/// Default value: 5000 (5 seconds).
///
public static int JobPollPeriod { get; set; }
private readonly IIocResolver _iocResolver;
private readonly IBackgroundJobStore _store;
static BackgroundJobManager()
{
JobPollPeriod = 5000;
}
///
/// Initializes a new instance of the class.
///
public BackgroundJobManager(
IIocResolver iocResolver,
IBackgroundJobStore store,
AbpTimer timer)
: base(timer)
{
_store = store;
_iocResolver = iocResolver;
EventBus = NullEventBus.Instance;
Timer.Period = JobPollPeriod;
}
public async Task EnqueueAsync(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
where TJob : IBackgroundJob
{
var jobInfo = new BackgroundJobInfo
{
JobType = typeof(TJob).AssemblyQualifiedName,
JobArgs = args.ToJsonString(),
Priority = priority
};
if (delay.HasValue)
{
jobInfo.NextTryTime = Clock.Now.Add(delay.Value);
}
await _store.InsertAsync(jobInfo);
return jobInfo.Id.ToString();
}
public async Task DeleteAsync(string jobId)
{
if (long.TryParse(jobId, out long finalJobId) == false)
{
throw new ArgumentException($"The jobId '{jobId}' should be a number.", nameof(jobId));
}
BackgroundJobInfo jobInfo = await _store.GetAsync(finalJobId);
if (jobInfo == null)
{
return false;
}
await _store.DeleteAsync(jobInfo);
return true;
}
protected override void DoWork()
{
var waitingJobs = AsyncHelper.RunSync(() => _store.GetWaitingJobsAsync(1000));
foreach (var job in waitingJobs)
{
TryProcessJob(job);
}
}
private void TryProcessJob(BackgroundJobInfo jobInfo)
{
try
{
jobInfo.TryCount++;
jobInfo.LastTryTime = Clock.Now;
var jobType = Type.GetType(jobInfo.JobType);
using (var job = _iocResolver.ResolveAsDisposable(jobType))
{
try
{
var jobExecuteMethod = job.Object.GetType().GetTypeInfo().GetMethod("Execute");
var argsType = jobExecuteMethod.GetParameters()[0].ParameterType;
var argsObj = JsonConvert.DeserializeObject(jobInfo.JobArgs, argsType);
jobExecuteMethod.Invoke(job.Object, new[] { argsObj });
AsyncHelper.RunSync(() => _store.DeleteAsync(jobInfo));
}
catch (Exception ex)
{
Logger.Warn(ex.Message, ex);
var nextTryTime = jobInfo.CalculateNextTryTime();
if (nextTryTime.HasValue)
{
jobInfo.NextTryTime = nextTryTime.Value;
}
else
{
jobInfo.IsAbandoned = true;
}
TryUpdate(jobInfo);
EventBus.Trigger(
this,
new AbpHandledExceptionData(
new BackgroundJobException(
"A background job execution is failed. See inner exception for details. See BackgroundJob property to get information on the background job.",
ex
)
{
BackgroundJob = jobInfo,
JobObject = job.Object
}
)
);
}
}
}
catch (Exception ex)
{
Logger.Warn(ex.ToString(), ex);
jobInfo.IsAbandoned = true;
TryUpdate(jobInfo);
}
}
private void TryUpdate(BackgroundJobInfo jobInfo)
{
try
{
_store.UpdateAsync(jobInfo);
}
catch (Exception updateEx)
{
Logger.Warn(updateEx.ToString(), updateEx);
}
}
}
}