aioapiクローラー

こんにちは。さまざまなjsonapiからデータをプルするためライブラリの作業を開始しましたapiのテストにも使用できます。



Apiは、たとえばクラスとして記述されます



class Categories(JsonEndpoint):
    url = "http://127.0.0.1:8888/categories"
    params = {"page": range(100), "language": "en"}
    headers = {"User-Agent": get_user_agent}
    results_key = "*.slug"

categories = Categories()


class Posts(JsonEndpoint):
    url = "http://127.0.0.1:8888/categories/{category}/posts"
    params = {"page": range(100), "language": "en"}
    url_params = {"category": categories.iter_results()}
    results_key = "posts"

    async def comments(self, post):
        comments = Comments(
            self.session,
            url_params={"category": post.url.params["category"], "id": post["id"]},
        )
        return [comment async for comment in comments]

posts = Posts()


パラメータとurl_paramsには、関数(ここでは、get_user_agent-ランダムなユーザーエージェントを返します)、範囲、イテレーター、待機可能および非同期イテレーター(これらをリンクできます)を含めることができます。



パラメータヘッダーとCookieには、関数と待機可能オブジェクトを含めることもできます。



上記の例のカテゴリapiは、スラッグを持つオブジェクトの配列を返します。イテレータはそれらを正確に返します。このイテレーターを投稿のurl_paramsに挿入することにより、イテレーターはすべてのカテゴリーとそれぞれのすべてのページを再帰的に調べます。 404またはその他のエラーが発生すると中止され、次のカテゴリに進みます。



また、リポジトリには、すべてをテストできるように、これらのクラスのaiohttpサーバーの例があります。



パラメータを取得することに加えて、それらをdataまたはjsonとして渡し、別のメソッドを設定できます。



results_keyは点線で、結果からキーをプルしようとします。たとえば、「comments。*。Text」は、コメント内の配列から各コメントのテキストを返します。



結果は、urlプロパティとparamsプロパティを持つラッパーにラップされます。 urlは、paramsも含む文字列から派生します。したがって、この結果を得るためにどのパラメーターが使用されたかを知ることができます。これは、commentsメソッドで示されます。



結果を処理するための基本Sinkクラスもあります。たとえば、それらをmqまたはデータベースに折りたたむ。個別のタスクで機能し、asyncio.Queueを介してデータを受信します。



class LoggingSink(Sink):
    def transform(self, obj):
        return repr(obj)

    async def init(self):
        from loguru import logger

        self.logger = logger

    async def process(self, obj):
        self.logger.info(obj)
        return True

sink = LoggingSink(num_tasks=1)


最も単純なシンクの例。 transformメソッドを使用すると、オブジェクトを操作して、適切でない場合はNoneを返すことができます。それら。テーマでは、検証を行うこともできます。



Sinkは非同期コンテキストマネージャーであり、終了すると、理論的には、キュー内のすべてのオブジェクトが処理されるまで待機してから、タスクをキャンセルします。



そして最後に、それをすべて結び付けるために、私はワーカークラスを作りました。 1つのエンドポイントと複数のシンクを受け入れます。例えば、



worker = Worker(endpoint=posts, sinks=[loggingsink, mongosink])
worker.run()


runは、ワーカーパイプラインのasyncio.run_until_completeを実行します。また、変換メソッドもあります。



一度に複数のワーカーを作成し、それらのasyncio.gatherを作成できるWorkerGroupクラスもあります。



このコードには、エンドポイントのフェイカーとハンドラーを介してデータを生成するサーバーの例が含まれています。これが最も明白だと思います。



これはすべて開発の初期段階であり、これまで私はしばしばapiを変更しました。しかし、今ではそれがどのように見えるべきかということになっているようです。リクエストとコメントをコードにマージするだけです。



All Articles