単一のサーバーに複数の機械学習モデルをデプロイする

問題を知る

商用開発で​​は、多くの機械学習のユースケースはマルチテナントアーキテクチャを意味し、クライアントやユーザーごとに個別のモデルをトレーニングする必要があります。





例として、機械学習を使用して特定の製品の購入と需要を予測することを検討してください。小売店のチェーンを運営している場合は、顧客の購入履歴データとそれらの製品の総需要を使用して、各店舗のコストと購入量を個別に予測できます。





ほとんどの場合、このような場合、モデルをデプロイするには、Flaskサービスを作成し、それをDockerコンテナーに配置します。単一モデルの機械学習サーバーの例はたくさんありますが、複数のモデルをデプロイする場合、開発者は問題を解決するために利用できるオプションがほとんどありません。





マルチテナントアプリケーションでは、テナントの数は事前にわかっておらず、実質的に無制限にすることができます。ある時点ではクライアントが1つしかない場合もあれば、ユーザーごとに別々のモデルを数千人のユーザーに提供できる場合もあります。ここから、標準の展開アプローチの制限が明らかになり始めます。





  • クライアントごとにDockerコンテナーをデプロイすると、非常に大規模で高価なアプリケーションになり、管理が非常に困難になります。





  • 何千ものモデルがサーバー上で動作し、実行時に新しいモデルが追加されるため、すべてのモデルがイメージに含まれる単一のコンテナーも機能しません。





決定

, . , Airflow S3, ML — .





ML — , : -> .





, :





  • Model — , ; SklearnModel, TensorFlowModel, MyCustomModel . .





  • ModelInfoRepository — , userid -> modelid. , SQAlchemyModelInfoRepository.





  • ModelRepository — , ID. FileSystemRepository, S3Repository .





from abc import ABC


class Model(ABC):
    @abstractmethod
    def predict(self, data: pd.DataFrame) -> np.ndarray:
        raise NotImplementedError
 

class ModelInfoRepository(ABC):
    @abstractmethod
    def get_model_id_by_user_id(self, user_id: str) -> str:
        raise NotImplementedError
 

class ModelRepository(ABC):
    @abstractmethod
    def get_model(self, model_id: str) -> Model:
        raise NotImplementedError
      
      



, sklearn, Amazon S3 userid -> modelid, .





class SklearnModel(Model):
    def __init__(self, model):
        self.model = model
 

    def predict(self, data: pd.DataFrame):
        return self.model.predict(data)
 

class SQAlchemyModelInfoRepository(ModelInfoRepository):
    def __init__(self, sqalchemy_session: Session):
        self.session = sqalchemy_session
 

    def get_model_id_by_user_id(user_id: str) -> str:
        # implementation goes here, query a table in any Database

      
class S3ModelRepository(ModelRepository):
    def __init__(self, s3_client):
        self.s3_client = s3_client
 

    def get_model(self, model_id: str) -> Model:
        # load and deserialize pickle from S3, implementation goes here
      
      



:





def make_app(model_info_repository: ModelInfoRepository,
    				 model_repsitory: ModelRepository) -> Flask:
    app = Flask("multi-model-server")
    
    @app.predict("/predict/<user_id>")
    def predict(user_id):
        model_id = model_info_repository.get_model_id_by_user_id(user_id)
 
        model = model_repsitory.get_model(model_id)
 
        data = pd.DataFrame(request.json())
 
        predictions = model.predict(data)
 
        return jsonify(predictions.tolist())
 
    return app
      
      



, Flask ; sklearn TensorFlow S3 , Flask .





, , . , . cachetools:





from cachetools import Cache
 
class CachedModelRepository(ModelRepository):
    def __init__(self, model_repository: ModelRepository, cache: Cache):
        self.model_repository = model_repository
        self.cache = cache
 
    @abstractmethod
    def get_model(self, model_id: str) -> Model:
        if model_id not in self.cache:
            self.cache[model_id] = self.model_repository.get_model(model_id)
        return self.cache[model_id]
      
      



:





from cachetools import LRUCache
 
model_repository = CachedModelRepository(
    S3ModelRepository(s3_client),
    LRUCache(max_size=10)
)
      
      



- , . , , MLOps . . , . №4 Google: , - .








All Articles