MkItYs

MkItYs > 音楽・漫画・VR・自律制御 > 

images

ML-Agents の低レベルAPIを使う(トレーニング編):LLAPI, Python, Unity

images

ML-Agents のPython 向け低レベルAPI(LLAPI )を使って、独自のやり方で強化学習を行います。


images

関連


ML-Agents の低レベルAPIを使う(NNモデル適用編):LLAPI, Barracuda, ONNX
コンテナに強化学習のフレームワークを設置する:Docker, ML-Agents

概要


背景

ゲームエンジン「Unity 」には、強化学習のフレームワーク「ML-Agents 」がありますがーー

問題

ただそのフレームワークに沿ったやり方では、提供された強化学習のアルゴリズム(PPO やSAC など)しか使えません。また代表的な強化学習のフレームワーク(OpenAI Gymなど)のラッパも提供されていますが、その場合でも、フレームワークの作法に沿ったやり方しかできないわけです。

対応

そのためML-Agents は、Python 向けのより低レベルなAPI(LLAPI )も提供していますーーこれを使えば、より汎用的なやり方(独自のやり方ふくむ)で、Unity 環境と連携した強化学習を行えるようになります。

ここでは、この低レベルAPIを使って強化学習を行います。

実装


前提

エージェントとニューラルネットのモデル(NNモデル)は、ここでは「ゼロつく」の方策勾配法の実装を使いますーー強化学習のなかでも方策勾配法はシンプルなアルゴリズムですし、また「ゼロつく」では、その作り方が(フレームワークの実装ふくめ)詳細に解説されているので:[※1]

https://github.com/oreilly-japan/deep-learning-from-scratch-4
https://github.com/oreilly-japan/deep-learning-from-scratch-4/blob/master/pytorch/simple_pg.py

またこのコードは、OpenAI Gymの倒立振子(カートポール)の訓練用ですーーなのでいちおう、環境側の仕様も確かめておきます:[※2]

https://gymnasium.farama.org/environments/classic_control/cart_pole/
役割

なおPython 側とUnity (C#)側のコードは、それぞれ次の分担になります:

Unity (C#)側 …… 環境と、それを観察し〜行動する3Dモデルのエージェントを用意しますーー観察と報酬をPython 側に渡し、Python 側から返された行動でエージェントを動かします。
Python 側 …… NNモデルと、そのパラメータを更新するエージェントを用意しますーーUnity 側から返された観察と報酬から行動を決定し、Unty 側に渡します。

※1
ただし「ゼロつく」独自のフレームワーク(Dezero)ではなく、Pytorch による実装を採用しますーーこれは、学習したNNモデルをUnity で使えるよう、標準形式(ONNX)で保存するためです。
※2
つまり、ML-Agents が提供するOpenAI Gym向けのラッパも試せる、ということですーー独自の実装と動作を比べることもできますね。

構成


環境は、次の手順で設定します(ここではコンテナを使います):

プロジェクトのためのコンテナを用意し、ML-Agents のPython 向けのコードをダウンロードします(これにふくまれるUnity 向けの環境はここでは不要なので、削除します):

$ mkdir ${dir_docker}
$ cd ${dir_docker}
$ vi Dockerfile

$ mkdir ${dir_project_unity}
$ mkdir ${dir_project_python}

$ cd ${dir_project_python}
$ git clone --depth=1 --branch release_20 https://github.com/Unity-Technologies/ml-agents.git

$ cd ${dir_project_python}/ml-agents
$ rm -rf Project

Dockerfile には、最低限の内容だけ記述しています:

FROM ubuntu:22.04

RUN apt-get -y update
RUN apt-get -y upgrade
RUN apt-get -y install python3
RUN apt-get -y install pip
RUN ln -s /usr/bin/python3.10 /usr/bin/python

コンテナを作成〜起動します(Unity 側と通信するためのポート「5004」を開けて起動します):

# 作成
$ docker build --no-cache -t ${container_image} ${dir_docker}

# 反映
$ docker commit ${container} ${container_image}

# 起動
$ docker run -it --rm \
-p 5004:5004 \
-v ${dir_project_python}/ml-agents:/app/ml-agents \
--name ${container} \
${container_image}

コンテナを起動したら、ライブラリ群を設置します(適宜、イメージに反映させます):

$ cd /app/ml-agents
$ pip install -e ml-agents-envs
$ pip install -e ml-agents

ゲームエンジン(Unity )側では、プロジェクトを作り〜ML-Agents のパッケージを導入します:

project
- template: 3d
- editor: 2021.2.8f1
- name: <project_unity>
- path: <dir_project_unity>

window
> package manager
  > add package from git url: com.unity.ml-agents

作成:Unity


倒立振子(カートポール)の3Dモデルを作成します。[※1]

images

作成した3Dモデルに、次のコンポーネント群を取り付けます:

SCENE
> OBJECT
  > add component
    > behavior parameters
    > decision requester
    > <class_agent> ... クラス「Agent 」を継承するスクリプト

コンポーネント群には、次のような値を設定します(観測のサイズ、行動のサイズ、行動を決定する間隔、を指定します。そしてNNモデルを空白にすることで、訓練のモードにします):

behavior parameters
> behavior name: Cpl001
> vector observation
  > space size: 4
  > stacked vectors: 1
> actions
  continuous actions: 0
  discrete branches: 1
  > branch N size: 1
> model: <nil>

decision requester
> decision period: 5

※1
3Dモデルの作成は、次が参考になりましたーーカートの動く範囲を固定する、カートとポールの重量比を大きくして揺れを防ぐ、など、物理演算の影響をできるだけ小さくする工夫があります:
https://medium.com/@goncalorrc/unity-ml-intro-tutorial-super-cart-pole-part-1-413ce1879f02

実装:Unity


Unity 側のスクリプトを記述します。

必要なモジュール群を読み込み:

using static System.Math;
using UnityEngine;
using Unity.MLAgents;
using Unity.MLAgents.Sensors;
using Unity.MLAgents.Actuators;

名前空間とクラスを定義し、クラス内で使う変数群を定義〜初期化しますーーなおクラスは「Agent 」を継承します:

namespace <namespace_project> {
  public class <class_agent>: Agent {
    public float fcemov = 100.0f;
    public float radmax = 0.418f;
    public float posmax = 3.0f;
    private int cntstp;
    private GameObject objtgt;
    private GameObject objpol;
    private Rigidbody rgdtgt;
    private Rigidbody rgdpol;
    private float radpol_bgn;

ゲーム全体を初期化しますーーここでは、カート(objtgt)とポール(objpol)の実体と、それぞれの物理演算のためのコンポーネント(リジッドボディ)を得ます:

    public override void Initialize() {
      objtgt = this.gameObject;
      rgdtgt = objtgt.GetComponent<Rigidbody>();
      objpol = objtgt.transform.Find("<name_pole>").gameObject;
      rgdpol = objpol.GetComponent<Rigidbody>();
    }

エピソードごとに行う処理を記述しますーーここでは、カートとポールの位置・回転を、初期の状態に配置します:

    public override void OnEpisodeBegin() {
      cntstp = 0;
      objtgt.transform.position = Vector3.zero;
      objtgt.transform.localRotation = Quaternion.identity;
      objpol.transform.localPosition = new Vector3(0.0f, 1.5f, 0.0f);
      objpol.transform.localRotation = Quaternion.identity;
    }

観測が呼ばれたときの処理を記述しますーーここでは、カートの位置と速度、ポールの角度と速度(角速度)を取得します:

    public override void CollectObservations(VectorSensor sensor) {
      sensor.AddObservation(objtgt.transform.position.x);
      sensor.AddObservation(rgdtgt.velocity.x);
      sensor.AddObservation(SymmetricalRadian(objpol.transform.localEulerAngles.z * Mathf.Deg2Rad));
      sensor.AddObservation(rgdpol.angularVelocity.z);
      radpol_bgn = SymmetricalRadian(objpol.transform.localEulerAngles.z * Mathf.Deg2Rad);
    }

行動の決定が呼ばれたときの処理を記述しますーーここでは、エージェントが返した値により、カートを左右どちらかに動かします。また報酬を返し、終了条件を満たしていれば、エピソードの終了を返します:[※1]

    public override void OnActionReceived(ActionBuffers action) {
      int flgact;
      float postgt_end;
      float radpol_end;
      float rwdpol = 0.0f;
//行動
      flgact = action.DiscreteActions[0];
      if (flgact == 0) {
        rgdtgt.AddForce(new Vector3(+ fcemov,0.0f,0.0f));
      }
      if (flgact == 1) {
        rgdtgt.AddForce(new Vector3(- fcemov,0.0f,0.0f));
      }
//結果:報酬
      SetReward(1.0f);
//結果:終了
      if (radpol_end > + radmax || radpol_end < - radmax || postgt_end > + posmax || postgt_end < - posmax) {
        EndEpisode();
      }
    }

※1
ここでの報酬は、たんにステップを終えたら1単位を与えるだけですーーこれはOpenAI Gymのカートポールの報酬の与え方に準じていますーーようするに、エピソードを長く続ける(1エピソードのステップ数が大きい)ほど、報酬が大きくなる、ということをエージェントに示唆するものです。

次は、角度をラジアンに変換する関数です(これもOpenAI Gymに合わせていますが、じっさいこの方が終了条件を記述しやすいですしね):

    private float SymmetricalRadian(float radorg) {
      if (radorg > Mathf.PI) {
        radorg = radorg - Mathf.PI * 2;
      }
      return radorg;
    }

実装:Python


Python側のスクリプトを記述します。

NNモデルとエージェントのクラス定義は、元のコード(simple_pg.py)のものをそのまま使います(Policy, Agent )。ライブラリ群は次のものを使い(gym の代わりにUnityEnvironmentをを使う、など)、トレーニングの記述はすべての行を消しておきます(env = gym.make('CartPole-v0'〜以降は削除):

# 次のライブラリ群を使用:

import sys
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
from mlagents_envs.environment import UnityEnvironment, ActionTuple

# 次のクラス定義は残す:

class Policy(nn.Module):
  ...
class Agent:
  ...

# ここから以降は削除

以降、追加するトレーニングの記述ですーー環境(Unity )とNNモデルおよびエージェントをインスタンス化し、環境をリセットします。学習する回数はスクリプトの引数から取得し、エージェントの名前は環境から取得します:[※1]

trials = sys.argv[1]

envuni = UnityEnvironment(file_name=None)
agtnnm = Agent()
envuni.reset()
agtnam = list(envuni.behavior_specs)[0]

※1
環境(UnityEnvironment)のfile_name をNoneとすることで、訓練をエディタから実行することができますーービルドした環境を使う場合は(この方がより早く訓練できます)、その実行ファイル名を指定します。

NNモデルのパラメータは、エピソードごとに更新します:

for cntepi in range(int(trials)):
  # ... エピソード内の記述
  agtnnm.update()

エピソード内では、環境をリセットし〜ステップごとの決定と終了のための配列を取得します。ステップ内のエージェントの状態を初期化したあと、ステップごとの反復に入ります:

# 初期(エピソード)
  envuni.reset()
  stpdcs, stptrm = envuni.get_steps(agtnam)
  agtstp = -1
  endepi = False

  while not endepi:
    # ... ステップ内の記述

ステップ内では、エージェントの状態を取得します:

    if len(stpdcs) >= 1:
      agtstp = stpdcs.agent_id[0]
    rwdstp = 0

環境(Unity )から観測の結果を得て:

    sensor = stpdcs[agtstp].obs[0]

観察から行動を決定し、その決定を環境(Unity )に返します:

    actnnm, prbnnm = agtnnm.get_action(sensor)
    #actnnm, prbnnm = agtnnm.get_action(np.array(sensor,dtype=float)) # Python のバージョンによってはこちらを使います

    action = ActionTuple()
    action.add_discrete(np.array([[int(actnnm)]]))
    envuni.set_actions(agtnam,action)

ひとつの行動を終えたら、環境(Unity )をリセットし〜ステップごとの決定と終了のための配列を取得します。このときエージェントの状態が決定の状態を満たしていれば、報酬を得ます。さらに終了の状態を満たしていれば、報酬を得て、ステップから抜けるようにします:

# 切替(ステップ)
    envuni.step()
    stpdcs, stptrm = envuni.get_steps(agtnam)

# 結果(報酬/終了)
    if agtstp in stpdcs:
      rwdstp = rwdstp + stpdcs[agtstp].reward
    if agtstp in stptrm:
      rwdstp = rwdstp + stptrm[agtstp].reward
      endepi = True

# 処理(ステップ)
    agtnnm.add(rwdstp,prbnnm)

最後に、環境(Unity )を終了させます:

envuni.close()

訓練


訓練は、Python のスクリプトを起動し、Unity のエディタからプロジェクトを実行することで始められます:

$ python ${script_train}.py 3000