こんにちは、Habr!Scala、Idris、その他のFPからしばらく離れて、イベントストア(イベントをイベントストリームに保存できるデータベース)について少し話すことにしました。古き良き本のように、実際にはマスケット銃兵も4人いて、4人目はDDDです。まず、イベントストーミングを使用して、コマンド、イベント、およびそれらに関連付けられているエンティティを選択します。次に、それらに基づいて、オブジェクトの状態を保存して復元します。この記事では、通常のTodoListを実行します。詳しくは猫の下でようこそ。
コンテンツ
- 3人の銃士-イベントソーシング、イベントストーミング、イベントストア-バトルに参加:パート1-DBイベントストアを試す
リンク
ソース
画像dockerimageイベント
ストア
イベントソーシング
イベントストーミング
実際、イベントストアはイベントを保存するように設計されたデータベースです。彼女はまた、イベントのサブスクリプションを作成して、イベントを何らかの方法で処理できるようにする方法も知っています。イベントに反応し、それに基づいていくつかのデータを蓄積する予測もあります。たとえば、TodoCreatedイベント中に、プロジェクションである種のカウントカウンターを増やすことができます。今のところ、このパートでは、イベントストアを読み取りと書き込みの両方のDbとして使用します。さらに次の記事では、イベントストアに書き込むためにデータベースに保存されたイベントに基づいてデータが書き込まれる読み取り用の個別のデータベースを作成します。また、システムを過去の状態にロールバックして「タイムトラベル」を行う方法の例もあります。
それでは、イベントストロミングを始めましょう。通常、その実装では、ソフトウェアがシミュレートするサブジェクト領域のイベントを伝えるすべての関心のある人々と専門家が集まります。たとえば、プラントのソフトウェアの場合-製品製造。ゲームの場合-受けたダメージ。金融ソフトウェアの場合-アカウントにクレジットされるお金など。サブジェクトエリアはTodoListと同じくらい単純なので、イベントはほとんどありません。それでは、サブジェクトエリア(ドメイン)のイベントをボードに書き込みましょう。
次に、これらのイベントをトリガーするコマンドを追加しましょう。
次に、これらのイベントとコマンドを、関連付けられている状態を変更してエンティティの周囲にグループ化します。
私のコマンドは単にサービスメソッド名に変わります。実装に取り掛かりましょう。
まず、イベントをコードで説明しましょう。
public interface IDomainEvent
{
// . id Event Strore
Guid EventId { get; }
// . Event Store
long EventNumber { get; set; }
}
public sealed class TodoCreated : IDomainEvent
{
//Id Todo
public Guid Id { get; set; }
// Todo
public string Name { get; set; }
public Guid EventId => Id;
public long EventNumber { get; set; }
}
public sealed class TodoRemoved : IDomainEvent
{
public Guid EventId { get; set; }
public long EventNumber { get; set; }
}
public sealed class TodoCompleted: IDomainEvent
{
public Guid EventId { get; set; }
public long EventNumber { get; set; }
}
今、私たちのコアはエンティティです:
public sealed class Todo : IEntity<TodoId>
{
private readonly List<IDomainEvent> _events;
public static Todo CreateFrom(string name)
{
var id = Guid.NewGuid();
var e = new List<IDomainEvent>(){new TodoCreated()
{
Id = id,
Name = name
}};
return new Todo(new TodoId(id), e, name, false);
}
public static Todo CreateFrom(IEnumerable<IDomainEvent> events)
{
var id = Guid.Empty;
var name = String.Empty;
var completed = false;
var ordered = events.OrderBy(e => e.EventNumber).ToList();
if (ordered.Count == 0)
return null;
foreach (var @event in ordered)
{
switch (@event)
{
case TodoRemoved _:
return null;
case TodoCreated created:
name = created.Name;
id = created.Id;
break;
case TodoCompleted _:
completed = true;
break;
default: break;
}
}
if (id == default)
return null;
return new Todo(new TodoId(id), new List<IDomainEvent>(), name, completed);
}
private Todo(TodoId id, List<IDomainEvent> events, string name, bool isCompleted)
{
Id = id;
_events = events;
Name = name;
IsCompleted = isCompleted;
Validate();
}
public TodoId Id { get; }
public IReadOnlyList<IDomainEvent> Events => _events;
public string Name { get; }
public bool IsCompleted { get; private set; }
public void Complete()
{
if (!IsCompleted)
{
IsCompleted = true;
_events.Add(new TodoCompleted()
{
EventId = Guid.NewGuid()
});
}
}
public void Delete()
{
_events.Add(new TodoRemoved()
{
EventId = Guid.NewGuid()
});
}
private void Validate()
{
if (Events == null)
throw new ApplicationException(" ");
if (string.IsNullOrWhiteSpace(Name))
throw new ApplicationException(" ");
if (Id == default)
throw new ApplicationException(" ");
}
}
イベントストアに接続します。
services.AddSingleton(sp =>
{
// TCP .
// . .
var con = EventStoreConnection.Create(new Uri("tcp://admin:changeit@127.0.0.1:1113"), "TodosConnection");
con.ConnectAsync().Wait();
return con;
});
そして、主要部分。イベントストア自体からのイベントの保存と読み取り:
public sealed class EventsRepository : IEventsRepository
{
private readonly IEventStoreConnection _connection;
public EventsRepository(IEventStoreConnection connection)
{
_connection = connection;
}
public async Task<long> Add(Guid collectionId, IEnumerable<IDomainEvent> events)
{
var eventPayload = events.Select(e => new EventData(
//Id
e.EventId,
//
e.GetType().Name,
// Json (True|False)
true,
//
Encoding.UTF8.GetBytes(JsonSerializer.Serialize((object)e)),
//
Encoding.UTF8.GetBytes((string)e.GetType().FullName)
));
//
var res = await _connection.AppendToStreamAsync(collectionId.ToString(), ExpectedVersion.Any, eventPayload);
return res.NextExpectedVersion;
}
public async Task<List<IDomainEvent>> Get(Guid collectionId)
{
var results = new List<IDomainEvent>();
long start = 0L;
while (true)
{
var events = await _connection.ReadStreamEventsForwardAsync(collectionId.ToString(), start, 4096, false);
if (events.Status != SliceReadStatus.Success)
return results;
results.AddRange(Deserialize(events.Events));
if (events.IsEndOfStream)
return results;
start = events.NextEventNumber;
}
}
public async Task<List<T>> GetAll<T>() where T : IDomainEvent
{
var results = new List<IDomainEvent>();
Position start = Position.Start;
while (true)
{
var events = await _connection.ReadAllEventsForwardAsync(start, 4096, false);
results.AddRange(Deserialize(events.Events.Where(e => e.Event.EventType == typeof(T).Name)));
if (events.IsEndOfStream)
return results.OfType<T>().ToList();
start = events.NextPosition;
}
}
private List<IDomainEvent> Deserialize(IEnumerable<ResolvedEvent> events) =>
events
.Where(e => IsEvent(e.Event.EventType))
.Select(e =>
{
var result = (IDomainEvent)JsonSerializer.Deserialize(e.Event.Data, ToType(e.Event.EventType));
result.EventNumber = e.Event.EventNumber;
return result;
})
.ToList();
private static bool IsEvent(string eventName)
{
return eventName switch
{
nameof(TodoCreated) => true,
nameof(TodoCompleted) => true,
nameof(TodoRemoved) => true,
_ => false
};
}
private static Type ToType(string eventName)
{
return eventName switch
{
nameof(TodoCreated) => typeof(TodoCreated),
nameof(TodoCompleted) => typeof(TodoCompleted),
nameof(TodoRemoved) => typeof(TodoRemoved),
_ => throw new NotImplementedException(eventName)
};
}
}
エンティティストアは非常にシンプルに見えます。EventStoreからエンティティイベントを取得して復元するか、エンティティイベントを保存するだけです。
public sealed class TodoRepository : ITodoRepository
{
private readonly IEventsRepository _eventsRepository;
public TodoRepository(IEventsRepository eventsRepository)
{
_eventsRepository = eventsRepository;
}
public Task SaveAsync(Todo entity) => _eventsRepository.Add(entity.Id.Value, entity.Events);
public async Task<Todo> GetAsync(TodoId id)
{
var events = await _eventsRepository.Get(id.Value);
return Todo.CreateFrom(events);
}
public async Task<List<Todo>> GetAllAsync()
{
var events = await _eventsRepository.GetAll<TodoCreated>();
var res = await Task.WhenAll(events.Where(t => t != null).Where(e => e.Id != default).Select(e => GetAsync(new TodoId(e.Id))));
return res.Where(t => t != null).ToList();
}
}
リポジトリおよびエンティティとの作業が行われるサービス:
public sealed class TodoService : ITodoService
{
private readonly ITodoRepository _repository;
public TodoService(ITodoRepository repository)
{
_repository = repository;
}
public async Task<TodoId> Create(TodoCreateDto dto)
{
var todo = Todo.CreateFrom(dto.Name);
await _repository.SaveAsync(todo);
return todo.Id;
}
public async Task Complete(TodoId id)
{
var todo = await _repository.GetAsync(id);
todo.Complete();
await _repository.SaveAsync(todo);
}
public async Task Remove(TodoId id)
{
var todo = await _repository.GetAsync(id);
todo.Delete();
await _repository.SaveAsync(todo);
}
public async Task<List<TodoReadDto>> GetAll()
{
var todos = await _repository.GetAllAsync();
return todos.Select(t => new TodoReadDto()
{
Id = t.Id.Value,
Name = t.Name,
IsComplete = t.IsCompleted
}).ToList();
}
public async Task<List<TodoReadDto>> Get(IEnumerable<TodoId> ids)
{
var todos = await Task.WhenAll(ids.Select(i => _repository.GetAsync(i)));
return todos.Where(t => t != null).Select(t => new TodoReadDto()
{
Id = t.Id.Value,
Name = t.Name,
IsComplete = t.IsCompleted
}).ToList();
}
}
まあ、実際には、これまでのところ印象的なものは何もありません。次の記事では、読み取り用に別のデータベースを追加すると、すべてが異なる色で輝きます。これにより、時間の経過とともに一貫性がすぐに失われます。マスター上のイベントストアとSQLDB-スレーブの原則。データを読み取る1つの白いESと多くの黒いMSSQL。
叙情的な逸脱。最近の出来事に照らして、私はマスタースレーブとブラックホワイトについて冗談を言うのを避けられませんでした。ええ、時代は去っていきます。複製中の基地がマスターとスレーブと呼ばれていた時代に私たちが住んでいたことを孫たちに伝えます。
読み取りが多く、データの書き込みが少ないシステム(ほとんど)では、これにより作業速度が向上します。実際には、マスタースレーブ自体の複製は、(インデックスの場合と同様に)書き込みが遅くなるという事実を目的としていますが、その見返りとして、複数のデータベースに負荷を分散することで読み取りが高速化されます。