Deepseek, FractalCell, FractalCellSln.slnx
https://chat.deepseek.com/share/45mer766poqry1usn1
https://giga.chat/link/gcsuyjVdky
D:\Projects\VS02\2606\FractalNet\DeepSeek\FractalCell\FractalCellSln\FractalCellSln.slnx
D:\Projects\VS02\2606\FractalNet\DeepSeek\FractalCell\FractalCellSln\FractalCell05\FractalCell05.csproj -- Initial Project
D:\Projects\VS02\2606\FractalNet\DeepSeek\FractalCell\FractalCellSln\FractalCell08\FractalCell08.csproj
-------------------------------------------------------------------------
Структура проекта
text
FractalCellSln.FractalCell05/
├── Core/
│ ├── Common/
│ │ └── Unsubscriber.cs
│ ├── Configuration/
│ │ └── FractalCellConfiguration.cs
│ ├── Interfaces/
│ │ ├── IApplicationEvent.cs
│ │ ├── IExternalBus.cs
│ │ ├── IFractalCell.cs
│ │ ├── IFractalEventHub.cs
│ │ └── IInternalBus.cs
│ ├── Templates/
│ │ ├── ExternalBusTemplate.cs
│ │ ├── FractalCellTemplate.cs
│ │ └── InternalBusTemplate.cs
│ └── FractalCellFactory.cs
├── Implementations/
│ ├── Channels/
│ │ ├── ChannelExternalBus.cs
│ │ ├── ChannelFractalCell.cs
│ │ └── ChannelInternalBus.cs
│ ├── TplDataflow/
│ │ ├── TplExternalBus.cs
│ │ ├── TplFractalCell.cs
│ │ └── TplInternalBus.cs
│ └── FractalEventHub.cs
├── Model/
│ └── FractalEvent.cs
├── Worker.cs
├── Program.cs
└── FractalCellSln.FractalCell05.csproj
---------------------------------------------------
📄 1. FractalCellSln.FractalCell05.csproj
xml
<Project Sdk="Microsoft.NET.Sdk.Worker">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<UserSecretsId>dotnet-FractalCell02-68e347bc-63c0-4746-95b3-6558399b234e</UserSecretsId>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="10.0.9" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="10.0.9" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="10.0.9" />
</ItemGroup>
</Project>
------------------------------------
📄 2. Program.cs
csharp
using FractalCell02;
using FractalCell02.Core.Configuration;
using FractalCell02.Implementations;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
var builder = Host.CreateApplicationBuilder(args);
builder.Logging.ClearProviders();
builder.Logging.AddConsole();
builder.Logging.AddDebug();
builder.Logging.SetMinimumLevel(LogLevel.Information);
builder.Services.Configure<FractalCellConfiguration>(
builder.Configuration.GetSection("FractalCell"));
builder.Services.AddSingleton<IFractalEventHub, InMemoryFractalEventHub>();
builder.Services.AddSingleton<HubSettings>(sp => new HubSettings
{
ChannelCapacity = 1000,
EnablePersistence = false,
MessageTimeout = TimeSpan.FromSeconds(30)
});
builder.Services.AddHostedService<Worker>();
var host = builder.Build();
Console.WriteLine("🚀 Starting Fractal System...");
Console.WriteLine("ℹ️ Press Ctrl+C to stop");
await host.RunAsync();
Console.WriteLine("✅ Fractal System stopped");
----------------------------------------------------------------------------
📄 3. Worker.cs
csharp
using FractalCell02.Core;
using FractalCell02.Core.Configuration;
using FractalCell02.Core.Interfaces;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace FractalCell02;
public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
private readonly IFractalEventHub _hub;
private readonly ILoggerFactory _loggerFactory;
private readonly List<IFractalCell> _cells = new();
public Worker(
ILogger<Worker> logger,
IFractalEventHub hub,
ILoggerFactory loggerFactory)
{
_logger = logger;
_hub = hub;
_loggerFactory = loggerFactory;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("🚀 Worker (Orchestrator) started");
try
{
await InitializeSystemAsync(stoppingToken);
_logger.LogInformation("✅ System initialized. Starting orchestration loop...");
while (!stoppingToken.IsCancellationRequested)
{
try
{
await OrchestrateAsync(stoppingToken);
await Task.Delay(TimeSpan.FromSeconds(3), stoppingToken);
}
catch (OperationCanceledException)
{
_logger.LogInformation("⏹️ Orchestration loop canceled");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "❌ Error in orchestration loop");
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("👋 Worker stopping due to cancellation");
}
catch (Exception ex)
{
_logger.LogError(ex, "❌ Critical worker error");
}
finally
{
_logger.LogInformation("🏁 Worker (Orchestrator) finished");
}
}
private async Task InitializeSystemAsync(CancellationToken ct)
{
_logger.LogInformation("🏗️ Initializing fractal system...");
var rootCell = await CreateCellAsync("Root", 3, ct);
_cells.Add(rootCell);
var childCells = new[] { "Child-A", "Child-B", "Child-C" };
foreach (var childId in childCells)
{
var child = await CreateCellAsync(childId, 2, ct);
_cells.Add(child);
}
_logger.LogInformation("🔍 System cells: {Count}", _cells.Count);
foreach (var cell in _cells)
{
_logger.LogInformation("🔍 Cell: {CellId}", cell.CellId);
}
_logger.LogInformation("▶️ Starting all cells...");
await Task.WhenAll(_cells.Select(c => c.StartAsync(ct)));
_logger.LogInformation("✅ System initialized with {Count} cells", _cells.Count);
}
private async Task<IFractalCell> CreateCellAsync(
string cellId,
int workers,
CancellationToken ct)
{
var config = new FractalCellConfiguration
{
CellId = cellId,
BackgroundServiceCount = workers,
InternalBusType = BusType.Channels,
ExternalBusType = BusType.Channels,
BusSettings = new BusSettings
{
Capacity = 1000,
MaxParallelism = 4
}
};
var cell = await FractalCellFactory.CreateAsync(config, _hub, _loggerFactory, ct);
_logger.LogInformation("📦 Cell {CellId} created with {Workers} workers", cellId, workers);
return cell;
}
private async Task OrchestrateAsync(CancellationToken ct)
{
if (_cells.Count == 0) return;
var random = new Random();
var rootCell = _cells.First();
var targetCell = _cells[random.Next(_cells.Count)];
var @event = new FractalEvent(
$"heartbeat-{DateTime.UtcNow.Ticks}",
DateTime.UtcNow,
rootCell.CellId,
targetCell.CellId,
"Heartbeat",
new {
Timestamp = DateTime.UtcNow,
Source = "Orchestrator",
ActiveCells = _hub.GetActiveCells().Count
}
);
_logger.LogInformation("📤 [ORCHESTRATOR] Sending heartbeat from {Source} to {Target}",
rootCell.CellId, targetCell.CellId);
await rootCell.ExternalBus.SendToCellAsync(targetCell.CellId, @event);
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("🛑 Stopping all cells...");
await Task.WhenAll(_cells.Select(c => c.StopAsync(cancellationToken)));
_logger.LogInformation("✅ All cells stopped");
await base.StopAsync(cancellationToken);
}
}
---------------------------------------------------------
📄 4. Core/Common/Unsubscriber.cs
csharp
namespace FractalCell02.Core.Common;
public class Unsubscriber : IDisposable
{
private readonly Action _unsubscribeAction;
private bool _disposed;
public Unsubscriber(Action unsubscribeAction)
{
_unsubscribeAction = unsubscribeAction ?? throw new ArgumentNullException(nameof(unsubscribeAction));
}
public void Dispose()
{
if (!_disposed)
{
_unsubscribeAction?.Invoke();
_disposed = true;
}
}
}
-----------------------------------------------------------------------------
📄 5. Core/Configuration/FractalCellConfiguration.cs
csharp
namespace FractalCell02.Core.Configuration;
public record FractalCellConfiguration
{
public string CellId { get; init; } = string.Empty;
public int BackgroundServiceCount { get; init; } = 2;
public BusType InternalBusType { get; init; } = BusType.Channels;
public BusType ExternalBusType { get; init; } = BusType.TplDataflow;
public BusSettings BusSettings { get; init; } = new();
public HubSettings HubSettings { get; init; } = new();
}
public record BusSettings
{
public int Capacity { get; init; } = 1000;
public int MaxParallelism { get; init; } = 4;
public TimeSpan MessageTimeout { get; init; } = TimeSpan.FromSeconds(30);
public bool EnablePersistence { get; init; } = false;
public string? PersistencePath { get; init; }
}
public record HubSettings
{
public int ChannelCapacity { get; init; } = 1000;
public bool EnablePersistence { get; init; } = false;
public string? PersistencePath { get; init; }
public TimeSpan MessageTimeout { get; init; } = TimeSpan.FromSeconds(30);
}
public enum BusType
{
Channels,
TplDataflow
}
----------------------------------------------------------------
📄 6. Core/Interfaces/IApplicationEvent.cs
csharp
namespace FractalCell02.Core.Interfaces;
public interface IApplicationEvent
{
string EventId { get; }
DateTime Timestamp { get; }
}
----------------------------------------------------------------
📄 7. Core/Interfaces/IExternalBus.cs
csharp
namespace FractalCell02.Core.Interfaces;
public interface IExternalBus : IDisposable
{
string BusId { get; }
Task ConnectToHubAsync(IFractalEventHub hub, string cellId);
Task SendToCellAsync(string targetCellId, IApplicationEvent @event);
Task BroadcastAsync(IApplicationEvent @event, Predicate<string>? filter = null);
IAsyncEnumerable<IApplicationEvent> ReadAllAsync(CancellationToken ct);
}
-------------------------------------------------------------------
📄 8. Core/Interfaces/IFractalCell.cs
csharp
using Microsoft.Extensions.Hosting;
namespace FractalCell02.Core.Interfaces;
public interface IFractalCell : IHostedService
{
string CellId { get; }
IInternalBus InternalBus { get; }
IExternalBus ExternalBus { get; }
Task InitializeAsync();
}
-----------------------------------------------------------------------------
📄 9. Core/Interfaces/IFractalEventHub.cs
csharp
using System.Threading.Channels;
namespace FractalCell02.Core.Interfaces;
public interface IFractalEventHub
{
Task RegisterChannelAsync(string cellId, Channel<IApplicationEvent> incomingChannel);
Task RegisterConsumerAsync(string cellId, Func<IApplicationEvent, Task> consumer);
Task UnregisterCellAsync(string cellId);
Task PublishAsync(string targetCellId, IApplicationEvent @event);
Task PublishToAllAsync(IApplicationEvent @event, Predicate<string>? filter = null);
IReadOnlyCollection<string> GetActiveCells();
}
-------------------------------------
📄 10. Core/Interfaces/IInternalBus.cs
csharp
namespace FractalCell02.Core.Interfaces;
public interface IInternalBus
{
string BusId { get; }
Task PublishAsync<TEvent>(TEvent @event) where TEvent : IApplicationEvent;
IDisposable Subscribe<TEvent>(Func<TEvent, Task> handler) where TEvent : IApplicationEvent;
Task StartAsync(CancellationToken ct);
Task StopAsync();
}
---------------------------------------
📄 11. Core/Templates/ExternalBusTemplate.cs
csharp
using FractalCell02.Core.Configuration;
using FractalCell02.Core.Interfaces;
using Microsoft.Extensions.Logging;
namespace FractalCell02.Core.Templates;
public abstract class ExternalBusTemplate : IExternalBus
{
protected readonly BusSettings Config;
protected readonly ILogger? _logger;
public string BusId { get; }
protected ExternalBusTemplate(string busId, BusSettings config, ILogger? logger = null)
{
BusId = busId;
Config = config;
_logger = logger;
}
public abstract Task ConnectToHubAsync(IFractalEventHub hub, string cellId);
public abstract Task SendToCellAsync(string targetCellId, IApplicationEvent @event);
public abstract Task BroadcastAsync(IApplicationEvent @event, Predicate<string>? filter = null);
public abstract IAsyncEnumerable<IApplicationEvent> ReadAllAsync(CancellationToken ct);
public virtual void Dispose()
{
GC.SuppressFinalize(this);
}
}
------------------------------------------------
📄 12. Core/Templates/FractalCellTemplate.cs
csharp
using FractalCell02.Core.Configuration;
using FractalCell02.Core.Interfaces;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace FractalCell02.Core.Templates;
public abstract class FractalCellTemplate<TInternalBus, TExternalBus>
: BackgroundService, IFractalCell
where TInternalBus : IInternalBus
where TExternalBus : IExternalBus
{
protected readonly TInternalBus InternalBusField;
protected readonly TExternalBus ExternalBusField;
protected readonly FractalCellConfiguration Configuration;
protected readonly ILogger Logger;
protected readonly List<BackgroundService> _backgroundServices = new();
public string CellId => Configuration.CellId;
public IInternalBus InternalBus => InternalBusField;
public IExternalBus ExternalBus => ExternalBusField;
protected FractalCellTemplate(
FractalCellConfiguration configuration,
TInternalBus internalBus,
TExternalBus externalBus,
ILogger logger)
{
Configuration = configuration;
InternalBusField = internalBus;
ExternalBusField = externalBus;
Logger = logger;
}
protected abstract IEnumerable<BackgroundService> CreateBackgroundServices();
protected abstract void ConfigureHandlers();
public async Task InitializeAsync()
{
ConfigureHandlers();
await Task.CompletedTask;
}
public override async Task StartAsync(CancellationToken cancellationToken)
{
Logger.LogInformation("FractalCell {CellId} starting", Configuration.CellId);
foreach (var bs in CreateBackgroundServices())
{
_backgroundServices.Add(bs);
await bs.StartAsync(cancellationToken);
}
await InternalBusField.StartAsync(cancellationToken);
// НЕ вызываем base.StartAsync! Запускаем ExecuteAsync вручную
_ = ExecuteAsync(cancellationToken);
Logger.LogInformation("FractalCell {CellId} started successfully", Configuration.CellId);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
Logger.LogInformation("FractalCell {CellId} execute loop started", Configuration.CellId);
try
{
await foreach (var @event in ExternalBusField.ReadAllAsync(stoppingToken))
{
try
{
await InternalBusField.PublishAsync(@event);
}
catch (Exception ex)
{
Logger.LogError(ex, "Error processing event in cell {CellId}",
Configuration.CellId);
}
}
}
catch (OperationCanceledException)
{
Logger.LogInformation("FractalCell {CellId} execute loop canceled", Configuration.CellId);
}
catch (Exception ex)
{
Logger.LogError(ex, "FractalCell {CellId} execute loop error", Configuration.CellId);
}
Logger.LogInformation("FractalCell {CellId} execute loop finished", Configuration.CellId);
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
Logger.LogInformation("FractalCell {CellId} stopping", Configuration.CellId);
await InternalBusField.StopAsync();
foreach (var bs in _backgroundServices)
{
await bs.StopAsync(cancellationToken);
}
Logger.LogInformation("FractalCell {CellId} stopped", Configuration.CellId);
}
}
------------------------------------
📄 13. Core/Templates/InternalBusTemplate.cs
csharp
using FractalCell02.Core.Configuration;
using FractalCell02.Core.Interfaces;
namespace FractalCell02.Core.Templates;
public abstract class InternalBusTemplate : IInternalBus
{
protected readonly BusSettings Config;
public string BusId { get; }
protected InternalBusTemplate(string busId, BusSettings config)
{
BusId = busId;
Config = config;
}
public abstract Task PublishAsync<TEvent>(TEvent @event) where TEvent : IApplicationEvent;
public abstract IDisposable Subscribe<TEvent>(Func<TEvent, Task> handler) where TEvent : IApplicationEvent;
public abstract Task StartAsync(CancellationToken ct);
public abstract Task StopAsync();
}
------------------------------------------------------------
📄 14. Core/FractalCellFactory.cs
csharp
using FractalCell02.Core.Configuration;
using FractalCell02.Core.Interfaces;
using FractalCell02.Core.Templates;
using FractalCell02.Implementations.Channels;
using FractalCell02.Implementations.TplDataflow;
using Microsoft.Extensions.Logging;
namespace FractalCell02.Core;
public static class FractalCellFactory
{
public static async Task<IFractalCell> CreateAsync(
FractalCellConfiguration config,
IFractalEventHub hub,
ILoggerFactory loggerFactory,
CancellationToken ct = default)
{
IInternalBus internalBus = CreateInternalBus(config);
IExternalBus externalBus = CreateExternalBus(config, loggerFactory);
await externalBus.ConnectToHubAsync(hub, config.CellId);
IFractalCell cell = CreateCellInstance(config, internalBus, externalBus, loggerFactory);
await cell.InitializeAsync();
return cell;
}
private static IInternalBus CreateInternalBus(FractalCellConfiguration config)
{
return config.InternalBusType switch
{
BusType.Channels => new ChannelInternalBus(
$"{config.CellId}-internal",
config.BusSettings),
BusType.TplDataflow => new TplInternalBus(
$"{config.CellId}-internal",
config.BusSettings),
_ => throw new NotSupportedException(
$"Bus type {config.InternalBusType} not supported")
};
}
private static IExternalBus CreateExternalBus(FractalCellConfiguration config, ILoggerFactory loggerFactory)
{
var logger = loggerFactory.CreateLogger($"ExternalBus-{config.CellId}");
return config.ExternalBusType switch
{
BusType.Channels => new ChannelExternalBus(
$"{config.CellId}-external",
config.BusSettings,
logger),
BusType.TplDataflow => new TplExternalBus(
$"{config.CellId}-external",
config.BusSettings,
logger),
_ => throw new NotSupportedException(
$"Bus type {config.ExternalBusType} not supported")
};
}
private static IFractalCell CreateCellInstance(
FractalCellConfiguration config,
IInternalBus internalBus,
IExternalBus externalBus,
ILoggerFactory loggerFactory)
{
return (config.InternalBusType, config.ExternalBusType) switch
{
(BusType.Channels, BusType.Channels) =>
new ChannelFractalCell(config,
(ChannelInternalBus)internalBus,
(ChannelExternalBus)externalBus,
loggerFactory.CreateLogger<ChannelFractalCell>()),
(BusType.TplDataflow, BusType.TplDataflow) =>
new TplFractalCell(config,
(TplInternalBus)internalBus,
(TplExternalBus)externalBus,
loggerFactory.CreateLogger<TplFractalCell>()),
_ => throw new NotSupportedException(
$"Combination {config.InternalBusType}/{config.ExternalBusType} not supported")
};
}
}
-----------------------------------------------------------------------------------------
📄 15. Implementations/Channels/ChannelExternalBus.cs
csharp
using System.Threading.Channels;
using FractalCell02.Core.Configuration;
using FractalCell02.Core.Interfaces;
using FractalCell02.Core.Templates;
using Microsoft.Extensions.Logging;
namespace FractalCell02.Implementations.Channels;
public class ChannelExternalBus : ExternalBusTemplate
{
private readonly Channel<IApplicationEvent> _incomingChannel;
private IFractalEventHub? _hub;
private string? _cellId;
public ChannelExternalBus(string busId, BusSettings config, ILogger? logger = null)
: base(busId, config, logger)
{
_incomingChannel = Channel.CreateBounded<IApplicationEvent>(
new BoundedChannelOptions(config.Capacity)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false,
SingleWriter = false
});
_logger?.LogInformation("ChannelExternalBus {BusId} created with capacity {Capacity}",
busId, config.Capacity);
}
public override async Task ConnectToHubAsync(IFractalEventHub hub, string cellId)
{
_hub = hub;
_cellId = cellId;
await hub.RegisterChannelAsync(cellId, _incomingChannel);
_logger?.LogInformation("ChannelExternalBus {BusId} connected to hub for cell {CellId}",
BusId, cellId);
}
public override async Task SendToCellAsync(string targetCellId, IApplicationEvent @event)
{
if (_hub == null)
throw new InvalidOperationException("Hub not connected");
_logger?.LogInformation("ChannelExternalBus {BusId} sending event {EventId} to {TargetCell}",
BusId, @event.EventId, targetCellId);
await _hub.PublishAsync(targetCellId, @event);
}
public override async Task BroadcastAsync(IApplicationEvent @event, Predicate<string>? filter = null)
{
if (_hub == null)
throw new InvalidOperationException("Hub not connected");
_logger?.LogInformation("ChannelExternalBus {BusId} broadcasting event {EventId}",
BusId, @event.EventId);
await _hub.PublishToAllAsync(@event, filter);
}
public override async IAsyncEnumerable<IApplicationEvent> ReadAllAsync(CancellationToken ct)
{
_logger?.LogDebug("ChannelExternalBus {BusId} starting to read events", BusId);
var reader = _incomingChannel.Reader;
var buffer = Channel.CreateUnbounded<IApplicationEvent>();
var writer = buffer.Writer;
_ = Task.Run(async () =>
{
try
{
await foreach (var @event in reader.ReadAllAsync(ct))
{
await writer.WriteAsync(@event, ct);
}
}
catch (OperationCanceledException)
{
_logger?.LogInformation("ChannelExternalBus {BusId} read loop canceled", BusId);
}
catch (Exception ex)
{
_logger?.LogError(ex, "ChannelExternalBus {BusId} read loop error", BusId);
}
finally
{
writer.Complete();
}
}, ct);
await foreach (var @event in buffer.Reader.ReadAllAsync(ct))
{
yield return @event;
}
_logger?.LogDebug("ChannelExternalBus {BusId} finished reading events", BusId);
}
public override void Dispose()
{
_logger?.LogInformation("ChannelExternalBus {BusId} disposing", BusId);
_incomingChannel.Writer.Complete();
base.Dispose();
}
}
----------------------------------------------
📄 16. Implementations/Channels/ChannelFractalCell.cs
csharp
using FractalCell02.Core.Configuration;
using FractalCell02.Core.Templates;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace FractalCell02.Implementations.Channels;
public class ChannelBackgroundService : BackgroundService
{
private readonly string _serviceId;
private readonly ChannelInternalBus _internalBus;
private readonly ILogger _logger;
private readonly FractalCellConfiguration _config;
private IDisposable? _subscription;
public ChannelBackgroundService(
string serviceId,
ChannelInternalBus internalBus,
ILogger logger,
FractalCellConfiguration config)
{
_serviceId = serviceId;
_internalBus = internalBus;
_logger = logger;
_config = config;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("ChannelBackgroundService {ServiceId} started", _serviceId);
_subscription = _internalBus.Subscribe<FractalEvent>(async @event =>
{
_logger.LogInformation(
"Service {ServiceId} processing event {EventId} of type {EventType}",
_serviceId, @event.EventId, @event.EventType);
await ProcessEventAsync(@event);
});
try
{
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(1000, stoppingToken);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("ChannelBackgroundService {ServiceId} stopping", _serviceId);
}
}
private Task ProcessEventAsync(FractalEvent @event)
{
switch (@event.EventType)
{
case "ProcessData":
_logger.LogInformation(
"Channel worker {ServiceId} processing data: {Payload}",
_serviceId, @event.Payload);
break;
case "Heartbeat":
_logger.LogDebug(
"Channel worker {ServiceId} heartbeat from {Source}",
_serviceId, @event.SourceCellId);
break;
default:
_logger.LogWarning("Unknown event type: {EventType}", @event.EventType);
break;
}
return Task.CompletedTask;
}
public override void Dispose()
{
_subscription?.Dispose();
base.Dispose();
}
}
public class ChannelFractalCell : FractalCellTemplate<ChannelInternalBus, ChannelExternalBus>
{
private readonly List<ChannelBackgroundService> _services = new();
public ChannelFractalCell(
FractalCellConfiguration configuration,
ChannelInternalBus internalBus,
ChannelExternalBus externalBus,
ILogger<ChannelFractalCell> logger)
: base(configuration, internalBus, externalBus, logger)
{
}
protected override IEnumerable<BackgroundService> CreateBackgroundServices()
{
for (int i = 0; i < Configuration.BackgroundServiceCount; i++)
{
var service = new ChannelBackgroundService(
$"{Configuration.CellId}-worker-{i}",
InternalBusField,
Logger,
Configuration);
_services.Add(service);
yield return service;
}
}
protected override void ConfigureHandlers()
{
InternalBusField.Subscribe<FractalEvent>(async fractalEvent =>
{
Logger.LogInformation("Channel cell {CellId} received event: {EventType} from {Source}",
Configuration.CellId, fractalEvent.EventType, fractalEvent.SourceCellId);
if (!string.IsNullOrEmpty(fractalEvent.TargetCellId) &&
fractalEvent.TargetCellId != Configuration.CellId)
{
Logger.LogInformation("Forwarding event {EventId} to {TargetCell}",
fractalEvent.EventId, fractalEvent.TargetCellId);
await ExternalBusField.SendToCellAsync(
fractalEvent.TargetCellId, fractalEvent);
}
});
}
}
----------------------------------------------------------------------
📄 17. Implementations/Channels/ChannelInternalBus.cs
csharp
using System.Collections.Concurrent;
using System.Threading.Channels;
using FractalCell02.Core.Common;
using FractalCell02.Core.Configuration;
using FractalCell02.Core.Interfaces;
using FractalCell02.Core.Templates;
namespace FractalCell02.Implementations.Channels;
public class ChannelInternalBus : InternalBusTemplate
{
private readonly Channel<IApplicationEvent> _channel;
private readonly ConcurrentDictionary<Type, List<Func<IApplicationEvent, Task>>> _handlers = new();
public ChannelInternalBus(string busId, BusSettings config)
: base(busId, config)
{
_channel = Channel.CreateBounded<IApplicationEvent>(
new BoundedChannelOptions(config.Capacity)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false,
SingleWriter = false
});
}
public override async Task PublishAsync<TEvent>(TEvent @event)
{
try
{
await _channel.Writer.WriteAsync(@event);
}
catch (ChannelClosedException)
{
// Канал закрыт, игнорируем
}
}
public override IDisposable Subscribe<TEvent>(Func<TEvent, Task> handler)
{
var eventType = typeof(TEvent);
var handlers = _handlers.GetOrAdd(eventType, _ => new List<Func<IApplicationEvent, Task>>());
var wrappedHandler = new Func<IApplicationEvent, Task>(e => handler((TEvent)e));
lock (handlers)
{
handlers.Add(wrappedHandler);
}
return new Unsubscriber(() =>
{
lock (handlers)
{
handlers.Remove(wrappedHandler);
if (handlers.Count == 0)
{
_handlers.TryRemove(eventType, out _);
}
}
});
}
public override async Task StartAsync(CancellationToken ct)
{
_ = Task.Run(async () =>
{
try
{
await foreach (var @event in _channel.Reader.ReadAllAsync(ct))
{
var eventType = @event.GetType();
if (_handlers.TryGetValue(eventType, out var handlers))
{
var handlersCopy = handlers.ToList();
if (handlersCopy.Count == 1)
{
await handlersCopy[0](@event);
}
else if (handlersCopy.Count > 1)
{
await Task.WhenAll(handlersCopy.Select(h => h(@event)));
}
}
}
}
catch (OperationCanceledException)
{
// Нормальное завершение
}
catch (Exception ex)
{
// Логируем ошибку
}
}, ct);
await Task.CompletedTask;
}
public override Task StopAsync()
{
_channel.Writer.Complete();
return Task.CompletedTask;
}
}
------------------------------------
📄 18. Implementations/TplDataflow/TplExternalBus.cs
csharp
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using System.Threading.Tasks.Dataflow;
using FractalCell02.Core.Configuration;
using FractalCell02.Core.Interfaces;
using FractalCell02.Core.Templates;
using Microsoft.Extensions.Logging;
namespace FractalCell02.Implementations.TplDataflow;
public class TplExternalBus : ExternalBusTemplate
{
private readonly BufferBlock<IApplicationEvent> _bufferBlock;
private readonly Channel<IApplicationEvent> _incomingChannel;
private IFractalEventHub? _hub;
private string? _cellId;
private CancellationTokenSource? _cts;
public TplExternalBus(string busId, BusSettings config, ILogger? logger = null)
: base(busId, config, logger)
{
_bufferBlock = new BufferBlock<IApplicationEvent>(
new DataflowBlockOptions
{
BoundedCapacity = config.Capacity,
EnsureOrdered = true
});
_incomingChannel = Channel.CreateBounded<IApplicationEvent>(
new BoundedChannelOptions(config.Capacity)
{
FullMode = BoundedChannelFullMode.Wait
});
}
public override async Task ConnectToHubAsync(IFractalEventHub hub, string cellId)
{
_hub = hub;
_cellId = cellId;
_cts = new CancellationTokenSource();
await hub.RegisterConsumerAsync(cellId, async @event =>
{
await _bufferBlock.SendAsync(@event, _cts.Token);
});
_ = Task.Run(async () =>
{
try
{
await foreach (var @event in _incomingChannel.Reader.ReadAllAsync(_cts.Token))
{
await _bufferBlock.SendAsync(@event, _cts.Token);
}
}
catch (OperationCanceledException)
{
// Нормальное завершение
}
catch (Exception ex)
{
_logger?.LogError(ex, "Error in TplExternalBus forwarding task");
}
}, _cts.Token);
}
public override async Task SendToCellAsync(string targetCellId, IApplicationEvent @event)
{
if (_hub == null)
throw new InvalidOperationException("Hub not connected");
await _hub.PublishAsync(targetCellId, @event);
}
public override async Task BroadcastAsync(IApplicationEvent @event, Predicate<string>? filter = null)
{
if (_hub == null)
throw new InvalidOperationException("Hub not connected");
await _hub.PublishToAllAsync(@event, filter);
}
public override async IAsyncEnumerable<IApplicationEvent> ReadAllAsync(
[EnumeratorCancellation] CancellationToken ct)
{
_logger?.LogDebug("TplExternalBus {BusId} starting to read events", BusId);
var buffer = Channel.CreateUnbounded<IApplicationEvent>();
var writer = buffer.Writer;
_ = Task.Run(async () =>
{
try
{
while (!ct.IsCancellationRequested)
{
var @event = await _bufferBlock.ReceiveAsync(ct);
await writer.WriteAsync(@event, ct);
}
}
catch (OperationCanceledException)
{
_logger?.LogInformation("TplExternalBus {BusId} read loop canceled", BusId);
}
catch (Exception ex)
{
_logger?.LogError(ex, "TplExternalBus {BusId} read loop error", BusId);
}
finally
{
writer.Complete();
}
}, ct);
await foreach (var @event in buffer.Reader.ReadAllAsync(ct))
{
yield return @event;
}
}
public override void Dispose()
{
_cts?.Cancel();
_cts?.Dispose();
base.Dispose();
}
}
--------------------------------------------
📄 19. Implementations/TplDataflow/TplFractalCell.cs
csharp
using FractalCell02.Core.Configuration;
using FractalCell02.Core.Interfaces;
using FractalCell02.Core.Templates;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks.Dataflow;
namespace FractalCell02.Implementations.TplDataflow;
public class TplBackgroundService : BackgroundService
{
private readonly ActionBlock<IApplicationEvent> _workerBlock;
private readonly string _serviceId;
private readonly ILogger _logger;
public TplBackgroundService(
ActionBlock<IApplicationEvent> workerBlock,
string serviceId,
ILogger logger)
{
_workerBlock = workerBlock;
_serviceId = serviceId;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("TplBackgroundService {ServiceId} started", _serviceId);
try
{
await _workerBlock.Completion.WaitAsync(stoppingToken);
}
catch (OperationCanceledException)
{
// Нормальное завершение
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in TplBackgroundService {ServiceId}", _serviceId);
}
}
public override void Dispose()
{
_workerBlock.Complete();
base.Dispose();
}
}
public class TplFractalCell : FractalCellTemplate<TplInternalBus, TplExternalBus>
{
private readonly BufferBlock<IApplicationEvent> _inputBuffer;
private readonly TransformBlock<IApplicationEvent, IApplicationEvent> _mainProcessingBlock;
private readonly BroadcastBlock<IApplicationEvent> _broadcastBlock;
private readonly List<ActionBlock<IApplicationEvent>> _workerBlocks = new();
private readonly List<IDisposable> _links = new();
public TplFractalCell(
FractalCellConfiguration configuration,
TplInternalBus internalBus,
TplExternalBus externalBus,
ILogger<TplFractalCell> logger)
: base(configuration, internalBus, externalBus, logger)
{
var options = new ExecutionDataflowBlockOptions
{
BoundedCapacity = configuration.BusSettings.Capacity,
MaxDegreeOfParallelism = configuration.BusSettings.MaxParallelism,
NameFormat = $"{configuration.CellId}-block-{{0}}"
};
_inputBuffer = new BufferBlock<IApplicationEvent>(options);
_mainProcessingBlock = new TransformBlock<IApplicationEvent, IApplicationEvent>(
async @event => await ProcessInMainBlockAsync(@event),
options);
_broadcastBlock = new BroadcastBlock<IApplicationEvent>(
e => e,
new DataflowBlockOptions
{
BoundedCapacity = configuration.BusSettings.Capacity
});
_links.Add(_inputBuffer.LinkTo(_mainProcessingBlock,
new DataflowLinkOptions { PropagateCompletion = true }));
_links.Add(_mainProcessingBlock.LinkTo(_broadcastBlock,
new DataflowLinkOptions { PropagateCompletion = true }));
}
private async Task<IApplicationEvent> ProcessInMainBlockAsync(IApplicationEvent @event)
{
Logger.LogInformation("Main block processing event {EventId}", @event.EventId);
try
{
await InternalBusField.PublishAsync(@event);
if (@event is FractalEvent fractalEvent &&
!string.IsNullOrEmpty(fractalEvent.TargetCellId))
{
await ExternalBusField.SendToCellAsync(
fractalEvent.TargetCellId, fractalEvent);
}
}
catch (Exception ex)
{
Logger.LogError(ex, "Error in main block processing event {EventId}",
@event.EventId);
}
return @event;
}
protected override IEnumerable<BackgroundService> CreateBackgroundServices()
{
for (int i = 0; i < Configuration.BackgroundServiceCount; i++)
{
var workerBlock = new ActionBlock<IApplicationEvent>(
async @event => await ProcessInWorkerAsync(@event, i),
new ExecutionDataflowBlockOptions
{
BoundedCapacity = Configuration.BusSettings.Capacity,
MaxDegreeOfParallelism = 1,
NameFormat = $"{Configuration.CellId}-worker-{i}"
});
_links.Add(_broadcastBlock.LinkTo(workerBlock,
new DataflowLinkOptions { PropagateCompletion = true }));
_workerBlocks.Add(workerBlock);
yield return new TplBackgroundService(workerBlock,
$"{Configuration.CellId}-worker-{i}", Logger);
}
}
protected override void ConfigureHandlers()
{
InternalBusField.Subscribe<FractalEvent>(async fractalEvent =>
{
await _inputBuffer.SendAsync(fractalEvent);
});
}
private async Task ProcessInWorkerAsync(IApplicationEvent @event, int workerId)
{
Logger.LogInformation("Worker {WorkerId} processing event {EventId}",
workerId, @event.EventId);
if (@event is FractalEvent fractalEvent)
{
try
{
switch (fractalEvent.EventType)
{
case "ProcessData":
await HandleProcessData(fractalEvent, workerId);
break;
case "Heartbeat":
await HandleHeartbeat(fractalEvent, workerId);
break;
default:
Logger.LogWarning("Unknown event type: {EventType}",
fractalEvent.EventType);
break;
}
}
catch (Exception ex)
{
Logger.LogError(ex, "Worker {WorkerId} error processing event {EventId}",
workerId, @event.EventId);
}
}
}
private Task HandleProcessData(FractalEvent @event, int workerId)
{
Logger.LogInformation(
"Worker {WorkerId} processing data from {Source}: {Payload}",
workerId, @event.SourceCellId, @event.Payload);
return Task.Delay(100);
}
private Task HandleHeartbeat(FractalEvent @event, int workerId)
{
Logger.LogDebug(
"Worker {WorkerId} heartbeat from {Source}",
workerId, @event.SourceCellId);
return Task.CompletedTask;
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
Logger.LogInformation("Stopping TPL blocks...");
_inputBuffer.Complete();
try
{
await Task.WhenAll(
_inputBuffer.Completion,
_mainProcessingBlock.Completion,
_broadcastBlock.Completion,
Task.WhenAll(_workerBlocks.Select(b => b.Completion))
).WaitAsync(cancellationToken);
}
catch (OperationCanceledException)
{
Logger.LogWarning("Stop operation canceled");
}
catch (Exception ex)
{
Logger.LogError(ex, "Error while stopping TPL blocks");
}
foreach (var link in _links)
{
link.Dispose();
}
_links.Clear();
await base.StopAsync(cancellationToken);
}
}
---------------------------------------
📄 20. Implementations/TplDataflow/TplInternalBus.cs
csharp
using System.Collections.Concurrent;
using System.Threading.Tasks.Dataflow;
using FractalCell02.Core.Common;
using FractalCell02.Core.Configuration;
using FractalCell02.Core.Interfaces;
using FractalCell02.Core.Templates;
namespace FractalCell02.Implementations.TplDataflow;
public class TplInternalBus : InternalBusTemplate
{
private readonly ActionBlock<IApplicationEvent> _actionBlock;
private readonly ConcurrentDictionary<Type, List<Func<IApplicationEvent, Task>>> _handlers = new();
public TplInternalBus(string busId, BusSettings config)
: base(busId, config)
{
_actionBlock = new ActionBlock<IApplicationEvent>(
async @event => await ProcessEventAsync(@event),
new ExecutionDataflowBlockOptions
{
BoundedCapacity = config.Capacity,
MaxDegreeOfParallelism = config.MaxParallelism
});
}
private async Task ProcessEventAsync(IApplicationEvent @event)
{
if (_handlers.TryGetValue(@event.GetType(), out var handlers))
{
var handlersCopy = handlers.ToList();
if (handlersCopy.Count == 1)
{
await handlersCopy[0](@event);
}
else if (handlersCopy.Count > 1)
{
await Task.WhenAll(handlersCopy.Select(h => h(@event)));
}
}
}
public override async Task PublishAsync<TEvent>(TEvent @event)
{
await _actionBlock.SendAsync(@event);
}
public override IDisposable Subscribe<TEvent>(Func<TEvent, Task> handler)
{
var eventType = typeof(TEvent);
var handlers = _handlers.GetOrAdd(eventType, _ => new List<Func<IApplicationEvent, Task>>());
var wrappedHandler = new Func<IApplicationEvent, Task>(e => handler((TEvent)e));
lock (handlers)
{
handlers.Add(wrappedHandler);
}
return new Unsubscriber(() =>
{
lock (handlers)
{
handlers.Remove(wrappedHandler);
if (handlers.Count == 0)
{
_handlers.TryRemove(eventType, out _);
}
}
});
}
public override Task StartAsync(CancellationToken ct)
{
return Task.CompletedTask;
}
public override Task StopAsync()
{
_actionBlock.Complete();
return _actionBlock.Completion;
}
}
------------------------------------------------------
📄 21. Implementations/FractalEventHub.cs
csharp
using System.Collections.Concurrent;
using System.Threading.Channels;
using FractalCell02.Core.Configuration;
using FractalCell02.Core.Interfaces;
using Microsoft.Extensions.Logging;
namespace FractalCell02.Implementations;
public class InMemoryFractalEventHub : IFractalEventHub
{
private readonly ConcurrentDictionary<string, CellConnection> _connections = new();
private readonly ILogger<InMemoryFractalEventHub> _logger;
private readonly HubSettings _settings;
private readonly SemaphoreSlim _semaphore = new(1, 1);
private record CellConnection(
Channel<IApplicationEvent>? Channel,
Func<IApplicationEvent, Task>? Consumer);
public InMemoryFractalEventHub(
ILogger<InMemoryFractalEventHub> logger,
HubSettings? settings = null)
{
_logger = logger;
_settings = settings ?? new HubSettings();
_logger.LogInformation("InMemoryFractalEventHub created");
}
public Task RegisterChannelAsync(string cellId, Channel<IApplicationEvent> incomingChannel)
{
_logger.LogInformation("Registering channel for cell {CellId}", cellId);
return RegisterInternalAsync(cellId, new CellConnection(incomingChannel, null));
}
public Task RegisterConsumerAsync(string cellId, Func<IApplicationEvent, Task> consumer)
{
_logger.LogInformation("Registering consumer for cell {CellId}", cellId);
return RegisterInternalAsync(cellId, new CellConnection(null, consumer));
}
private async Task RegisterInternalAsync(string cellId, CellConnection connection)
{
await _semaphore.WaitAsync();
try
{
if (_connections.TryAdd(cellId, connection))
{
_logger.LogInformation("Cell {CellId} registered successfully. Total cells: {Count}",
cellId, _connections.Count);
}
else
{
_logger.LogWarning("Cell {CellId} already registered, updating", cellId);
_connections[cellId] = connection;
}
}
finally
{
_semaphore.Release();
}
}
public Task UnregisterCellAsync(string cellId)
{
if (_connections.TryRemove(cellId, out _))
{
_logger.LogInformation("Cell {CellId} unregistered", cellId);
}
return Task.CompletedTask;
}
public async Task PublishAsync(string targetCellId, IApplicationEvent @event)
{
_logger.LogInformation("Publishing event {EventId} to target cell {TargetCell}. Active cells: {Count}",
@event.EventId, targetCellId, _connections.Count);
if (_connections.TryGetValue(targetCellId, out var connection))
{
try
{
if (connection.Channel != null)
{
_logger.LogDebug("Writing event {EventId} to channel of {TargetCell}",
@event.EventId, targetCellId);
await connection.Channel.Writer.WriteAsync(@event);
_logger.LogInformation("Event {EventId} successfully sent to {TargetCell}",
@event.EventId, targetCellId);
}
else if (connection.Consumer != null)
{
_logger.LogDebug("Invoking consumer for event {EventId} in {TargetCell}",
@event.EventId, targetCellId);
await connection.Consumer(@event);
_logger.LogInformation("Event {EventId} successfully consumed by {TargetCell}",
@event.EventId, targetCellId);
}
else
{
_logger.LogWarning("Cell {TargetCell} has no active connection", targetCellId);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to send event {EventId} to {TargetCell}",
@event.EventId, targetCellId);
throw;
}
}
else
{
_logger.LogWarning("Target cell {TargetCell} not found. Available cells: {Cells}",
targetCellId, string.Join(", ", _connections.Keys));
throw new KeyNotFoundException($"Cell {targetCellId} not found");
}
}
public async Task PublishToAllAsync(IApplicationEvent @event, Predicate<string>? filter = null)
{
_logger.LogInformation("Broadcasting event {EventId} to all cells", @event.EventId);
var tasks = _connections
.Where(kvp => filter == null || filter(kvp.Key))
.Select(async kvp =>
{
try
{
if (kvp.Value.Channel != null)
{
await kvp.Value.Channel.Writer.WriteAsync(@event);
_logger.LogDebug("Event {EventId} broadcasted to {Cell}",
@event.EventId, kvp.Key);
}
else if (kvp.Value.Consumer != null)
{
await kvp.Value.Consumer(@event);
_logger.LogDebug("Event {EventId} consumed by {Cell}",
@event.EventId, kvp.Key);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to broadcast to {Cell}", kvp.Key);
}
});
await Task.WhenAll(tasks);
_logger.LogInformation("Event {EventId} broadcasted to {Count} cells",
@event.EventId, tasks.Count());
}
public IReadOnlyCollection<string> GetActiveCells()
{
var cells = _connections.Keys.ToList().AsReadOnly();
_logger.LogDebug("Getting active cells: {Count} cells", cells.Count);
return cells;
}
}
-----------------------------------------------------
📄 22. Model/FractalEvent.cs
csharp
using FractalCell02.Core.Interfaces;
namespace FractalCell02;
public record FractalEvent(
string EventId,
DateTime Timestamp,
string SourceCellId,
string TargetCellId,
string EventType,
object Payload
) : IApplicationEvent;
Комментариев нет:
Отправить комментарий