問題を知る
商用開発では、多くの機械学習のユースケースはマルチテナントアーキテクチャを意味し、クライアントやユーザーごとに個別のモデルをトレーニングする必要があります。
例として、機械学習を使用して特定の製品の購入と需要を予測することを検討してください。小売店のチェーンを運営している場合は、顧客の購入履歴データとそれらの製品の総需要を使用して、各店舗のコストと購入量を個別に予測できます。
ほとんどの場合、このような場合、モデルをデプロイするには、Flaskサービスを作成し、それをDockerコンテナーに配置します。単一モデルの機械学習サーバーの例はたくさんありますが、複数のモデルをデプロイする場合、開発者は問題を解決するために利用できるオプションがほとんどありません。
マルチテナントアプリケーションでは、テナントの数は事前にわかっておらず、実質的に無制限にすることができます。ある時点ではクライアントが1つしかない場合もあれば、ユーザーごとに別々のモデルを数千人のユーザーに提供できる場合もあります。ここから、標準の展開アプローチの制限が明らかになり始めます。
クライアントごとにDockerコンテナーをデプロイすると、非常に大規模で高価なアプリケーションになり、管理が非常に困難になります。
何千ものモデルがサーバー上で動作し、実行時に新しいモデルが追加されるため、すべてのモデルがイメージに含まれる単一のコンテナーも機能しません。
決定
, . , Airflow S3, ML — .
ML — , : -> .
, :
Model — , ; SklearnModel, TensorFlowModel, MyCustomModel . .
ModelInfoRepository — , userid -> modelid. , SQAlchemyModelInfoRepository.
ModelRepository — , ID. FileSystemRepository, S3Repository .
from abc import ABC
class Model(ABC):
@abstractmethod
def predict(self, data: pd.DataFrame) -> np.ndarray:
raise NotImplementedError
class ModelInfoRepository(ABC):
@abstractmethod
def get_model_id_by_user_id(self, user_id: str) -> str:
raise NotImplementedError
class ModelRepository(ABC):
@abstractmethod
def get_model(self, model_id: str) -> Model:
raise NotImplementedError
, sklearn, Amazon S3 userid -> modelid, .
class SklearnModel(Model):
def __init__(self, model):
self.model = model
def predict(self, data: pd.DataFrame):
return self.model.predict(data)
class SQAlchemyModelInfoRepository(ModelInfoRepository):
def __init__(self, sqalchemy_session: Session):
self.session = sqalchemy_session
def get_model_id_by_user_id(user_id: str) -> str:
# implementation goes here, query a table in any Database
class S3ModelRepository(ModelRepository):
def __init__(self, s3_client):
self.s3_client = s3_client
def get_model(self, model_id: str) -> Model:
# load and deserialize pickle from S3, implementation goes here
:
def make_app(model_info_repository: ModelInfoRepository,
model_repsitory: ModelRepository) -> Flask:
app = Flask("multi-model-server")
@app.predict("/predict/<user_id>")
def predict(user_id):
model_id = model_info_repository.get_model_id_by_user_id(user_id)
model = model_repsitory.get_model(model_id)
data = pd.DataFrame(request.json())
predictions = model.predict(data)
return jsonify(predictions.tolist())
return app
, Flask ; sklearn TensorFlow S3 , Flask .
, , . , . cachetools:
from cachetools import Cache
class CachedModelRepository(ModelRepository):
def __init__(self, model_repository: ModelRepository, cache: Cache):
self.model_repository = model_repository
self.cache = cache
@abstractmethod
def get_model(self, model_id: str) -> Model:
if model_id not in self.cache:
self.cache[model_id] = self.model_repository.get_model(model_id)
return self.cache[model_id]
:
from cachetools import LRUCache
model_repository = CachedModelRepository(
S3ModelRepository(s3_client),
LRUCache(max_size=10)
)
- , . , , MLOps . . , . №4 Google: , - .