DelayedQueueキュー

数年前、私たちのプロジェクトの1つで、アクションの実行を一定期間延期する必要に直面しました。たとえば、3時間以内に支払いのステータスを確認したり、45分後に通知を再送信したりできます。ただし、その時点では、「延期」でき、構成と操作に追加の時間を必要としない適切なライブラリは見つかりませんでした。可能なオプションを分析し、Redisをリポジトリとして使用して、Javaで独自の少し遅延したキューライブラリを作成しました。この記事では、ライブラリの機能、その代替手段、およびその過程で遭遇した「レーキ」について説明します。



機能性



では、遅延キューは何をするのでしょうか?保留中のキューに追加されたイベントは、指定された時間間隔でハンドラーに配信されます。処理が失敗した場合、イベントは後で再度配信されます。さらに、最大試行回数は制限されています。Redisは安全性を保証するものではなく、イベントの損失に備える必要がありますただし、クラスターバージョンでは、Redisはかなり高い信頼性を示しており、1年半の運用でこれに遭遇したことはありません。



API



キューにイベントを追加します



eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1)).subscribe();


メソッドはを返すMonoため、実行するには次のいずれかを実行する必要があることに注意してください



  • subscribe(...)
  • block()


より詳細な説明は、のドキュメントに記載されていますProject Reactorコンテキストは次のようにイベントに追加されます。



eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1), Map.of("key", "value")).subscribe();


イベントハンドラーの登録



eventService.addHandler(DummyEvent.class, e -> Mono.just(true), 1);


, :



eventService.addHandler(
        DummyEvent.class,
        e -> Mono
            .subscriberContext()
            .doOnNext(ctx -> {
                Map<String, String> eventContext = ctx.get("eventContext");
                log.info("context key {}", eventContext.get("key"));
            })
            .thenReturn(true),
        1
);




eventService.removeHandler(DummyEvent.class);




"-":



import static com.github.fred84.queue.DelayedEventService.delayedEventService;

var eventService = delayedEventService().client(redisClient).build();


:



import static com.github.fred84.queue.DelayedEventService.delayedEventService;

var eventService = delayedEventService()
        .client(redisClient)
        .mapper(objectMapper)
        .handlerScheduler(Schedulers.fromExecutorService(executor))
        .schedulingInterval(Duration.ofSeconds(1))
        .schedulingBatchSize(SCHEDULING_BATCH_SIZE)
        .enableScheduling(false)
        .pollingTimeout(POLLING_TIMEOUT)
        .eventContextHandler(new DefaultEventContextHandler())
        .dataSetPrefix("")
        .retryAttempts(10)
        .metrics(new NoopMetrics())
        .refreshSubscriptionsInterval(Duration.ofMinutes(5))
        .build();


( Redis) eventService.close() , @javax.annotation.PreDestroy.





- , . :



  • , Redis;
  • , ( "delayed.queue.ready.for.handling.count" )




, delayed queue. 2018

Amazon Web Services.

, . : " , Amazon-, ".





:





- , JMS . SQS , 15 .





" " . , Redis :





, Netflix dyno-queues

. , , .



, " " sorted set list, ( ):



var events = redis.zrangebyscore("delayed_events", Range.create(-1, System.currentTimeMillis()), 100);
events.forEach(key -> {
  var payload = extractPayload(key);
  var listName = extractType(key);
  redis.lpush(listName, payload);
  redis.zrem("delayed_events", key);
});


Spring Integration, :



redis.brpop(listName)


.





"list" (, ), list . Redis , 2 .



events.forEach(key -> {
  ...
  redis.multi();
  redis.zrem("delayed_events", key);
  redis.lpush(listName, payload);
  redis.exec();
});




list-a . , . "sorted_set" .



events.forEach(key -> {
  ...
  redis.multi();
  redis.zadd("delayed_events", nextAttempt(key))
  redis.zrem("delayed_events", key);
  redis.lpush(listName, payload);
  redis.exec();
});




, , " " "delayed queue" . "sorted set"

metadata;payload, payload , metadata - . . , metadata payload Redis hset "sorted set" .



var envelope = metadata + SEPARATOR + payload;
redis.zadd(envelope, scheduledAt);




var envelope = metadata + SEPARATOR + payload;
var key = eventType + SEPARATOR + eventId;

redis.multi();
redis.zadd(key, scheduledAt);
redis.hset("metadata", key, envelope)
redis.exec();




, . , list . TTL :



redis.set(lockKey, "value", ex(lockTimeout.toMillis() * 1000).nx());




Spring, . " " :





Lettuce , . Project Reactor , " ".

, Subscriber



redis
  .reactive()
  .brpop(timeout, queue)
  .map(e -> deserialize(e))
  .subscribe(new InnerSubscriber<>(handler, ... params ..))




class InnerSubscriber<T extends Event> extends BaseSubscriber<EventEnvelope<T>> {

    @Override
    protected void hookOnNext(@NotNull EventEnvelope<T> envelope) {
        Mono<Boolean> promise = handler.apply(envelope.getPayload());
        promise.subscribe(r -> request(1));
    }
}


, ( Netflix dyno queue, poll- ).



?



  • Kotlin DSL. Kotlin suspend fun API Project Reactor


リンク






All Articles