RustのGraphQL

この記事では、Rustとそのエコシステムを使用してGraphQLサーバーを構築する方法を紹介します。GraphQLAPIの開発における最も一般的なタスクの実装例を示します。結果として、3 microservicesのAPIはアポロサーバーと使用して、単一のアクセスポイントに結合されるアポロ連盟これにより、クライアントは、どのデータがどのサービスからのものであるかを知らなくても、複数のソースから同時にデータを要求できます。





前書き

概要概要

機能面では、説明したプロジェクトは前回の記事紹介したものと非常に似ていますが、今回はRustスタックを使用しています。アーキテクチャ上、プロジェクトは次のようになります。





アーキテクチャの各コンポーネントは、GraphQLAPIを実装するときに発生する可能性のあるいくつかの問題に光を当てます。ドメインモデルには、太陽系の惑星とその衛星に関するデータが含まれています。プロジェクトはマルチモジュール構造(またはモノリポジトリ)であり、次のモジュールで構成されています。





  • 惑星-サービス(錆)





  • 衛星サービス(錆)





  • auth-service(Rust)





  • apollo-server(JS)





GraphQL Rust: Juniper Async-graphql, Apollo Federation, ( Federation Juniper). code-first .





PostgreSQL — , JWT — Kafka — .





, :

















GitHub









Rust





link





link





GraphQL





Async-graphql





link





link





GraphQL





Apollo Server





link





link





Web





actix-web





link





link









PostgreSQL





link





link









Apache Kafka





link





link









Docker Compose





link





link





Rust :

















GitHub





ORM





Diesel





link





link





Kafka





rust-rdkafka





link





link









argonautica





link





link





JWT





jsonwebtoken





link





link









Testcontainers-rs





link





link





, Docker Compose. :





  • Rust





  • Diesel CLI ( cargo install diesel_cli --no-default-features --features postgres



    )





  • LLVM ( argonautica



    )





  • CMake ( rust-rdkafka



    )





  • PostgreSQL





  • Apache Kafka





  • npm





Cargo.toml



:





Root Cargo.toml





[workspace]
members = [
    "auth-service",
    "planets-service",
    "satellites-service",
    "common-utils",
]

      
      



planets-service



.





Cargo.toml



:





Cargo.toml





[package]
name = "planets-service"
version = "0.1.0"
edition = "2018"

[dependencies]
common-utils = { path = "../common-utils" }
async-graphql = "2.4.3"
async-graphql-actix-web = "2.4.3"
actix-web = "3.3.2"
actix-rt = "1.1.1"
actix-web-actors = "3.0.0"
futures = "0.3.8"
async-trait = "0.1.42"
bigdecimal = { version = "0.1.2", features = ["serde"] }
serde = { version = "1.0.118", features = ["derive"] }
serde_json = "1.0.60"
diesel = { version = "1.4.5", features = ["postgres", "r2d2", "numeric"] }
diesel_migrations = "1.4.0"
dotenv = "0.15.0"
strum = "0.20.0"
strum_macros = "0.20.1"
rdkafka = { version = "0.24.0", features = ["cmake-build"] }
async-stream = "0.3.0"
lazy_static = "1.4.0"

[dev-dependencies]
jsonpath_lib = "0.2.6"
testcontainers = "0.9.1"

      
      



async-graphql



— GraphQL , actix-web



— web , async-graphql-actix-web



.





main.rs



:





main.rs





#[actix_rt::main]
async fn main() -> std::io::Result<()> {
    dotenv().ok();
    let pool = create_connection_pool();
    run_migrations(&pool);

    let schema = create_schema_with_context(pool);

    HttpServer::new(move || App::new()
        .configure(configure_service)
        .data(schema.clone())
    )
        .bind("0.0.0.0:8001")?
        .run()
        .await
}

      
      



HTTP , lib.rs



:





lib.rs





pub fn configure_service(cfg: &mut web::ServiceConfig) {
    cfg
        .service(web::resource("/")
            .route(web::post().to(index))
            .route(web::get().guard(guard::Header("upgrade", "websocket")).to(index_ws))
            .route(web::get().to(index_playground))
        );
}

async fn index(schema: web::Data, http_req: HttpRequest, req: Request) -> Response {
    let mut query = req.into_inner();

    let maybe_role = common_utils::get_role(http_req);
    if let Some(role) = maybe_role {
        query = query.data(role);
    }

    schema.execute(query).await.into()
}

async fn index_ws(schema: web::Data, req: HttpRequest, payload: web::Payload) -> Result {
    WSSubscription::start(Schema::clone(&*schema), &req, payload)
}

async fn index_playground() -> HttpResponse {
    HttpResponse::Ok()
        .content_type("text/html; charset=utf-8")
        .body(playground_source(GraphQLPlaygroundConfig::new("/").subscription_endpoint("/")))
}

pub fn create_schema_with_context(pool: PgPool) -> Schema {
    let arc_pool = Arc::new(pool);
    let cloned_pool = Arc::clone(&arc_pool);
    let details_batch_loader = Loader::new(DetailsBatchLoader {
        pool: cloned_pool
    }).with_max_batch_size(10);

    let kafka_consumer_counter = Mutex::new(0);

    Schema::build(Query, Mutation, Subscription)
        .data(arc_pool)
        .data(details_batch_loader)
        .data(kafka::create_producer())
        .data(kafka_consumer_counter)
        .finish()
}

      
      



:





  • index



    — GraphQL (query)





  • index_ws



    — GraphQL





  • index_playground



    — Playground GraphQL IDE





  • create_schema_with_context



    — GraphQL , ,





GraphQL

:









#[Object]
impl Query {
    async fn get_planets(&self, ctx: &Context<'_>) -> Vec {
        repository::get_all(&get_conn_from_ctx(ctx)).expect("Can't get planets")
            .iter()
            .map(|p| { Planet::from(p) })
            .collect()
    }

    async fn get_planet(&self, ctx: &Context<'_>, id: ID) -> Option {
        find_planet_by_id_internal(ctx, id)
    }

    #[graphql(entity)]
    async fn find_planet_by_id(&self, ctx: &Context<'_>, id: ID) -> Option {
        find_planet_by_id_internal(ctx, id)
    }
}

fn find_planet_by_id_internal(ctx: &Context<'_>, id: ID) -> Option {
    let id = id.to_string().parse::().expect("Can't get id from String");
    repository::get(id, &get_conn_from_ctx(ctx)).ok()
        .map(|p| { Planet::from(&p) })
}

      
      



. GraphQL DTO ( ). get_planets



get_planet



GraphQL IDE :









{
  getPlanets {
    name
    type
  }
}

      
      



Planet



:





GraphQL





#[derive(Serialize, Deserialize)]
struct Planet {
    id: ID,
    name: String,
    planet_type: PlanetType,
}

#[Object]
impl Planet {
    async fn id(&self) -> &ID {
        &self.id
    }

    async fn name(&self) -> &String {
        &self.name
    }

    /// From an astronomical point of view
    #[graphql(name = "type")]
    async fn planet_type(&self) -> &PlanetType {
        &self.planet_type
    }

    #[graphql(deprecation = "Now it is not in doubt. Do not use this field")]
    async fn is_rotating_around_sun(&self) -> bool {
        true
    }

    async fn details(&self, ctx: &Context<'_>) -> Details {
        let loader = ctx.data::>().expect("Can't get loader");
        let planet_id = self.id.to_string().parse::().expect("Can't convert id");
        loader.load(planet_id).await
    }
}

      
      



impl



. ( Rust ) deprecation reason. GraphQL IDE.





N+1

Planet.details



N+1, , :





GraphQL





{
  getPlanets {
    name
    details {
      meanRadius
    }
  }
}

      
      



details



SQL , . . Details



Planet



.





DataLoader, Async-graphql, details



:









async fn details(&self, ctx: &Context<'_>) -> Result {
    let data_loader = ctx.data::>().expect("Can't get data loader");
    let planet_id = self.id.to_string().parse::().expect("Can't convert id");
    let details = data_loader.load_one(planet_id).await?;
    details.ok_or_else(|| "Not found".into())
}

      
      



data_loader



— , :





DataLoader'





let details_data_loader = DataLoader::new(DetailsLoader {
    pool: cloned_pool
}).max_batch_size(10);

      
      



DetailsLoader



:





DetailsLoader definition





pub struct DetailsLoader {
    pub pool: Arc
}

#[async_trait::async_trait]
impl Loader for DetailsLoader {
    type Value = Details;
    type Error = Error;

    async fn load(&self, keys: &[i32]) -> Result, Self::Error> {
        let conn = self.pool.get().expect("Can't get DB connection");
        let details = repository::get_details(keys, &conn).expect("Can't get planets' details");

        Ok(details.iter()
            .map(|details_entity| (details_entity.planet_id, Details::from(details_entity)))
            .collect::>())
    }
}

      
      



N+1, . . DetailsLoader.load



SQL , DetailsEntity



.





GraphQL :





GraphQL





#[derive(Interface, Clone)]
#[graphql(
    field(name = "mean_radius", type = "&CustomBigDecimal"),
    field(name = "mass", type = "&CustomBigInt"),
)]
pub enum Details {
    InhabitedPlanetDetails(InhabitedPlanetDetails),
    UninhabitedPlanetDetails(UninhabitedPlanetDetails),
}

#[derive(SimpleObject, Clone)]
pub struct InhabitedPlanetDetails {
    mean_radius: CustomBigDecimal,
    mass: CustomBigInt,
    /// In billions
    population: CustomBigDecimal,
}

#[derive(SimpleObject, Clone)]
pub struct UninhabitedPlanetDetails {
    mean_radius: CustomBigDecimal,
    mass: CustomBigInt,
}

      
      



, "" , SimpleObject



.





. ; (. . - orphan rule). :





: BigInt





#[derive(Clone)]
pub struct CustomBigInt(BigDecimal);

#[Scalar(name = "BigInt")]
impl ScalarType for CustomBigInt {
    fn parse(value: Value) -> InputValueResult {
        match value {
            Value::String(s) => {
                let parsed_value = BigDecimal::from_str(&s)?;
                Ok(CustomBigInt(parsed_value))
            }
            _ => Err(InputValueError::expected_type(value)),
        }
    }

    fn to_value(&self) -> Value {
        Value::String(format!("{:e}", &self))
    }
}

impl LowerExp for CustomBigInt {
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
        let val = &self.0.to_f64().expect("Can't convert BigDecimal");
        LowerExp::fmt(val, f)
    }
}

      
      



: BigDecimal





#[derive(Clone)]
pub struct CustomBigDecimal(BigDecimal);

#[Scalar(name = "BigDecimal")]
impl ScalarType for CustomBigDecimal {
    fn parse(value: Value) -> InputValueResult {
        match value {
            Value::String(s) => {
                let parsed_value = BigDecimal::from_str(&s)?;
                Ok(CustomBigDecimal(parsed_value))
            }
            _ => Err(InputValueError::expected_type(value)),
        }
    }

    fn to_value(&self) -> Value {
        Value::String(self.0.to_string())
    }
}

      
      



, .





:









pub struct Mutation;

#[Object]
impl Mutation {
    #[graphql(guard(RoleGuard(role = "Role::Admin")))]
    async fn create_planet(&self, ctx: &Context<'_>, planet: PlanetInput) -> Result {
        let new_planet = NewPlanetEntity {
            name: planet.name,
            planet_type: planet.planet_type.to_string(),
        };

        let details = planet.details;
        let new_planet_details = NewDetailsEntity {
            mean_radius: details.mean_radius.0,
            mass: BigDecimal::from_str(&details.mass.0.to_string()).expect("Can't get BigDecimal from string"),
            population: details.population.map(|wrapper| { wrapper.0 }),
            planet_id: 0,
        };

        let created_planet_entity = repository::create(new_planet, new_planet_details, &get_conn_from_ctx(ctx))?;

        let producer = ctx.data::().expect("Can't get Kafka producer");
        let message = serde_json::to_string(&Planet::from(&created_planet_entity)).expect("Can't serialize a planet");
        kafka::send_message(producer, message).await;

        Ok(Planet::from(&created_planet_entity))
    }
}

      
      



, :





input type





#[derive(InputObject)]
struct PlanetInput {
    name: String,
    #[graphql(name = "type")]
    planet_type: PlanetType,
    details: DetailsInput,
}

      
      



RoleGuard



', Admin



. , , , :









mutation {
  createPlanet(
    planet: {
      name: "test_planet"
      type: TERRESTRIAL_PLANET
      details: { meanRadius: "10.5", mass: "8.8e24", population: "0.5" }
    }
  ) {
    id
  }
}

      
      



Authorization



JWT, auth-service



( ).





:





Kafka





let producer = ctx.data::().expect("Can't get Kafka producer");
let message = serde_json::to_string(&Planet::from(&created_planet_entity)).expect("Can't serialize a planet");
kafka::send_message(producer, message).await;

      
      



API , Kafka consumer:









pub struct Subscription;

#[Subscription]
impl Subscription {
    async fn latest_planet<'ctx>(&self, ctx: &'ctx Context<'_>) -> impl Stream + 'ctx {
        let kafka_consumer_counter = ctx.data::>().expect("Can't get Kafka consumer counter");
        let consumer_group_id = kafka::get_kafka_consumer_group_id(kafka_consumer_counter);
        let consumer = kafka::create_consumer(consumer_group_id);

        async_stream::stream! {
            let mut stream = consumer.start();

            while let Some(value) = stream.next().await {
                yield match value {
                    Ok(message) => {
                        let payload = message.payload().expect("Kafka message should contain payload");
                        let message = String::from_utf8_lossy(payload).to_string();
                        serde_json::from_str(&message).expect("Can't deserialize a planet")
                    }
                    Err(e) => panic!("Error while Kafka message processing: {}", e)
                };
            }
        }
    }
}

      
      



, :









subscription {
  latestPlanet {
    id
    name
    type
    details {
      meanRadius
    }
  }
}

      
      



ws://localhost:8001



.





:









#[actix_rt::test]
async fn test_get_planets() {
    let docker = Cli::default();
    let (_pg_container, pool) = common::setup(&docker);

    let mut service = test::init_service(App::new()
        .configure(configure_service)
        .data(create_schema_with_context(pool))
    ).await;

    let query = "
        {
            getPlanets {
                id
                name
                type
                details {
                    meanRadius
                    mass
                    ... on InhabitedPlanetDetails {
                        population
                    }
                }
            }
        }
        ".to_string();

    let request_body = GraphQLCustomRequest {
        query,
        variables: Map::new(),
    };

    let request = test::TestRequest::post().uri("/").set_json(&request_body).to_request();

    let response: GraphQLCustomResponse = test::read_response_json(&mut service, request).await;

    fn get_planet_as_json(all_planets: &serde_json::Value, index: i32) -> &serde_json::Value {
        jsonpath::select(all_planets, &format!("$.getPlanets[{}]", index)).expect("Can't get planet by JSON path")[0]
    }

    let mercury_json = get_planet_as_json(&response.data, 0);
    common::check_planet(mercury_json, 1, "Mercury", "TERRESTRIAL_PLANET", "2439.7");

    let earth_json = get_planet_as_json(&response.data, 2);
    common::check_planet(earth_json, 3, "Earth", "TERRESTRIAL_PLANET", "6371.0");

    let neptune_json = get_planet_as_json(&response.data, 7);
    common::check_planet(neptune_json, 8, "Neptune", "ICE_GIANT", "24622.0");
}

      
      



, :









const PLANET_FRAGMENT: &str = "
    fragment planetFragment on Planet {
        id
        name
        type
        details {
            meanRadius
            mass
            ... on InhabitedPlanetDetails {
                population
            }
        }
    }
";

#[actix_rt::test]
async fn test_get_planet_by_id() {
    ...

    let query = "
        {
            getPlanet(id: 3) {
                ... planetFragment
            }
        }
        ".to_string() + PLANET_FRAGMENT;

    let request_body = GraphQLCustomRequest {
        query,
        variables: Map::new(),
    };

    ...
}

      
      



, :









#[actix_rt::test]
async fn test_get_planet_by_id_with_variable() {
    ...

    let query = "
        query testPlanetById($planetId: String!) {
            getPlanet(id: $planetId) {
                ... planetFragment
            }
        }".to_string() + PLANET_FRAGMENT;

    let jupiter_id = 5;
    let mut variables = Map::new();
    variables.insert("planetId".to_string(), jupiter_id.into());

    let request_body = GraphQLCustomRequest {
        query,
        variables,
    };

    ...
}

      
      



Testcontainers-rs



, , , PostgreSQL.





GraphQL API

GraphQL API. , , graphql-client, .





API

GraphQL API (. ); .





Satellite



planet



, :









{
  getPlanet(id: "1") {
    satellites {
      planet {
        satellites {
          planet {
            satellites {
              ... # more deep nesting!
            }
          }
        }
      }
    }
  }
}

      
      



:









pub fn create_schema_with_context(pool: PgPool) -> Schema {
    ...

    Schema::build(Query, Mutation, Subscription)
        .limit_depth(3)
        .limit_complexity(15)

    ...
}

      
      



, GraphQL IDE. , IDE introspection query, .





auth-service



argonautica



jsonwebtoken



. Argon2. ; , .





:









pub struct Mutation;

#[Object]
impl Mutation {

    async fn sign_in(&self, ctx: &Context<'_>, input: SignInInput) -> Result {
        let maybe_user = repository::get_user(&input.username, &get_conn_from_ctx(ctx)).ok();

        if let Some(user) = maybe_user {
            if let Ok(matching) = verify_password(&user.hash, &input.password) {
                if matching {
                    let role = AuthRole::from_str(user.role.as_str()).expect("Can't convert &str to AuthRole");
                    return Ok(common_utils::create_token(user.username, role));
                }
            }
        }

        Err(Error::new("Can't authenticate a user"))
    }
}

#[derive(InputObject)]
struct SignInInput {
    username: String,
    password: String,
}

      
      



verify_password



utils



, create_token



common_utils



. , sign_in



JWT, .





JWT :





JWT





mutation {
  signIn(input: { username: "john_doe", password: "password" })
}

      
      



john_doe/password. JWT (. ).





, HTTP Authorization: Bearer $JWT



. index



HTTP GraphQL /:









async fn index(schema: web::Data, http_req: HttpRequest, req: Request) -> Response {
    let mut query = req.into_inner();

    let maybe_role = common_utils::get_role(http_req);
    if let Some(role) = maybe_role {
        query = query.data(role);
    }

    schema.execute(query).await.into()
}

      
      



create_planet



:









#[graphql(guard(RoleGuard(role = "Role::Admin")))]

      
      



:









struct RoleGuard {
    role: Role,
}

#[async_trait::async_trait]
impl Guard for RoleGuard {
    async fn check(&self, ctx: &Context<'_>) -> Result<()> {
        if ctx.data_opt::() == Some(&self.role) {
            Ok(())
        } else {
            Err("Forbidden".into())
        }
    }
}

      
      



, , "Forbidden".





GraphQL :









#[derive(SimpleObject)]
struct Satellite {
    ...
    life_exists: LifeExists,
}

#[derive(Copy, Clone, Eq, PartialEq, Debug, Enum, EnumString)]
#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
pub enum LifeExists {
    Yes,
    OpenQuestion,
    NoData,
}

      
      



Async-graphql / chrono



, :









#[derive(SimpleObject)]
struct Satellite {
    ...
    first_spacecraft_landing_date: Option,
}

      
      



Apollo Federation

satellites-service



— GraphQL (Planet



) ( ) Apollo Server.





Planet



planets-service



:





Planet



planets-service







#[derive(Serialize, Deserialize)]
struct Planet {
    id: ID,
    name: String,
    planet_type: PlanetType,
}

      
      



planets-service



Planet



:





Planet







#[Object]
impl Query {
    #[graphql(entity)]
    async fn find_planet_by_id(&self, ctx: &Context<'_>, id: ID) -> Option {
        find_planet_by_id_internal(ctx, id)
    }
}

      
      



satellites-service



Planet



satellites



:





Planet



satellites-service







struct Planet {
    id: ID
}

#[Object(extends)]
impl Planet {
    #[graphql(external)]
    async fn id(&self) -> &ID {
        &self.id
    }

    async fn satellites(&self, ctx: &Context<'_>) -> Vec {
        let id = self.id.to_string().parse::().expect("Can't get id from String");
        repository::get_by_planet_id(id, &get_conn_from_ctx(ctx)).expect("Can't get satellites of planet")
            .iter()
            .map(|e| { Satellite::from(e) })
            .collect()
    }
}

      
      



. Planet



:





Planet







#[Object]
impl Query {

    #[graphql(entity)]
    async fn get_planet_by_id(&self, id: ID) -> Planet {
        Planet { id }
    }
}

      
      



Async-graphql (_service



and _entities



), Apollo Server'. — , API Apollo Server'. , Apollo Federation - .





Apollo Server

Apollo Server Apollo Federation :





  • GraphQL API









, , frontend , .





GraphQL , schema stitching, .





:





-





{
  "name": "api-gateway",
  "main": "gateway.js",
  "scripts": {
    "start-gateway": "nodemon gateway.js"
  },
  "devDependencies": {
    "concurrently": "5.3.0",
    "nodemon": "2.0.6"
  },
  "dependencies": {
    "@apollo/gateway": "0.21.3",
    "apollo-server": "2.19.0",
    "graphql": "15.4.0"
  }
}

      
      



Apollo Server





const {ApolloServer} = require("apollo-server");
const {ApolloGateway, RemoteGraphQLDataSource} = require("@apollo/gateway");

class AuthenticatedDataSource extends RemoteGraphQLDataSource {
    willSendRequest({request, context}) {
        if (context.authHeaderValue) {
            request.http.headers.set('Authorization', context.authHeaderValue);
        }
    }
}

let node_env = process.env.NODE_ENV;

function get_service_url(service_name, port) {
    let host;
    switch (node_env) {
        case 'docker':
            host = service_name;
            break;
        case 'local': {
            host = 'localhost';
            break
        }
    }

    return "http://" + host + ":" + port;
}

const gateway = new ApolloGateway({
    serviceList: [
        {name: "planets-service", url: get_service_url("planets-service", 8001)},
        {name: "satellites-service", url: get_service_url("satellites-service", 8002)},
        {name: "auth-service", url: get_service_url("auth-service", 8003)},
    ],
    buildService({name, url}) {
        return new AuthenticatedDataSource({url});
    },
});

const server = new ApolloServer({
    gateway, subscriptions: false, context: ({req}) => ({
        authHeaderValue: req.headers.authorization
    })
});

server.listen({host: "0.0.0.0", port: 4000}).then(({url}) => {
    console.log(`? Server ready at ${url}`);
});

      
      



, .





apollo-service



, Rust ( Authorization



).





, , Apollo Server, Federation; , .





:





  • Apollo Gateway ( - standalone Rust )





  • , GraphQL





PostgreSQL and Diesel. Docker , diesel setup



, . , , .





API

, :





  • Docker Compose (docker-compose.yml)





    , , :





    • ( )





      docker-compose up







    • production mode ( )





      docker-compose -f docker-compose.yml up







  • Docker





    Rust cargo run



    , Apollo Server:





    • cd



      apollo-server







    • NODE_ENV



      , , set NODE_ENV=local



      ( Windows)





    • npm install







    • npm run start-gateway







apollo-server



:





Apollo Server





[nodemon] 2.0.6
[nodemon] to restart at any time, enter `rs`
[nodemon] watching path(s): *.*
[nodemon] watching extensions: js,mjs,json
[nodemon] starting `node gateway.js`
Server ready at http://0.0.0.0:4000/

      
      



http://localhost:4000



Playground IDE:





, , . , Playground IDE.





, , GraphQL IDE; :









subscription {
  latestPlanet {
    name
    type
  }
}

      
      



Authorization



:









mutation {
  createPlanet(
    planet: {
      name: "Pluto"
      type: DWARF_PLANET
      details: { meanRadius: "1188", mass: "1.303e22" }
    }
  ) {
    id
  }
}

      
      



:





CI/CD

CI/CD GitHub Actions (workflow), , Docker Google Cloud Platform.





API .





: "" , .





, GraphQL API Rust. API Rust GraphQL GraphQL ; . Apollo Server, Apollo Federation Async-graphql. GitHub. , . !





  • graphql.org





  • spec.graphql.org





  • graphql.org/learn/best-practices





  • howtographql.com





  • Async-graphql





  • Async-graphql book





  • Awesome GraphQL





  • Public GraphQL APIs





  • Apollo Federation demo








All Articles