# noqa: D212, D415
"""
# Pistonball
```{figure} butterfly_pistonball.gif
:width: 200px
:name: pistonball
```
This environment is part of the <a href='..'>butterfly environments</a>. Please read that page first for general information.
| Import | `from pettingzoo.butterfly import pistonball_v6` |
|----------------------|------------------------------------------------------|
| Actions | Either |
| Parallel API | Yes |
| Manual Control | Yes |
| Agents | `agents= ['piston_0', 'piston_1', ..., 'piston_19']` |
| Agents | 20 |
| Action Shape | (1,) |
| Action Values | [-1, 1] |
| Observation Shape | (457, 120, 3) |
| Observation Values | (0, 255) |
| State Shape | (560, 880, 3) |
| State Values | (0, 255) |
This is a physics based cooperative game where the goal is to move the ball to the left-wall of the game border by activating the vertically moving pistons. To achieve an optimal policy for the environment, pistons must learn highly coordinated behavior.
**Observations**: Each piston-agent's observation is an RGB image encompassing the piston, its immediate neighbors (either two pistons or a piston and left/right-wall) and the space above them (which may show the ball).
**Actions**: Every piston can be acted on at each time step. In discrete mode, the action space is 0 to move down by 4 pixels, 1 to stay still, and 2 to move up by 4 pixels. In continuous mode, the value in the range [-1, 1] is proportional to the amount that the pistons
are lowered or raised by. Continuous actions are scaled by a factor of 4 to allow for matching the distance travelled in discrete mode, e.g. an action of -1 moves the piston down 4 pixels.
**Rewards**: The same reward is provided to each agent based on how much the ball moved left in the last time-step (moving right results in a negative reward) plus a constant time-penalty. The distance component is the percentage of the initial total distance (i.e. at game-start)
to the left-wall travelled in the past timestep. For example, if the ball began the game 300 pixels away from the wall, began the time-step 180 pixels away and finished the time-step 175 pixels away, the distance reward would be 100 * 5/300 = 1.7. There is also a configurable
time-penalty (default: -0.1) added to the distance-based reward at each time-step. For example, if the ball does not move in a time-step, the reward will be -0.1 not 0. This is to incentivize solving the game faster.
Pistonball uses the chipmunk physics engine, so the physics are about as realistic as in the game Angry Birds.
Keys *a* and *d* control which piston is selected to move (initially the rightmost piston is selected) and keys *w* and *s* move the piston in the vertical direction.
### Arguments
``` python
pistonball_v6.env(n_pistons=20, time_penalty=-0.1, continuous=True,
random_drop=True, random_rotate=True, ball_mass=0.75, ball_friction=0.3,
ball_elasticity=1.5, max_cycles=125)
```
`n_pistons`: The number of pistons (agents) in the environment.
`time_penalty`: Amount of reward added to each piston each time step. Higher values mean higher weight towards getting the ball across the screen to terminate the game.
`continuous`: If true, piston action is a real value between -1 and 1 which is added to the piston height. If False, then action is a discrete value to move a unit up or down.
`random_drop`: If True, ball will initially spawn in a random x value. If False, ball will always spawn at x=800
`random_rotate`: If True, ball will spawn with a random angular momentum
`ball_mass`: Sets the mass of the ball physics object
`ball_friction`: Sets the friction of the ball physics object
`ball_elasticity`: Sets the elasticity of the ball physics object
`max_cycles`: after max_cycles steps all agents will return done
### Version History
* v6: Fix ball bouncing off of left wall.
* v5: Ball moving into the left column due to physics engine imprecision no longer gives additional reward
* v4: Changed default arguments for `max_cycles` and `continuous`, bumped PyMunk version (1.6.0)
* v3: Refactor, added number of pistons argument, minor visual changes (1.5.0)
* v2: Misc fixes, bumped PyGame and PyMunk version (1.4.0)
* v1: Fix to continuous mode (1.0.1)
* v0: Initial versions release (1.0.0)
"""
import math
import gymnasium
import numpy as np
import pygame
import pymunk
import pymunk.pygame_util
from gymnasium.utils import EzPickle, seeding
from pettingzoo import AECEnv
from pettingzoo.butterfly.pistonball.manual_policy import ManualPolicy
from pettingzoo.utils import AgentSelector, wrappers
from pettingzoo.utils.conversions import parallel_wrapper_fn
FPS = 20
__all__ = ["ManualPolicy", "env", "parallel_env", "raw_env"]
def get_image(path):
from os import path as os_path
cwd = os_path.dirname(__file__)
image = pygame.image.load(cwd + "/" + path)
sfc = pygame.Surface(image.get_size(), flags=pygame.SRCALPHA)
sfc.blit(image, (0, 0))
return sfc
[docs]
def env(**kwargs):
env = raw_env(**kwargs)
if env.continuous:
env = wrappers.ClipOutOfBoundsWrapper(env)
else:
env = wrappers.AssertOutOfBoundsWrapper(env)
env = wrappers.OrderEnforcingWrapper(env)
return env
parallel_env = parallel_wrapper_fn(env)
[docs]
class raw_env(AECEnv, EzPickle):
metadata = {
"render_modes": ["human", "rgb_array"],
"name": "pistonball_v6",
"is_parallelizable": True,
"render_fps": FPS,
"has_manual_policy": True,
}
def __init__(
self,
n_pistons=20,
time_penalty=-0.1,
continuous=True,
random_drop=True,
random_rotate=True,
ball_mass=0.75,
ball_friction=0.3,
ball_elasticity=1.5,
max_cycles=125,
render_mode=None,
):
EzPickle.__init__(
self,
n_pistons=n_pistons,
time_penalty=time_penalty,
continuous=continuous,
random_drop=random_drop,
random_rotate=random_rotate,
ball_mass=ball_mass,
ball_friction=ball_friction,
ball_elasticity=ball_elasticity,
max_cycles=max_cycles,
render_mode=render_mode,
)
self.dt = 1.0 / FPS
self.n_pistons = n_pistons
self.piston_head_height = 11
self.piston_width = 40
self.piston_height = 40
self.piston_body_height = 23
self.piston_radius = 5
self.wall_width = 40
self.ball_radius = 40
self.screen_width = (2 * self.wall_width) + (self.piston_width * self.n_pistons)
self.screen_height = 560
y_high = self.screen_height - self.wall_width - self.piston_body_height
y_low = self.wall_width
obs_height = y_high - y_low
assert (
self.piston_width == self.wall_width
), "Wall width and piston width must be equal for observation calculation"
assert self.n_pistons > 1, "n_pistons must be greater than 1"
self.agents = ["piston_" + str(r) for r in range(self.n_pistons)]
self.possible_agents = self.agents[:]
self.agent_name_mapping = dict(zip(self.agents, list(range(self.n_pistons))))
self._agent_selector = AgentSelector(self.agents)
self.observation_spaces = dict(
zip(
self.agents,
[
gymnasium.spaces.Box(
low=0,
high=255,
shape=(obs_height, self.piston_width * 3, 3),
dtype=np.uint8,
)
]
* self.n_pistons,
)
)
self.continuous = continuous
if self.continuous:
self.action_spaces = dict(
zip(
self.agents,
[gymnasium.spaces.Box(low=-1, high=1, shape=(1,))] * self.n_pistons,
)
)
else:
self.action_spaces = dict(
zip(self.agents, [gymnasium.spaces.Discrete(3)] * self.n_pistons)
)
self.state_space = gymnasium.spaces.Box(
low=0,
high=255,
shape=(self.screen_height, self.screen_width, 3),
dtype=np.uint8,
)
pygame.init()
pymunk.pygame_util.positive_y_is_up = False
self.render_mode = render_mode
self.renderOn = False
self.screen = pygame.Surface((self.screen_width, self.screen_height))
self.max_cycles = max_cycles
self.piston_sprite = get_image("piston.png")
self.piston_body_sprite = get_image("piston_body.png")
self.background = get_image("background.png")
self.random_drop = random_drop
self.random_rotate = random_rotate
self.pistonList = []
self.pistonRewards = [] # Keeps track of individual rewards
self.recentFrameLimit = (
20 # Defines what "recent" means in terms of number of frames.
)
self.recentPistons = set() # Set of pistons that have touched the ball recently
self.time_penalty = time_penalty
self.ball_mass = ball_mass
self.ball_friction = ball_friction
self.ball_elasticity = ball_elasticity
self.terminate = False
self.truncate = False
self.pixels_per_position = 4
self.n_piston_positions = 16
self.screen.fill((0, 0, 0))
self.draw_background()
# self.screen.blit(self.background, (0, 0))
self.render_rect = pygame.Rect(
self.wall_width, # Left
self.wall_width, # Top
self.screen_width - (2 * self.wall_width), # Width
self.screen_height
- (2 * self.wall_width)
- self.piston_body_height, # Height
)
# Blit background image if ball goes out of bounds. Ball radius is 40
self.valid_ball_position_rect = pygame.Rect(
self.render_rect.left + self.ball_radius, # Left
self.render_rect.top + self.ball_radius, # Top
self.render_rect.width - (2 * self.ball_radius), # Width
self.render_rect.height - (2 * self.ball_radius), # Height
)
self.frames = 0
self._seed()
[docs]
def observation_space(self, agent):
return self.observation_spaces[agent]
[docs]
def action_space(self, agent):
return self.action_spaces[agent]
def _seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
[docs]
def observe(self, agent):
observation = pygame.surfarray.pixels3d(self.screen)
i = self.agent_name_mapping[agent]
# Set x bounds to include 40px left and 40px right of piston
x_high = self.wall_width + self.piston_width * (i + 2)
x_low = self.wall_width + self.piston_width * (i - 1)
y_high = self.screen_height - self.wall_width - self.piston_body_height
y_low = self.wall_width
cropped = np.array(observation[x_low:x_high, y_low:y_high, :])
observation = np.rot90(cropped, k=3)
observation = np.fliplr(observation)
return observation
[docs]
def state(self):
"""Returns an observation of the global environment."""
state = pygame.surfarray.pixels3d(self.screen).copy()
state = np.rot90(state, k=3)
state = np.fliplr(state)
return state
def enable_render(self):
self.screen = pygame.display.set_mode((self.screen_width, self.screen_height))
pygame.display.set_caption("Pistonball")
self.renderOn = True
# self.screen.blit(self.background, (0, 0))
self.draw_background()
self.draw()
[docs]
def close(self):
if self.screen is not None:
pygame.quit()
self.screen = None
def add_walls(self):
top_left = (self.wall_width, self.wall_width)
top_right = (self.screen_width - self.wall_width, self.wall_width)
bot_left = (self.wall_width, self.screen_height - self.wall_width)
bot_right = (
self.screen_width - self.wall_width,
self.screen_height - self.wall_width,
)
walls = [
pymunk.Segment(self.space.static_body, top_left, top_right, 1), # Top wall
pymunk.Segment(self.space.static_body, top_left, bot_left, 1), # Left wall
pymunk.Segment(
self.space.static_body, bot_left, bot_right, 1
), # Bottom wall
pymunk.Segment(self.space.static_body, top_right, bot_right, 1), # Right
]
for wall in walls:
wall.friction = 0.64
self.space.add(wall)
def add_ball(self, x, y, b_mass, b_friction, b_elasticity):
mass = b_mass
radius = 40
inertia = pymunk.moment_for_circle(mass, 0, radius, (0, 0))
body = pymunk.Body(mass, inertia)
body.position = x, y
# radians per second
if self.random_rotate:
body.angular_velocity = self.np_random.uniform(-6 * math.pi, 6 * math.pi)
shape = pymunk.Circle(body, radius, (0, 0))
shape.friction = b_friction
shape.elasticity = b_elasticity
self.space.add(body, shape)
return body
def add_piston(self, space, x, y):
piston = pymunk.Body(body_type=pymunk.Body.KINEMATIC)
piston.position = x, y
segment = pymunk.Segment(
piston,
(0, 0),
(self.piston_width - (2 * self.piston_radius), 0),
self.piston_radius,
)
segment.friction = 0.64
segment.color = pygame.color.THECOLORS["blue"]
space.add(piston, segment)
return piston
def move_piston(self, piston, v):
def cap(y):
maximum_piston_y = (
self.screen_height
- self.wall_width
- (self.piston_height - self.piston_head_height)
)
if y > maximum_piston_y:
y = maximum_piston_y
elif y < maximum_piston_y - (
self.n_piston_positions * self.pixels_per_position
):
y = maximum_piston_y - (
self.n_piston_positions * self.pixels_per_position
)
return y
piston.position = (
piston.position[0],
cap(piston.position[1] - v * self.pixels_per_position),
)
[docs]
def reset(self, seed=None, options=None):
if seed is not None:
self._seed(seed)
self.space = pymunk.Space(threaded=False)
self.add_walls()
# self.space.threads = 2
self.space.gravity = (0.0, 750.0)
self.space.collision_bias = 0.0001
self.space.iterations = 10 # 10 is default in PyMunk
self.pistonList = []
maximum_piston_y = (
self.screen_height
- self.wall_width
- (self.piston_height - self.piston_head_height)
)
for i in range(self.n_pistons):
# Multiply by 0.5 to use only the lower half of possible positions
possible_y_displacements = np.arange(
0,
0.5 * self.pixels_per_position * self.n_piston_positions,
self.pixels_per_position,
)
piston = self.add_piston(
self.space,
self.wall_width
+ self.piston_radius
+ self.piston_width * i, # x position
maximum_piston_y
# y position
- self.np_random.choice(possible_y_displacements),
)
piston.velociy = 0
self.pistonList.append(piston)
self.horizontal_offset = 0
self.vertical_offset = 0
horizontal_offset_range = 30
vertical_offset_range = 15
if self.random_drop:
self.vertical_offset = self.np_random.integers(
-vertical_offset_range, vertical_offset_range + 1
)
self.horizontal_offset = self.np_random.integers(
-horizontal_offset_range, horizontal_offset_range + 1
)
ball_x = (
self.screen_width
- self.wall_width
- self.ball_radius
- horizontal_offset_range
+ self.horizontal_offset
)
ball_y = (
self.screen_height
- self.wall_width
- self.piston_body_height
- self.ball_radius
- (0.5 * self.pixels_per_position * self.n_piston_positions)
- vertical_offset_range
+ self.vertical_offset
)
# Ensure ball starts somewhere right of the left wall
ball_x = max(ball_x, self.wall_width + self.ball_radius + 1)
self.ball = self.add_ball(
ball_x, ball_y, self.ball_mass, self.ball_friction, self.ball_elasticity
)
self.ball.angle = 0
self.ball.velocity = (0, 0)
if self.random_rotate:
self.ball.angular_velocity = self.np_random.uniform(
-6 * math.pi, 6 * math.pi
)
self.ball_prev_pos = self._get_ball_position()
self.distance_to_wall_at_game_start = self.ball_prev_pos - self.wall_width
self.draw_background()
self.draw()
self.agents = self.possible_agents[:]
self._agent_selector.reinit(self.agents)
self.agent_selection = self._agent_selector.next()
self.terminate = False
self.truncate = False
self.rewards = dict(zip(self.agents, [0 for _ in self.agents]))
self._cumulative_rewards = dict(zip(self.agents, [0 for _ in self.agents]))
self.terminations = dict(zip(self.agents, [False for _ in self.agents]))
self.truncations = dict(zip(self.agents, [False for _ in self.agents]))
self.infos = dict(zip(self.agents, [{} for _ in self.agents]))
self.frames = 0
def draw_background(self):
outer_walls = pygame.Rect(
0, # Left
0, # Top
self.screen_width, # Width
self.screen_height, # Height
)
outer_wall_color = (58, 64, 65)
pygame.draw.rect(self.screen, outer_wall_color, outer_walls)
inner_walls = pygame.Rect(
self.wall_width / 2, # Left
self.wall_width / 2, # Top
self.screen_width - self.wall_width, # Width
self.screen_height - self.wall_width, # Height
)
inner_wall_color = (68, 76, 77)
pygame.draw.rect(self.screen, inner_wall_color, inner_walls)
self.draw_pistons()
def draw_pistons(self):
piston_color = (65, 159, 221)
x_pos = self.wall_width
for piston in self.pistonList:
self.screen.blit(
self.piston_body_sprite,
(x_pos, self.screen_height - self.wall_width - self.piston_body_height),
)
# Height is the size of the blue part of the piston. 6 is the piston base height (the gray part at the bottom)
height = (
self.screen_height
- self.wall_width
- self.piston_body_height
- (piston.position[1] + self.piston_radius)
+ (self.piston_body_height - 6)
)
body_rect = pygame.Rect(
piston.position[0]
+ self.piston_radius
+ 1, # +1 to match up to piston graphics
piston.position[1] + self.piston_radius + 1,
18,
height,
)
pygame.draw.rect(self.screen, piston_color, body_rect)
x_pos += self.piston_width
def draw(self):
if self.render_mode is None:
return
# redraw the background image if ball goes outside valid position
if not self.valid_ball_position_rect.collidepoint(self.ball.position):
# self.screen.blit(self.background, (0, 0))
self.draw_background()
ball_x = int(self.ball.position[0])
ball_y = int(self.ball.position[1])
color = (255, 255, 255)
pygame.draw.rect(self.screen, color, self.render_rect)
color = (65, 159, 221)
pygame.draw.circle(self.screen, color, (ball_x, ball_y), self.ball_radius)
line_end_x = ball_x + (self.ball_radius - 1) * np.cos(self.ball.angle)
line_end_y = ball_y + (self.ball_radius - 1) * np.sin(self.ball.angle)
color = (58, 64, 65)
pygame.draw.line(
self.screen, color, (ball_x, ball_y), (line_end_x, line_end_y), 3
) # 39 because it kept sticking over by 1 at 40
for piston in self.pistonList:
self.screen.blit(
self.piston_sprite,
(
piston.position[0] - self.piston_radius,
piston.position[1] - self.piston_radius,
),
)
self.draw_pistons()
[docs]
def render(self):
if self.render_mode is None:
gymnasium.logger.warn(
"You are calling render method without specifying any render mode."
)
return
if self.render_mode == "human" and not self.renderOn:
# sets self.renderOn to true and initializes display
self.enable_render()
self.draw_background()
self.draw()
observation = np.array(pygame.surfarray.pixels3d(self.screen))
if self.render_mode == "human":
pygame.display.flip()
return (
np.transpose(observation, axes=(1, 0, 2))
if self.render_mode == "rgb_array"
else None
)
def _get_ball_position(self) -> int:
"""Return the leftmost x-position of the ball.
That leftmost x-position is generally referred to and treated as the
balls' position in this class. If the ball extends beyond the leftmost
wall, return the position of that wall-edge.
"""
ball_position = int(self.ball.position[0] - self.ball_radius)
# Check if the ball is touching/within the left-most wall.
clipped_ball_position = max(self.wall_width, ball_position)
return clipped_ball_position
[docs]
def step(self, action):
if (
self.terminations[self.agent_selection]
or self.truncations[self.agent_selection]
):
self._was_dead_step(action)
return
action = np.asarray(action)
agent = self.agent_selection
if self.continuous:
# action is a 1 item numpy array, move_piston expects a scalar
self.move_piston(self.pistonList[self.agent_name_mapping[agent]], action[0])
else:
self.move_piston(
self.pistonList[self.agent_name_mapping[agent]], action - 1
)
self.space.step(self.dt)
if self._agent_selector.is_last():
ball_curr_pos = self._get_ball_position()
# A rough, first-order prediction (i.e. velocity-only) of the balls next position.
# The physics environment may bounce the ball off the wall in the next time-step
# without us first registering that win-condition.
ball_predicted_next_pos = ball_curr_pos + self.ball.velocity[0] * self.dt
# Include a single-pixel fudge-factor for the approximation.
if ball_predicted_next_pos <= self.wall_width + 1:
self.terminate = True
self.draw()
# The negative one is included since the x-axis increases from left-to-right. And, if the x
# position decreases we want the reward to be positive, since the ball would have gotten closer
# to the left-wall.
reward = (
-1
* (ball_curr_pos - self.ball_prev_pos)
* (100 / self.distance_to_wall_at_game_start)
)
if not self.terminate:
reward += self.time_penalty
self.rewards = {agent: reward for agent in self.agents}
self.ball_prev_pos = ball_curr_pos
self.frames += 1
else:
self._clear_rewards()
self.truncate = self.frames >= self.max_cycles
# Clear the list of recent pistons for the next reward cycle
if self.frames % self.recentFrameLimit == 0:
self.recentPistons = set()
if self._agent_selector.is_last():
self.terminations = dict(
zip(self.agents, [self.terminate for _ in self.agents])
)
self.truncations = dict(
zip(self.agents, [self.truncate for _ in self.agents])
)
self.agent_selection = self._agent_selector.next()
self._cumulative_rewards[agent] = 0
self._accumulate_rewards()
if self.render_mode == "human":
self.render()
# Game art created by J K Terry