| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459 |
- using Abp;
- using Abp.Dependency;
- using Abp.Domain.Repositories;
- using Abp.Domain.Services;
- using Abp.Domain.Uow;
- using Abp.Linq;
- using Abp.Runtime.Session;
- using Microsoft.EntityFrameworkCore;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Threading;
- using System.Threading.Tasks;
- using VberZero.Workflow.DesignInfo;
- using VberZero.Workflow.Persistence;
- using WorkflowCore.Models;
- namespace VberZero.Workflow
- {
- public class VberPersistenceProvider : DomainService, IVberPersistenceProvider, ISingletonDependency
- {
- protected readonly IRepository<WorkflowEventInfo, string> _eventRepository;
- protected readonly IRepository<WorkflowExecutionPointerInfo, string> _executionPointerRepository;
- protected readonly IRepository<WorkflowInfo, string> _workflowRepository;
- protected readonly IRepository<WorkflowDefinitionInfo, string> _workflowDefinitionRepository;
- protected readonly IRepository<WorkflowSubscriptionInfo, string> _eventSubscriptionRepository;
- protected readonly IRepository<WorkflowExecutionErrorInfo, string> _executionErrorRepository;
- protected readonly IGuidGenerator _guidGenerator;
- protected readonly IAsyncQueryableExecuter _asyncQueryableExecuter;
- public IAbpSession AbpSession { get; set; }
- public VberPersistenceProvider(IRepository<WorkflowEventInfo, string> eventRepository, IRepository<WorkflowExecutionPointerInfo, string> executionPointerRepository, IRepository<WorkflowInfo, string> workflowRepository, IRepository<WorkflowSubscriptionInfo, string> eventSubscriptionRepository, IGuidGenerator guidGenerator, IAsyncQueryableExecuter asyncQueryableExecuter, IRepository<WorkflowExecutionErrorInfo, string> executionErrorRepository, IRepository<WorkflowDefinitionInfo, string> workflowDefinitionRepository)
- {
- _eventRepository = eventRepository;
- _executionPointerRepository = executionPointerRepository;
- _workflowRepository = workflowRepository;
- _eventSubscriptionRepository = eventSubscriptionRepository;
- _guidGenerator = guidGenerator;
- _asyncQueryableExecuter = asyncQueryableExecuter;
- _executionErrorRepository = executionErrorRepository;
- _workflowDefinitionRepository = workflowDefinitionRepository;
- }
- [UnitOfWork]
- public virtual async Task<string> CreateEventSubscription(EventSubscription subscription, CancellationToken cancellationToken = new())
- {
- subscription.Id = _guidGenerator.Create().ToString("N");
- var persistedSubscription = subscription.ToPersistEntity();
- await _eventSubscriptionRepository.InsertAsync(persistedSubscription);
- return subscription.Id;
- }
- [UnitOfWork]
- public virtual async Task<string> CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = new())
- {
- var entity = GenerateWorkflow(workflow);
- //if (AbpSession.UserId.HasValue)
- //{
- // var userCache = AbpSession.g
- // entity.CreateUserIdentityName = userCache.FullName;
- //}
- await _workflowRepository.InsertAsync(entity);
- await CurrentUnitOfWork.SaveChangesAsync();
- return workflow.Id;
- }
- protected WorkflowInfo GenerateWorkflow(WorkflowInstance workflow)
- {
- workflow.Id = _guidGenerator.Create().ToString("N");
- var entity = workflow.ToPersistEntity();
- entity.TenantId = AbpSession.TenantId;
- //entity.CreatorUserId = AbpSession.UserId;
- return entity;
- }
- [UnitOfWork]
- public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = new())
- {
- using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
- {
- var entity = await _workflowRepository.GetAll()
- .Where(x => x.Id == workflow.Id)
- .Include(wf => wf.WorkflowDefinitionInfo)
- .Include(wf => wf.ExecutionPointers)
- .ThenInclude(ep => ep.ExtensionAttributes)
- .AsTracking()
- .FirstOrDefaultAsync(cancellationToken: cancellationToken);
- if (entity == null)
- {
- entity = GenerateWorkflow(workflow);
- await _workflowRepository.InsertAsync(entity);
- }
- else
- {
- entity = workflow.ToPersistEntity(entity);
- await _workflowRepository.UpdateAsync(entity);
- }
- await CurrentUnitOfWork.SaveChangesAsync();
- }
- }
- [UnitOfWork]
- public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = new())
- {
- using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
- {
- var entity = await _workflowRepository.GetAll()
- .Where(x => x.Id == workflow.Id)
- .Include(wf => wf.WorkflowDefinitionInfo)
- .Include(wf => wf.ExecutionPointers)
- .ThenInclude(ep => ep.ExtensionAttributes)
- .AsTracking()
- .FirstOrDefaultAsync(cancellationToken: cancellationToken);
- if (entity == null)
- {
- entity = GenerateWorkflow(workflow);
- await _workflowRepository.InsertAsync(entity);
- }
- else
- {
- entity = workflow.ToPersistEntity(entity);
- await _workflowRepository.UpdateAsync(entity);
- }
- //TODO
- foreach (var subscription in subscriptions)
- {
- var subEntity = subscription.ToPersistEntity();
- subEntity.WorkflowId = entity.Id;
- await _eventSubscriptionRepository.InsertAsync(subEntity);
- }
- await CurrentUnitOfWork.SaveChangesAsync();
- }
- }
- [UnitOfWork]
- public virtual async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = new())
- {
- using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
- {
- var now = asAt.ToUniversalTime().Ticks;
- var query = _workflowRepository.GetAll().Where(x => x.NextExecution.HasValue && (x.NextExecution <= now) && (x.Status == WorkflowStatus.Runnable))
- .Select(x => x.Id);
- var raw = await _asyncQueryableExecuter.ToListAsync(query);
- return raw.Select(s => s.ToString()).ToList();
- }
- }
- [UnitOfWork]
- public virtual async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take)
- {
- using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
- {
- IQueryable<WorkflowInfo> query = _workflowRepository.GetAll()
- .Include(wf => wf.WorkflowDefinitionInfo)
- .Include(wf => wf.ExecutionPointers)
- .ThenInclude(ep => ep.ExtensionAttributes)
- .AsQueryable();
- if (status.HasValue)
- query = query.Where(x => x.Status == status.Value);
- if (!String.IsNullOrEmpty(type))
- query = query.Where(x => x.WorkflowDefinitionId == type);
- if (createdFrom.HasValue)
- query = query.Where(x => x.CreateTime >= createdFrom.Value);
- if (createdTo.HasValue)
- query = query.Where(x => x.CreateTime <= createdTo.Value);
- var rawResult = await query.Skip(skip).Take(take).ToListAsync();
- List<WorkflowInstance> result = new List<WorkflowInstance>();
- foreach (var item in rawResult)
- result.Add(item.ToWorkflowInstance());
- return result;
- }
- }
- [UnitOfWork]
- public virtual async Task<WorkflowInstance> GetWorkflowInstance(string id, CancellationToken cancellationToken = new())
- {
- using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
- {
- var raw = await _workflowRepository.GetAll()
- .Include(wf => wf.WorkflowDefinitionInfo)
- .Include(wf => wf.ExecutionPointers)
- .ThenInclude(ep => ep.ExtensionAttributes)
- .FirstAsync(x => x.Id == id, cancellationToken);
- return raw.ToWorkflowInstance();
- }
- }
- [UnitOfWork]
- public virtual async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids, CancellationToken cancellationToken = new())
- {
- if (ids == null)
- {
- return new List<WorkflowInstance>();
- }
- using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
- {
- var raw = _workflowRepository.GetAll()
- .Include(wf => wf.WorkflowDefinitionInfo)
- .Include(wf => wf.ExecutionPointers)
- .ThenInclude(ep => ep.ExtensionAttributes)
- .Where(x => ids.Contains(x.Id));
- return (await raw.ToListAsync(cancellationToken: cancellationToken)).Select(i => i.ToWorkflowInstance());
- }
- }
- [UnitOfWork]
- public virtual async Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken = new())
- {
- var existing = await _eventSubscriptionRepository.FirstOrDefaultAsync(x => x.Id == eventSubscriptionId);
- await _eventSubscriptionRepository.DeleteAsync(existing);
- await CurrentUnitOfWork.SaveChangesAsync();
- }
- [UnitOfWork]
- public virtual void EnsureStoreExists()
- {
- }
- [UnitOfWork]
- public virtual async Task<IEnumerable<EventSubscription>> GetSubscriptions(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = new())
- {
- asOf = asOf.ToUniversalTime();
- var raw = await _eventSubscriptionRepository.GetAll()
- .Where(x => x.EventName == eventName && x.EventKey == eventKey && x.SubscribeAsOf <= asOf)
- .ToListAsync(cancellationToken);
- return raw.Select(item => item.ToEventSubscription()).ToList();
- }
- [UnitOfWork]
- public virtual async Task<string> CreateEvent(Event newEvent, CancellationToken cancellationToken = new())
- {
- using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
- {
- newEvent.Id = _guidGenerator.Create().ToString("N");
- var persistEntity = newEvent.ToPersistEntity();
- persistEntity.TenantId = AbpSession.TenantId;
- var entity = await _eventRepository.InsertAsync(persistEntity);
- await CurrentUnitOfWork.SaveChangesAsync();
- return entity.Id;
- }
- }
- [UnitOfWork]
- public virtual async Task<Event> GetEvent(string id, CancellationToken cancellationToken = new())
- {
- using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
- {
- var raw = await _eventRepository
- .FirstOrDefaultAsync(x => x.Id == id);
- if (raw == null)
- return null;
- return raw.ToEvent();
- }
- }
- [UnitOfWork]
- public virtual async Task<IEnumerable<string>> GetRunnableEvents(DateTime asAt, CancellationToken cancellationToken = new())
- {
- using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
- {
- var now = asAt.ToUniversalTime();
- // asAt = asAt.ToUniversalTime();
- var raw = await _eventRepository.GetAll()
- .Where(x => !x.IsProcessed)
- .Where(x => x.EventTime <= now)
- .Select(x => x.Id)
- .ToListAsync(cancellationToken);
- return raw.Select(s => s.ToString()).ToList();
- }
- }
- [UnitOfWork]
- public virtual async Task MarkEventProcessed(string id, CancellationToken cancellationToken = new())
- {
- using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
- {
- var existingEntity = await _eventRepository.GetAll()
- .Where(x => x.Id == id)
- .AsTracking()
- .FirstAsync(cancellationToken);
- existingEntity.IsProcessed = true;
- await CurrentUnitOfWork.SaveChangesAsync();
- }
- }
- [UnitOfWork]
- public virtual async Task<IEnumerable<string>> GetEvents(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = new())
- {
- using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
- {
- var raw = await _eventRepository.GetAll()
- .Where(x => x.EventName == eventName && x.EventKey == eventKey)
- .Where(x => x.EventTime >= asOf)
- .Select(x => x.Id)
- .ToListAsync(cancellationToken);
- var result = new List<string>();
- foreach (var s in raw)
- result.Add(s);
- return result;
- }
- }
- [UnitOfWork]
- public virtual async Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = new())
- {
- using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
- {
- var existingEntity = await _eventRepository.GetAll()
- .Where(x => x.Id == id)
- .AsTracking()
- .FirstAsync(cancellationToken);
- existingEntity.IsProcessed = false;
- await CurrentUnitOfWork.SaveChangesAsync();
- }
- }
- [UnitOfWork]
- public virtual async Task PersistErrors(IEnumerable<ExecutionError> errors, CancellationToken cancellationToken = new())
- {
- var executionErrors = errors as ExecutionError[] ?? errors.ToArray();
- if (executionErrors.Any())
- {
- foreach (var error in executionErrors)
- {
- await _executionErrorRepository.InsertAsync(error.ToPersistEntity());
- }
- await CurrentUnitOfWork.SaveChangesAsync();
- }
- }
- [UnitOfWork]
- public virtual async Task<EventSubscription> GetSubscription(string eventSubscriptionId, CancellationToken cancellationToken = new())
- {
- var raw = await _eventSubscriptionRepository.FirstOrDefaultAsync(x => x.Id == eventSubscriptionId);
- return raw?.ToEventSubscription();
- }
- [UnitOfWork]
- public virtual async Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = new())
- {
- var raw = await _eventSubscriptionRepository.FirstOrDefaultAsync(x => x.EventName == eventName && x.EventKey == eventKey && x.SubscribeAsOf <= asOf && x.ExternalToken == null);
- return raw?.ToEventSubscription();
- }
- [UnitOfWork]
- public virtual async Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = new())
- {
- var existingEntity = await _eventSubscriptionRepository.GetAll()
- .Where(x => x.Id == eventSubscriptionId)
- .AsTracking()
- .FirstAsync(cancellationToken);
- existingEntity.ExternalToken = token;
- existingEntity.ExternalWorkerId = workerId;
- existingEntity.ExternalTokenExpiry = expiry;
- await CurrentUnitOfWork.SaveChangesAsync();
- return true;
- }
- [UnitOfWork]
- public virtual async Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = new())
- {
- var existingEntity = await _eventSubscriptionRepository.GetAll()
- .Where(x => x.Id == eventSubscriptionId)
- .AsTracking()
- .FirstAsync(cancellationToken);
- if (existingEntity.ExternalToken != token)
- throw new InvalidOperationException();
- existingEntity.ExternalToken = null;
- existingEntity.ExternalWorkerId = null;
- existingEntity.ExternalTokenExpiry = null;
- await CurrentUnitOfWork.SaveChangesAsync();
- }
- public Task<WorkflowInfo> GetPersistedWorkflow(string id, CancellationToken cancellationToken = new())
- {
- using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
- {
- return _workflowRepository.GetAsync(id);
- }
- }
- public Task<WorkflowDefinitionInfo> GetPersistedWorkflowDefinition(string id, int version, CancellationToken cancellationToken = new())
- {
- using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
- {
- return _workflowDefinitionRepository.GetAll().AsNoTracking().FirstOrDefaultAsync(u => u.Id == id && u.Version == version, cancellationToken: cancellationToken);
- }
- }
- public Task<WorkflowExecutionPointerInfo> GetPersistedExecutionPointer(string id, CancellationToken cancellationToken = new())
- {
- using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
- {
- return _executionPointerRepository.GetAsync(id);
- }
- }
- /// <summary>
- ///
- /// </summary>
- /// <param name="definitionId"></param>
- /// <param name="version"></param>
- /// <param name="cancellationToken"></param>
- /// <returns></returns>
- public async Task<IEnumerable<WorkflowInfo>> GetAllRunnablePersistedWorkflow(string definitionId, int version, CancellationToken cancellationToken = new())
- {
- using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant))
- {
- return await _workflowRepository.GetAll().Where(u => u.WorkflowDefinitionId == definitionId && u.Version == version).ToListAsync(cancellationToken: cancellationToken);
- }
- }
- public Task ScheduleCommand(ScheduledCommand command)
- {
- //throw new NotImplementedException();
- return Task.CompletedTask;
- }
- public Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> action, CancellationToken cancellationToken = new())
- {
- //throw new NotImplementedException();
- return Task.CompletedTask;
- }
- public bool SupportsScheduledCommands { get; set; }
- }
- }
|