Cでの協調マルチタスクの実装#







.Netでのマルチタスクに関しては、ほとんどの場合、オペレーティングシステムスレッドに基づくプリエンプティブマルチタスクを想定しています。ただし、この記事では、協調マルチタスクの実装に焦点を当てます。これにより、1つのスレッドだけを使用して複数のメソッドの同時操作の外観を作成できます。







これが私たちの簡単なテンプレートです:







static void Main()
{
    DoWork("A", 4);
    DoWork("B", 3);
    DoWork("C", 2);
    DoWork("D", 1);
}

static void DoWork(string name, int num)
{
    for (int i = 1; i <= num; i++)
    {
        Console.WriteLine($"Work {name}: {i}");
    }
    Console.WriteLine($"Work {name} is completed");
}
      
      





4 A, B, C, D . , , , , , .







, , , : “ , - , , , , , ”.







: " C# - ?" ! await! await :







static async ValueTask DoWork(string name, int num)
{
    for (int i = 1; i <= num; i++)
    {
        Console.WriteLine($"Work {name}: {i}");
        await /*Something*/
    }
    Console.WriteLine($"Work {name} is completed");
}
      
      





await , , . await-? Task.Delay(), Task.Yield() , . , await, . CooperativeBroker:







private class CooperativeBroker : ICooperativeBroker
{
    private Action? _continuation;

    public void GetResult() 
        => this._continuation = null;

    public bool IsCompleted 
        => false;//Preventing sync completion in async method state machine

    public void OnCompleted(Action continuation)
    {
        this._continuation = continuation;
        this.InvokeContinuation();
    }

    public ICooperativeBroker GetAwaiter() 
        => this;

    public void InvokeContinuation() 
        => this._continuation?.Invoke();
}
      
      





C# , await . continuation OnCompleted. , continuation , , continuation , , . . CooperativeContext:







private class CooperativeBroker
{
    private readonly CooperativeContext _cooperativeContext;

    private Action? _continuation;

    public CooperativeBroker(CooperativeContext cooperativeContext)
        => this._cooperativeContext = cooperativeContext;

    ...

    public void OnCompleted(Action continuation)
    {
        this._continuation = continuation;
        this._cooperativeContext.OnCompleted(this);
    }

}

public class CooperativeContext
{
    private readonly List<CooperativeBroker> _brokers = 
        new List<CooperativeBroker>();

    void OnCompleted(CooperativeBroker broker)
    {
        ...
    }
}
      
      





OnCompleted :







private void OnCompleted(CooperativeBroker broker)
{
    //       .
    if (this._targetBrokersCount == this._brokers.Count)
    {
        var nextIndex = this._brokers.IndexOf(broker) + 1;
        if (nextIndex == this._brokers.Count)
        {
            nextIndex = 0;
        }

        this._brokers[nextIndex].InvokeContinuation();
    }
}
      
      





– , (_targetBrokersCount — ). , , "" .







, :







static void Main()
{
    CooperativeContext.Run(
        b => DoWork(b, "A", 4),
        b => DoWork(b, "B", 3),
        b => DoWork(b, "C", 2),
        b => DoWork(b, "D", 1)
    );
}

static async ValueTask DoWork(CooperativeBroker broker, string name, int num, bool extraWork = false)
{
    for (int i = 1; i <= num; i++)
    {
        Console.WriteLine($"Work {name}: {i}, Thread: {Thread.CurrentThread.ManagedThreadId}");
        await broker;
    }

    Console.WriteLine($"Work {name} is completed, Thread: {Thread.CurrentThread.ManagedThreadId}");
}

public class CooperativeContext
{
    public static void Run(params Func<CooperativeBroker, ValueTask>[] tasks)
    {
        CooperativeContext context = new CooperativeContext(tasks.Length);
        foreach (var task in tasks)
        {
            task(context.CreateBroker());
        }

        ...
    }

    ...

    private int _targetBrokersCount;

    private CooperativeContext(int maxCooperation)
    {
        this._threadId = Thread.CurrentThread.ManagedThreadId;
        this._targetBrokersCount = maxCooperation;
    }

    ...
}
      
      





, , – , OnCompleted . "", :







public class CooperativeContext
{
    public static void Run(params Func<ICooperativeBroker, ValueTask>[] tasks)
    {
        CooperativeContext context = new CooperativeContext(tasks.Length);
        foreach (var task in tasks)
        {
            task(context.CreateBroker());
        }

        //        ,  
        //   
        while (context._brokers.Count > 0)
        {
            context.ReleaseFirstFinishedBrokerAndInvokeNext();
        }
    }

    ...
    private void ReleaseFirstFinishedBrokerAndInvokeNext()
    {
        // IsNoAction     
        var completedBroker = this._brokers.Find(i => i.IsNoAction)!;

        var index = this._brokers.IndexOf(completedBroker);
        this._brokers.RemoveAt(index);
        this._targetBrokersCount--;

        if (index == this._brokers.Count)
        {
            index = 0;
        }

        if (this._brokers.Count > 0)
        {
            this._brokers[index].InvokeContinuation();
        }
    }    
}

private class CooperativeBroker : ICooperativeBroker
{
    ...
    public bool IsNoAction
        => this._continuation == null;
    ...
}
      
      





( ):







static void Main()
{
    CooperativeContext.Run(
        b => DoWork(b, "A", 4),
        b => DoWork(b, "B", 3, extraWork: true),
        b => DoWork(b, "C", 2),
        b => DoWork(b, "D", 1)
    );
}

static async ValueTask DoWork(
    ICooperativeBroker broker, 
    string name, 
    int num, 
    bool extraWork = false)
{
    for (int i = 1; i <= num; i++)
    {
        Console.WriteLine(
               $"Work {name}: {i}, Thread: {Thread.CurrentThread.ManagedThreadId}");
        await broker;
        if (extraWork)
        {
            Console.WriteLine(
                   $"Work {name}: {i} (Extra), Thread: {Thread.CurrentThread.ManagedThreadId}");
            await broker;
        }
    }

    Console.WriteLine(
           $"Work {name} is completed, Thread: {Thread.CurrentThread.ManagedThreadId}");
}
      
      





:







Work A: 1, Thread: 1
Work B: 1, Thread: 1
Work C: 1, Thread: 1
Work D: 1, Thread: 1
Work A: 2, Thread: 1
Work B: 1 (Extra), Thread: 1
Work C: 2, Thread: 1
Work D is completed, Thread: 1
Work A: 3, Thread: 1
Work B: 2, Thread: 1
Work C is completed, Thread: 1
Work A: 4, Thread: 1
Work B: 2 (Extra), Thread: 1
Work A is completed, Thread: 1
Work B: 3, Thread: 1
Work B: 3 (Extra), Thread: 1
Work B is completed, Thread: 1
      
      





, , , , .










, , , , , C# .







github.







[Update] DistortNeo、彼はこの問題に対するより実用的な解決策を提供します。








All Articles