Source code for pettingzoo.mpe.simple_push.simple_push

# noqa: D212, D415
"""
# Simple Push

```{figure} mpe_simple_push.gif
:width: 140px
:name: simple_push
```

This environment is part of the <a href='..'>MPE environments</a>. Please read that page first for general information.

| Import             | `from pettingzoo.mpe import simple_push_v3` |
|--------------------|---------------------------------------------|
| Actions            | Discrete/Continuous                         |
| Parallel API       | Yes                                         |
| Manual Control     | No                                          |
| Agents             | `agents= [adversary_0, agent_0]`            |
| Agents             | 2                                           |
| Action Shape       | (5)                                         |
| Action Values      | Discrete(5)/Box(0.0, 1.0, (5,))             |
| Observation Shape  | (8),(19)                                    |
| Observation Values | (-inf,inf)                                  |
| State Shape        | (27,)                                       |
| State Values       | (-inf,inf)                                  |


This environment has 1 good agent, 1 adversary, and 1 landmark. The good agent is rewarded based on the distance to the landmark. The adversary is rewarded if it is close to the landmark, and if the agent is far from the landmark (the difference of the distances). Thus the adversary must learn to
push the good agent away from the landmark.

Agent observation space: `[self_vel, goal_rel_position, goal_landmark_id, all_landmark_rel_positions, landmark_ids, other_agent_rel_positions]`

Adversary observation space: `[self_vel, all_landmark_rel_positions, other_agent_rel_positions]`

Agent action space: `[no_action, move_left, move_right, move_down, move_up]`

Adversary action space: `[no_action, move_left, move_right, move_down, move_up]`

### Arguments

``` python
simple_push_v3.env(max_cycles=25, continuous_actions=False, dynamic_rescaling=False)
```



`max_cycles`:  number of frames (a step for each agent) until game terminates

`dynamic_rescaling`: Whether to rescale the size of agents and landmarks based on the screen size


"""

import numpy as np
from gymnasium.utils import EzPickle

from pettingzoo.mpe._mpe_utils.core import Agent, Landmark, World
from pettingzoo.mpe._mpe_utils.scenario import BaseScenario
from pettingzoo.mpe._mpe_utils.simple_env import SimpleEnv, make_env
from pettingzoo.utils.conversions import parallel_wrapper_fn


[docs] class raw_env(SimpleEnv, EzPickle): def __init__( self, max_cycles=25, continuous_actions=False, render_mode=None, dynamic_rescaling=False, ): EzPickle.__init__( self, max_cycles=max_cycles, continuous_actions=continuous_actions, render_mode=render_mode, ) scenario = Scenario() world = scenario.make_world() SimpleEnv.__init__( self, scenario=scenario, world=world, render_mode=render_mode, max_cycles=max_cycles, continuous_actions=continuous_actions, dynamic_rescaling=dynamic_rescaling, ) self.metadata["name"] = "simple_push_v3"
env = make_env(raw_env) parallel_env = parallel_wrapper_fn(env) class Scenario(BaseScenario): def make_world(self): world = World() # set any world properties first world.dim_c = 2 num_agents = 2 num_adversaries = 1 num_landmarks = 2 # add agents world.agents = [Agent() for i in range(num_agents)] for i, agent in enumerate(world.agents): agent.adversary = True if i < num_adversaries else False base_name = "adversary" if agent.adversary else "agent" base_index = i if i < num_adversaries else i - num_adversaries agent.name = f"{base_name}_{base_index}" agent.collide = True agent.silent = True # add landmarks world.landmarks = [Landmark() for i in range(num_landmarks)] for i, landmark in enumerate(world.landmarks): landmark.name = "landmark %d" % i landmark.collide = False landmark.movable = False return world def reset_world(self, world, np_random): # random properties for landmarks for i, landmark in enumerate(world.landmarks): landmark.color = np.array([0.1, 0.1, 0.1]) landmark.color[i + 1] += 0.8 landmark.index = i # set goal landmark goal = np_random.choice(world.landmarks) for i, agent in enumerate(world.agents): agent.goal_a = goal agent.color = np.array([0.25, 0.25, 0.25]) if agent.adversary: agent.color = np.array([0.75, 0.25, 0.25]) else: j = goal.index agent.color[j + 1] += 0.5 # set random initial states for agent in world.agents: agent.state.p_pos = np_random.uniform(-1, +1, world.dim_p) agent.state.p_vel = np.zeros(world.dim_p) agent.state.c = np.zeros(world.dim_c) for i, landmark in enumerate(world.landmarks): landmark.state.p_pos = np_random.uniform(-1, +1, world.dim_p) landmark.state.p_vel = np.zeros(world.dim_p) def reward(self, agent, world): # Agents are rewarded based on minimum agent distance to each landmark return ( self.adversary_reward(agent, world) if agent.adversary else self.agent_reward(agent, world) ) def agent_reward(self, agent, world): # the distance to the goal return -np.sqrt(np.sum(np.square(agent.state.p_pos - agent.goal_a.state.p_pos))) def adversary_reward(self, agent, world): # keep the nearest good agents away from the goal agent_dist = [ np.sqrt(np.sum(np.square(a.state.p_pos - a.goal_a.state.p_pos))) for a in world.agents if not a.adversary ] pos_rew = min(agent_dist) # nearest_agent = world.good_agents[np.argmin(agent_dist)] # neg_rew = np.sqrt(np.sum(np.square(nearest_agent.state.p_pos - agent.state.p_pos))) neg_rew = np.sqrt( np.sum(np.square(agent.goal_a.state.p_pos - agent.state.p_pos)) ) # neg_rew = sum([np.sqrt(np.sum(np.square(a.state.p_pos - agent.state.p_pos))) for a in world.good_agents]) return pos_rew - neg_rew def observation(self, agent, world): # get positions of all entities in this agent's reference frame entity_pos = [] for entity in world.landmarks: # world.entities: entity_pos.append(entity.state.p_pos - agent.state.p_pos) # entity colors entity_color = [] for entity in world.landmarks: # world.entities: entity_color.append(entity.color) # communication of all other agents comm = [] other_pos = [] for other in world.agents: if other is agent: continue comm.append(other.state.c) other_pos.append(other.state.p_pos - agent.state.p_pos) if not agent.adversary: return np.concatenate( [agent.state.p_vel] + [agent.goal_a.state.p_pos - agent.state.p_pos] + [agent.color] + entity_pos + entity_color + other_pos ) else: return np.concatenate([agent.state.p_vel] + entity_pos + other_pos)