# noqa: D212, D415
"""
# Pistonball
```{figure} butterfly_pistonball.gif
:width: 200px
:name: pistonball
```
This environment is part of the <a href='..'>butterfly environments</a>. Please read that page first for general information.
| Import | `from pettingzoo.butterfly import pistonball_v6` |
|----------------------|------------------------------------------------------|
| Actions | Either |
| Parallel API | Yes |
| Manual Control | Yes |
| Agents | `agents= ['piston_0', 'piston_1', ..., 'piston_19']` |
| Agents | 20 |
| Action Shape | (1,) |
| Action Values | [-1, 1] |
| Observation Shape | (457, 120, 3) |
| Observation Values | (0, 255) |
| State Shape | (560, 880, 3) |
| State Values | (0, 255) |
This is a simple physics based cooperative game where the goal is to move the ball to the left wall of the game border by activating the vertically moving pistons. Each piston agent's observation is an RGB image of the two pistons (or the wall) next to the agent and the space above them. Every
piston can be acted on in any given time. The action space in discrete mode is 0 to move down, 1 to stay still, and 2 to move up. In continuous mode, the value in the range [-1, 1] is proportional to the amount that the pistons are raised or lowered by. Continuous actions are scaled by a factor
of 4, so that in both the discrete and continuous action space, the action 1 will move a piston 4 pixels up, and -1 will move pistons 4 pixels down.
Accordingly, pistons must learn highly coordinated emergent behavior to achieve an optimal policy for the environment. Each agent gets a reward that is a combination of how much the ball moved left overall and how much the ball moved left if it was close to the piston (i.e. movement the piston
contributed to). A piston is considered close to the ball if it is directly below any part of the ball. Balancing the ratio between these local and global rewards appears to be critical to learning this environment, and as such is an environment parameter. The local reward applied is 0.5 times
the change in the ball's x-position. Additionally, the global reward is change in x-position divided by the starting position, times 100, plus the `time_penalty` (default -0.1). For each piston, the reward is `local_ratio` * local_reward + (1-`local_ratio`) * global_reward. The local reward is
applied to pistons surrounding the ball while the global reward is provided to all pistons.
Pistonball uses the chipmunk physics engine, and are thus the physics are about as realistic as in the game Angry Birds.
Keys *a* and *d* control which piston is selected to move (initially the rightmost piston is selected) and keys *w* and *s* move the piston in the vertical direction.
### Arguments
``` python
pistonball_v6.env(n_pistons=20, time_penalty=-0.1, continuous=True,
random_drop=True, random_rotate=True, ball_mass=0.75, ball_friction=0.3,
ball_elasticity=1.5, max_cycles=125)
```
`n_pistons`: The number of pistons (agents) in the environment.
`time_penalty`: Amount of reward added to each piston each time step. Higher values mean higher weight towards getting the ball across the screen to terminate the game.
`continuous`: If true, piston action is a real value between -1 and 1 which is added to the piston height. If False, then action is a discrete value to move a unit up or down.
`random_drop`: If True, ball will initially spawn in a random x value. If False, ball will always spawn at x=800
`random_rotate`: If True, ball will spawn with a random angular momentum
`ball_mass`: Sets the mass of the ball physics object
`ball_friction`: Sets the friction of the ball physics object
`ball_elasticity`: Sets the elasticity of the ball physics object
`max_cycles`: after max_cycles steps all agents will return done
### Version History
* v6: Fix ball bouncing off of left wall.
* v5: Ball moving into the left column due to physics engine imprecision no longer gives additional reward
* v4: Changed default arguments for `max_cycles` and `continuous`, bumped PyMunk version (1.6.0)
* v3: Refactor, added number of pistons argument, minor visual changes (1.5.0)
* v2: Misc fixes, bumped PyGame and PyMunk version (1.4.0)
* v1: Fix to continuous mode (1.0.1)
* v0: Initial versions release (1.0.0)
"""
import math
import gymnasium
import numpy as np
import pygame
import pymunk
import pymunk.pygame_util
from gymnasium.utils import EzPickle, seeding
from pettingzoo import AECEnv
from pettingzoo.butterfly.pistonball.manual_policy import ManualPolicy
from pettingzoo.utils import agent_selector, wrappers
from pettingzoo.utils.conversions import parallel_wrapper_fn
_image_library = {}
FPS = 20
__all__ = ["ManualPolicy", "env", "parallel_env", "raw_env"]
def get_image(path):
from os import path as os_path
cwd = os_path.dirname(__file__)
image = pygame.image.load(cwd + "/" + path)
sfc = pygame.Surface(image.get_size(), flags=pygame.SRCALPHA)
sfc.blit(image, (0, 0))
return sfc
[docs]
def env(**kwargs):
env = raw_env(**kwargs)
if env.continuous:
env = wrappers.ClipOutOfBoundsWrapper(env)
else:
env = wrappers.AssertOutOfBoundsWrapper(env)
env = wrappers.OrderEnforcingWrapper(env)
return env
parallel_env = parallel_wrapper_fn(env)
[docs]
class raw_env(AECEnv, EzPickle):
metadata = {
"render_modes": ["human", "rgb_array"],
"name": "pistonball_v6",
"is_parallelizable": True,
"render_fps": FPS,
"has_manual_policy": True,
}
def __init__(
self,
n_pistons=20,
time_penalty=-0.1,
continuous=True,
random_drop=True,
random_rotate=True,
ball_mass=0.75,
ball_friction=0.3,
ball_elasticity=1.5,
max_cycles=125,
render_mode=None,
):
EzPickle.__init__(
self,
n_pistons=n_pistons,
time_penalty=time_penalty,
continuous=continuous,
random_drop=random_drop,
random_rotate=random_rotate,
ball_mass=ball_mass,
ball_friction=ball_friction,
ball_elasticity=ball_elasticity,
max_cycles=max_cycles,
render_mode=render_mode,
)
self.dt = 1.0 / FPS
self.n_pistons = n_pistons
self.piston_head_height = 11
self.piston_width = 40
self.piston_height = 40
self.piston_body_height = 23
self.piston_radius = 5
self.wall_width = 40
self.ball_radius = 40
self.screen_width = (2 * self.wall_width) + (self.piston_width * self.n_pistons)
self.screen_height = 560
y_high = self.screen_height - self.wall_width - self.piston_body_height
y_low = self.wall_width
obs_height = y_high - y_low
assert (
self.piston_width == self.wall_width
), "Wall width and piston width must be equal for observation calculation"
assert self.n_pistons > 1, "n_pistons must be greater than 1"
self.agents = ["piston_" + str(r) for r in range(self.n_pistons)]
self.possible_agents = self.agents[:]
self.agent_name_mapping = dict(zip(self.agents, list(range(self.n_pistons))))
self._agent_selector = agent_selector(self.agents)
self.observation_spaces = dict(
zip(
self.agents,
[
gymnasium.spaces.Box(
low=0,
high=255,
shape=(obs_height, self.piston_width * 3, 3),
dtype=np.uint8,
)
]
* self.n_pistons,
)
)
self.continuous = continuous
if self.continuous:
self.action_spaces = dict(
zip(
self.agents,
[gymnasium.spaces.Box(low=-1, high=1, shape=(1,))] * self.n_pistons,
)
)
else:
self.action_spaces = dict(
zip(self.agents, [gymnasium.spaces.Discrete(3)] * self.n_pistons)
)
self.state_space = gymnasium.spaces.Box(
low=0,
high=255,
shape=(self.screen_height, self.screen_width, 3),
dtype=np.uint8,
)
pygame.init()
pymunk.pygame_util.positive_y_is_up = False
self.render_mode = render_mode
self.renderOn = False
self.screen = pygame.Surface((self.screen_width, self.screen_height))
self.max_cycles = max_cycles
self.piston_sprite = get_image("piston.png")
self.piston_body_sprite = get_image("piston_body.png")
self.background = get_image("background.png")
self.random_drop = random_drop
self.random_rotate = random_rotate
self.pistonList = []
self.pistonRewards = [] # Keeps track of individual rewards
self.recentFrameLimit = (
20 # Defines what "recent" means in terms of number of frames.
)
self.recentPistons = set() # Set of pistons that have touched the ball recently
self.time_penalty = time_penalty
# TODO: this was a bad idea and the logic this uses should be removed at some point
self.local_ratio = 0
self.ball_mass = ball_mass
self.ball_friction = ball_friction
self.ball_elasticity = ball_elasticity
self.terminate = False
self.truncate = False
self.pixels_per_position = 4
self.n_piston_positions = 16
self.screen.fill((0, 0, 0))
self.draw_background()
# self.screen.blit(self.background, (0, 0))
self.render_rect = pygame.Rect(
self.wall_width, # Left
self.wall_width, # Top
self.screen_width - (2 * self.wall_width), # Width
self.screen_height
- (2 * self.wall_width)
- self.piston_body_height, # Height
)
# Blit background image if ball goes out of bounds. Ball radius is 40
self.valid_ball_position_rect = pygame.Rect(
self.render_rect.left + self.ball_radius, # Left
self.render_rect.top + self.ball_radius, # Top
self.render_rect.width - (2 * self.ball_radius), # Width
self.render_rect.height - (2 * self.ball_radius), # Height
)
self.frames = 0
self._seed()
[docs]
def observation_space(self, agent):
return self.observation_spaces[agent]
[docs]
def action_space(self, agent):
return self.action_spaces[agent]
def _seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
[docs]
def observe(self, agent):
observation = pygame.surfarray.pixels3d(self.screen)
i = self.agent_name_mapping[agent]
# Set x bounds to include 40px left and 40px right of piston
x_high = self.wall_width + self.piston_width * (i + 2)
x_low = self.wall_width + self.piston_width * (i - 1)
y_high = self.screen_height - self.wall_width - self.piston_body_height
y_low = self.wall_width
cropped = np.array(observation[x_low:x_high, y_low:y_high, :])
observation = np.rot90(cropped, k=3)
observation = np.fliplr(observation)
return observation
[docs]
def state(self):
"""Returns an observation of the global environment."""
state = pygame.surfarray.pixels3d(self.screen).copy()
state = np.rot90(state, k=3)
state = np.fliplr(state)
return state
def enable_render(self):
self.screen = pygame.display.set_mode((self.screen_width, self.screen_height))
pygame.display.set_caption("Pistonball")
self.renderOn = True
# self.screen.blit(self.background, (0, 0))
self.draw_background()
self.draw()
[docs]
def close(self):
if self.screen is not None:
pygame.quit()
self.screen = None
def add_walls(self):
top_left = (self.wall_width, self.wall_width)
top_right = (self.screen_width - self.wall_width, self.wall_width)
bot_left = (self.wall_width, self.screen_height - self.wall_width)
bot_right = (
self.screen_width - self.wall_width,
self.screen_height - self.wall_width,
)
walls = [
pymunk.Segment(self.space.static_body, top_left, top_right, 1), # Top wall
pymunk.Segment(self.space.static_body, top_left, bot_left, 1), # Left wall
pymunk.Segment(
self.space.static_body, bot_left, bot_right, 1
), # Bottom wall
pymunk.Segment(self.space.static_body, top_right, bot_right, 1), # Right
]
for wall in walls:
wall.friction = 0.64
self.space.add(wall)
def add_ball(self, x, y, b_mass, b_friction, b_elasticity):
mass = b_mass
radius = 40
inertia = pymunk.moment_for_circle(mass, 0, radius, (0, 0))
body = pymunk.Body(mass, inertia)
body.position = x, y
# radians per second
if self.random_rotate:
body.angular_velocity = self.np_random.uniform(-6 * math.pi, 6 * math.pi)
shape = pymunk.Circle(body, radius, (0, 0))
shape.friction = b_friction
shape.elasticity = b_elasticity
self.space.add(body, shape)
return body
def add_piston(self, space, x, y):
piston = pymunk.Body(body_type=pymunk.Body.KINEMATIC)
piston.position = x, y
segment = pymunk.Segment(
piston,
(0, 0),
(self.piston_width - (2 * self.piston_radius), 0),
self.piston_radius,
)
segment.friction = 0.64
segment.color = pygame.color.THECOLORS["blue"]
space.add(piston, segment)
return piston
def move_piston(self, piston, v):
def cap(y):
maximum_piston_y = (
self.screen_height
- self.wall_width
- (self.piston_height - self.piston_head_height)
)
if y > maximum_piston_y:
y = maximum_piston_y
elif y < maximum_piston_y - (
self.n_piston_positions * self.pixels_per_position
):
y = maximum_piston_y - (
self.n_piston_positions * self.pixels_per_position
)
return y
piston.position = (
piston.position[0],
cap(piston.position[1] - v * self.pixels_per_position),
)
[docs]
def reset(self, seed=None, options=None):
if seed is not None:
self._seed(seed)
self.space = pymunk.Space(threaded=False)
self.add_walls()
# self.space.threads = 2
self.space.gravity = (0.0, 750.0)
self.space.collision_bias = 0.0001
self.space.iterations = 10 # 10 is default in PyMunk
self.pistonList = []
maximum_piston_y = (
self.screen_height
- self.wall_width
- (self.piston_height - self.piston_head_height)
)
for i in range(self.n_pistons):
# Multiply by 0.5 to use only the lower half of possible positions
possible_y_displacements = np.arange(
0,
0.5 * self.pixels_per_position * self.n_piston_positions,
self.pixels_per_position,
)
piston = self.add_piston(
self.space,
self.wall_width
+ self.piston_radius
+ self.piston_width * i, # x position
maximum_piston_y
# y position
- self.np_random.choice(possible_y_displacements),
)
piston.velociy = 0
self.pistonList.append(piston)
self.horizontal_offset = 0
self.vertical_offset = 0
horizontal_offset_range = 30
vertical_offset_range = 15
if self.random_drop:
self.vertical_offset = self.np_random.integers(
-vertical_offset_range, vertical_offset_range + 1
)
self.horizontal_offset = self.np_random.integers(
-horizontal_offset_range, horizontal_offset_range + 1
)
ball_x = (
self.screen_width
- self.wall_width
- self.ball_radius
- horizontal_offset_range
+ self.horizontal_offset
)
ball_y = (
self.screen_height
- self.wall_width
- self.piston_body_height
- self.ball_radius
- (0.5 * self.pixels_per_position * self.n_piston_positions)
- vertical_offset_range
+ self.vertical_offset
)
# Ensure ball starts somewhere right of the left wall
ball_x = max(ball_x, self.wall_width + self.ball_radius + 1)
self.ball = self.add_ball(
ball_x, ball_y, self.ball_mass, self.ball_friction, self.ball_elasticity
)
self.ball.angle = 0
self.ball.velocity = (0, 0)
if self.random_rotate:
self.ball.angular_velocity = self.np_random.uniform(
-6 * math.pi, 6 * math.pi
)
self.lastX = int(self.ball.position[0] - self.ball_radius)
self.distance = self.lastX - self.wall_width
self.draw_background()
self.draw()
self.agents = self.possible_agents[:]
self._agent_selector.reinit(self.agents)
self.agent_selection = self._agent_selector.next()
self.terminate = False
self.truncate = False
self.rewards = dict(zip(self.agents, [0 for _ in self.agents]))
self._cumulative_rewards = dict(zip(self.agents, [0 for _ in self.agents]))
self.terminations = dict(zip(self.agents, [False for _ in self.agents]))
self.truncations = dict(zip(self.agents, [False for _ in self.agents]))
self.infos = dict(zip(self.agents, [{} for _ in self.agents]))
self.frames = 0
def draw_background(self):
outer_walls = pygame.Rect(
0, # Left
0, # Top
self.screen_width, # Width
self.screen_height, # Height
)
outer_wall_color = (58, 64, 65)
pygame.draw.rect(self.screen, outer_wall_color, outer_walls)
inner_walls = pygame.Rect(
self.wall_width / 2, # Left
self.wall_width / 2, # Top
self.screen_width - self.wall_width, # Width
self.screen_height - self.wall_width, # Height
)
inner_wall_color = (68, 76, 77)
pygame.draw.rect(self.screen, inner_wall_color, inner_walls)
self.draw_pistons()
def draw_pistons(self):
piston_color = (65, 159, 221)
x_pos = self.wall_width
for piston in self.pistonList:
self.screen.blit(
self.piston_body_sprite,
(x_pos, self.screen_height - self.wall_width - self.piston_body_height),
)
# Height is the size of the blue part of the piston. 6 is the piston base height (the gray part at the bottom)
height = (
self.screen_height
- self.wall_width
- self.piston_body_height
- (piston.position[1] + self.piston_radius)
+ (self.piston_body_height - 6)
)
body_rect = pygame.Rect(
piston.position[0]
+ self.piston_radius
+ 1, # +1 to match up to piston graphics
piston.position[1] + self.piston_radius + 1,
18,
height,
)
pygame.draw.rect(self.screen, piston_color, body_rect)
x_pos += self.piston_width
def draw(self):
if self.render_mode is None:
return
# redraw the background image if ball goes outside valid position
if not self.valid_ball_position_rect.collidepoint(self.ball.position):
# self.screen.blit(self.background, (0, 0))
self.draw_background()
ball_x = int(self.ball.position[0])
ball_y = int(self.ball.position[1])
color = (255, 255, 255)
pygame.draw.rect(self.screen, color, self.render_rect)
color = (65, 159, 221)
pygame.draw.circle(self.screen, color, (ball_x, ball_y), self.ball_radius)
line_end_x = ball_x + (self.ball_radius - 1) * np.cos(self.ball.angle)
line_end_y = ball_y + (self.ball_radius - 1) * np.sin(self.ball.angle)
color = (58, 64, 65)
pygame.draw.line(
self.screen, color, (ball_x, ball_y), (line_end_x, line_end_y), 3
) # 39 because it kept sticking over by 1 at 40
for piston in self.pistonList:
self.screen.blit(
self.piston_sprite,
(
piston.position[0] - self.piston_radius,
piston.position[1] - self.piston_radius,
),
)
self.draw_pistons()
def get_nearby_pistons(self):
# first piston = leftmost
nearby_pistons = []
ball_pos = int(self.ball.position[0] - self.ball_radius)
closest = abs(self.pistonList[0].position.x - ball_pos)
closest_piston_index = 0
for i in range(self.n_pistons):
next_distance = abs(self.pistonList[i].position.x - ball_pos)
if next_distance < closest:
closest = next_distance
closest_piston_index = i
if closest_piston_index > 0:
nearby_pistons.append(closest_piston_index - 1)
nearby_pistons.append(closest_piston_index)
if closest_piston_index < self.n_pistons - 1:
nearby_pistons.append(closest_piston_index + 1)
return nearby_pistons
def get_local_reward(self, prev_position, curr_position):
local_reward = 0.5 * (prev_position - curr_position)
return local_reward
[docs]
def render(self):
if self.render_mode is None:
gymnasium.logger.warn(
"You are calling render method without specifying any render mode."
)
return
if self.render_mode == "human" and not self.renderOn:
# sets self.renderOn to true and initializes display
self.enable_render()
self.draw_background()
self.draw()
observation = np.array(pygame.surfarray.pixels3d(self.screen))
if self.render_mode == "human":
pygame.display.flip()
return (
np.transpose(observation, axes=(1, 0, 2))
if self.render_mode == "rgb_array"
else None
)
[docs]
def step(self, action):
if (
self.terminations[self.agent_selection]
or self.truncations[self.agent_selection]
):
self._was_dead_step(action)
return
action = np.asarray(action)
agent = self.agent_selection
if self.continuous:
self.move_piston(self.pistonList[self.agent_name_mapping[agent]], action)
else:
self.move_piston(
self.pistonList[self.agent_name_mapping[agent]], action - 1
)
self.space.step(self.dt)
if self._agent_selector.is_last():
ball_min_x = int(self.ball.position[0] - self.ball_radius)
ball_next_x = (
self.ball.position[0]
- self.ball_radius
+ self.ball.velocity[0] * self.dt
)
if ball_next_x <= self.wall_width + 1:
self.terminate = True
# ensures that the ball can't pass through the wall
ball_min_x = max(self.wall_width, ball_min_x)
self.draw()
local_reward = self.get_local_reward(self.lastX, ball_min_x)
# Opposite order due to moving right to left
global_reward = (100 / self.distance) * (self.lastX - ball_min_x)
if not self.terminate:
global_reward += self.time_penalty
total_reward = [
global_reward * (1 - self.local_ratio)
] * self.n_pistons # start with global reward
local_pistons_to_reward = self.get_nearby_pistons()
for index in local_pistons_to_reward:
total_reward[index] += local_reward * self.local_ratio
self.rewards = dict(zip(self.agents, total_reward))
self.lastX = ball_min_x
self.frames += 1
else:
self._clear_rewards()
self.truncate = self.frames >= self.max_cycles
# Clear the list of recent pistons for the next reward cycle
if self.frames % self.recentFrameLimit == 0:
self.recentPistons = set()
if self._agent_selector.is_last():
self.terminations = dict(
zip(self.agents, [self.terminate for _ in self.agents])
)
self.truncations = dict(
zip(self.agents, [self.truncate for _ in self.agents])
)
self.agent_selection = self._agent_selector.next()
self._cumulative_rewards[agent] = 0
self._accumulate_rewards()
if self.render_mode == "human":
self.render()
# Game art created by J K Terry