创建 Multigrid 新环境#

自定义环境。

import sys
from pathlib import Path
from IPython import display

root_dir = Path(".").resolve()
sys.path.extend([str(root_dir.parents[2]/"tests/gym-multigrid")])
temp_dir = root_dir/"images"

第一步:Multigrid 创建新世界#

  • core/agent.py: 如果现有的动作类不满足需求,添加新世界的动作类。

  • core/constants.py:

    • 如果有对象特定的状态,定义 state_to_idx_{yourChosenName} 字典。

    • 如果你的环境需要尚未定义的对象,将新对象的条目添加到 OBJECT_TO_STR 字典中。

  • core/object.py

    • 为新对象(如果有)定义类。

    • 对象需要:类型、颜色、位置、编码、解码、渲染。

    • 对象属性通过以下方式定义:can_overlap(可重叠)、can_pickup(可拾取)、can_contain(可容纳)、see_behind(可见背面)。

    • 对象可以具有:contains(包含)、toggle(切换)。

  • core/world.py

    • 如果现有世界不满足需求,添加一个新世界。

    • 一个世界定义了对象、颜色和编码大小。

    • 注意:编码层用于捕获不同的事物。第1层用于单元格中的对象类型,第2层用于颜色,第3层用于代理方向。我们还没有使用4、5和6层,但如果需要,它们可以用于更多特性。

  • core/grid.py

    • 这是新环境的基础结构。

    • 方法包括:copy(复制)、get(获取)、set(设置)、rotate(旋转)、slice(切片)、render(渲染)、encode(编码)。

    • 还有:horz_wall(水平墙)、vert_wall(垂直墙)、wall_rect(矩形墙)。

第二步:Multigrid 创建环境#

gym_multigrid/gym_multigrid/envs/ 目录下创建名为 {yourChosenName}.py 的文件。编写继承自 MultigridEnv 的环境类。

  • 在调用父类的 __init__ 方法时,您应该指定以下参数:

    • 代理列表

    • 网格尺寸

    • 是否使用完全或部分可观测性

    • 每个时间步长的数量

    • 上面定义的动作和世界类

from gym_multigrid.multigrid import MultiGridEnv
MultiGridEnv?
Init signature:
MultiGridEnv(
    agents: list[~AgentT],
    grid_size: int | None = None,
    width: int | None = None,
    height: int | None = None,
    max_steps: int = 100,
    see_through_walls: bool = False,
    partial_obs: bool = False,
    agent_view_size: int = 7,
    actions_set: Type[~ActionsT] = <enum 'DefaultActions'>,
    world: ~WorldT = World(encode_dim=6, normalize_obs=1, OBJECT_TO_IDX={'unseen': 0, 'empty': 1, 'wall': 2, 'floor': 3, 'door': 4, 'key': 5, 'ball': 6, 'box': 7, 'goal': 8, 'lava': 9, 'agent': 10, 'objgoal': 11, 'switch': 12}, COLORS={'red': array([228,   3,   3]), 'orange': array([255, 140,   0]), 'yellow': array([255, 237,   0]), 'green': array([  0, 128,  38]), 'blue': array([  0,  77, 255]), 'purple': array([117,   7, 135]), 'brown': array([120,  79,  23]), 'grey': array([100, 100, 100]), 'light_red': array([234, 153, 153]), 'light_blue': array([ 90, 170, 223])}, COLOR_TO_IDX={'red': 0, 'orange': 1, 'yellow': 2, 'green': 3, 'blue': 4, 'purple': 5, 'brown': 6, 'grey': 7, 'light_red': 8, 'light_blue': 9}, IDX_TO_COLOR={0: 'red', 1: 'orange', 2: 'yellow', 3: 'green', 4: 'blue', 5: 'purple', 6: 'brown', 7: 'grey', 8: 'light_red', 9: 'light_blue'}, IDX_TO_OBJECT={0: 'unseen', 1: 'empty', 2: 'wall', 3: 'floor', 4: 'door', 5: 'key', 6: 'ball', 7: 'box', 8: 'goal', 9: 'lava', 10: 'agent', 11: 'objgoal', 12: 'switch'}),
    render_mode: Literal['human', 'rgb_array'] = 'rgb_array',
    uncached_object_types: list[str] = [],
) -> None
Docstring:      2D grid world game environment
File:           /media/pc/data/lxw/ai/d2py/tests/gym-multigrid/gym_multigrid/multigrid.py
Type:           type
Subclasses:

你可能需要初始化/定义与你的环境相关的其他私有变量。例如,在收集游戏中,我们需要跟踪以下内容:

from gym_multigrid.envs.collect_game import CollectGameEnv
CollectGameEnv.__init__?
Signature:
CollectGameEnv.__init__(
    self,
    *args,
    actions_set=<enum 'CollectActions'>,
    **kwargs,
)
Docstring:
Initialize the CollectGameEnv.

Parameters
----------
size : int
    Size of grid if square. Default 10
num_balls : list[int]
    Total number of balls present in environment.
agents_index : list[int]
    Colour index for each agent.
balls_index : list[int]
    Colour index for each ball type.
balls_reward : list[float]
    Reward given for collecting each ball type.
respawn : bool
    Whether or not balls respawn after being collected.
File:      /media/pc/data/lxw/ai/d2py/tests/gym-multigrid/gym_multigrid/envs/collect_game.py
Type:      function

_gen_grid()#

你必须实现这个方法,因为它没有在MultiGridEnv父类中定义。这个方法在env.reset()期间默认被调用。在这里,你需要放置所有存在于网格世界中的对象和代理。

例如,在收集游戏中,我们定义了四个边界墙,放置球体,然后放置代理。

CollectGameEnv._gen_grid??
Signature: CollectGameEnv._gen_grid(self, width: int, height: int)
Source:   
    def _gen_grid(self, width: int, height: int):
        """
        Generate grid and place all the balls and agents.

        Parameters
        ----------
        width : int
            width of grid
        height : int
            height of grid
        """
        self.grid = Grid(width, height, self.world)

        # Generate the surrounding walls
        self.grid.horz_wall(0, 0)
        self.grid.horz_wall(0, height - 1)
        self.grid.vert_wall(0, 0)
        self.grid.vert_wall(width - 1, 0)

        if not isinstance(self.num_balls, list):
            raise TypeError(f'Expected num balls to be of type list, \
            however type {type(self.num_balls)} was passed')

        for number, index, reward in zip(
            self.num_balls, self.balls_index, self.balls_reward
        ):
            for _ in range(number):
                self.place_obj(Ball(self.world, index, reward))

        # Randomize the player start position
        for a in self.agents:
            self.place_agent(a)
File:      /media/pc/data/lxw/ai/d2py/tests/gym-multigrid/gym_multigrid/envs/collect_game.py
Type:      function

place_obj() 方法由父类定义,具有以下参数:

MultiGridEnv.place_obj??
Signature:
MultiGridEnv.place_obj(
    self,
    obj: ~WorldObjT,
    top: tuple[int, int] | numpy.ndarray[typing.Any, numpy.dtype[numpy.int64]] | None = None,
    size: tuple[int, int] | None = None,
    reject_fn: Optional[Callable[[ForwardRef('MultiGridEnv'), numpy.ndarray[Any, numpy.dtype[+_ScalarType_co]]], bool]] = None,
    max_tries: float = inf,
)
Source:   
    def place_obj(
        self,
        obj: WorldObjT,
        top: Position | None = None,
        size: tuple[int, int] | None = None,
        reject_fn: Callable[["MultiGridEnv", NDArray], bool] | None = None,
        max_tries: float = math.inf,
    ):
        """
        Place an object at an empty position in the grid

        :param top: top-left position of the rectangle where to place
        :param size: size of the rectangle where to place
        :param reject_fn: function to filter out potential positions
        """

        if top is None:
            top = (0, 0)
        else:
            top = (max(top[0], 0), max(top[1], 0))

        if size is None:
            size = (self.grid.width, self.grid.height)

        num_tries = 0

        while True:
            # This is to handle with rare cases where rejection sampling
            # gets stuck in an infinite loop
            if num_tries > max_tries:
                raise RecursionError("rejection sampling failed in place_obj")

            num_tries += 1

            pos = np.array(
                (
                    self._rand_int(top[0], min(top[0] + size[0], self.grid.width - 1)),
                    self._rand_int(top[1], min(top[1] + size[1], self.grid.height - 1)),
                )
            )

            # Don't place the object on top of another object
            if self.grid.get(*pos) != None:
                continue

            # Check if there is a filtering criterion
            if reject_fn and reject_fn(self, pos):
                continue

            break

        self.grid.set(*pos, obj)

        if obj is not None:
            obj.init_pos = pos
            obj.pos = pos

        return pos
File:      /media/pc/data/lxw/ai/d2py/tests/gym-multigrid/gym_multigrid/multigrid.py
Type:      function

默认情况下,该方法通过反复随机均匀地采样位置来尝试将对象放置在网格中,直到找到一个空闲的网格单元格。

如果你知道对象的坐标,你应该改用这个方法:

MultiGridEnv.put_obj??
Signature: MultiGridEnv.put_obj(self, obj: ~WorldObjT, i: int, j: int)
Source:   
    def put_obj(self, obj: WorldObjT, i: int, j: int):
        """
        Put an object at a specific position in the grid
        """

        self.grid.set(i, j, obj)
        obj.init_pos = (i, j)
        obj.pos = (i, j)
File:      /media/pc/data/lxw/ai/d2py/tests/gym-multigrid/gym_multigrid/multigrid.py
Type:      function

对于放置代理,根据需要使用上述两种方法调用此方法:

MultiGridEnv.place_agent??
Signature:
MultiGridEnv.place_agent(
    self,
    agent: ~AgentT,
    pos: tuple[int, int] | numpy.ndarray[typing.Any, numpy.dtype[numpy.int64]] | None = None,
    top: tuple[int, int] | numpy.ndarray[typing.Any, numpy.dtype[numpy.int64]] | None = None,
    size: tuple[int, int] | None = None,
    rand_dir: bool = False,
    max_tries: float = inf,
) -> tuple[int, int] | numpy.ndarray[typing.Any, numpy.dtype[numpy.int64]]
Source:   
    def place_agent(
        self,
        agent: AgentT,
        pos: Position | None = None,
        top: Position | None = None,
        size: tuple[int, int] | None = None,
        rand_dir: bool = False,
        max_tries: float = math.inf,
    ) -> Position:
        """
        Set the agent's starting point at an empty position in the grid
        """
        if pos is not None:
            agent.pos = pos
            self.put_obj(agent, i=pos[0], j=pos[1])
        else:
            agent.pos = None
            pos = self.place_obj(agent, top, size, max_tries=max_tries)
            agent.pos = pos
            agent.init_pos = pos

        if rand_dir:
            agent.dir = self._rand_int(0, 3)
        else:
            agent.dir = 3

        agent.init_dir = agent.dir

        return pos
File:      /media/pc/data/lxw/ai/d2py/tests/gym-multigrid/gym_multigrid/multigrid.py
Type:      function

_reward()#

MultiGridEnv._reward??
Signature: MultiGridEnv._reward(self, current_agent, rewards, reward=1)
Source:   
    def _reward(self, current_agent, rewards, reward=1):
        """
        Compute the reward to be given upon success
        """
        rewards[current_agent] += reward - 0.9 * (self.step_count / self.max_steps)
        return rewards
File:      /media/pc/data/lxw/ai/d2py/tests/gym-multigrid/gym_multigrid/multigrid.py
Type:      function

当达到目标状态时,此方法会被调用。current_agent 指定哪个代理接收奖励。

如果你的环境有不同的奖励结构,你应该重写这个方法。

step()#

该方法对你的环境动态至关重要。你应该定义这个方法,并且如果 MultiGridEnvstep() 方法可以按照你的环境所需的方式处理动作执行,你也可以调用它。

step() 方法的唯一必需参数是执行的动作列表。默认情况下,这些动作以随机顺序执行或者在达到最大时间步数时结束这一 episode:

MultiGridEnv.step??
Signature:
MultiGridEnv.step(
    self,
    actions: list[int] | numpy.ndarray[typing.Any, numpy.dtype[numpy.int64]],
) -> tuple[numpy.ndarray[typing.Any, numpy.dtype[numpy.int64]], numpy.ndarray[typing.Any, numpy.dtype[numpy.float64]], bool, bool, dict]
Docstring:
Run one timestep of the environment's dynamics using the agent actions.

When the end of an episode is reached (``terminated or truncated``), it is necessary to call :meth:`reset` to
reset this environment's state for the next episode.

.. versionchanged:: 0.26

    The Step API was changed removing ``done`` in favor of ``terminated`` and ``truncated`` to make it clearer
    to users when the environment had terminated or truncated which is critical for reinforcement learning
    bootstrapping algorithms.

Args:
    action (ActType): an action provided by the agent to update the environment state.

Returns:
    observation (ObsType): An element of the environment's :attr:`observation_space` as the next observation due to the agent actions.
        An example is a numpy array containing the positions and velocities of the pole in CartPole.
    reward (SupportsFloat): The reward as a result of taking the action.
    terminated (bool): Whether the agent reaches the terminal state (as defined under the MDP of the task)
        which can be positive or negative. An example is reaching the goal state or moving into the lava from
        the Sutton and Barto Gridworld. If true, the user needs to call :meth:`reset`.
    truncated (bool): Whether the truncation condition outside the scope of the MDP is satisfied.
        Typically, this is a timelimit, but could also be used to indicate an agent physically going out of bounds.
        Can be used to end the episode prematurely before a terminal state is reached.
        If true, the user needs to call :meth:`reset`.
    info (dict): Contains auxiliary diagnostic information (helpful for debugging, learning, and logging).
        This might, for instance, contain: metrics that describe the agent's performance state, variables that are
        hidden from observations, or individual reward terms that are combined to produce the total reward.
        In OpenAI Gym <v26, it contains "TimeLimit.truncated" to distinguish truncation and termination,
        however this is deprecated in favour of returning terminated and truncated variables.
    done (bool): (Deprecated) A boolean value for if the episode has ended, in which case further :meth:`step` calls will
        return undefined results. This was removed in OpenAI Gym v26 in favor of terminated and truncated attributes.
        A done signal may be emitted for different reasons: Maybe the task underlying the environment was solved successfully,
        a certain timelimit was exceeded, or the physics simulation has entered an invalid state.
Source:   
    def step(
        self, actions: list[int] | NDArray[np.int_]
    ) -> tuple[NDArray[np.int_], NDArray[np.float_], bool, bool, dict]:
        self.step_count += 1

        order = np.random.permutation(len(actions))

        rewards = np.zeros(len(actions))
        terminated = False
        truncated = False

        for i in order:
            if (
                self.agents[i].terminated
                or self.agents[i].paused
                or not self.agents[i].started
                or actions[i] == self.actions.still
            ):
                continue

            # Get the position in front of the agent
            fwd_pos = self.agents[i].front_pos

            # Get the contents of the cell in front of the agent
            fwd_cell = self.grid.get(*fwd_pos)

            # Rotate left
            if actions[i] == self.actions.left:
                self.agents[i].dir -= 1
                if self.agents[i].dir < 0:
                    self.agents[i].dir += 4

            # Rotate right
            elif actions[i] == self.actions.right:
                self.agents[i].dir = (self.agents[i].dir + 1) % 4

            # Move forward
            elif actions[i] == self.actions.forward:
                if fwd_cell is not None:
                    if fwd_cell.type == "goal":
                        terminated = True
                        rewards = self._reward(i, rewards, 1)
                    elif fwd_cell.type == "switch":
                        self._handle_switch(i, rewards, fwd_pos, fwd_cell)
                elif fwd_cell is None or fwd_cell.can_overlap():
                    self.grid.set(*fwd_pos, self.agents[i])
                    self.grid.set(*self.agents[i].pos, None)
                    self.agents[i].pos = fwd_pos
                self._handle_special_moves(i, rewards, fwd_pos, fwd_cell)

            elif "build" in self.actions.available and actions[i] == self.actions.build:
                self._handle_build(i, rewards, fwd_pos, fwd_cell)

            # Pick up an object
            elif actions[i] == self.actions.pickup:
                self._handle_pickup(i, rewards, fwd_pos, fwd_cell)

            # Drop an object
            elif actions[i] == self.actions.drop:
                self._handle_drop(i, rewards, fwd_pos, fwd_cell)

            # Toggle/activate an object
            elif actions[i] == self.actions.toggle:
                if fwd_cell:
                    fwd_cell.toggle(self, fwd_pos)

            # Done action (not used by default)
            elif actions[i] == self.actions.done:
                pass

            else:
                assert False, "unknown action"

        if self.step_count >= self.max_steps:
            truncated = True

        if self.partial_obs:
            obs = self.gen_obs()
        else:
            obs = [
                self.grid.encode_for_agents(agent_pos=self.agents[i].pos)
                for i in range(len(actions))
            ]

        obs = [self.world.normalize_obs * ob for ob in obs]
        info = self._get_info()
        return obs, rewards, terminated, truncated, info
File:      /media/pc/data/lxw/ai/d2py/tests/gym-multigrid/gym_multigrid/multigrid.py
Type:      function

reset()#

step 方法一样,你应该为你的环境实现一个方法,并且也可以调用 MultiGridEnvreset 方法,因为它重置了其他变量。

例如,在收集游戏中,我们重置了 collected_balls 的数量和 info 字典:

CollectGameEnv.reset??
Signature:
CollectGameEnv.reset(
    self,
    *,
    seed: int | None = None,
    options: dict | None = None,
)
Docstring:
Resets the environment to an initial internal state, returning an initial observation and info.

This method generates a new starting state often with some randomness to ensure that the agent explores the
state space and learns a generalised policy about the environment. This randomness can be controlled
with the ``seed`` parameter otherwise if the environment already has a random number generator and
:meth:`reset` is called with ``seed=None``, the RNG is not reset.

Therefore, :meth:`reset` should (in the typical use case) be called with a seed right after initialization and then never again.

For Custom environments, the first line of :meth:`reset` should be ``super().reset(seed=seed)`` which implements
the seeding correctly.

.. versionchanged:: v0.25

    The ``return_info`` parameter was removed and now info is expected to be returned.

Args:
    seed (optional int): The seed that is used to initialize the environment's PRNG (`np_random`) and
        the read-only attribute `np_random_seed`.
        If the environment does not already have a PRNG and ``seed=None`` (the default option) is passed,
        a seed will be chosen from some source of entropy (e.g. timestamp or /dev/urandom).
        However, if the environment already has a PRNG and ``seed=None`` is passed, the PRNG will *not* be reset
        and the env's :attr:`np_random_seed` will *not* be altered.
        If you pass an integer, the PRNG will be reset even if it already exists.
        Usually, you want to pass an integer *right after the environment has been initialized and then never again*.
        Please refer to the minimal example above to see this paradigm in action.
    options (optional dict): Additional information to specify how the environment is reset (optional,
        depending on the specific environment)

Returns:
    observation (ObsType): Observation of the initial state. This will be an element of :attr:`observation_space`
        (typically a numpy array) and is analogous to the observation returned by :meth:`step`.
    info (dictionary):  This dictionary contains auxiliary information complementing ``observation``. It should be analogous to
        the ``info`` returned by :meth:`step`.
Source:   
    def reset(self, *, seed: int | None = None, options: dict | None = None):
        self.collected_balls = 0
        self.info = {
            "agent1ball1": 0,
            "agent1ball2": 0,
            "agent1ball3": 0,
            "agent2ball1": 0,
            "agent2ball2": 0,
            "agent2ball3": 0,
        }
        super().reset(seed=seed)
        state = self.grid.encode()
        return state, self.info
File:      /media/pc/data/lxw/ai/d2py/tests/gym-multigrid/gym_multigrid/envs/collect_game.py
Type:      function

state/obs 编码#

默认的网格编码是一个形状为高度 x 宽度 x encode_dim 的 numpy数组。该方法还考虑了部分可观测性。你可能需要编写一个方法,将这个默认编码转换为最适合你的环境和代理算法的格式。

第三步:注册环境#

gym_multigrid/gym_multigrid/__init.py 中添加一行代码,以在 gymnasium 上注册新创建的环境。

# Collect game with 2 agents and 3 object types
# ----------------------------------------
register(
    id="multigrid-collect-v0",
    entry_point="gym_multigrid.envs:CollectGameEvenDist",
    max_episode_steps=100,
    kwargs={
        "size": 10,
        "num_balls": 15,
        "agents_index": [3, 5],  # green, purple
        "balls_index": [0, 1, 2],  # red, orange, yellow
        "balls_reward": [1, 1, 1],
        "respawn": False,
    },
)