Pythonでの非同期性

過去数年にわたって、async非同期プログラミングのキーワードとセマンティクスはJavaScriptRustC#などの多くの一般的なプログラミング言語に浸透してきましたもちろん、Pythonにもありますasync/await。Python3.5で導入されました。



この記事では、非同期コードの問題について説明し、代替案について推測し、同期アプリケーションと非同期アプリケーションの両方を同時にサポートするための新しいアプローチを提案します。



機能色



非同期関数がプログラミング言語に含まれている場合、それは本質的に2つに分割されます。赤い関数が表示され(または非同期)、一部の関数は青いままです(同期)。



主な問題は、青の関数が赤を呼び出すことができないことですが、赤は潜在的に青を引き起こす可能性があります。たとえば、Pythonでは、これは部分的に当てはまります。非同期関数は、同期の非ブロッキング関数のみを呼び出すことができます。ただし、説明から関数がブロックされているかどうかを判断することはできません。 Pythonはスクリプト言語です。



この分割により、言語が同期と非同期の2つのサブセットに分割されます。 Python 3.5は5年以上前にリリースされましたがasync、Pythonの同期機能ほどサポートされていません。



このすばらしい記事で関数の色についてもっと読むことができます



重複するコード



関数の色が異なるということは、実際にはコードが重複していることを意味します。



Webページのサイズを取得するためのCLIツールを開発していて、それを行うための同期と非同期の両方の方法を維持したいとします。たとえば、これは、ライブラリを作成していて、コードがどのように使用されるかわからない場合に必要です。また、PyPIライブラリだけでなく、Djangoやaiohttpなどで記述された、さまざまなサービスに共通のロジックを備えた独自のライブラリについても説明します。もちろん、独立したアプリケーションは、ほとんどの場合、同期的にのみ、または非同期的にのみ作成されます。



同期疑似コードから始めましょう:



def fetch_resource_size(url: str) -> int:
    response = client_get(url)
    return len(response.content)


いい感じ。次に、非同期アナログを見てみましょう。



async def fetch_resource_size(url: str) -> int:
    response = await client_get(url)
    return len(response.content)


一般に、これは同じコードですが、単語asyncawait。が追加されていますそして、私はそれを構成しませんでした-httpxのチュートリアルのコード例を比較してください:





まったく同じ絵があります。



抽象化と構成



それはあなたがすべての同期コードを書き換えると、ここで配置する必要があるとそこにいることが判明asyncし、awaitプログラムが非同期になるように。



2つの原則がこの問題の解決に役立ちます。まず、必須の疑似コードを機能に書き直してみましょう。これにより、画像をよりはっきりと見ることができます。



def fetch_resource_size(url: str) -> Abstraction[int]:
    return client_get(url).map(
        lambda response: len(response.content),
    )


あなたはこの方法が.map何であるか、それは何をするのかを尋ねます。これは、複雑な抽象化と純粋な関数の構成が機能的なスタイルで行われる方法です。これにより、既存の状態から新しい状態で新しい抽象化を作成できます。それを仮定client_get(url)最初に戻りますAbstraction[Response]、そして呼び出しが.map(lambda response: len(response.content))必要なインスタンスへの応答を変換Abstraction[int]



次に何をすべきかが明らかになります。いくつかの独立したステップから順次関数呼び出しに簡単に移行できたことに注目してください。さらに、応答タイプを変更しました。関数が抽象化を返すようになりました。



非同期バージョンで動作するようにコードを書き直してみましょう。



def fetch_resource_size(url: str) -> AsyncAbstraction[int]:
    return client_get(url).map(
        lambda response: len(response.content),
    )


唯一異なるのはリターンタイプです- AsyncAbstraction残りのコードはまったく同じです。キーワードasyncを使用する必要がなくなりましたawaitawaitはまったく使用されておらず(このためにすべてが開始されました)、それがないと意味がありませんasync



最後に、必要なクライアント(非同期または同期)を決定します。



def fetch_resource_size(
    client_get: Callable[[str], AbstactionType[Response]],
    url: str,
) -> AbstactionType[int]:
    return client_get(url).map(
        lambda response: len(response.content),
    )


client_getは、URL文字列を入力として受け取りAbstractionType、オブジェクトに対して何らかの型返す呼び出し可能な型引数になりましたResponseAbstractionType-AbstractionまたはAsyncAbstraction前の例から。



を渡すAbstractionと、コードは同期的にAsyncAbstraction実行されます。同じコードが自動的に非同期で実行を開始します。



IOResultとFutureResult



幸いなことに、dry-python/returns正しい抽象化はすでに実施されています。



タイプセーフで、mypyに対応し、フレームワークに依存しない、完全にPythonのツールを紹介します。それは絶対にどんなプロジェクトでも使用できる驚くべき、便利な、素晴らしい抽象化を持っています。



同期オプション



まず、再現可能な例を得るために依存関係を追加しましょう



pip install returns httpx anyio


次に、疑似コードを機能するPythonコードに変換しましょう。同期オプションから始めましょう。



from typing import Callable
 
import httpx
 
from returns.io import IOResultE, impure_safe
 
def fetch_resource_size(
    client_get: Callable[[str], IOResultE[httpx.Response]],
    url: str,
) -> IOResultE[int]:
    return client_get(url).map(
        lambda response: len(response.content),
    )
 
print(fetch_resource_size(
    impure_safe(httpx.get),
    'https://sobolevn.me',
))
# => <IOResult: <Success: 27972>>


動作するコードを取得するには、いくつかの変更が必要でした。



  • 使用IOResultEは、同期IOエラーを処理する機能的な方法です(例外が常に機能するとは限りません)。に基づくタイプをResult使用すると、例外をシミュレートできますが、値は異なりますFailure()成功した出口は、タイプにラップされSuccessます。通常、誰も例外を気にしませんが、私たちは気にします。
  • httpx同期および非同期の要求を処理できるものを使用します。
  • 関数impure_safeを使用して、戻り型httpx.getを抽象化に変換しIOResultEます。


非同期オプション



非同期コードでも同じことを試してみましょう。



from typing import Callable
 
import anyio
import httpx
 
from returns.future import FutureResultE, future_safe
 
def fetch_resource_size(
    client_get: Callable[[str], FutureResultE[httpx.Response]],
    url: str,
) -> FutureResultE[int]:
    return client_get(url).map(
        lambda response: len(response.content),
    )
 
page_size = fetch_resource_size(
    future_safe(httpx.AsyncClient().get),
    'https://sobolevn.me',
)
print(page_size)
print(anyio.run(page_size.awaitable))
# => <FutureResult: <coroutine object async_map at 0x10b17c320>>
# => <IOResult: <Success: 27972>>


ご覧のとおり、結果はまったく同じですが、コードは非同期で実行されています。ただし、その主要部分は変更されていません。ただし、次の点に注意する必要があります。



  • 同時はIOResultE非同期に変更されFutureResultEimpure_safe上- future_safeこれは同じように機能しますが、異なる抽象化を返しますFutureResultE
  • AsyncClientから使用さhttpxます。
  • FutureResult赤い関数は自分自身を呼び出すことができないため、結果の値を実行する必要があります。
  • ユーティリティは、anyioこのアプローチは、任意の非同期ライブラリと連動することを示すために使用されますasynciotriocurio


一石二鳥



同期バージョンと非同期バージョンを1つのタイプセーフAPIに組み合わせる方法を紹介します。IOを操作するための



より高い種類のタイプタイプクラスはまだリリースされていないため(0.15.0で表示されます)、通常の方法で説明します@overload



from typing import Callable, Union, overload
 
import anyio
import httpx
 
from returns.future import FutureResultE, future_safe
from returns.io import IOResultE, impure_safe
 
@overload
def fetch_resource_size(
    client_get: Callable[[str], IOResultE[httpx.Response]],
    url: str,
) -> IOResultE[int]:
    """Sync case."""
 
@overload
def fetch_resource_size(
    client_get: Callable[[str], FutureResultE[httpx.Response]],
    url: str,
) -> FutureResultE[int]:
    """Async case."""
 
def fetch_resource_size(
    client_get: Union[
        Callable[[str], IOResultE[httpx.Response]],
        Callable[[str], FutureResultE[httpx.Response]],
    ],
    url: str,
) -> Union[IOResultE[int], FutureResultE[int]]:
    return client_get(url).map(
        lambda response: len(response.content),
    )


デコレータを使用して、@overload許可される入力データと戻り値のタイプ記述します。デコレータの詳細については@overload、他の記事をご覧ください



同期または非同期クライアントでの関数呼び出しは次のようになります。



# Sync:
print(fetch_resource_size(
    impure_safe(httpx.get),
    'https://sobolevn.me',
))
# => <IOResult: <Success: 27972>>
 
# Async:
page_size = fetch_resource_size(
    future_safe(httpx.AsyncClient().get),
    'https://sobolevn.me',
)
print(page_size)
print(anyio.run(page_size.awaitable))
# => <FutureResult: <coroutine object async_map at 0x10b17c320>>
# => <IOResult: <Success: 27972>>


ご覧のとおりfetch_resource_size、同期バリアントではIOResultすぐに戻って実行します。一方、非同期バージョンでは、通常のコルーチンと同様に、イベントループが必要です。anyio結果を表示するために使用されます。



ではmypy、このコードはコメントはありません。



» mypy async_and_sync.py
Success: no issues found in 1 source file


何かが台無しになった場合に何が起こるか見てみましょう。



---lambda response: len(response.content),
+++lambda response: response.content,


mypy 新しいエラーを簡単に見つける:



» mypy async_and_sync.py
async_and_sync.py:33: error: Argument 1 to "map" of "IOResult" has incompatible type "Callable[[Response], bytes]"; expected "Callable[[Response], int]"
async_and_sync.py:33: error: Argument 1 to "map" of "FutureResult" has incompatible type "Callable[[Response], bytes]"; expected "Callable[[Response], int]"
async_and_sync.py:33: error: Incompatible return value type (got "bytes", expected "int")


巧妙で魔法はありません:正しい抽象化で非同期コードを書くには、古き良き時代の構成だけが必要です。しかし、異なるタイプに対して同じAPIを取得するという事実は本当に素晴らしいです。たとえば、HTTPリクエストの動作を同期的または非同期的に抽象化できます。



うまくいけば、この例は、非同期プログラムが実際にどれほど素晴らしいかを示しています。そして、dry-python / returnsを試してみると、さらに多くの興味深いものが見つかります。新しいバージョンでは、Higher KindedTypesと必要なすべてのインターフェイスを操作するために必要なプリミティブをすでに作成しています。上記のコードは、次のように書き直すことができます。



from typing import Callable, TypeVar

import anyio
import httpx

from returns.future import future_safe
from returns.interfaces.specific.ioresult import IOResultLike2
from returns.io import impure_safe
from returns.primitives.hkt import Kind2, kinded

_IOKind = TypeVar('_IOKind', bound=IOResultLike2)

@kinded
def fetch_resource_size(
    client_get: Callable[[str], Kind2[_IOKind, httpx.Response, Exception]],
    url: str,
) -> Kind2[_IOKind, int, Exception]:
    return client_get(url).map(
        lambda response: len(response.content),
    )


# Sync:
print(fetch_resource_size(
    impure_safe(httpx.get),
    'https://sobolevn.me',
))
# => <IOResult: <Success: 27972>>

# Async:
page_size = fetch_resource_size(
    future_safe(httpx.AsyncClient().get),
    'https://sobolevn.me',
)
print(page_size)
print(anyio.run(page_size.awaitable))
# => <FutureResult: <coroutine object async_map at 0x10b17c320>>
# => <IOResult: <Success: 27972>>


`master`ブランチを参照してください。すでにそこで機能しています。



その他のドライPython機能



これが私が最も誇りに思っている他のいくつかの便利なドライパイソン機能です。





from returns.curry import curry, partial
 
def example(a: int, b: str) -> float:
    ...
 
reveal_type(partial(example, 1))
# note: Revealed type is 'def (b: builtins.str) -> builtins.float'
 
reveal_type(curry(example))
# note: Revealed type is 'Overload(def (a: builtins.int) -> def (b: builtins.str) -> builtins.float, def (a: builtins.int, b: builtins.str) -> builtins.float)'


これにより@curry、たとえば次のように使用できます



@curry
def example(a: int, b: str) -> float:
    return float(a + len(b))
 
assert example(1, 'abc') == 4.0
assert example(1)('abc') == 4.0




カスタムmypyプラグインを使用して、型を返す機能パイプラインを構築できます。



from returns.pipeline import flow
assert flow(
    [1, 2, 3],
    lambda collection: max(collection),
    lambda max_number: -max_number,
) == -3


通常、型付きコードでは、ラムダの引数は常に型であるため、ラムダを操作するのは非常に不便Anyです。推論はmypyこの問題を解決します。



その助けを借りて、私たちは今、どのlambda collection: max(collection)タイプCallable[[List[int]], int]であるかを知っていますが、lambda max_number: -max_number単純Callable[[int], int]です。Inflowは任意の数の引数を渡すことができ、それらは正常に機能します。プラグインに感謝します。





前に説明した抽象化をFutureResult使用して、依存関係を機能的なスタイルで非同期プログラムに明示的に渡すことができます。



今後の計画



最終的にバージョン1.0をリリースする前に、解決すべきいくつかの重要なタスクがあります。



  • より高い種類のタイプまたはそれらのエミュレーションを実装します(問題)。
  • 適切なタイプクラスを追加して、必要な抽象化を実装します(問題)。
  • コンパイラを試してみてくださいmypyc。これにより、型付きの注釈付きPythonプログラムをバイナリにコンパイルできる可能性があります。そうすると、のコードdry-python/returnsは数倍速く動作します(問題)。
  • 「do-notation」など、Pythonで機能コードを作成する新しい方法を探ります


結論



構成と抽象化はあらゆる問題を解決できます。この記事では、関数の色の問題を解決し、機能するシンプルで読みやすく柔軟なコードを作成する方法について説明しました。そして、タイプチェックを行います。dry-python / returns



試してロシアのPython Weekに参加してください。カンファレンスでは、dry-pythonのコア開発者であるPablo Aguilarが、dry-pythonを使用してビジネスロジックを作成するワークショップを開催します。



All Articles