この記事では、キューを使用して無限タスクオーケストレーターを実装する方法について説明します。最終的な目標として、長寿命のタスクを管理できるシステム、タスクのグループが特定のサーバーでホストされる分散システムを実装する必要があります。このサーバーに障害が発生すると、タスクは自動的に無料のサーバーに再配布されます。
ほとんどの場合、すべてのエンタープライズ開発は同じ要件を満たすことになります。アプリケーションは、アプリケーションのタイプに応じて作成され、ある種のライフサイクルがあり、アプリケーションのライフサイクルの終わりに、必要なものを受け取ります(または受け取りません)。アプリケーションとは、製品のオンライン購入、金銭の注文、弾道ミサイルの軌道の計算など、あらゆるものを意味します。各アプリケーションには独自の生き方があり、注意すべき重要なことは 寿命です。今回は短いほど良いです。言い換えれば、私のワイヤー転送が完了するのが早いほど良いです。要件も同様で、 1秒あたりのRPC操作が多く 、待ち時間が短く 、システムは障害耐性があり、スケーラブルで、昨日準備ができている必要があります ..。数百万のツール、数百のデータベース、さまざまなアプローチとパターンがあります。そして、すべてが長い間書かれてきました、私たちはプロジェクトで既製の技術を正しく使用する必要があります。
タスクオーケストレーションのトピックは新しいものではありませんが、驚いたことに、アクティブなサーバー間でタスクを再配布する可能性のある、無限のタスク(寿命は無制限)を管理するための既製のソリューションはありません。したがって、独自のソリューションを実装します。しかし、まず最初に…。
, . — (Job), , . . , “”, . : , . , , , . “”- WebSocket , connected. , , , , . , “” Observer , , .
, , . :
, , .
, , .
, , . , , , , .
/, , ( , RAM ..), .
: N , . , , .
3 . #, . , C# .Net.
Task. . Task “”.
Schedulers. , . , , .
, , . , . RabbitMq, Framework - MassTransit, . .
Task
Task. , ( , ).
. , “Hello Word” :
public async Task SendEmailAsync(Email email, CancellationToken token)
{
//
}
, , await SendEmailAsync.
foreach (var email in emails
{
if(token.IsCancellationRequested)
break;
_emailSender.SendEmailAsync(email, token); // await
}
:
.
FireAndForget Exception .
.
, , .
await- , async/await .
email, , CancellationToken. , , , , . RetryPolicy , ?! , .
Schedulers
.NET , .
. , ( , — , , instance ) /Tasks. Hangfire, - UI, , . .
, Hangfire. BackgroundJob.Enqueue(Expression<Action> methodCall).
var jobIds = new List<string>();
foreach (var email in emails)
{
if(token.IsCancellationRequested)
break;
jobIds.Add(BackgroundJob.Enqueue(
async () => await _emailSender.SendEmailAsync(email, token)));
}
, , . RetryPolicy , . , , .
, . , “” :
_observer.DoWork(observerArg, new CancellationToken())
- , . BackgroundJobClient.
var client = new BackgroundJobClient(JobStorage.Current);
// , .
var state = new EnqueuedState(“unique-queue-name”);
client.Create(() =>_observer.DoWork(observerArg, new CancellationToken()), state);
, . - unique-queue-name.
// instance hangfire .
_server = new BackgroundJobServer(new BackgroundJobServerOptions()
{
WorkerCount = 10,
Queues = new[] { “unique-queue-name” },
ServerName = _serverOptions.ServerName
});
WorkerCount - , . , .
, , . : . , . Hangfire , , .
_monitoringApi = JobStorage.Current.GetMonitoringApi();
:
Observer-service - , , ( HangFilre WorkerCount ).
Observer-manager - , ... . , , . .
Scheduler common db – - , Hangfire MsSql, PostgreSql Redis.
— . “”.
, , , , , , .
, , . , . Hangfire. :
1) . , , .
2) . . , , , .
3) . custom-id, . - .
4) , “default” . , , . job-filters . , .
5) , , , . , , , framework .
, . , , , , .
, ,
, , , . . ? , — , , . , ? , . , , . , .
? “”. - PrefetchCount .
Ready.
Conumer , Unacked. Consumer .
, _Error .
acknowledged, Consumer.
- PrefetchCount , ( ), WorkerCount, Hangfire.
:
Observer-services, . PrefetchCount 1
. , . , , Unacked.
"”, :
Observer-services , , Round-robin.
msg1 . , “Observer 1”. Unacked , .
msg2 . “Observer 1” , , “Observer 2”.
, “Observer-service 1” , ( - “ ... ?”).
, , acknowledgement Unacked Ready. . , , .
- , . _Error, RetryPolicy. , .
RetryPolicy :
1000 .
5 1,4,10... .
int.MaxValue .
? “”, /. PrefetchCount, 10, 10 , . - , , . , 10 , 5 “”, , , 11- , .
? ? , , ... ?! , , "" , CancellationToken.
Manager. . , , . , . , , :
Id () - Guid .
Name (), , , .
CreatedAt/ModifyAt ( / ).
WorkersCount, PrefetchCount - , .
Manager .
Id |
Name |
WorkerCount |
CreatedAt |
ModifyAt |
IsDeleted |
{Unique id} |
Observer service 1 |
10 |
{some date} |
null |
false |
{Unique id} |
Observer service 2 |
10 |
{some date} |
null |
false |
{Unique id} |
Observer service 3 |
10 |
{some date} |
null |
false |
. , , 3 - .
, , , N . IsDeleted=true.
, (Kill –9, ). , Docker. , , . “”, , , . , - ….
“” API. ( , “State queue” ). “” , , , , - .
, , “”. , , , , .
, , “” Created.
Id |
Name |
CreatedAt |
ModifyAt |
ServiceId |
Status |
{Observer id} |
My_new_observer |
{created date} |
null |
null |
Created |
, , , Processing .
Id |
Name |
CreatedAt |
ModifyAt |
ServiceId |
Status |
{Observer id} |
My_new_observer |
{created date} |
{modify date} |
{Observer service 1 id} |
Processing |
“” .
:
Created
Processing
OnDeleting
Deleted
"", :
1) , CancellationToken.
2) , FanOut. , “” , .
, — , ... “ ”.
Observer-service , . , “” CancellationToken. “” .
“” . , id . , .
Created, “” . - , “”.
OnDeleting Deleted, - “” , .
Id |
Name |
CreatedAt |
ModifyAt |
ServiceId |
Status |
{Observer id} |
My_new_observer |
{created date} |
{modify date} |
{Observer service 1 id} |
Deleted |
:
1) .
, . , - MsSql, RabbitMq, Kafka, Kubernetes , , SLA . , . - , .
2) blackout, .
, - , , , , , “”, . “”, . ( , .)
3) .
, "”, . "”, , .
4) . , "”.
. - , , .
5) “”, , .
, , “” . . . , , , , , .
, . , , - Unacked, - Ready. , , polling , . - "”, . , , , PrefetchCount. , , .