Asyncio + DependencyInjectorの監視デーモン-依存関係インジェクションガイド

こんにちは、



私はDependencyInjectorの作成者ですこれは、Pythonの依存関係インジェクションフレームワークです。



これは、DependencyInjectorを使用してアプリケーションを構築するための別のチュートリアルです。



今日は、モジュールに基づいて非同期デーモンを構築する方法を示したいと思いますasyncio



マニュアルは次の部分で構成されています。



  1. 何を構築しますか?
  2. ツールチェック
  3. プロジェクト構造
  4. 環境の準備
  5. ロギングと構成
  6. 発車係
  7. example.comの監視
  8. httpbin.orgの監視
  9. テスト
  10. 結論


完成したプロジェクトはGithubにあります。



まず、次のことが望ましいです。



  • の初期知識 asyncio
  • 依存関係の注入の原理を理解する


何を構築しますか?



Webサービスへのアクセスを監視する監視デーモンを構築します。



デーモンは、数秒ごとexample.comhttpbin.orgにリクエストを送信します応答を受信すると、次のデータがログに書き込まれます。



  • 応答コード
  • 応答のバイト数
  • リクエストの完了にかかった時間






ツールチェック



Dockerdocker-compose を使用ます。それらがインストールされていることを確認しましょう:



docker --version
docker-compose --version


出力は次のようになります。



Docker version 19.03.12, build 48a66213fe
docker-compose version 1.26.2, build eefe0d31


Dockerまたはdocker-composeがインストールされていない場合は、続行する前にインストールする必要があります。次のガイドに従ってください。





ツールの準備ができました。プロジェクトの構造に移りましょう。



プロジェクト構造



プロジェクトフォルダを作成し、それに移動します。



mkdir monitoring-daemon-tutorial
cd monitoring-daemon-tutorial


次に、初期プロジェクト構造を作成する必要があります。以下の構造に従ってファイルとフォルダを作成します。今のところ、すべてのファイルは空になります。後で記入します。



初期プロジェクト構造:



./
├── monitoringdaemon/
│   ├── __init__.py
│   ├── __main__.py
│   └── containers.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt


初期のプロジェクト構造は準備ができています。次のセクションで拡張します。



次に、環境の準備を待っています。



環境の準備



このセクションでは、デーモンを起動するための環境を準備します。



まず、依存関係を定義する必要があります。次のようなパッケージを使用します。



  • dependency-injector -依存関係注入フレームワーク
  • aiohttp -Webフレームワーク(httpクライアントのみが必要です)
  • pyyaml -構成の読み取りに使用されるYAMLファイルを解析するためのライブラリ
  • pytest -テストフレームワーク
  • pytest-asyncio- asyncioアプリケーションをテストするためのヘルパーライブラリ
  • pytest-cov -テストによってコードカバレッジを測定するためのヘルパーライブラリ


次の行をファイルに追加しましょうrequirements.txt



dependency-injector
aiohttp
pyyaml
pytest
pytest-asyncio
pytest-cov


そして、ターミナルで実行します。



pip install -r requirements.txt


次に、を作成しますDockerfileデーモンを構築して起動するプロセスについて説明します。python:3.8-busterベース画像として使用します。



次の行をファイルに追加しましょうDockerfile



FROM python:3.8-buster

ENV PYTHONUNBUFFERED=1

WORKDIR /code
COPY . /code/

RUN apt-get install openssl \
 && pip install --upgrade pip \
 && pip install -r requirements.txt \
 && rm -rf ~/.cache

CMD ["python", "-m", "monitoringdaemon"]


最後のステップは、設定を定義することdocker-composeです。



次の行をファイルに追加しましょうdocker-compose.yml



version: "3.7"

services:

  monitor:
    build: ./
    image: monitoring-daemon
    volumes:
      - "./:/code"


すべての準備が整いました。イメージの作成を開始し、環境が正しく構成されていることを確認しましょう。



ターミナルで実行してみましょう:



docker-compose build


ビルドプロセスには数分かかる場合があります。最後に、次のように表示されます。



Successfully built 5b4ee5e76e35
Successfully tagged monitoring-daemon:latest


ビルドプロセスが完了したら、コンテナを起動します。



docker-compose up


次のように表示されます。



Creating network "monitoring-daemon-tutorial_default" with the default driver
Creating monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitoring-daemon-tutorial_monitor_1 exited with code 0


環境の準備ができています。コンテナはコードで開始および終了します0



次のステップは、ロギングを設定し、構成ファイルを読み取ることです。



ロギングと構成



このセクションでは、ロギングを構成し、構成ファイルを読み取ります。



アプリケーションの主要部分である依存関係コンテナー(さらにはコンテナーのみ)を追加することから始めましょう。コンテナには、アプリケーションのすべてのコンポーネントが含まれます。



最初の2つのコンポーネントを追加しましょう。これは、構成オブジェクトであり、ロギングを構成するための関数です。



編集しましょうcontainers.py



"""Application containers module."""

import logging
import sys

from dependency_injector import containers, providers


class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""

    config = providers.Configuration()

    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )


値を設定する前に、構成パラメーターを使用しました。これは、プロバイダーが機能する原則ですConfiguration



最初にを使用し、次に値を設定します。



ロギング設定は構成ファイルに含まれます。



編集しましょうconfig.yml



log:
  level: "INFO"
  format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"


次に、デーモンを起動する関数を定義しましょう。彼女は通常呼ばれmain()ます。コンテナを作成します。コンテナは、構成ファイルを読み取り、ロギング設定関数を呼び出すために使用されます。



編集しましょう__main__.py



"""Main module."""

from .containers import ApplicationContainer


def main() -> None:
    """Run the application."""
    container = ApplicationContainer()

    container.config.from_yaml('config.yml')
    container.configure_logging()


if __name__ == '__main__':
    main()


コンテナは、アプリケーションの最初のオブジェクトです。他のすべてのオブジェクトを取得するために使用されます。


構成のロギングと読み取りが構成されます。次のセクションでは、監視タスクマネージャを作成します。



発車係



監視タスクマネージャを追加する時が来ました。



ディスパッチャには、監視タスクのリストが含まれ、それらの実行を制御します。彼はスケジュールに従って各タスクを実行します。クラスMonitor-タスクを監視するための基本クラス。特定のタスクを作成するには、子クラスを追加してメソッドを実装する必要がありますcheck()





監視タスクのディスパッチャと基本クラスを追加しましょう。



さんが作成してみましょうdispatcher.pymonitors.py、パッケージ内monitoringdaemon



./
├── monitoringdaemon/
│   ├── __init__.py
│   ├── __main__.py
│   ├── containers.py
│   ├── dispatcher.py
│   └── monitors.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt


次の行をファイルに追加しましょうmonitors.py



"""Monitors module."""

import logging


class Monitor:

    def __init__(self, check_every: int) -> None:
        self.check_every = check_every
        self.logger = logging.getLogger(self.__class__.__name__)

    async def check(self) -> None:
        raise NotImplementedError()


そしてファイルにdispatcher.py



""""Dispatcher module."""

import asyncio
import logging
import signal
import time
from typing import List

from .monitors import Monitor


class Dispatcher:

    def __init__(self, monitors: List[Monitor]) -> None:
        self._monitors = monitors
        self._monitor_tasks: List[asyncio.Task] = []
        self._logger = logging.getLogger(self.__class__.__name__)
        self._stopping = False

    def run(self) -> None:
        asyncio.run(self.start())

    async def start(self) -> None:
        self._logger.info('Starting up')

        for monitor in self._monitors:
            self._monitor_tasks.append(
                asyncio.create_task(self._run_monitor(monitor)),
            )

        asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, self.stop)
        asyncio.get_event_loop().add_signal_handler(signal.SIGINT, self.stop)

        await asyncio.gather(*self._monitor_tasks, return_exceptions=True)

        self.stop()

    def stop(self) -> None:
        if self._stopping:
            return

        self._stopping = True

        self._logger.info('Shutting down')
        for task, monitor in zip(self._monitor_tasks, self._monitors):
            task.cancel()
        self._logger.info('Shutdown finished successfully')

    @staticmethod
    async def _run_monitor(monitor: Monitor) -> None:
        def _until_next(last: float) -> float:
            time_took = time.time() - last
            return monitor.check_every - time_took

        while True:
            time_start = time.time()

            try:
                await monitor.check()
            except asyncio.CancelledError:
                break
            except Exception:
                monitor.logger.exception('Error executing monitor check')

            await asyncio.sleep(_until_next(last=time_start))


ディスパッチャをコンテナに追加する必要があります。



編集しましょうcontainers.py



"""Application containers module."""

import logging
import sys

from dependency_injector import containers, providers

from . import dispatcher


class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""

    config = providers.Configuration()

    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )

    dispatcher = providers.Factory(
        dispatcher.Dispatcher,
        monitors=providers.List(
            # TODO: add monitors
        ),
    )


各コンポーネントがコンテナに追加されます。


最後に、関数を更新する必要がありますmain()コンテナからディスパッチャを取得し、そのメソッドを呼び出しますrun()



編集しましょう__main__.py



"""Main module."""

from .containers import ApplicationContainer


def main() -> None:
    """Run the application."""
    container = ApplicationContainer()

    container.config.from_yaml('config.yml')
    container.configure_logging()

    dispatcher = container.dispatcher()
    dispatcher.run()


if __name__ == '__main__':
    main()


それでは、デーモンを起動して、その動作を確認しましょう。



ターミナルで実行してみましょう:



docker-compose up


出力は次のようになります。



Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1  | [2020-08-08 16:12:35,772] [INFO] [Dispatcher]: Starting up
monitor_1  | [2020-08-08 16:12:35,774] [INFO] [Dispatcher]: Shutting down
monitor_1  | [2020-08-08 16:12:35,774] [INFO] [Dispatcher]: Shutdown finished successfully
monitoring-daemon-tutorial_monitor_1 exited with code 0


すべてが正しく機能します。監視タスクがないため、ディスパッチャは開始および停止します。



このセクションの終わりまでに、私たちの悪魔の骨格は準備ができています。次のセクションでは、最初の監視タスクを追加します。



example.comの監視



このセクションでは、http://example.comへのアクセスを監視する監視タスクを追加します



まず、新しいタイプの監視タスクでクラスモデルを拡張することから始めますHttpMonitor



HttpMonitor子クラスMonitorです。check()メソッドを実装します。HTTP要求を送信し、受信した応答をログに記録します。HTTPリクエストの詳細はクラスに委任されHttpClientます。





最初に追加しましょうHttpClientパッケージに



ファイルhttp.py作成しましょうmonitoringdaemon



./
├── monitoringdaemon/
│   ├── __init__.py
│   ├── __main__.py
│   ├── containers.py
│   ├── dispatcher.py
│   ├── http.py
│   └── monitors.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt


そして、それに次の行を追加します。



"""Http client module."""

from aiohttp import ClientSession, ClientTimeout, ClientResponse


class HttpClient:

    async def request(self, method: str, url: str, timeout: int) -> ClientResponse:
        async with ClientSession(timeout=ClientTimeout(timeout)) as session:
            async with session.request(method, url) as response:
                return response


次に、HttpClientコンテナに追加する必要があります



編集しましょうcontainers.py



"""Application containers module."""

import logging
import sys

from dependency_injector import containers, providers

from . import http, dispatcher


class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""

    config = providers.Configuration()

    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )

    http_client = providers.Factory(http.HttpClient)

    dispatcher = providers.Factory(
        dispatcher.Dispatcher,
        monitors=providers.List(
            # TODO: add monitors
        ),
    )


これで、を追加する準備ができましたHttpMonitorモジュールに追加しましょうmonitors



編集しましょうmonitors.py



"""Monitors module."""

import logging
import time
from typing import Dict, Any

from .http import HttpClient


class Monitor:

    def __init__(self, check_every: int) -> None:
        self.check_every = check_every
        self.logger = logging.getLogger(self.__class__.__name__)

    async def check(self) -> None:
        raise NotImplementedError()


class HttpMonitor(Monitor):

    def __init__(
            self,
            http_client: HttpClient,
            options: Dict[str, Any],
    ) -> None:
        self._client = http_client
        self._method = options.pop('method')
        self._url = options.pop('url')
        self._timeout = options.pop('timeout')
        super().__init__(check_every=options.pop('check_every'))

    @property
    def full_name(self) -> str:
        return '{0}.{1}(url="{2}")'.format(__name__, self.__class__.__name__, self._url)

    async def check(self) -> None:
        time_start = time.time()

        response = await self._client.request(
            method=self._method,
            url=self._url,
            timeout=self._timeout,
        )

        time_end = time.time()
        time_took = time_end - time_start

        self.logger.info(
            'Response code: %s, content length: %s, request took: %s seconds',
            response.status,
            response.content_length,
            round(time_took, 3)
        )


http://example.com のチェックを追加する準備が整いましたコンテナに2つの変更を加える必要があります。



  • 工場を追加しexample_monitorます。
  • example_monitorディスパッチャに転送します。


編集しましょうcontainers.py



"""Application containers module."""

import logging
import sys

from dependency_injector import containers, providers

from . import http, monitors, dispatcher


class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""

    config = providers.Configuration()

    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )

    http_client = providers.Factory(http.HttpClient)

    example_monitor = providers.Factory(
        monitors.HttpMonitor,
        http_client=http_client,
        options=config.monitors.example,
    )

    dispatcher = providers.Factory(
        dispatcher.Dispatcher,
        monitors=providers.List(
            example_monitor,
        ),
    )


プロバイダーexample_monitorは構成値に依存しています。これらの値を追加しましょう:



編集config.yml



log:
  level: "INFO"
  format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"

monitors:

  example:
    method: "GET"
    url: "http://example.com"
    timeout: 5
    check_every: 5


すべての準備が整いました。デーモンを起動し、動作を確認します。



ターミナルで実行します:



docker-compose up


そして、同様の結論が見られます。



Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1  | [2020-08-08 17:06:41,965] [INFO] [Dispatcher]: Starting up
monitor_1  | [2020-08-08 17:06:42,033] [INFO] [HttpMonitor]: Check
monitor_1  |     GET http://example.com
monitor_1  |     response code: 200
monitor_1  |     content length: 648
monitor_1  |     request took: 0.067 seconds
monitor_1  |
monitor_1  | [2020-08-08 17:06:47,040] [INFO] [HttpMonitor]: Check
monitor_1  |     GET http://example.com
monitor_1  |     response code: 200
monitor_1  |     content length: 648
monitor_1  |     request took: 0.073 seconds


私たちのデーモンは、http://example.comへのアクセスの可用性を監視できます



監視https://httpbin.orgを追加しましょう



httpbin.orgの監視



このセクションでは、http://example.comへのアクセスを監視する監視タスクを追加しますすべてのコンポーネントの準備ができている



ため、https: //httpbin.orgの監視タスクを追加する方が簡単です。コンテナに新しいプロバイダーを追加して、構成を更新するだけです。



編集しましょうcontainers.py



"""Application containers module."""

import logging
import sys

from dependency_injector import containers, providers

from . import http, monitors, dispatcher


class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""

    config = providers.Configuration()

    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )

    http_client = providers.Factory(http.HttpClient)

    example_monitor = providers.Factory(
        monitors.HttpMonitor,
        http_client=http_client,
        options=config.monitors.example,
    )

    httpbin_monitor = providers.Factory(
        monitors.HttpMonitor,
        http_client=http_client,
        options=config.monitors.httpbin,
    )

    dispatcher = providers.Factory(
        dispatcher.Dispatcher,
        monitors=providers.List(
            example_monitor,
            httpbin_monitor,
        ),
    )


編集しましょうconfig.yml



log:
  level: "INFO"
  format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"

monitors:

  example:
    method: "GET"
    url: "http://example.com"
    timeout: 5
    check_every: 5

  httpbin:
    method: "GET"
    url: "https://httpbin.org/get"
    timeout: 5
    check_every: 5


デーモンを起動してログを確認しましょう。



ターミナルで実行してみましょう:



docker-compose up


そして、同様の結論が見られます。



Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1  | [2020-08-08 18:09:08,540] [INFO] [Dispatcher]: Starting up
monitor_1  | [2020-08-08 18:09:08,618] [INFO] [HttpMonitor]: Check
monitor_1  |     GET http://example.com
monitor_1  |     response code: 200
monitor_1  |     content length: 648
monitor_1  |     request took: 0.077 seconds
monitor_1  |
monitor_1  | [2020-08-08 18:09:08,722] [INFO] [HttpMonitor]: Check
monitor_1  |     GET https://httpbin.org/get
monitor_1  |     response code: 200
monitor_1  |     content length: 310
monitor_1  |     request took: 0.18 seconds
monitor_1  |
monitor_1  | [2020-08-08 18:09:13,619] [INFO] [HttpMonitor]: Check
monitor_1  |     GET http://example.com
monitor_1  |     response code: 200
monitor_1  |     content length: 648
monitor_1  |     request took: 0.066 seconds
monitor_1  |
monitor_1  | [2020-08-08 18:09:13,681] [INFO] [HttpMonitor]: Check
monitor_1  |     GET https://httpbin.org/get
monitor_1  |     response code: 200
monitor_1  |     content length: 310
monitor_1  |     request took: 0.126 seconds


機能部分が完成しました。デーモンは、http://example.comおよびhttps://httpbin.orgへのアクセスの可用性を監視します



次のセクションでは、いくつかのテストを追加します。



テスト



いくつかのテストを追加するとよいでしょう。そうしよう。パッケージに



ファイルtests.py作成しますmonitoringdaemon



./
├── monitoringdaemon/
│   ├── __init__.py
│   ├── __main__.py
│   ├── containers.py
│   ├── dispatcher.py
│   ├── http.py
│   ├── monitors.py
│   └── tests.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt


それに次の行を追加します。



"""Tests module."""

import asyncio
import dataclasses
from unittest import mock

import pytest

from .containers import ApplicationContainer


@dataclasses.dataclass
class RequestStub:
    status: int
    content_length: int


@pytest.fixture
def container():
    container = ApplicationContainer()
    container.config.from_dict({
        'log': {
            'level': 'INFO',
            'formant': '[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s',
        },
        'monitors': {
            'example': {
                'method': 'GET',
                'url': 'http://fake-example.com',
                'timeout': 1,
                'check_every': 1,
            },
            'httpbin': {
                'method': 'GET',
                'url': 'https://fake-httpbin.org/get',
                'timeout': 1,
                'check_every': 1,
            },
        },
    })
    return container


@pytest.mark.asyncio
async def test_example_monitor(container, caplog):
    caplog.set_level('INFO')

    http_client_mock = mock.AsyncMock()
    http_client_mock.request.return_value = RequestStub(
        status=200,
        content_length=635,
    )

    with container.http_client.override(http_client_mock):
        example_monitor = container.example_monitor()
        await example_monitor.check()

    assert 'http://fake-example.com' in caplog.text
    assert 'response code: 200' in caplog.text
    assert 'content length: 635' in caplog.text


@pytest.mark.asyncio
async def test_dispatcher(container, caplog, event_loop):
    caplog.set_level('INFO')

    example_monitor_mock = mock.AsyncMock()
    httpbin_monitor_mock = mock.AsyncMock()

    with container.example_monitor.override(example_monitor_mock), \
            container.httpbin_monitor.override(httpbin_monitor_mock):

        dispatcher = container.dispatcher()
        event_loop.create_task(dispatcher.start())
        await asyncio.sleep(0.1)
        dispatcher.stop()

    assert example_monitor_mock.check.called
    assert httpbin_monitor_mock.check.called


テストを実行するには、ターミナルで実行します。



docker-compose run --rm monitor py.test monitoringdaemon/tests.py --cov=monitoringdaemon


同様の結果が得られるはずです。



platform linux -- Python 3.8.3, pytest-6.0.1, py-1.9.0, pluggy-0.13.1
rootdir: /code
plugins: asyncio-0.14.0, cov-2.10.0
collected 2 items

monitoringdaemon/tests.py ..                                    [100%]

----------- coverage: platform linux, python 3.8.3-final-0 -----------
Name                             Stmts   Miss  Cover
----------------------------------------------------
monitoringdaemon/__init__.py         0      0   100%
monitoringdaemon/__main__.py         9      9     0%
monitoringdaemon/containers.py      11      0   100%
monitoringdaemon/dispatcher.py      43      5    88%
monitoringdaemon/http.py             6      3    50%
monitoringdaemon/monitors.py        23      1    96%
monitoringdaemon/tests.py           37      0   100%
----------------------------------------------------
TOTAL                              129     18    86%


テストtest_example_monitorHttpClient、メソッドを使用してモックを置き換える方法に注目してください.override()このようにして、任意のプロバイダーの戻り値をオーバーライドできます。



テストでtest_dispatcherは、監視タスクをモックに置き換えるために同じアクションが実行されます。





結論



asyncio依存関係インジェクションの原理に 基づいて監視デーモンを構築しました依存関係インジェクターを依存関係インジェクションフレームワークとして使用しました。



Dependency Injectorで得られる利点は、コンテナーです。



アプリケーションの構造を理解または変更する必要がある場合、コンテナは成果を上げ始めます。コンテナを使用すると、アプリケーションのすべてのコンポーネントとその依存関係が1か所にあるため、簡単です。



"""Application containers module."""

import logging
import sys

from dependency_injector import containers, providers

from . import http, monitors, dispatcher


class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""

    config = providers.Configuration()

    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )

    http_client = providers.Factory(http.HttpClient)

    example_monitor = providers.Factory(
        monitors.HttpMonitor,
        http_client=http_client,
        options=config.monitors.example,
    )

    httpbin_monitor = providers.Factory(
        monitors.HttpMonitor,
        http_client=http_client,
        options=config.monitors.httpbin,
    )

    dispatcher = providers.Factory(
        dispatcher.Dispatcher,
        monitors=providers.List(
            example_monitor,
            httpbin_monitor,
        ),
    )




アプリケーションのマップとしてのコンテナー。あなたは常に何が何に依存するかを知っています。



次は何ですか?






All Articles