using Abp; using Abp.Dependency; using Abp.Domain.Repositories; using Abp.Domain.Services; using Abp.Domain.Uow; using Abp.Linq; using Abp.Runtime.Session; using Microsoft.EntityFrameworkCore; using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using VberZero.Workflow.DesignInfo; using VberZero.Workflow.Persistence; using WorkflowCore.Models; namespace VberZero.Workflow { public class VberPersistenceProvider : DomainService, IVberPersistenceProvider, ISingletonDependency { protected readonly IRepository _eventRepository; protected readonly IRepository _executionPointerRepository; protected readonly IRepository _workflowRepository; protected readonly IRepository _workflowDefinitionRepository; protected readonly IRepository _eventSubscriptionRepository; protected readonly IRepository _executionErrorRepository; protected readonly IGuidGenerator _guidGenerator; protected readonly IAsyncQueryableExecuter _asyncQueryableExecuter; public IAbpSession AbpSession { get; set; } public VberPersistenceProvider(IRepository eventRepository, IRepository executionPointerRepository, IRepository workflowRepository, IRepository eventSubscriptionRepository, IGuidGenerator guidGenerator, IAsyncQueryableExecuter asyncQueryableExecuter, IRepository executionErrorRepository, IRepository workflowDefinitionRepository) { _eventRepository = eventRepository; _executionPointerRepository = executionPointerRepository; _workflowRepository = workflowRepository; _eventSubscriptionRepository = eventSubscriptionRepository; _guidGenerator = guidGenerator; _asyncQueryableExecuter = asyncQueryableExecuter; _executionErrorRepository = executionErrorRepository; _workflowDefinitionRepository = workflowDefinitionRepository; } [UnitOfWork] public virtual async Task CreateEventSubscription(EventSubscription subscription, CancellationToken cancellationToken = new()) { subscription.Id = _guidGenerator.Create().ToString("N"); var persistedSubscription = subscription.ToPersistEntity(); await _eventSubscriptionRepository.InsertAsync(persistedSubscription); return subscription.Id; } [UnitOfWork] public virtual async Task CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = new()) { var entity = GenerateWorkflow(workflow); //if (AbpSession.UserId.HasValue) //{ // var userCache = AbpSession.g // entity.CreateUserIdentityName = userCache.FullName; //} await _workflowRepository.InsertAsync(entity); await CurrentUnitOfWork.SaveChangesAsync(); return workflow.Id; } protected WorkflowInfo GenerateWorkflow(WorkflowInstance workflow) { workflow.Id = _guidGenerator.Create().ToString("N"); var entity = workflow.ToPersistEntity(); entity.TenantId = AbpSession.TenantId; //entity.CreatorUserId = AbpSession.UserId; return entity; } [UnitOfWork] public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = new()) { using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant)) { var entity = await _workflowRepository.GetAll() .Where(x => x.Id == workflow.Id) .Include(wf => wf.WorkflowDefinitionInfo) .Include(wf => wf.ExecutionPointers) .ThenInclude(ep => ep.ExtensionAttributes) .AsTracking() .FirstOrDefaultAsync(cancellationToken: cancellationToken); if (entity == null) { entity = GenerateWorkflow(workflow); await _workflowRepository.InsertAsync(entity); } else { entity = workflow.ToPersistEntity(entity); await _workflowRepository.UpdateAsync(entity); } await CurrentUnitOfWork.SaveChangesAsync(); } } [UnitOfWork] public async Task PersistWorkflow(WorkflowInstance workflow, List subscriptions, CancellationToken cancellationToken = new()) { using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant)) { var entity = await _workflowRepository.GetAll() .Where(x => x.Id == workflow.Id) .Include(wf => wf.WorkflowDefinitionInfo) .Include(wf => wf.ExecutionPointers) .ThenInclude(ep => ep.ExtensionAttributes) .AsTracking() .FirstOrDefaultAsync(cancellationToken: cancellationToken); if (entity == null) { entity = GenerateWorkflow(workflow); await _workflowRepository.InsertAsync(entity); } else { entity = workflow.ToPersistEntity(entity); await _workflowRepository.UpdateAsync(entity); } //TODO foreach (var subscription in subscriptions) { var subEntity = subscription.ToPersistEntity(); subEntity.WorkflowId = entity.Id; await _eventSubscriptionRepository.InsertAsync(subEntity); } await CurrentUnitOfWork.SaveChangesAsync(); } } [UnitOfWork] public virtual async Task> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = new()) { using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant)) { var now = asAt.ToUniversalTime().Ticks; var query = _workflowRepository.GetAll().Where(x => x.NextExecution.HasValue && (x.NextExecution <= now) && (x.Status == WorkflowStatus.Runnable)) .Select(x => x.Id); var raw = await _asyncQueryableExecuter.ToListAsync(query); return raw.Select(s => s.ToString()).ToList(); } } [UnitOfWork] public virtual async Task> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take) { using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant)) { IQueryable query = _workflowRepository.GetAll() .Include(wf => wf.WorkflowDefinitionInfo) .Include(wf => wf.ExecutionPointers) .ThenInclude(ep => ep.ExtensionAttributes) .AsQueryable(); if (status.HasValue) query = query.Where(x => x.Status == status.Value); if (!String.IsNullOrEmpty(type)) query = query.Where(x => x.WorkflowDefinitionId == type); if (createdFrom.HasValue) query = query.Where(x => x.CreateTime >= createdFrom.Value); if (createdTo.HasValue) query = query.Where(x => x.CreateTime <= createdTo.Value); var rawResult = await query.Skip(skip).Take(take).ToListAsync(); List result = new List(); foreach (var item in rawResult) result.Add(item.ToWorkflowInstance()); return result; } } [UnitOfWork] public virtual async Task GetWorkflowInstance(string id, CancellationToken cancellationToken = new()) { using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant)) { var raw = await _workflowRepository.GetAll() .Include(wf => wf.WorkflowDefinitionInfo) .Include(wf => wf.ExecutionPointers) .ThenInclude(ep => ep.ExtensionAttributes) .FirstAsync(x => x.Id == id, cancellationToken); return raw.ToWorkflowInstance(); } } [UnitOfWork] public virtual async Task> GetWorkflowInstances(IEnumerable ids, CancellationToken cancellationToken = new()) { if (ids == null) { return new List(); } using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant)) { var raw = _workflowRepository.GetAll() .Include(wf => wf.WorkflowDefinitionInfo) .Include(wf => wf.ExecutionPointers) .ThenInclude(ep => ep.ExtensionAttributes) .Where(x => ids.Contains(x.Id)); return (await raw.ToListAsync(cancellationToken: cancellationToken)).Select(i => i.ToWorkflowInstance()); } } [UnitOfWork] public virtual async Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken = new()) { var existing = await _eventSubscriptionRepository.FirstOrDefaultAsync(x => x.Id == eventSubscriptionId); await _eventSubscriptionRepository.DeleteAsync(existing); await CurrentUnitOfWork.SaveChangesAsync(); } [UnitOfWork] public virtual void EnsureStoreExists() { } [UnitOfWork] public virtual async Task> GetSubscriptions(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = new()) { asOf = asOf.ToUniversalTime(); var raw = await _eventSubscriptionRepository.GetAll() .Where(x => x.EventName == eventName && x.EventKey == eventKey && x.SubscribeAsOf <= asOf) .ToListAsync(cancellationToken); return raw.Select(item => item.ToEventSubscription()).ToList(); } [UnitOfWork] public virtual async Task CreateEvent(Event newEvent, CancellationToken cancellationToken = new()) { using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant)) { newEvent.Id = _guidGenerator.Create().ToString("N"); var persistEntity = newEvent.ToPersistEntity(); persistEntity.TenantId = AbpSession.TenantId; var entity = await _eventRepository.InsertAsync(persistEntity); await CurrentUnitOfWork.SaveChangesAsync(); return entity.Id; } } [UnitOfWork] public virtual async Task GetEvent(string id, CancellationToken cancellationToken = new()) { using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant)) { var raw = await _eventRepository .FirstOrDefaultAsync(x => x.Id == id); if (raw == null) return null; return raw.ToEvent(); } } [UnitOfWork] public virtual async Task> GetRunnableEvents(DateTime asAt, CancellationToken cancellationToken = new()) { using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant)) { var now = asAt.ToUniversalTime(); // asAt = asAt.ToUniversalTime(); var raw = await _eventRepository.GetAll() .Where(x => !x.IsProcessed) .Where(x => x.EventTime <= now) .Select(x => x.Id) .ToListAsync(cancellationToken); return raw.Select(s => s.ToString()).ToList(); } } [UnitOfWork] public virtual async Task MarkEventProcessed(string id, CancellationToken cancellationToken = new()) { using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant)) { var existingEntity = await _eventRepository.GetAll() .Where(x => x.Id == id) .AsTracking() .FirstAsync(cancellationToken); existingEntity.IsProcessed = true; await CurrentUnitOfWork.SaveChangesAsync(); } } [UnitOfWork] public virtual async Task> GetEvents(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = new()) { using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant)) { var raw = await _eventRepository.GetAll() .Where(x => x.EventName == eventName && x.EventKey == eventKey) .Where(x => x.EventTime >= asOf) .Select(x => x.Id) .ToListAsync(cancellationToken); var result = new List(); foreach (var s in raw) result.Add(s); return result; } } [UnitOfWork] public virtual async Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = new()) { using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant)) { var existingEntity = await _eventRepository.GetAll() .Where(x => x.Id == id) .AsTracking() .FirstAsync(cancellationToken); existingEntity.IsProcessed = false; await CurrentUnitOfWork.SaveChangesAsync(); } } [UnitOfWork] public virtual async Task PersistErrors(IEnumerable errors, CancellationToken cancellationToken = new()) { var executionErrors = errors as ExecutionError[] ?? errors.ToArray(); if (executionErrors.Any()) { foreach (var error in executionErrors) { await _executionErrorRepository.InsertAsync(error.ToPersistEntity()); } await CurrentUnitOfWork.SaveChangesAsync(); } } [UnitOfWork] public virtual async Task GetSubscription(string eventSubscriptionId, CancellationToken cancellationToken = new()) { var raw = await _eventSubscriptionRepository.FirstOrDefaultAsync(x => x.Id == eventSubscriptionId); return raw?.ToEventSubscription(); } [UnitOfWork] public virtual async Task GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = new()) { var raw = await _eventSubscriptionRepository.FirstOrDefaultAsync(x => x.EventName == eventName && x.EventKey == eventKey && x.SubscribeAsOf <= asOf && x.ExternalToken == null); return raw?.ToEventSubscription(); } [UnitOfWork] public virtual async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = new()) { var existingEntity = await _eventSubscriptionRepository.GetAll() .Where(x => x.Id == eventSubscriptionId) .AsTracking() .FirstAsync(cancellationToken); existingEntity.ExternalToken = token; existingEntity.ExternalWorkerId = workerId; existingEntity.ExternalTokenExpiry = expiry; await CurrentUnitOfWork.SaveChangesAsync(); return true; } [UnitOfWork] public virtual async Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = new()) { var existingEntity = await _eventSubscriptionRepository.GetAll() .Where(x => x.Id == eventSubscriptionId) .AsTracking() .FirstAsync(cancellationToken); if (existingEntity.ExternalToken != token) throw new InvalidOperationException(); existingEntity.ExternalToken = null; existingEntity.ExternalWorkerId = null; existingEntity.ExternalTokenExpiry = null; await CurrentUnitOfWork.SaveChangesAsync(); } public Task GetPersistedWorkflow(string id, CancellationToken cancellationToken = new()) { using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant)) { return _workflowRepository.GetAsync(id); } } public Task GetPersistedWorkflowDefinition(string id, int version, CancellationToken cancellationToken = new()) { using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant)) { return _workflowDefinitionRepository.GetAll().AsNoTracking().FirstOrDefaultAsync(u => u.Id == id && u.Version == version, cancellationToken: cancellationToken); } } public Task GetPersistedExecutionPointer(string id, CancellationToken cancellationToken = new()) { using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant)) { return _executionPointerRepository.GetAsync(id); } } /// /// /// /// /// /// /// public async Task> GetAllRunnablePersistedWorkflow(string definitionId, int version, CancellationToken cancellationToken = new()) { using (CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant)) { return await _workflowRepository.GetAll().Where(u => u.WorkflowDefinitionId == definitionId && u.Version == version).ToListAsync(cancellationToken: cancellationToken); } } public Task ScheduleCommand(ScheduledCommand command) { //throw new NotImplementedException(); return Task.CompletedTask; } public Task ProcessCommands(DateTimeOffset asOf, Func action, CancellationToken cancellationToken = new()) { //throw new NotImplementedException(); return Task.CompletedTask; } public bool SupportsScheduledCommands { get; set; } } }