この記事では、モバイルロボットアルゴリズム研究所と共同で取り組んだコースプロジェクトについて説明します。 JetBrains Research:私がGym-DuckietownEmulatorのために書いたIssueCheckerについて。テストシステムと、このシステムと、外部採点者テクノロジーを使用するオンライン教育プラットフォーム(Stepik.orgプラットフォームなど)との統合について説明します。
著者について
私の名前はダニイル・プラシェンコです。サンクトペテルブルクHSEのマスタープログラム「プログラミングとデータ分析」の1年目(2年目)の学生です。2019年に、同じ大学で応用数学とコンピューターサイエンスの学士号を取得しました。
ダッキータウンプラットフォーム
ダッキータウンは自律型車両研究プロジェクトです。プロジェクトの主催者は、人工知能とロボット工学の分野で教えるための新しいアプローチを導入するのに役立つプラットフォームを作成しました。すべては2016年にMITのコースとして始まりましたが、高校から修士課程まで、世界中のさまざまなレベルの教育に徐々に広がりました。
ダッキータウンプラットフォームには2つの部分があります。まず、これは、道路、建物、道路標識、障害物がある都市の輸送環境の縮小モデルです。第二に、それは輸送です。 Raspberry Piを実行している小型の移動ロボット(Duckiebots)は、カメラを介して周囲の世界に関する情報を受け取り、街の住民(黄色のゴム製のアヒル)を道路に沿って輸送します。
Duckietownエミュレーターを使用しました。というGym-Duckietown、そしてそれはPythonで書かれたオープンソースプロジェクトです。エミュレーターは、ボットを都市内に配置し、使用しているアルゴリズム(または押したボタン)に応じて位置を変更し、画像を再描画して、ボットの現在の位置をログに書き込みます。
試してみたい場合は、自分でリポジトリのクローンを作成し、manual_control.pyを実行することをお勧めします。この方法で、キーボードの矢印を使用してボットを制御できます。
エミュレータ
のスクリーンショットエミュレータは、タスクを実行するための環境として使用できます。次の問題を設定しましょう。1つの車線で構成される特定のマップで、1メートルを直線で運転する必要があります。
この問題を解決するには、次のアルゴリズムを使用できます。
for _ in range(25):
env.step([1, 0])
env.render()
この変数
env
は、環境の状態を格納します。
stepメソッドは
action
、ボットのアクションを説明する2つの要素のリストを入力として受け取ります。最初の要素は速度を設定し、2番目の要素は回転角度を設定します。このメソッドrender
は、ボットの新しい位置を考慮して、画像を再描画します。ステップ数と速度は経験的に選択されています:これらは、ボットが正確に1メートル直線で移動するために必要な値です。
このスニペットをmanual_control.pyで使用する場合は、ここに貼り付けてください。この時点までのコードは、環境のロードでビジーです。簡単にするために、それを再利用してから、上記の解決策を問題に追加できます。
テストシステム
そのようなタスクを自動的にチェックできるようにしたいと思います。ボット制御アルゴリズムの実装を取得し、トリップをシミュレートして、提案されたアルゴリズムがタスクを正しく実行するかどうかを報告します。このようなテストシステムは、自律輸送制御の競技会の準備中、および教育目的で環境を使用することを可能にします。つまり、学生に一連の問題を発行し、その解決策を自動的にチェックします。私はコースプロジェクトに取り組んでいる間その開発に従事しました、そして以下に私が得たものについてあなたに話します。
ソリューションをチェックするときの一連の手順は次のようになります。
自分でテストシステムにタスクを追加することも、私が行ったタスクを参照することもできます。各タスクには、条件ジェネレーター(環境状態を生成するクラス)があります。マップの名前と開始セル内のボットの開始位置が含まれています。目標座標も設定されます。この場合、開始位置から1メートルの地点です。
class Ride1MTaskGenerator(TaskGenerator):
def __init__(self, args):
super().__init__(args)
def generate_task(self):
env_loader = CVTaskEnv if self.args.is_cv_task else TrackingDuckietownEnv
env = env_loader(
map_name="straight_road",
position_on_initial_road_tile=PositionOnInitialRoadTile(
x_coefficient=0.5,
z_coefficient=0.5,
angle=0,
),
)
self.generated_task['target_coordinates'] = [[env.road_tile_size * 0.5 + 1, 0, env.road_tile_size * 0.5]]
self.generated_task['env'] = env
env.render()
return self.generated_task
ここ
TrackingDuckietownEnv
とCVTaskEnv
さらなる分析のための店の旅行情報に使用されているラッパークラスです。
class TrackingDuckietownEnv:
def __init__(self, **kwargs):
self.__wrapped = DuckietownEnv(**kwargs)
…
…
def step(self, action):
obs, reward, done, misc = self.__wrapped.step(action)
message = misc['Simulator']['msg']
if 'invalid pose' in message.lower():
raise InvalidPoseException(message)
for t in self.trackers:
t.track(self)
return obs, reward, done, misc
トラッカーは、ボットの位置など、現在の状態に関する情報を収集します。
CVTaskEnv
これは、エミュレーターの機能ではなく、カメラからの情報(「コンピュータービジョン」)のみを使用してソリューションが必要な場合に使用されます。たとえば、ボットがストリップの中心からどれだけ離れているか、または最も近い可視オブジェクトがどこにあるかを知る必要がある場合などです。CVTaskEnv
エミュレーター関数を呼び出すと、問題の解決が簡単になり、クラスはエミュレーターメソッドの呼び出しを制限します。フラグが表示されているときに使用されますis_cv_task
。
class CVTaskEnv:
def __init__(self, **kwargs):
self.__wrapped = TrackingDuckietownEnv(**kwargs)
def __getattr__(self, item):
if item in self.__wrapped.overriden_methods:
return self.__wrapped.__getattribute__(item)
ALLOWED_FOR_CV_TASKS = [
'render', '_CVTaskEnv__wrapped', '_TrackingDuckietownEnv__wrapped',
'road_tile_size', 'trip_statistics'
]
if item in ALLOWED_FOR_CV_TASKS:
return self.__wrapped.__getattr__(item)
else:
raise AttributeError(item + " call is not allowed in CV tasks")
決定の実行が完了した後(タイムアウトによって中断されていない場合)、トリップ情報は一連のチェッカーを通過します。すべてのチェッカーが正常に機能した場合、問題は正しく解決されたと見なされます。それ以外の場合は、説明的な評決が表示されます。たとえば、ボットがクラッシュしたり、道路から
追い出されたりします。これは、標準のチェッカーの1つです。彼は、ボットが旅行の終わりに開始点に戻ったことを確認します。これは、たとえば、タスクで特定のポイントに到達してから戻る必要がある場合に役立ちます。
class SameInitialAndFinalCoordinatesChecker(Checker):
def __init__(self, maximum_deviation=0.1, **kwargs):
super().__init__(**kwargs)
self.maximum_deviation = maximum_deviation
def check(self, generated_task, trackers, **kwargs):
trip_statistics = next(x for x in trackers if isinstance(x, TripStatistics))
trip_data = trip_statistics.trip_data
if len(trip_data) == 0:
return True
initial_coordinates = trip_data[0].position.coordinates
final_coordinates = trip_data[-1].position.coordinates
return np.linalg.norm(initial_coordinates - final_coordinates) < self.maximum_deviation
このようなテストシステムは、手動モードで使用できます。つまり、手動でテストを開始してから、評決を視覚的に調査します。たとえば、Stepik.orgで自律輸送に関するオンラインコースを開始したい場合は、プラットフォームとの統合が必要になります。これについては、記事の次の部分で説明します。
オンラインプラットフォームとの統合
テストタスクには、edXプラットフォームによって開発されたExternalGraderテクノロジーがよく使用されます。
外部グレーダーを使用する場合、教育プラットフォームはそれ自体でタスクをチェックしませんが、別のデバイスに送信されるパッケージのキューを生成します。キュー接続機能は、xqueue-watcherプロジェクトに実装されています。 Xqueue-watcherは小包をフェッチし、バリデーターによってテストされます(通常、テキスト/番号の比較よりも重要なことを行います)。その後、検証の評決は教育プラットフォームの側に送り返されます。
キューに接続する瞬間をさらに詳しく考えてみましょう。教育プラットフォームが接続データを提供した後、それらをに追加する必要があります構成ファイル、およびgradeメソッドで、検証起動を直接実装します。より詳細な手順は、こことここにあります。
Xqueue-watcherはエンドポイントget_submissionを呼び出します。これにより、可能であればキューからパッケージが取得されます。その後、彼女はテストに行きます。次に、xqueue-watcherはput_resultを呼び出して判定を返します。
次のようにxqueue-watcherを起動できます。
make requirements && python -m xqueue_watcher -d conf.d/
外部グレーダーテクノロジーを使用したいが、オンラインプラットフォームでコースを実行したくないとしましょう。Xqueue-watcherは、ソリューションを含むファイルがアップロードされるファイルストレージがあることを前提に実装されています(プラットフォームにはそのようなストレージがあります)。Xqueueを変更して、ファイルストレージが不要になるようにすることができます。このようなシステムは、通常、ラップトップでも実行できます。
まず、区画キュー自体を維持する方法を学ぶ必要があります。キュー機能は、xqueueプロジェクトによって提供されます。
ドキュメント から撮影した写真。
次のように実行できます。
apt-get install libaio1 libaio-dev
apt-get install libmysqlclient-dev
pip3 install -r requirements.txt
python3 manage.py migrate
python3 manage.py runserver $xqueue_address
ファイルの作成が必要になる場合があります〜/ edx / edx.log
デフォルトでは、xqueueはxqueue-watcherにパッケージの内容、つまり問題の解決策を含むファイルではなく、ファイルストレージ内のこれらのファイルへのリンクを提供します。ファイルストレージに依存しないように、ファイル自体を転送して、xqueue-watcherが実行されているのと同じマシンに保存することができます。
これを実現するためにソースコードを変更する必要がある
方法は次のとおりです。lms_interface.pyの_uploadメソッドの実装は次のように置き換えられます。
def _upload(file_to_upload, path, name):
'''
Upload file using the provided keyname.
Returns:
URL to access uploaded file
'''
full_path = os.path.join(path, name)
return default_storage.save(full_path, file_to_upload)
ファイルストレージが接続されていない場合、このメソッドはソリューションとともにファイルをパス$ queue_name / $ file_to_upload_hashに保存します。
ext_interface.pyファイルのget_sumbissionの実装では、次の行の代わりに次のように記述します。
xqueue_files = json.loads(submission.urls)
for xqueue_file in xqueue_files.keys():
with open(xqueue_files[xqueue_file], 'r') as f:
xqueue_files[xqueue_file] = f.readlines()
リンク(パス)をファイルに転送するのではなく、その内容を転送しましょう。
各ソリューションは、限られたリソースで「1回限りの」ドッカーコンテナで実行されます。つまり、各ソリューションの実行用に個別のコンテナが作成され、テストが終了すると削除されます。このようなコンテナを作成してコマンドを実行するには、portainer-apiを使用します(実際には、Docker APIのラッパーとして)。
結果
この記事では、自律輸送のテストシステムとタスクがどのように作成されたか、およびこのシステムと外部採点者テクノロジーを使用するオンライン教育プラットフォームとの統合について説明しました。このシステムを使用したコースがまもなく開始され、オンラインプラットフォームとの統合に関する部分が、独自のオフラインまたはオンラインコースを作成したい方に役立つことを願っています。