VberPersistenceProvider.cs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. using Abp;
  2. using Abp.Dependency;
  3. using Abp.Domain.Repositories;
  4. using Abp.Domain.Services;
  5. using Abp.Domain.Uow;
  6. using Abp.Linq;
  7. using Abp.Runtime.Session;
  8. using Microsoft.EntityFrameworkCore;
  9. using System;
  10. using System.Collections.Generic;
  11. using System.Linq;
  12. using System.Threading;
  13. using System.Threading.Tasks;
  14. using VberZero.Workflow.DesignInfo;
  15. using VberZero.Workflow.Persistence;
  16. using WorkflowCore.Models;
  17. namespace VberZero.Workflow
  18. {
  19. public class VberPersistenceProvider : DomainService, IVberPersistenceProvider, ISingletonDependency
  20. {
  21. protected readonly IRepository<WorkflowEventInfo, string> _eventRepository;
  22. protected readonly IRepository<WorkflowExecutionPointerInfo, string> _executionPointerRepository;
  23. protected readonly IRepository<WorkflowInfo, string> _workflowRepository;
  24. protected readonly IRepository<WorkflowDefinitionInfo, string> _workflowDefinitionRepository;
  25. protected readonly IRepository<WorkflowSubscriptionInfo, string> _eventSubscriptionRepository;
  26. protected readonly IRepository<WorkflowExecutionErrorInfo, string> _executionErrorRepository;
  27. protected readonly IGuidGenerator _guidGenerator;
  28. protected readonly IAsyncQueryableExecuter _asyncQueryableExecuter;
  29. public IAbpSession AbpSession { get; set; }
  30. public VberPersistenceProvider(IRepository<WorkflowEventInfo, string> eventRepository, IRepository<WorkflowExecutionPointerInfo, string> executionPointerRepository, IRepository<WorkflowInfo, string> workflowRepository, IRepository<WorkflowSubscriptionInfo, string> eventSubscriptionRepository, IGuidGenerator guidGenerator, IAsyncQueryableExecuter asyncQueryableExecuter, IRepository<WorkflowExecutionErrorInfo, string> executionErrorRepository, IRepository<WorkflowDefinitionInfo, string> workflowDefinitionRepository)
  31. {
  32. _eventRepository = eventRepository;
  33. _executionPointerRepository = executionPointerRepository;
  34. _workflowRepository = workflowRepository;
  35. _eventSubscriptionRepository = eventSubscriptionRepository;
  36. _guidGenerator = guidGenerator;
  37. _asyncQueryableExecuter = asyncQueryableExecuter;
  38. _executionErrorRepository = executionErrorRepository;
  39. _workflowDefinitionRepository = workflowDefinitionRepository;
  40. }
  41. [UnitOfWork]
  42. public virtual async Task<string> CreateEventSubscription(EventSubscription subscription, CancellationToken cancellationToken = new())
  43. {
  44. subscription.Id = _guidGenerator.Create().ToString("N");
  45. var persistedSubscription = subscription.ToPersistEntity();
  46. await _eventSubscriptionRepository.InsertAsync(persistedSubscription);
  47. return subscription.Id;
  48. }
  49. [UnitOfWork]
  50. public virtual async Task<string> CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = new())
  51. {
  52. var entity = GenerateWorkflow(workflow);
  53. //if (AbpSession.UserId.HasValue)
  54. //{
  55. // var userCache = AbpSession.g
  56. // entity.CreateUserIdentityName = userCache.FullName;
  57. //}
  58. await _workflowRepository.InsertAsync(entity);
  59. await CurrentUnitOfWork.SaveChangesAsync();
  60. return workflow.Id;
  61. }
  62. protected WorkflowInfo GenerateWorkflow(WorkflowInstance workflow)
  63. {
  64. workflow.Id = _guidGenerator.Create().ToString("N");
  65. var entity = workflow.ToPersistEntity();
  66. entity.TenantId = AbpSession.TenantId;
  67. //entity.CreatorUserId = AbpSession.UserId;
  68. return entity;
  69. }
  70. [UnitOfWork]
  71. public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = new())
  72. {
  73. using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
  74. {
  75. var entity = await _workflowRepository.GetAll()
  76. .Where(x => x.Id == workflow.Id)
  77. .Include(wf => wf.WorkflowDefinitionInfo)
  78. .Include(wf => wf.ExecutionPointers)
  79. .ThenInclude(ep => ep.ExtensionAttributes)
  80. .AsTracking()
  81. .FirstOrDefaultAsync(cancellationToken: cancellationToken);
  82. if (entity == null)
  83. {
  84. entity = GenerateWorkflow(workflow);
  85. await _workflowRepository.InsertAsync(entity);
  86. }
  87. else
  88. {
  89. entity = workflow.ToPersistEntity(entity);
  90. await _workflowRepository.UpdateAsync(entity);
  91. }
  92. await CurrentUnitOfWork.SaveChangesAsync();
  93. }
  94. }
  95. [UnitOfWork]
  96. public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = new())
  97. {
  98. using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
  99. {
  100. var entity = await _workflowRepository.GetAll()
  101. .Where(x => x.Id == workflow.Id)
  102. .Include(wf => wf.WorkflowDefinitionInfo)
  103. .Include(wf => wf.ExecutionPointers)
  104. .ThenInclude(ep => ep.ExtensionAttributes)
  105. .AsTracking()
  106. .FirstOrDefaultAsync(cancellationToken: cancellationToken);
  107. if (entity == null)
  108. {
  109. entity = GenerateWorkflow(workflow);
  110. await _workflowRepository.InsertAsync(entity);
  111. }
  112. else
  113. {
  114. entity = workflow.ToPersistEntity(entity);
  115. await _workflowRepository.UpdateAsync(entity);
  116. }
  117. //TODO
  118. foreach (var subscription in subscriptions)
  119. {
  120. var subEntity = subscription.ToPersistEntity();
  121. subEntity.WorkflowId = entity.Id;
  122. await _eventSubscriptionRepository.InsertAsync(subEntity);
  123. }
  124. await CurrentUnitOfWork.SaveChangesAsync();
  125. }
  126. }
  127. [UnitOfWork]
  128. public virtual async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = new())
  129. {
  130. using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
  131. {
  132. var now = asAt.ToUniversalTime().Ticks;
  133. var query = _workflowRepository.GetAll().Where(x => x.NextExecution.HasValue && (x.NextExecution <= now) && (x.Status == WorkflowStatus.Runnable))
  134. .Select(x => x.Id);
  135. var raw = await _asyncQueryableExecuter.ToListAsync(query);
  136. return raw.Select(s => s.ToString()).ToList();
  137. }
  138. }
  139. [UnitOfWork]
  140. public virtual async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take)
  141. {
  142. using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
  143. {
  144. IQueryable<WorkflowInfo> query = _workflowRepository.GetAll()
  145. .Include(wf => wf.WorkflowDefinitionInfo)
  146. .Include(wf => wf.ExecutionPointers)
  147. .ThenInclude(ep => ep.ExtensionAttributes)
  148. .AsQueryable();
  149. if (status.HasValue)
  150. query = query.Where(x => x.Status == status.Value);
  151. if (!String.IsNullOrEmpty(type))
  152. query = query.Where(x => x.WorkflowDefinitionId == type);
  153. if (createdFrom.HasValue)
  154. query = query.Where(x => x.CreateTime >= createdFrom.Value);
  155. if (createdTo.HasValue)
  156. query = query.Where(x => x.CreateTime <= createdTo.Value);
  157. var rawResult = await query.Skip(skip).Take(take).ToListAsync();
  158. List<WorkflowInstance> result = new List<WorkflowInstance>();
  159. foreach (var item in rawResult)
  160. result.Add(item.ToWorkflowInstance());
  161. return result;
  162. }
  163. }
  164. [UnitOfWork]
  165. public virtual async Task<WorkflowInstance> GetWorkflowInstance(string id, CancellationToken cancellationToken = new())
  166. {
  167. using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
  168. {
  169. var raw = await _workflowRepository.GetAll()
  170. .Include(wf => wf.WorkflowDefinitionInfo)
  171. .Include(wf => wf.ExecutionPointers)
  172. .ThenInclude(ep => ep.ExtensionAttributes)
  173. .FirstAsync(x => x.Id == id, cancellationToken);
  174. return raw.ToWorkflowInstance();
  175. }
  176. }
  177. [UnitOfWork]
  178. public virtual async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids, CancellationToken cancellationToken = new())
  179. {
  180. if (ids == null)
  181. {
  182. return new List<WorkflowInstance>();
  183. }
  184. using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
  185. {
  186. var raw = _workflowRepository.GetAll()
  187. .Include(wf => wf.WorkflowDefinitionInfo)
  188. .Include(wf => wf.ExecutionPointers)
  189. .ThenInclude(ep => ep.ExtensionAttributes)
  190. .Where(x => ids.Contains(x.Id));
  191. return (await raw.ToListAsync(cancellationToken: cancellationToken)).Select(i => i.ToWorkflowInstance());
  192. }
  193. }
  194. [UnitOfWork]
  195. public virtual async Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken = new())
  196. {
  197. var existing = await _eventSubscriptionRepository.FirstOrDefaultAsync(x => x.Id == eventSubscriptionId);
  198. await _eventSubscriptionRepository.DeleteAsync(existing);
  199. await CurrentUnitOfWork.SaveChangesAsync();
  200. }
  201. [UnitOfWork]
  202. public virtual void EnsureStoreExists()
  203. {
  204. }
  205. [UnitOfWork]
  206. public virtual async Task<IEnumerable<EventSubscription>> GetSubscriptions(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = new())
  207. {
  208. asOf = asOf.ToUniversalTime();
  209. var raw = await _eventSubscriptionRepository.GetAll()
  210. .Where(x => x.EventName == eventName && x.EventKey == eventKey && x.SubscribeAsOf <= asOf)
  211. .ToListAsync(cancellationToken);
  212. return raw.Select(item => item.ToEventSubscription()).ToList();
  213. }
  214. [UnitOfWork]
  215. public virtual async Task<string> CreateEvent(Event newEvent, CancellationToken cancellationToken = new())
  216. {
  217. using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
  218. {
  219. newEvent.Id = _guidGenerator.Create().ToString("N");
  220. var persistEntity = newEvent.ToPersistEntity();
  221. persistEntity.TenantId = AbpSession.TenantId;
  222. var entity = await _eventRepository.InsertAsync(persistEntity);
  223. await CurrentUnitOfWork.SaveChangesAsync();
  224. return entity.Id;
  225. }
  226. }
  227. [UnitOfWork]
  228. public virtual async Task<Event> GetEvent(string id, CancellationToken cancellationToken = new())
  229. {
  230. using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
  231. {
  232. var raw = await _eventRepository
  233. .FirstOrDefaultAsync(x => x.Id == id);
  234. if (raw == null)
  235. return null;
  236. return raw.ToEvent();
  237. }
  238. }
  239. [UnitOfWork]
  240. public virtual async Task<IEnumerable<string>> GetRunnableEvents(DateTime asAt, CancellationToken cancellationToken = new())
  241. {
  242. using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
  243. {
  244. var now = asAt.ToUniversalTime();
  245. // asAt = asAt.ToUniversalTime();
  246. var raw = await _eventRepository.GetAll()
  247. .Where(x => !x.IsProcessed)
  248. .Where(x => x.EventTime <= now)
  249. .Select(x => x.Id)
  250. .ToListAsync(cancellationToken);
  251. return raw.Select(s => s.ToString()).ToList();
  252. }
  253. }
  254. [UnitOfWork]
  255. public virtual async Task MarkEventProcessed(string id, CancellationToken cancellationToken = new())
  256. {
  257. using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
  258. {
  259. var existingEntity = await _eventRepository.GetAll()
  260. .Where(x => x.Id == id)
  261. .AsTracking()
  262. .FirstAsync(cancellationToken);
  263. existingEntity.IsProcessed = true;
  264. await CurrentUnitOfWork.SaveChangesAsync();
  265. }
  266. }
  267. [UnitOfWork]
  268. public virtual async Task<IEnumerable<string>> GetEvents(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = new())
  269. {
  270. using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
  271. {
  272. var raw = await _eventRepository.GetAll()
  273. .Where(x => x.EventName == eventName && x.EventKey == eventKey)
  274. .Where(x => x.EventTime >= asOf)
  275. .Select(x => x.Id)
  276. .ToListAsync(cancellationToken);
  277. var result = new List<string>();
  278. foreach (var s in raw)
  279. result.Add(s);
  280. return result;
  281. }
  282. }
  283. [UnitOfWork]
  284. public virtual async Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = new())
  285. {
  286. using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
  287. {
  288. var existingEntity = await _eventRepository.GetAll()
  289. .Where(x => x.Id == id)
  290. .AsTracking()
  291. .FirstAsync(cancellationToken);
  292. existingEntity.IsProcessed = false;
  293. await CurrentUnitOfWork.SaveChangesAsync();
  294. }
  295. }
  296. [UnitOfWork]
  297. public virtual async Task PersistErrors(IEnumerable<ExecutionError> errors, CancellationToken cancellationToken = new())
  298. {
  299. var executionErrors = errors as ExecutionError[] ?? errors.ToArray();
  300. if (executionErrors.Any())
  301. {
  302. foreach (var error in executionErrors)
  303. {
  304. await _executionErrorRepository.InsertAsync(error.ToPersistEntity());
  305. }
  306. await CurrentUnitOfWork.SaveChangesAsync();
  307. }
  308. }
  309. [UnitOfWork]
  310. public virtual async Task<EventSubscription> GetSubscription(string eventSubscriptionId, CancellationToken cancellationToken = new())
  311. {
  312. var raw = await _eventSubscriptionRepository.FirstOrDefaultAsync(x => x.Id == eventSubscriptionId);
  313. return raw?.ToEventSubscription();
  314. }
  315. [UnitOfWork]
  316. public virtual async Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = new())
  317. {
  318. var raw = await _eventSubscriptionRepository.FirstOrDefaultAsync(x => x.EventName == eventName && x.EventKey == eventKey && x.SubscribeAsOf <= asOf && x.ExternalToken == null);
  319. return raw?.ToEventSubscription();
  320. }
  321. [UnitOfWork]
  322. public virtual async Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = new())
  323. {
  324. var existingEntity = await _eventSubscriptionRepository.GetAll()
  325. .Where(x => x.Id == eventSubscriptionId)
  326. .AsTracking()
  327. .FirstAsync(cancellationToken);
  328. existingEntity.ExternalToken = token;
  329. existingEntity.ExternalWorkerId = workerId;
  330. existingEntity.ExternalTokenExpiry = expiry;
  331. await CurrentUnitOfWork.SaveChangesAsync();
  332. return true;
  333. }
  334. [UnitOfWork]
  335. public virtual async Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = new())
  336. {
  337. var existingEntity = await _eventSubscriptionRepository.GetAll()
  338. .Where(x => x.Id == eventSubscriptionId)
  339. .AsTracking()
  340. .FirstAsync(cancellationToken);
  341. if (existingEntity.ExternalToken != token)
  342. throw new InvalidOperationException();
  343. existingEntity.ExternalToken = null;
  344. existingEntity.ExternalWorkerId = null;
  345. existingEntity.ExternalTokenExpiry = null;
  346. await CurrentUnitOfWork.SaveChangesAsync();
  347. }
  348. public Task<WorkflowInfo> GetPersistedWorkflow(string id, CancellationToken cancellationToken = new())
  349. {
  350. using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
  351. {
  352. return _workflowRepository.GetAsync(id);
  353. }
  354. }
  355. public Task<WorkflowDefinitionInfo> GetPersistedWorkflowDefinition(string id, int version, CancellationToken cancellationToken = new())
  356. {
  357. using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
  358. {
  359. return _workflowDefinitionRepository.GetAll().AsNoTracking().FirstOrDefaultAsync(u => u.Id == id && u.Version == version, cancellationToken: cancellationToken);
  360. }
  361. }
  362. public Task<WorkflowExecutionPointerInfo> GetPersistedExecutionPointer(string id, CancellationToken cancellationToken = new())
  363. {
  364. using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
  365. {
  366. return _executionPointerRepository.GetAsync(id);
  367. }
  368. }
  369. /// <summary>
  370. ///
  371. /// </summary>
  372. /// <param name="definitionId"></param>
  373. /// <param name="version"></param>
  374. /// <param name="cancellationToken"></param>
  375. /// <returns></returns>
  376. public async Task<IEnumerable<WorkflowInfo>> GetAllRunnablePersistedWorkflow(string definitionId, int version, CancellationToken cancellationToken = new())
  377. {
  378. using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
  379. {
  380. return await _workflowRepository.GetAll().Where(u => u.WorkflowDefinitionId == definitionId && u.Version == version).ToListAsync(cancellationToken: cancellationToken);
  381. }
  382. }
  383. public Task ScheduleCommand(ScheduledCommand command)
  384. {
  385. //throw new NotImplementedException();
  386. return Task.CompletedTask;
  387. }
  388. public Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> action, CancellationToken cancellationToken = new())
  389. {
  390. //throw new NotImplementedException();
  391. return Task.CompletedTask;
  392. }
  393. public bool SupportsScheduledCommands { get; set; }
  394. }
  395. }