четверг, 25 июня 2026 г.

Deepseek, FractalCell, FractalCellSln.slnx

Deepseek, FractalCell, FractalCellSln.slnx

https://chat.deepseek.com/share/45mer766poqry1usn1

https://giga.chat/link/gcsuyjVdky

D:\Projects\VS02\2606\FractalNet\DeepSeek\FractalCell\FractalCellSln\FractalCellSln.slnx

D:\Projects\VS02\2606\FractalNet\DeepSeek\FractalCell\FractalCellSln\FractalCell05\FractalCell05.csproj  -- Initial Project

D:\Projects\VS02\2606\FractalNet\DeepSeek\FractalCell\FractalCellSln\FractalCell08\FractalCell08.csproj

-------------------------------------------------------------------------

Структура проекта

text

FractalCellSln.FractalCell05/

├── Core/

│   ├── Common/

│   │   └── Unsubscriber.cs

│   ├── Configuration/

│   │   └── FractalCellConfiguration.cs

│   ├── Interfaces/

│   │   ├── IApplicationEvent.cs

│   │   ├── IExternalBus.cs

│   │   ├── IFractalCell.cs

│   │   ├── IFractalEventHub.cs

│   │   └── IInternalBus.cs

│   ├── Templates/

│   │   ├── ExternalBusTemplate.cs

│   │   ├── FractalCellTemplate.cs

│   │   └── InternalBusTemplate.cs

│   └── FractalCellFactory.cs

├── Implementations/

│   ├── Channels/

│   │   ├── ChannelExternalBus.cs

│   │   ├── ChannelFractalCell.cs

│   │   └── ChannelInternalBus.cs

│   ├── TplDataflow/

│   │   ├── TplExternalBus.cs

│   │   ├── TplFractalCell.cs

│   │   └── TplInternalBus.cs

│   └── FractalEventHub.cs

├── Model/

│   └── FractalEvent.cs

├── Worker.cs

├── Program.cs

└── FractalCellSln.FractalCell05.csproj

---------------------------------------------------

📄 1. FractalCellSln.FractalCell05.csproj

xml

<Project Sdk="Microsoft.NET.Sdk.Worker">


  <PropertyGroup>

    <TargetFramework>net10.0</TargetFramework>

    <Nullable>enable</Nullable>

    <ImplicitUsings>enable</ImplicitUsings>

    <UserSecretsId>dotnet-FractalCell02-68e347bc-63c0-4746-95b3-6558399b234e</UserSecretsId>

  </PropertyGroup>

  <ItemGroup>

    <PackageReference Include="Microsoft.Extensions.Hosting" Version="10.0.9" />

    <PackageReference Include="Microsoft.Extensions.Logging.Console" Version="10.0.9" />

    <PackageReference Include="System.Threading.Tasks.Dataflow" Version="10.0.9" />

  </ItemGroup>

</Project>

------------------------------------

📄 2. Program.cs

csharp

using FractalCell02;

using FractalCell02.Core.Configuration;

using FractalCell02.Implementations;

using Microsoft.Extensions.DependencyInjection;

using Microsoft.Extensions.Hosting;

using Microsoft.Extensions.Logging;


var builder = Host.CreateApplicationBuilder(args);


builder.Logging.ClearProviders();

builder.Logging.AddConsole();

builder.Logging.AddDebug();

builder.Logging.SetMinimumLevel(LogLevel.Information);


builder.Services.Configure<FractalCellConfiguration>(

    builder.Configuration.GetSection("FractalCell"));


builder.Services.AddSingleton<IFractalEventHub, InMemoryFractalEventHub>();

builder.Services.AddSingleton<HubSettings>(sp => new HubSettings

{

    ChannelCapacity = 1000,

    EnablePersistence = false,

    MessageTimeout = TimeSpan.FromSeconds(30)

});


builder.Services.AddHostedService<Worker>();


var host = builder.Build();


Console.WriteLine("🚀 Starting Fractal System...");

Console.WriteLine("ℹ️ Press Ctrl+C to stop");


await host.RunAsync();


Console.WriteLine("✅ Fractal System stopped");

----------------------------------------------------------------------------

📄 3. Worker.cs

csharp

using FractalCell02.Core;

using FractalCell02.Core.Configuration;

using FractalCell02.Core.Interfaces;

using Microsoft.Extensions.Hosting;

using Microsoft.Extensions.Logging;


namespace FractalCell02;


public class Worker : BackgroundService

{

    private readonly ILogger<Worker> _logger;

    private readonly IFractalEventHub _hub;

    private readonly ILoggerFactory _loggerFactory;

    private readonly List<IFractalCell> _cells = new();


    public Worker(

        ILogger<Worker> logger,

        IFractalEventHub hub,

        ILoggerFactory loggerFactory)

    {

        _logger = logger;

        _hub = hub;

        _loggerFactory = loggerFactory;

    }


    protected override async Task ExecuteAsync(CancellationToken stoppingToken)

    {

        _logger.LogInformation("🚀 Worker (Orchestrator) started");


        try

        {

            await InitializeSystemAsync(stoppingToken);


            _logger.LogInformation("✅ System initialized. Starting orchestration loop...");


            while (!stoppingToken.IsCancellationRequested)

            {

                try

                {

                    await OrchestrateAsync(stoppingToken);

                    await Task.Delay(TimeSpan.FromSeconds(3), stoppingToken);

                }

                catch (OperationCanceledException)

                {

                    _logger.LogInformation("⏹️ Orchestration loop canceled");

                    break;

                }

                catch (Exception ex)

                {

                    _logger.LogError(ex, "❌ Error in orchestration loop");

                    await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);

                }

            }

        }

        catch (OperationCanceledException)

        {

            _logger.LogInformation("👋 Worker stopping due to cancellation");

        }

        catch (Exception ex)

        {

            _logger.LogError(ex, "❌ Critical worker error");

        }

        finally

        {

            _logger.LogInformation("🏁 Worker (Orchestrator) finished");

        }

    }


    private async Task InitializeSystemAsync(CancellationToken ct)

    {

        _logger.LogInformation("🏗️ Initializing fractal system...");


        var rootCell = await CreateCellAsync("Root", 3, ct);

        _cells.Add(rootCell);


        var childCells = new[] { "Child-A", "Child-B", "Child-C" };

        foreach (var childId in childCells)

        {

            var child = await CreateCellAsync(childId, 2, ct);

            _cells.Add(child);

        }


        _logger.LogInformation("🔍 System cells: {Count}", _cells.Count);

        foreach (var cell in _cells)

        {

            _logger.LogInformation("🔍 Cell: {CellId}", cell.CellId);

        }


        _logger.LogInformation("▶️ Starting all cells...");

        await Task.WhenAll(_cells.Select(c => c.StartAsync(ct)));

        

        _logger.LogInformation("✅ System initialized with {Count} cells", _cells.Count);

    }


    private async Task<IFractalCell> CreateCellAsync(

        string cellId, 

        int workers, 

        CancellationToken ct)

    {

        var config = new FractalCellConfiguration

        {

            CellId = cellId,

            BackgroundServiceCount = workers,

            InternalBusType = BusType.Channels,

            ExternalBusType = BusType.Channels,

            BusSettings = new BusSettings

            {

                Capacity = 1000,

                MaxParallelism = 4

            }

        };


        var cell = await FractalCellFactory.CreateAsync(config, _hub, _loggerFactory, ct);

        _logger.LogInformation("📦 Cell {CellId} created with {Workers} workers", cellId, workers);

        return cell;

    }


    private async Task OrchestrateAsync(CancellationToken ct)

    {

        if (_cells.Count == 0) return;


        var random = new Random();

        var rootCell = _cells.First();

        var targetCell = _cells[random.Next(_cells.Count)];


        var @event = new FractalEvent(

            $"heartbeat-{DateTime.UtcNow.Ticks}",

            DateTime.UtcNow,

            rootCell.CellId,

            targetCell.CellId,

            "Heartbeat",

            new { 

                Timestamp = DateTime.UtcNow,

                Source = "Orchestrator",

                ActiveCells = _hub.GetActiveCells().Count

            }

        );


        _logger.LogInformation("📤 [ORCHESTRATOR] Sending heartbeat from {Source} to {Target}", 

            rootCell.CellId, targetCell.CellId);


        await rootCell.ExternalBus.SendToCellAsync(targetCell.CellId, @event);

    }


    public override async Task StopAsync(CancellationToken cancellationToken)

    {

        _logger.LogInformation("🛑 Stopping all cells...");

        

        await Task.WhenAll(_cells.Select(c => c.StopAsync(cancellationToken)));

        

        _logger.LogInformation("✅ All cells stopped");

        await base.StopAsync(cancellationToken);

    }

}

---------------------------------------------------------

📄 4. Core/Common/Unsubscriber.cs

csharp

namespace FractalCell02.Core.Common;


public class Unsubscriber : IDisposable

{

    private readonly Action _unsubscribeAction;

    private bool _disposed;


    public Unsubscriber(Action unsubscribeAction)

    {

        _unsubscribeAction = unsubscribeAction ?? throw new ArgumentNullException(nameof(unsubscribeAction));

    }


    public void Dispose()

    {

        if (!_disposed)

        {

            _unsubscribeAction?.Invoke();

            _disposed = true;

        }

    }

}

-----------------------------------------------------------------------------

📄 5. Core/Configuration/FractalCellConfiguration.cs

csharp

namespace FractalCell02.Core.Configuration;


public record FractalCellConfiguration

{

    public string CellId { get; init; } = string.Empty;

    public int BackgroundServiceCount { get; init; } = 2;

    public BusType InternalBusType { get; init; } = BusType.Channels;

    public BusType ExternalBusType { get; init; } = BusType.TplDataflow;

    

    public BusSettings BusSettings { get; init; } = new();

    public HubSettings HubSettings { get; init; } = new();

}


public record BusSettings

{

    public int Capacity { get; init; } = 1000;

    public int MaxParallelism { get; init; } = 4;

    public TimeSpan MessageTimeout { get; init; } = TimeSpan.FromSeconds(30);

    public bool EnablePersistence { get; init; } = false;

    public string? PersistencePath { get; init; }

}


public record HubSettings

{

    public int ChannelCapacity { get; init; } = 1000;

    public bool EnablePersistence { get; init; } = false;

    public string? PersistencePath { get; init; }

    public TimeSpan MessageTimeout { get; init; } = TimeSpan.FromSeconds(30);

}


public enum BusType

{

    Channels,

    TplDataflow

}

----------------------------------------------------------------

📄 6. Core/Interfaces/IApplicationEvent.cs

csharp

namespace FractalCell02.Core.Interfaces;


public interface IApplicationEvent

{

    string EventId { get; }

    DateTime Timestamp { get; }

}

----------------------------------------------------------------

📄 7. Core/Interfaces/IExternalBus.cs

csharp

namespace FractalCell02.Core.Interfaces;


public interface IExternalBus : IDisposable

{

    string BusId { get; }

    Task ConnectToHubAsync(IFractalEventHub hub, string cellId);

    Task SendToCellAsync(string targetCellId, IApplicationEvent @event);

    Task BroadcastAsync(IApplicationEvent @event, Predicate<string>? filter = null);

    IAsyncEnumerable<IApplicationEvent> ReadAllAsync(CancellationToken ct);

}

-------------------------------------------------------------------

📄 8. Core/Interfaces/IFractalCell.cs

csharp

using Microsoft.Extensions.Hosting;


namespace FractalCell02.Core.Interfaces;


public interface IFractalCell : IHostedService

{

    string CellId { get; }

    IInternalBus InternalBus { get; }

    IExternalBus ExternalBus { get; }

    Task InitializeAsync();

}

-----------------------------------------------------------------------------

📄 9. Core/Interfaces/IFractalEventHub.cs

csharp

using System.Threading.Channels;


namespace FractalCell02.Core.Interfaces;


public interface IFractalEventHub

{

    Task RegisterChannelAsync(string cellId, Channel<IApplicationEvent> incomingChannel);

    Task RegisterConsumerAsync(string cellId, Func<IApplicationEvent, Task> consumer);

    Task UnregisterCellAsync(string cellId);

    Task PublishAsync(string targetCellId, IApplicationEvent @event);

    Task PublishToAllAsync(IApplicationEvent @event, Predicate<string>? filter = null);

    IReadOnlyCollection<string> GetActiveCells();

}

-------------------------------------

📄 10. Core/Interfaces/IInternalBus.cs

csharp

namespace FractalCell02.Core.Interfaces;


public interface IInternalBus

{

    string BusId { get; }

    Task PublishAsync<TEvent>(TEvent @event) where TEvent : IApplicationEvent;

    IDisposable Subscribe<TEvent>(Func<TEvent, Task> handler) where TEvent : IApplicationEvent;

    Task StartAsync(CancellationToken ct);

    Task StopAsync();

}

---------------------------------------

📄 11. Core/Templates/ExternalBusTemplate.cs

csharp

using FractalCell02.Core.Configuration;

using FractalCell02.Core.Interfaces;

using Microsoft.Extensions.Logging;


namespace FractalCell02.Core.Templates;


public abstract class ExternalBusTemplate : IExternalBus

{

    protected readonly BusSettings Config;

    protected readonly ILogger? _logger;

    public string BusId { get; }


    protected ExternalBusTemplate(string busId, BusSettings config, ILogger? logger = null)

    {

        BusId = busId;

        Config = config;

        _logger = logger;

    }


    public abstract Task ConnectToHubAsync(IFractalEventHub hub, string cellId);

    public abstract Task SendToCellAsync(string targetCellId, IApplicationEvent @event);

    public abstract Task BroadcastAsync(IApplicationEvent @event, Predicate<string>? filter = null);

    public abstract IAsyncEnumerable<IApplicationEvent> ReadAllAsync(CancellationToken ct);


    public virtual void Dispose()

    {

        GC.SuppressFinalize(this);

    }

}

------------------------------------------------

📄 12. Core/Templates/FractalCellTemplate.cs

csharp

using FractalCell02.Core.Configuration;

using FractalCell02.Core.Interfaces;

using Microsoft.Extensions.Hosting;

using Microsoft.Extensions.Logging;


namespace FractalCell02.Core.Templates;


public abstract class FractalCellTemplate<TInternalBus, TExternalBus>

    : BackgroundService, IFractalCell

    where TInternalBus : IInternalBus

    where TExternalBus : IExternalBus

{

    protected readonly TInternalBus InternalBusField;

    protected readonly TExternalBus ExternalBusField;

    protected readonly FractalCellConfiguration Configuration;

    protected readonly ILogger Logger;

    protected readonly List<BackgroundService> _backgroundServices = new();


    public string CellId => Configuration.CellId;

    public IInternalBus InternalBus => InternalBusField;

    public IExternalBus ExternalBus => ExternalBusField;


    protected FractalCellTemplate(

        FractalCellConfiguration configuration,

        TInternalBus internalBus,

        TExternalBus externalBus,

        ILogger logger)

    {

        Configuration = configuration;

        InternalBusField = internalBus;

        ExternalBusField = externalBus;

        Logger = logger;

    }


    protected abstract IEnumerable<BackgroundService> CreateBackgroundServices();

    protected abstract void ConfigureHandlers();


    public async Task InitializeAsync()

    {

        ConfigureHandlers();

        await Task.CompletedTask;

    }


    public override async Task StartAsync(CancellationToken cancellationToken)

    {

        Logger.LogInformation("FractalCell {CellId} starting", Configuration.CellId);


        foreach (var bs in CreateBackgroundServices())

        {

            _backgroundServices.Add(bs);

            await bs.StartAsync(cancellationToken);

        }


        await InternalBusField.StartAsync(cancellationToken);


        // НЕ вызываем base.StartAsync! Запускаем ExecuteAsync вручную

        _ = ExecuteAsync(cancellationToken);


        Logger.LogInformation("FractalCell {CellId} started successfully", Configuration.CellId);

    }


    protected override async Task ExecuteAsync(CancellationToken stoppingToken)

    {

        Logger.LogInformation("FractalCell {CellId} execute loop started", Configuration.CellId);


        try

        {

            await foreach (var @event in ExternalBusField.ReadAllAsync(stoppingToken))

            {

                try

                {

                    await InternalBusField.PublishAsync(@event);

                }

                catch (Exception ex)

                {

                    Logger.LogError(ex, "Error processing event in cell {CellId}",

                        Configuration.CellId);

                }

            }

        }

        catch (OperationCanceledException)

        {

            Logger.LogInformation("FractalCell {CellId} execute loop canceled", Configuration.CellId);

        }

        catch (Exception ex)

        {

            Logger.LogError(ex, "FractalCell {CellId} execute loop error", Configuration.CellId);

        }

        

        Logger.LogInformation("FractalCell {CellId} execute loop finished", Configuration.CellId);

    }


    public override async Task StopAsync(CancellationToken cancellationToken)

    {

        Logger.LogInformation("FractalCell {CellId} stopping", Configuration.CellId);


        await InternalBusField.StopAsync();

        

        foreach (var bs in _backgroundServices)

        {

            await bs.StopAsync(cancellationToken);

        }


        Logger.LogInformation("FractalCell {CellId} stopped", Configuration.CellId);

    }

}

------------------------------------

📄 13. Core/Templates/InternalBusTemplate.cs

csharp

using FractalCell02.Core.Configuration;

using FractalCell02.Core.Interfaces;


namespace FractalCell02.Core.Templates;


public abstract class InternalBusTemplate : IInternalBus

{

    protected readonly BusSettings Config;

    public string BusId { get; }


    protected InternalBusTemplate(string busId, BusSettings config)

    {

        BusId = busId;

        Config = config;

    }


    public abstract Task PublishAsync<TEvent>(TEvent @event) where TEvent : IApplicationEvent;

    public abstract IDisposable Subscribe<TEvent>(Func<TEvent, Task> handler) where TEvent : IApplicationEvent;

    public abstract Task StartAsync(CancellationToken ct);

    public abstract Task StopAsync();

}

------------------------------------------------------------

📄 14. Core/FractalCellFactory.cs

csharp

using FractalCell02.Core.Configuration;

using FractalCell02.Core.Interfaces;

using FractalCell02.Core.Templates;

using FractalCell02.Implementations.Channels;

using FractalCell02.Implementations.TplDataflow;

using Microsoft.Extensions.Logging;


namespace FractalCell02.Core;


public static class FractalCellFactory

{

    public static async Task<IFractalCell> CreateAsync(

        FractalCellConfiguration config,

        IFractalEventHub hub,

        ILoggerFactory loggerFactory,

        CancellationToken ct = default)

    {

        IInternalBus internalBus = CreateInternalBus(config);

        IExternalBus externalBus = CreateExternalBus(config, loggerFactory);


        await externalBus.ConnectToHubAsync(hub, config.CellId);


        IFractalCell cell = CreateCellInstance(config, internalBus, externalBus, loggerFactory);


        await cell.InitializeAsync();


        return cell;

    }


    private static IInternalBus CreateInternalBus(FractalCellConfiguration config)

    {

        return config.InternalBusType switch

        {

            BusType.Channels => new ChannelInternalBus(

                $"{config.CellId}-internal",

                config.BusSettings),

            BusType.TplDataflow => new TplInternalBus(

                $"{config.CellId}-internal",

                config.BusSettings),

            _ => throw new NotSupportedException(

                $"Bus type {config.InternalBusType} not supported")

        };

    }


    private static IExternalBus CreateExternalBus(FractalCellConfiguration config, ILoggerFactory loggerFactory)

    {

        var logger = loggerFactory.CreateLogger($"ExternalBus-{config.CellId}");


        return config.ExternalBusType switch

        {

            BusType.Channels => new ChannelExternalBus(

                $"{config.CellId}-external",

                config.BusSettings,

                logger),

            BusType.TplDataflow => new TplExternalBus(

                $"{config.CellId}-external",

                config.BusSettings,

                logger),

            _ => throw new NotSupportedException(

                $"Bus type {config.ExternalBusType} not supported")

        };

    }


    private static IFractalCell CreateCellInstance(

        FractalCellConfiguration config,

        IInternalBus internalBus,

        IExternalBus externalBus,

        ILoggerFactory loggerFactory)

    {

        return (config.InternalBusType, config.ExternalBusType) switch

        {

            (BusType.Channels, BusType.Channels) =>

                new ChannelFractalCell(config,

                    (ChannelInternalBus)internalBus,

                    (ChannelExternalBus)externalBus,

                    loggerFactory.CreateLogger<ChannelFractalCell>()),


            (BusType.TplDataflow, BusType.TplDataflow) =>

                new TplFractalCell(config,

                    (TplInternalBus)internalBus,

                    (TplExternalBus)externalBus,

                    loggerFactory.CreateLogger<TplFractalCell>()),


            _ => throw new NotSupportedException(

                $"Combination {config.InternalBusType}/{config.ExternalBusType} not supported")

        };

    }

}

-----------------------------------------------------------------------------------------

📄 15. Implementations/Channels/ChannelExternalBus.cs

csharp

using System.Threading.Channels;

using FractalCell02.Core.Configuration;

using FractalCell02.Core.Interfaces;

using FractalCell02.Core.Templates;

using Microsoft.Extensions.Logging;


namespace FractalCell02.Implementations.Channels;


public class ChannelExternalBus : ExternalBusTemplate

{

    private readonly Channel<IApplicationEvent> _incomingChannel;

    private IFractalEventHub? _hub;

    private string? _cellId;


    public ChannelExternalBus(string busId, BusSettings config, ILogger? logger = null)

        : base(busId, config, logger)

    {

        _incomingChannel = Channel.CreateBounded<IApplicationEvent>(

            new BoundedChannelOptions(config.Capacity)

            {

                FullMode = BoundedChannelFullMode.Wait,

                SingleReader = false,

                SingleWriter = false

            });

        

        _logger?.LogInformation("ChannelExternalBus {BusId} created with capacity {Capacity}", 

            busId, config.Capacity);

    }


    public override async Task ConnectToHubAsync(IFractalEventHub hub, string cellId)

    {

        _hub = hub;

        _cellId = cellId;

        await hub.RegisterChannelAsync(cellId, _incomingChannel);

        _logger?.LogInformation("ChannelExternalBus {BusId} connected to hub for cell {CellId}", 

            BusId, cellId);

    }


    public override async Task SendToCellAsync(string targetCellId, IApplicationEvent @event)

    {

        if (_hub == null)

            throw new InvalidOperationException("Hub not connected");

        

        _logger?.LogInformation("ChannelExternalBus {BusId} sending event {EventId} to {TargetCell}", 

            BusId, @event.EventId, targetCellId);

            

        await _hub.PublishAsync(targetCellId, @event);

    }


    public override async Task BroadcastAsync(IApplicationEvent @event, Predicate<string>? filter = null)

    {

        if (_hub == null)

            throw new InvalidOperationException("Hub not connected");

        

        _logger?.LogInformation("ChannelExternalBus {BusId} broadcasting event {EventId}", 

            BusId, @event.EventId);

            

        await _hub.PublishToAllAsync(@event, filter);

    }


    public override async IAsyncEnumerable<IApplicationEvent> ReadAllAsync(CancellationToken ct)

    {

        _logger?.LogDebug("ChannelExternalBus {BusId} starting to read events", BusId);


        var reader = _incomingChannel.Reader;

        var buffer = Channel.CreateUnbounded<IApplicationEvent>();

        var writer = buffer.Writer;


        _ = Task.Run(async () =>

        {

            try

            {

                await foreach (var @event in reader.ReadAllAsync(ct))

                {

                    await writer.WriteAsync(@event, ct);

                }

            }

            catch (OperationCanceledException)

            {

                _logger?.LogInformation("ChannelExternalBus {BusId} read loop canceled", BusId);

            }

            catch (Exception ex)

            {

                _logger?.LogError(ex, "ChannelExternalBus {BusId} read loop error", BusId);

            }

            finally

            {

                writer.Complete();

            }

        }, ct);


        await foreach (var @event in buffer.Reader.ReadAllAsync(ct))

        {

            yield return @event;

        }

        

        _logger?.LogDebug("ChannelExternalBus {BusId} finished reading events", BusId);

    }


    public override void Dispose()

    {

        _logger?.LogInformation("ChannelExternalBus {BusId} disposing", BusId);

        _incomingChannel.Writer.Complete();

        base.Dispose();

    }

}

----------------------------------------------

📄 16. Implementations/Channels/ChannelFractalCell.cs

csharp

using FractalCell02.Core.Configuration;

using FractalCell02.Core.Templates;

using Microsoft.Extensions.Hosting;

using Microsoft.Extensions.Logging;


namespace FractalCell02.Implementations.Channels;


public class ChannelBackgroundService : BackgroundService

{

    private readonly string _serviceId;

    private readonly ChannelInternalBus _internalBus;

    private readonly ILogger _logger;

    private readonly FractalCellConfiguration _config;

    private IDisposable? _subscription;


    public ChannelBackgroundService(

        string serviceId,

        ChannelInternalBus internalBus,

        ILogger logger,

        FractalCellConfiguration config)

    {

        _serviceId = serviceId;

        _internalBus = internalBus;

        _logger = logger;

        _config = config;

    }


    protected override async Task ExecuteAsync(CancellationToken stoppingToken)

    {

        _logger.LogInformation("ChannelBackgroundService {ServiceId} started", _serviceId);


        _subscription = _internalBus.Subscribe<FractalEvent>(async @event =>

        {

            _logger.LogInformation(

                "Service {ServiceId} processing event {EventId} of type {EventType}",

                _serviceId, @event.EventId, @event.EventType);


            await ProcessEventAsync(@event);

        });


        try

        {

            while (!stoppingToken.IsCancellationRequested)

            {

                await Task.Delay(1000, stoppingToken);

            }

        }

        catch (OperationCanceledException)

        {

            _logger.LogInformation("ChannelBackgroundService {ServiceId} stopping", _serviceId);

        }

    }


    private Task ProcessEventAsync(FractalEvent @event)

    {

        switch (@event.EventType)

        {

            case "ProcessData":

                _logger.LogInformation(

                    "Channel worker {ServiceId} processing data: {Payload}",

                    _serviceId, @event.Payload);

                break;


            case "Heartbeat":

                _logger.LogDebug(

                    "Channel worker {ServiceId} heartbeat from {Source}",

                    _serviceId, @event.SourceCellId);

                break;


            default:

                _logger.LogWarning("Unknown event type: {EventType}", @event.EventType);

                break;

        }


        return Task.CompletedTask;

    }


    public override void Dispose()

    {

        _subscription?.Dispose();

        base.Dispose();

    }

}


public class ChannelFractalCell : FractalCellTemplate<ChannelInternalBus, ChannelExternalBus>

{

    private readonly List<ChannelBackgroundService> _services = new();


    public ChannelFractalCell(

        FractalCellConfiguration configuration,

        ChannelInternalBus internalBus,

        ChannelExternalBus externalBus,

        ILogger<ChannelFractalCell> logger)

        : base(configuration, internalBus, externalBus, logger)

    {

    }


    protected override IEnumerable<BackgroundService> CreateBackgroundServices()

    {

        for (int i = 0; i < Configuration.BackgroundServiceCount; i++)

        {

            var service = new ChannelBackgroundService(

                $"{Configuration.CellId}-worker-{i}",

                InternalBusField,

                Logger,

                Configuration);


            _services.Add(service);

            yield return service;

        }

    }


    protected override void ConfigureHandlers()

    {

        InternalBusField.Subscribe<FractalEvent>(async fractalEvent =>

        {

            Logger.LogInformation("Channel cell {CellId} received event: {EventType} from {Source}",

                Configuration.CellId, fractalEvent.EventType, fractalEvent.SourceCellId);


            if (!string.IsNullOrEmpty(fractalEvent.TargetCellId) &&

                fractalEvent.TargetCellId != Configuration.CellId)

            {

                Logger.LogInformation("Forwarding event {EventId} to {TargetCell}",

                    fractalEvent.EventId, fractalEvent.TargetCellId);


                await ExternalBusField.SendToCellAsync(

                    fractalEvent.TargetCellId, fractalEvent);

            }

        });

    }

}

----------------------------------------------------------------------

📄 17. Implementations/Channels/ChannelInternalBus.cs

csharp

using System.Collections.Concurrent;

using System.Threading.Channels;

using FractalCell02.Core.Common;

using FractalCell02.Core.Configuration;

using FractalCell02.Core.Interfaces;

using FractalCell02.Core.Templates;


namespace FractalCell02.Implementations.Channels;


public class ChannelInternalBus : InternalBusTemplate

{

    private readonly Channel<IApplicationEvent> _channel;

    private readonly ConcurrentDictionary<Type, List<Func<IApplicationEvent, Task>>> _handlers = new();


    public ChannelInternalBus(string busId, BusSettings config)

        : base(busId, config)

    {

        _channel = Channel.CreateBounded<IApplicationEvent>(

            new BoundedChannelOptions(config.Capacity)

            {

                FullMode = BoundedChannelFullMode.Wait,

                SingleReader = false,

                SingleWriter = false

            });

    }


    public override async Task PublishAsync<TEvent>(TEvent @event)

    {

        try

        {

            await _channel.Writer.WriteAsync(@event);

        }

        catch (ChannelClosedException)

        {

            // Канал закрыт, игнорируем

        }

    }


    public override IDisposable Subscribe<TEvent>(Func<TEvent, Task> handler)

    {

        var eventType = typeof(TEvent);

        

        var handlers = _handlers.GetOrAdd(eventType, _ => new List<Func<IApplicationEvent, Task>>());

        

        var wrappedHandler = new Func<IApplicationEvent, Task>(e => handler((TEvent)e));

        

        lock (handlers)

        {

            handlers.Add(wrappedHandler);

        }


        return new Unsubscriber(() =>

        {

            lock (handlers)

            {

                handlers.Remove(wrappedHandler);

                if (handlers.Count == 0)

                {

                    _handlers.TryRemove(eventType, out _);

                }

            }

        });

    }


    public override async Task StartAsync(CancellationToken ct)

    {

        _ = Task.Run(async () =>

        {

            try

            {

                await foreach (var @event in _channel.Reader.ReadAllAsync(ct))

                {

                    var eventType = @event.GetType();

                    

                    if (_handlers.TryGetValue(eventType, out var handlers))

                    {

                        var handlersCopy = handlers.ToList();

                        

                        if (handlersCopy.Count == 1)

                        {

                            await handlersCopy[0](@event);

                        }

                        else if (handlersCopy.Count > 1)

                        {

                            await Task.WhenAll(handlersCopy.Select(h => h(@event)));

                        }

                    }

                }

            }

            catch (OperationCanceledException)

            {

                // Нормальное завершение

            }

            catch (Exception ex)

            {

                // Логируем ошибку

            }

        }, ct);

        

        await Task.CompletedTask;

    }


    public override Task StopAsync()

    {

        _channel.Writer.Complete();

        return Task.CompletedTask;

    }

}

------------------------------------

📄 18. Implementations/TplDataflow/TplExternalBus.cs

csharp

using System.Runtime.CompilerServices;

using System.Threading.Channels;

using System.Threading.Tasks.Dataflow;

using FractalCell02.Core.Configuration;

using FractalCell02.Core.Interfaces;

using FractalCell02.Core.Templates;

using Microsoft.Extensions.Logging;


namespace FractalCell02.Implementations.TplDataflow;


public class TplExternalBus : ExternalBusTemplate

{

    private readonly BufferBlock<IApplicationEvent> _bufferBlock;

    private readonly Channel<IApplicationEvent> _incomingChannel;

    private IFractalEventHub? _hub;

    private string? _cellId;

    private CancellationTokenSource? _cts;


    public TplExternalBus(string busId, BusSettings config, ILogger? logger = null)

        : base(busId, config, logger)

    {

        _bufferBlock = new BufferBlock<IApplicationEvent>(

            new DataflowBlockOptions 

            { 

                BoundedCapacity = config.Capacity,

                EnsureOrdered = true

            });

            

        _incomingChannel = Channel.CreateBounded<IApplicationEvent>(

            new BoundedChannelOptions(config.Capacity)

            {

                FullMode = BoundedChannelFullMode.Wait

            });

    }


    public override async Task ConnectToHubAsync(IFractalEventHub hub, string cellId)

    {

        _hub = hub;

        _cellId = cellId;

        _cts = new CancellationTokenSource();


        await hub.RegisterConsumerAsync(cellId, async @event =>

        {

            await _bufferBlock.SendAsync(@event, _cts.Token);

        });


        _ = Task.Run(async () =>

        {

            try

            {

                await foreach (var @event in _incomingChannel.Reader.ReadAllAsync(_cts.Token))

                {

                    await _bufferBlock.SendAsync(@event, _cts.Token);

                }

            }

            catch (OperationCanceledException)

            {

                // Нормальное завершение

            }

            catch (Exception ex)

            {

                _logger?.LogError(ex, "Error in TplExternalBus forwarding task");

            }

        }, _cts.Token);

    }


    public override async Task SendToCellAsync(string targetCellId, IApplicationEvent @event)

    {

        if (_hub == null)

            throw new InvalidOperationException("Hub not connected");

            

        await _hub.PublishAsync(targetCellId, @event);

    }


    public override async Task BroadcastAsync(IApplicationEvent @event, Predicate<string>? filter = null)

    {

        if (_hub == null)

            throw new InvalidOperationException("Hub not connected");

            

        await _hub.PublishToAllAsync(@event, filter);

    }


    public override async IAsyncEnumerable<IApplicationEvent> ReadAllAsync(

        [EnumeratorCancellation] CancellationToken ct)

    {

        _logger?.LogDebug("TplExternalBus {BusId} starting to read events", BusId);

        

        var buffer = Channel.CreateUnbounded<IApplicationEvent>();

        var writer = buffer.Writer;


        _ = Task.Run(async () =>

        {

            try

            {

                while (!ct.IsCancellationRequested)

                {

                    var @event = await _bufferBlock.ReceiveAsync(ct);

                    await writer.WriteAsync(@event, ct);

                }

            }

            catch (OperationCanceledException)

            {

                _logger?.LogInformation("TplExternalBus {BusId} read loop canceled", BusId);

            }

            catch (Exception ex)

            {

                _logger?.LogError(ex, "TplExternalBus {BusId} read loop error", BusId);

            }

            finally

            {

                writer.Complete();

            }

        }, ct);


        await foreach (var @event in buffer.Reader.ReadAllAsync(ct))

        {

            yield return @event;

        }

    }


    public override void Dispose()

    {

        _cts?.Cancel();

        _cts?.Dispose();

        base.Dispose();

    }

}

--------------------------------------------

📄 19. Implementations/TplDataflow/TplFractalCell.cs

csharp

using FractalCell02.Core.Configuration;

using FractalCell02.Core.Interfaces;

using FractalCell02.Core.Templates;

using Microsoft.Extensions.Hosting;

using Microsoft.Extensions.Logging;

using System.Threading.Tasks.Dataflow;


namespace FractalCell02.Implementations.TplDataflow;


public class TplBackgroundService : BackgroundService

{

    private readonly ActionBlock<IApplicationEvent> _workerBlock;

    private readonly string _serviceId;

    private readonly ILogger _logger;


    public TplBackgroundService(

        ActionBlock<IApplicationEvent> workerBlock,

        string serviceId,

        ILogger logger)

    {

        _workerBlock = workerBlock;

        _serviceId = serviceId;

        _logger = logger;

    }


    protected override async Task ExecuteAsync(CancellationToken stoppingToken)

    {

        _logger.LogInformation("TplBackgroundService {ServiceId} started", _serviceId);


        try

        {

            await _workerBlock.Completion.WaitAsync(stoppingToken);

        }

        catch (OperationCanceledException)

        {

            // Нормальное завершение

        }

        catch (Exception ex)

        {

            _logger.LogError(ex, "Error in TplBackgroundService {ServiceId}", _serviceId);

        }

    }


    public override void Dispose()

    {

        _workerBlock.Complete();

        base.Dispose();

    }

}


public class TplFractalCell : FractalCellTemplate<TplInternalBus, TplExternalBus>

{

    private readonly BufferBlock<IApplicationEvent> _inputBuffer;

    private readonly TransformBlock<IApplicationEvent, IApplicationEvent> _mainProcessingBlock;

    private readonly BroadcastBlock<IApplicationEvent> _broadcastBlock;

    private readonly List<ActionBlock<IApplicationEvent>> _workerBlocks = new();

    private readonly List<IDisposable> _links = new();


    public TplFractalCell(

        FractalCellConfiguration configuration,

        TplInternalBus internalBus,

        TplExternalBus externalBus,

        ILogger<TplFractalCell> logger)

        : base(configuration, internalBus, externalBus, logger)

    {

        var options = new ExecutionDataflowBlockOptions

        {

            BoundedCapacity = configuration.BusSettings.Capacity,

            MaxDegreeOfParallelism = configuration.BusSettings.MaxParallelism,

            NameFormat = $"{configuration.CellId}-block-{{0}}"

        };


        _inputBuffer = new BufferBlock<IApplicationEvent>(options);


        _mainProcessingBlock = new TransformBlock<IApplicationEvent, IApplicationEvent>(

            async @event => await ProcessInMainBlockAsync(@event),

            options);


        _broadcastBlock = new BroadcastBlock<IApplicationEvent>(

            e => e,

            new DataflowBlockOptions

            {

                BoundedCapacity = configuration.BusSettings.Capacity

            });


        _links.Add(_inputBuffer.LinkTo(_mainProcessingBlock,

            new DataflowLinkOptions { PropagateCompletion = true }));


        _links.Add(_mainProcessingBlock.LinkTo(_broadcastBlock,

            new DataflowLinkOptions { PropagateCompletion = true }));

    }


    private async Task<IApplicationEvent> ProcessInMainBlockAsync(IApplicationEvent @event)

    {

        Logger.LogInformation("Main block processing event {EventId}", @event.EventId);


        try

        {

            await InternalBusField.PublishAsync(@event);


            if (@event is FractalEvent fractalEvent &&

                !string.IsNullOrEmpty(fractalEvent.TargetCellId))

            {

                await ExternalBusField.SendToCellAsync(

                    fractalEvent.TargetCellId, fractalEvent);

            }

        }

        catch (Exception ex)

        {

            Logger.LogError(ex, "Error in main block processing event {EventId}",

                @event.EventId);

        }


        return @event;

    }


    protected override IEnumerable<BackgroundService> CreateBackgroundServices()

    {

        for (int i = 0; i < Configuration.BackgroundServiceCount; i++)

        {

            var workerBlock = new ActionBlock<IApplicationEvent>(

                async @event => await ProcessInWorkerAsync(@event, i),

                new ExecutionDataflowBlockOptions

                {

                    BoundedCapacity = Configuration.BusSettings.Capacity,

                    MaxDegreeOfParallelism = 1,

                    NameFormat = $"{Configuration.CellId}-worker-{i}"

                });


            _links.Add(_broadcastBlock.LinkTo(workerBlock,

                new DataflowLinkOptions { PropagateCompletion = true }));


            _workerBlocks.Add(workerBlock);


            yield return new TplBackgroundService(workerBlock,

                $"{Configuration.CellId}-worker-{i}", Logger);

        }

    }


    protected override void ConfigureHandlers()

    {

        InternalBusField.Subscribe<FractalEvent>(async fractalEvent =>

        {

            await _inputBuffer.SendAsync(fractalEvent);

        });

    }


    private async Task ProcessInWorkerAsync(IApplicationEvent @event, int workerId)

    {

        Logger.LogInformation("Worker {WorkerId} processing event {EventId}",

            workerId, @event.EventId);


        if (@event is FractalEvent fractalEvent)

        {

            try

            {

                switch (fractalEvent.EventType)

                {

                    case "ProcessData":

                        await HandleProcessData(fractalEvent, workerId);

                        break;

                    case "Heartbeat":

                        await HandleHeartbeat(fractalEvent, workerId);

                        break;

                    default:

                        Logger.LogWarning("Unknown event type: {EventType}",

                            fractalEvent.EventType);

                        break;

                }

            }

            catch (Exception ex)

            {

                Logger.LogError(ex, "Worker {WorkerId} error processing event {EventId}",

                    workerId, @event.EventId);

            }

        }

    }


    private Task HandleProcessData(FractalEvent @event, int workerId)

    {

        Logger.LogInformation(

            "Worker {WorkerId} processing data from {Source}: {Payload}",

            workerId, @event.SourceCellId, @event.Payload);


        return Task.Delay(100);

    }


    private Task HandleHeartbeat(FractalEvent @event, int workerId)

    {

        Logger.LogDebug(

            "Worker {WorkerId} heartbeat from {Source}",

            workerId, @event.SourceCellId);


        return Task.CompletedTask;

    }


    public override async Task StopAsync(CancellationToken cancellationToken)

    {

        Logger.LogInformation("Stopping TPL blocks...");


        _inputBuffer.Complete();


        try

        {

            await Task.WhenAll(

                _inputBuffer.Completion,

                _mainProcessingBlock.Completion,

                _broadcastBlock.Completion,

                Task.WhenAll(_workerBlocks.Select(b => b.Completion))

            ).WaitAsync(cancellationToken);

        }

        catch (OperationCanceledException)

        {

            Logger.LogWarning("Stop operation canceled");

        }

        catch (Exception ex)

        {

            Logger.LogError(ex, "Error while stopping TPL blocks");

        }


        foreach (var link in _links)

        {

            link.Dispose();

        }

        _links.Clear();


        await base.StopAsync(cancellationToken);

    }

}

---------------------------------------

📄 20. Implementations/TplDataflow/TplInternalBus.cs

csharp

using System.Collections.Concurrent;

using System.Threading.Tasks.Dataflow;

using FractalCell02.Core.Common;

using FractalCell02.Core.Configuration;

using FractalCell02.Core.Interfaces;

using FractalCell02.Core.Templates;


namespace FractalCell02.Implementations.TplDataflow;


public class TplInternalBus : InternalBusTemplate

{

    private readonly ActionBlock<IApplicationEvent> _actionBlock;

    private readonly ConcurrentDictionary<Type, List<Func<IApplicationEvent, Task>>> _handlers = new();


    public TplInternalBus(string busId, BusSettings config)

        : base(busId, config)

    {

        _actionBlock = new ActionBlock<IApplicationEvent>(

            async @event => await ProcessEventAsync(@event),

            new ExecutionDataflowBlockOptions

            {

                BoundedCapacity = config.Capacity,

                MaxDegreeOfParallelism = config.MaxParallelism

            });

    }


    private async Task ProcessEventAsync(IApplicationEvent @event)

    {

        if (_handlers.TryGetValue(@event.GetType(), out var handlers))

        {

            var handlersCopy = handlers.ToList();

            

            if (handlersCopy.Count == 1)

            {

                await handlersCopy[0](@event);

            }

            else if (handlersCopy.Count > 1)

            {

                await Task.WhenAll(handlersCopy.Select(h => h(@event)));

            }

        }

    }


    public override async Task PublishAsync<TEvent>(TEvent @event)

    {

        await _actionBlock.SendAsync(@event);

    }


    public override IDisposable Subscribe<TEvent>(Func<TEvent, Task> handler)

    {

        var eventType = typeof(TEvent);

        var handlers = _handlers.GetOrAdd(eventType, _ => new List<Func<IApplicationEvent, Task>>());

        

        var wrappedHandler = new Func<IApplicationEvent, Task>(e => handler((TEvent)e));

        

        lock (handlers)

        {

            handlers.Add(wrappedHandler);

        }


        return new Unsubscriber(() =>

        {

            lock (handlers)

            {

                handlers.Remove(wrappedHandler);

                if (handlers.Count == 0)

                {

                    _handlers.TryRemove(eventType, out _);

                }

            }

        });

    }


    public override Task StartAsync(CancellationToken ct)

    {

        return Task.CompletedTask;

    }


    public override Task StopAsync()

    {

        _actionBlock.Complete();

        return _actionBlock.Completion;

    }

}

------------------------------------------------------

📄 21. Implementations/FractalEventHub.cs

csharp

using System.Collections.Concurrent;

using System.Threading.Channels;

using FractalCell02.Core.Configuration;

using FractalCell02.Core.Interfaces;

using Microsoft.Extensions.Logging;


namespace FractalCell02.Implementations;


public class InMemoryFractalEventHub : IFractalEventHub

{

    private readonly ConcurrentDictionary<string, CellConnection> _connections = new();

    private readonly ILogger<InMemoryFractalEventHub> _logger;

    private readonly HubSettings _settings;

    private readonly SemaphoreSlim _semaphore = new(1, 1);


    private record CellConnection(

        Channel<IApplicationEvent>? Channel,

        Func<IApplicationEvent, Task>? Consumer);


    public InMemoryFractalEventHub(

        ILogger<InMemoryFractalEventHub> logger,

        HubSettings? settings = null)

    {

        _logger = logger;

        _settings = settings ?? new HubSettings();

        _logger.LogInformation("InMemoryFractalEventHub created");

    }


    public Task RegisterChannelAsync(string cellId, Channel<IApplicationEvent> incomingChannel)

    {

        _logger.LogInformation("Registering channel for cell {CellId}", cellId);

        return RegisterInternalAsync(cellId, new CellConnection(incomingChannel, null));

    }


    public Task RegisterConsumerAsync(string cellId, Func<IApplicationEvent, Task> consumer)

    {

        _logger.LogInformation("Registering consumer for cell {CellId}", cellId);

        return RegisterInternalAsync(cellId, new CellConnection(null, consumer));

    }


    private async Task RegisterInternalAsync(string cellId, CellConnection connection)

    {

        await _semaphore.WaitAsync();

        try

        {

            if (_connections.TryAdd(cellId, connection))

            {

                _logger.LogInformation("Cell {CellId} registered successfully. Total cells: {Count}", 

                    cellId, _connections.Count);

            }

            else

            {

                _logger.LogWarning("Cell {CellId} already registered, updating", cellId);

                _connections[cellId] = connection;

            }

        }

        finally

        {

            _semaphore.Release();

        }

    }


    public Task UnregisterCellAsync(string cellId)

    {

        if (_connections.TryRemove(cellId, out _))

        {

            _logger.LogInformation("Cell {CellId} unregistered", cellId);

        }

        return Task.CompletedTask;

    }


    public async Task PublishAsync(string targetCellId, IApplicationEvent @event)

    {

        _logger.LogInformation("Publishing event {EventId} to target cell {TargetCell}. Active cells: {Count}", 

            @event.EventId, targetCellId, _connections.Count);


        if (_connections.TryGetValue(targetCellId, out var connection))

        {

            try

            {

                if (connection.Channel != null)

                {

                    _logger.LogDebug("Writing event {EventId} to channel of {TargetCell}", 

                        @event.EventId, targetCellId);

                    await connection.Channel.Writer.WriteAsync(@event);

                    _logger.LogInformation("Event {EventId} successfully sent to {TargetCell}", 

                        @event.EventId, targetCellId);

                }

                else if (connection.Consumer != null)

                {

                    _logger.LogDebug("Invoking consumer for event {EventId} in {TargetCell}", 

                        @event.EventId, targetCellId);

                    await connection.Consumer(@event);

                    _logger.LogInformation("Event {EventId} successfully consumed by {TargetCell}", 

                        @event.EventId, targetCellId);

                }

                else

                {

                    _logger.LogWarning("Cell {TargetCell} has no active connection", targetCellId);

                }

            }

            catch (Exception ex)

            {

                _logger.LogError(ex, "Failed to send event {EventId} to {TargetCell}", 

                    @event.EventId, targetCellId);

                throw;

            }

        }

        else

        {

            _logger.LogWarning("Target cell {TargetCell} not found. Available cells: {Cells}", 

                targetCellId, string.Join(", ", _connections.Keys));

            throw new KeyNotFoundException($"Cell {targetCellId} not found");

        }

    }


    public async Task PublishToAllAsync(IApplicationEvent @event, Predicate<string>? filter = null)

    {

        _logger.LogInformation("Broadcasting event {EventId} to all cells", @event.EventId);

        

        var tasks = _connections

            .Where(kvp => filter == null || filter(kvp.Key))

            .Select(async kvp =>

            {

                try

                {

                    if (kvp.Value.Channel != null)

                    {

                        await kvp.Value.Channel.Writer.WriteAsync(@event);

                        _logger.LogDebug("Event {EventId} broadcasted to {Cell}", 

                            @event.EventId, kvp.Key);

                    }

                    else if (kvp.Value.Consumer != null)

                    {

                        await kvp.Value.Consumer(@event);

                        _logger.LogDebug("Event {EventId} consumed by {Cell}", 

                            @event.EventId, kvp.Key);

                    }

                }

                catch (Exception ex)

                {

                    _logger.LogError(ex, "Failed to broadcast to {Cell}", kvp.Key);

                }

            });


        await Task.WhenAll(tasks);

        _logger.LogInformation("Event {EventId} broadcasted to {Count} cells", 

            @event.EventId, tasks.Count());

    }


    public IReadOnlyCollection<string> GetActiveCells()

    {

        var cells = _connections.Keys.ToList().AsReadOnly();

        _logger.LogDebug("Getting active cells: {Count} cells", cells.Count);

        return cells;

    }

}

-----------------------------------------------------

📄 22. Model/FractalEvent.cs

csharp

using FractalCell02.Core.Interfaces;


namespace FractalCell02;


public record FractalEvent(

    string EventId,

    DateTime Timestamp,

    string SourceCellId,

    string TargetCellId,

    string EventType,

    object Payload

) : IApplicationEvent;


Комментариев нет:

Отправить комментарий