Tutorial: Environment Logic#
Introduction#
Now that we have a basic understanding of the structure of environment repositories, we can start thinking about the fun part - environment logic!
For this tutorial, we will be creating a two-player game consisting of a prisoner, trying to escape, and a guard, trying to catch the prisoner. This game will be played on a 7x7 grid, where:
The prisoner starts in the top left corner,
The guard starts in the bottom right corner,
The escape door is randomly placed in the middle of the grid
Both the prisoner and the guard can move in any of the four cardinal directions (up, down, left, right).
Code#
import functools
import random
from copy import copy
import numpy as np
from gymnasium.spaces import Discrete, MultiDiscrete
from pettingzoo import ParallelEnv
class CustomEnvironment(ParallelEnv):
"""The metadata holds environment constants.
The "name" metadata allows the environment to be pretty printed.
"""
metadata = {
"name": "custom_environment_v0",
}
def __init__(self):
"""The init method takes in environment arguments.
Should define the following attributes:
- escape x and y coordinates
- guard x and y coordinates
- prisoner x and y coordinates
- timestamp
- possible_agents
Note: as of v1.18.1, the action_spaces and observation_spaces attributes are deprecated.
Spaces should be defined in the action_space() and observation_space() methods.
If these methods are not overridden, spaces will be inferred from self.observation_spaces/action_spaces, raising a warning.
These attributes should not be changed after initialization.
"""
self.escape_y = None
self.escape_x = None
self.guard_y = None
self.guard_x = None
self.prisoner_y = None
self.prisoner_x = None
self.timestep = None
self.possible_agents = ["prisoner", "guard"]
def reset(self, seed=None, options=None):
"""Reset set the environment to a starting point.
It needs to initialize the following attributes:
- agents
- timestamp
- prisoner x and y coordinates
- guard x and y coordinates
- escape x and y coordinates
- observation
- infos
And must set up the environment so that render(), step(), and observe() can be called without issues.
"""
self.agents = copy(self.possible_agents)
self.timestep = 0
self.prisoner_x = 0
self.prisoner_y = 0
self.guard_x = 6
self.guard_y = 6
self.escape_x = random.randint(2, 5)
self.escape_y = random.randint(2, 5)
observations = {
a: (
self.prisoner_x + 7 * self.prisoner_y,
self.guard_x + 7 * self.guard_y,
self.escape_x + 7 * self.escape_y,
)
for a in self.agents
}
# Get dummy infos. Necessary for proper parallel_to_aec conversion
infos = {a: {} for a in self.agents}
return observations, infos
def step(self, actions):
"""Takes in an action for the current agent (specified by agent_selection).
Needs to update:
- prisoner x and y coordinates
- guard x and y coordinates
- terminations
- truncations
- rewards
- timestamp
- infos
And any internal state used by observe() or render()
"""
# Execute actions
prisoner_action = actions["prisoner"]
guard_action = actions["guard"]
if prisoner_action == 0 and self.prisoner_x > 0:
self.prisoner_x -= 1
elif prisoner_action == 1 and self.prisoner_x < 6:
self.prisoner_x += 1
elif prisoner_action == 2 and self.prisoner_y > 0:
self.prisoner_y -= 1
elif prisoner_action == 3 and self.prisoner_y < 6:
self.prisoner_y += 1
if guard_action == 0 and self.guard_x > 0:
self.guard_x -= 1
elif guard_action == 1 and self.guard_x < 6:
self.guard_x += 1
elif guard_action == 2 and self.guard_y > 0:
self.guard_y -= 1
elif guard_action == 3 and self.guard_y < 6:
self.guard_y += 1
# Check termination conditions
terminations = {a: False for a in self.agents}
rewards = {a: 0 for a in self.agents}
if self.prisoner_x == self.guard_x and self.prisoner_y == self.guard_y:
rewards = {"prisoner": -1, "guard": 1}
terminations = {a: True for a in self.agents}
elif self.prisoner_x == self.escape_x and self.prisoner_y == self.escape_y:
rewards = {"prisoner": 1, "guard": -1}
terminations = {a: True for a in self.agents}
# Check truncation conditions (overwrites termination conditions)
truncations = {a: False for a in self.agents}
if self.timestep > 100:
rewards = {"prisoner": 0, "guard": 0}
truncations = {"prisoner": True, "guard": True}
self.timestep += 1
# Get observations
observations = {
a: (
self.prisoner_x + 7 * self.prisoner_y,
self.guard_x + 7 * self.guard_y,
self.escape_x + 7 * self.escape_y,
)
for a in self.agents
}
# Get dummy infos (not used in this example)
infos = {a: {} for a in self.agents}
if any(terminations.values()) or all(truncations.values()):
self.agents = []
return observations, rewards, terminations, truncations, infos
def render(self):
"""Renders the environment."""
grid = np.full((7, 7), " ")
grid[self.prisoner_y, self.prisoner_x] = "P"
grid[self.guard_y, self.guard_x] = "G"
grid[self.escape_y, self.escape_x] = "E"
print(f"{grid} \n")
# Observation space should be defined here.
# lru_cache allows observation and action spaces to be memoized, reducing clock cycles required to get each agent's space.
# If your spaces change over time, remove this line (disable caching).
@functools.lru_cache(maxsize=None)
def observation_space(self, agent):
# gymnasium spaces are defined and documented here: https://gymnasium.farama.org/api/spaces/
return MultiDiscrete([7 * 7] * 3)
# Action space should be defined here.
# If your spaces change over time, remove this line (disable caching).
@functools.lru_cache(maxsize=None)
def action_space(self, agent):
return Discrete(4)