# noqa: D212, D415
"""
# Simple Spread
```{figure} mpe_simple_spread.gif
:width: 140px
:name: simple_spread
```
This environment is part of the <a href='..'>MPE environments</a>. Please read that page first for general information.
| Import | `from pettingzoo.mpe import simple_spread_v3` |
|----------------------|-----------------------------------------------|
| Actions | Discrete/Continuous |
| Parallel API | Yes |
| Manual Control | No |
| Agents | `agents= [agent_0, agent_1, agent_2]` |
| Agents | 3 |
| Action Shape | (5) |
| Action Values | Discrete(5)/Box(0.0, 1.0, (5)) |
| Observation Shape | (18) |
| Observation Values | (-inf,inf) |
| State Shape | (54,) |
| State Values | (-inf,inf) |
This environment has N agents, N landmarks (default N=3). At a high level, agents must learn to cover all the landmarks while avoiding collisions.
More specifically, all agents are globally rewarded based on how far the closest agent is to each landmark (sum of the minimum distances). Locally, the agents are penalized if they collide with other agents (-1 for each collision). The relative weights of these rewards can be controlled with the
`local_ratio` parameter.
Agent observations: `[self_vel, self_pos, landmark_rel_positions, other_agent_rel_positions, communication]`
Agent action space: `[no_action, move_left, move_right, move_down, move_up]`
### Arguments
``` python
simple_spread_v3.env(N=3, local_ratio=0.5, max_cycles=25, continuous_actions=False)
```
`N`: number of agents and landmarks
`local_ratio`: Weight applied to local reward and global reward. Global reward weight will always be 1 - local reward weight.
`max_cycles`: number of frames (a step for each agent) until game terminates
`continuous_actions`: Whether agent action spaces are discrete(default) or continuous
"""
import numpy as np
from gymnasium.utils import EzPickle
from pettingzoo.mpe._mpe_utils.core import Agent, Landmark, World
from pettingzoo.mpe._mpe_utils.scenario import BaseScenario
from pettingzoo.mpe._mpe_utils.simple_env import SimpleEnv, make_env
from pettingzoo.utils.conversions import parallel_wrapper_fn
[docs]
class raw_env(SimpleEnv, EzPickle):
def __init__(
self,
N=3,
local_ratio=0.5,
max_cycles=25,
continuous_actions=False,
render_mode=None,
):
EzPickle.__init__(
self,
N=N,
local_ratio=local_ratio,
max_cycles=max_cycles,
continuous_actions=continuous_actions,
render_mode=render_mode,
)
assert (
0.0 <= local_ratio <= 1.0
), "local_ratio is a proportion. Must be between 0 and 1."
scenario = Scenario()
world = scenario.make_world(N)
SimpleEnv.__init__(
self,
scenario=scenario,
world=world,
render_mode=render_mode,
max_cycles=max_cycles,
continuous_actions=continuous_actions,
local_ratio=local_ratio,
)
self.metadata["name"] = "simple_spread_v3"
env = make_env(raw_env)
parallel_env = parallel_wrapper_fn(env)
class Scenario(BaseScenario):
def make_world(self, N=3):
world = World()
# set any world properties first
world.dim_c = 2
num_agents = N
num_landmarks = N
world.collaborative = True
# add agents
world.agents = [Agent() for i in range(num_agents)]
for i, agent in enumerate(world.agents):
agent.name = f"agent_{i}"
agent.collide = True
agent.silent = True
agent.size = 0.15
# add landmarks
world.landmarks = [Landmark() for i in range(num_landmarks)]
for i, landmark in enumerate(world.landmarks):
landmark.name = "landmark %d" % i
landmark.collide = False
landmark.movable = False
return world
def reset_world(self, world, np_random):
# random properties for agents
for i, agent in enumerate(world.agents):
agent.color = np.array([0.35, 0.35, 0.85])
# random properties for landmarks
for i, landmark in enumerate(world.landmarks):
landmark.color = np.array([0.25, 0.25, 0.25])
# set random initial states
for agent in world.agents:
agent.state.p_pos = np_random.uniform(-1, +1, world.dim_p)
agent.state.p_vel = np.zeros(world.dim_p)
agent.state.c = np.zeros(world.dim_c)
for i, landmark in enumerate(world.landmarks):
landmark.state.p_pos = np_random.uniform(-1, +1, world.dim_p)
landmark.state.p_vel = np.zeros(world.dim_p)
def benchmark_data(self, agent, world):
rew = 0
collisions = 0
occupied_landmarks = 0
min_dists = 0
for lm in world.landmarks:
dists = [
np.sqrt(np.sum(np.square(a.state.p_pos - lm.state.p_pos)))
for a in world.agents
]
min_dists += min(dists)
rew -= min(dists)
if min(dists) < 0.1:
occupied_landmarks += 1
if agent.collide:
for a in world.agents:
if self.is_collision(a, agent):
rew -= 1
collisions += 1
return (rew, collisions, min_dists, occupied_landmarks)
def is_collision(self, agent1, agent2):
delta_pos = agent1.state.p_pos - agent2.state.p_pos
dist = np.sqrt(np.sum(np.square(delta_pos)))
dist_min = agent1.size + agent2.size
return True if dist < dist_min else False
def reward(self, agent, world):
# Agents are rewarded based on minimum agent distance to each landmark, penalized for collisions
rew = 0
if agent.collide:
for a in world.agents:
rew -= 1.0 * (self.is_collision(a, agent) and a != agent)
return rew
def global_reward(self, world):
rew = 0
for lm in world.landmarks:
dists = [
np.sqrt(np.sum(np.square(a.state.p_pos - lm.state.p_pos)))
for a in world.agents
]
rew -= min(dists)
return rew
def observation(self, agent, world):
# get positions of all entities in this agent's reference frame
entity_pos = []
for entity in world.landmarks: # world.entities:
entity_pos.append(entity.state.p_pos - agent.state.p_pos)
# communication of all other agents
comm = []
other_pos = []
for other in world.agents:
if other is agent:
continue
comm.append(other.state.c)
other_pos.append(other.state.p_pos - agent.state.p_pos)
return np.concatenate(
[agent.state.p_vel] + [agent.state.p_pos] + entity_pos + other_pos + comm
)