Source code for pettingzoo.butterfly.cooperative_pong.cooperative_pong

# noqa: D212, D415
"""
# Cooperative Pong

```{figure} butterfly_cooperative_pong.gif
:width: 200px
:name: cooperative_pong
```

This environment is part of the <a href='..'>butterfly environments</a>. Please read that page first for general information.

| Import               | `from pettingzoo.butterfly import cooperative_pong_v5` |
|----------------------|--------------------------------------------------------|
| Actions              | Discrete                                               |
| Parallel API         | Yes                                                    |
| Manual Control       | Yes                                                    |
| Agents               | `agents= ['paddle_0', 'paddle_1']`                     |
| Agents               | 2                                                      |
| Action Shape         | Discrete(3)                                            |
| Action Values        | [0, 1]                                                 |
| Observation Shape    | (280, 480, 3)                                          |
| Observation Values   | [0, 255]                                               |
| State Shape          | (560, 960, 3)                                          |
| State Values         | (0, 255)                                               |


Cooperative pong is a game of simple pong, where the objective is to keep the ball in play for the longest time. The game is over when the ball goes out of bounds from either the left or right edge of the screen. There are two agents (paddles), one that moves along the left edge and the other that
moves along the right edge of the screen. All collisions of the ball are elastic. The ball always starts moving in a random direction from the center of the screen with each reset. To make learning a little more challenging, the right paddle is tiered cake-shaped by default.
The observation space of each agent is its own half of the screen. There are two possible actions for the agents (_move up/down_). If the ball stays within bounds, each agent receives a reward of `max_reward / max_cycles` (default 0.11) at each timestep. Otherwise, each agent receives a reward of
`off_screen_penalty` (default -10) and the game ends.


### Manual Control

Move the left paddle using the 'W' and 'S' keys. Move the right paddle using 'UP' and 'DOWN' arrow keys.

### Arguments

``` python
cooperative_pong_v5.env(ball_speed=9, left_paddle_speed=12,
right_paddle_speed=12, cake_paddle=True, max_cycles=900, bounce_randomness=False, max_reward=100, off_screen_penalty=-10)
```

`ball_speed`: Speed of ball (in pixels)

`left_paddle_speed`: Speed of left paddle (in pixels)

`right_paddle_speed`: Speed of right paddle (in pixels)

`cake_paddle`: If True, the right paddle cakes the shape of a 4 tiered wedding cake

`max_cycles`:  after max_cycles steps all agents will return done

`bounce_randomness`: If True, each collision of the ball with the paddles adds a small random angle to the direction of the ball, with the speed of the ball remaining unchanged.

`max_reward`:  Total reward given to each agent over max_cycles timesteps

`off_screen_penalty`:  Negative reward penalty for each agent if the ball goes off the screen

### Version History

* v5: Fixed ball teleporting bugs
* v4: Added max_reward and off_screen_penalty arguments and changed default, fixed glitch where ball would occasionally teleport, reward redesign (1.14.0)
* v3: Change observation space to include entire screen (1.10.0)
* v2: Misc fixes (1.4.0)
* v1: Fixed bug in how `dones` were computed (1.3.1)
* v0: Initial versions release (1.0.0)

"""

import gymnasium
import numpy as np
import pygame
from gymnasium.utils import EzPickle, seeding

from pettingzoo import AECEnv
from pettingzoo.butterfly.cooperative_pong.ball import Ball
from pettingzoo.butterfly.cooperative_pong.cake_paddle import CakePaddle
from pettingzoo.butterfly.cooperative_pong.manual_policy import ManualPolicy
from pettingzoo.butterfly.cooperative_pong.paddle import Paddle
from pettingzoo.utils import wrappers
from pettingzoo.utils.agent_selector import agent_selector
from pettingzoo.utils.conversions import parallel_wrapper_fn

FPS = 15


__all__ = ["ManualPolicy", "env", "raw_env", "parallel_env"]


def deg_to_rad(deg):
    return deg * np.pi / 180


def get_flat_shape(width, height, kernel_window_length=2):
    return int(width * height / (kernel_window_length * kernel_window_length))


def original_obs_shape(screen_width, screen_height, kernel_window_length=2):
    return (
        int(screen_height * 2 / kernel_window_length),
        int(screen_width * 2 / (kernel_window_length)),
        1,
    )


def get_valid_angle(randomizer):
    # generates an angle in [0, 2*np.pi) that
    # excludes (90 +- ver_deg_range), (270 +- ver_deg_range), (0 +- hor_deg_range), (180 +- hor_deg_range)
    # (65, 115), (245, 295), (170, 190), (0, 10), (350, 360)
    ver_deg_range = 25
    hor_deg_range = 10
    a1 = deg_to_rad(90 - ver_deg_range)
    b1 = deg_to_rad(90 + ver_deg_range)
    a2 = deg_to_rad(270 - ver_deg_range)
    b2 = deg_to_rad(270 + ver_deg_range)
    c1 = deg_to_rad(180 - hor_deg_range)
    d1 = deg_to_rad(180 + hor_deg_range)
    c2 = deg_to_rad(360 - hor_deg_range)
    d2 = deg_to_rad(0 + hor_deg_range)

    angle = 0
    while (
        (a1 < angle < b1)
        or (a2 < angle < b2)
        or (c1 < angle < d1)
        or (angle > c2)
        or (angle < d2)
    ):
        angle = 2 * np.pi * randomizer.random()

    return angle


class CooperativePong:
    def __init__(
        self,
        randomizer,
        ball_speed=9,
        left_paddle_speed=12,
        right_paddle_speed=12,
        cake_paddle=True,
        max_cycles=900,
        bounce_randomness=False,
        max_reward=100,
        off_screen_penalty=-10,
        render_mode=None,
        render_ratio=2,
        kernel_window_length=2,
        render_fps=15,
    ):
        super().__init__()

        pygame.init()
        self.num_agents = 2

        self.render_ratio = render_ratio
        self.kernel_window_length = kernel_window_length

        # Display screen
        self.s_width, self.s_height = 960 // render_ratio, 560 // render_ratio
        self.area = pygame.Rect(0, 0, self.s_width, self.s_height)
        self.max_reward = max_reward
        self.off_screen_penalty = off_screen_penalty

        # define action and observation spaces
        self.action_space = [
            gymnasium.spaces.Discrete(3) for _ in range(self.num_agents)
        ]
        original_shape = original_obs_shape(
            self.s_width, self.s_height, kernel_window_length=kernel_window_length
        )
        original_color_shape = (original_shape[0], original_shape[1], 3)
        self.observation_space = [
            gymnasium.spaces.Box(
                low=0, high=255, shape=(original_color_shape), dtype=np.uint8
            )
            for _ in range(self.num_agents)
        ]
        # define the global space of the environment or state
        self.state_space = gymnasium.spaces.Box(
            low=0, high=255, shape=((self.s_height, self.s_width, 3)), dtype=np.uint8
        )

        self.render_mode = render_mode
        self.screen = None

        # set speed
        self.speed = [ball_speed, left_paddle_speed, right_paddle_speed]

        self.max_cycles = max_cycles

        # paddles
        self.p0 = Paddle((20 // render_ratio, 80 // render_ratio), left_paddle_speed)
        if cake_paddle:
            self.p1 = CakePaddle(right_paddle_speed, render_ratio=render_ratio)
        else:
            self.p1 = Paddle(
                (20 // render_ratio, 100 // render_ratio), right_paddle_speed
            )

        self.agents = ["paddle_0", "paddle_1"]  # list(range(self.num_agents))

        # ball
        self.ball = Ball(
            randomizer,
            (20 // render_ratio, 20 // render_ratio),
            ball_speed,
            bounce_randomness,
        )
        self.randomizer = randomizer

        self.reinit()

        self.render_fps = render_fps
        if self.render_mode == "human":
            self.clock = pygame.time.Clock()

    def reinit(self):
        self.rewards = dict(zip(self.agents, [0.0] * len(self.agents)))
        self.terminations = dict(zip(self.agents, [False] * len(self.agents)))
        self.truncations = dict(zip(self.agents, [False] * len(self.agents)))
        self.infos = dict(zip(self.agents, [{}] * len(self.agents)))
        self.score = 0

    def reset(self, seed=None, options=None):
        # reset ball and paddle init conditions
        self.ball.rect.center = self.area.center
        # set the direction to an angle between [0, 2*np.pi)
        angle = get_valid_angle(self.randomizer)
        # angle = deg_to_rad(89)
        self.ball.speed = [
            int(self.ball.speed_val * np.cos(angle)),
            int(self.ball.speed_val * np.sin(angle)),
        ]

        self.p0.rect.midleft = self.area.midleft
        self.p1.rect.midright = self.area.midright
        self.p0.reset()
        self.p1.reset()
        self.p0.speed = self.speed[1]
        self.p1.speed = self.speed[2]

        self.terminate = False
        self.truncate = False

        self.num_frames = 0

        self.reinit()

        # Pygame surface required even for render_mode == None, as observations are taken from pixel values
        # Observe
        if self.render_mode != "human":
            self.screen = pygame.Surface((self.s_width, self.s_height))

        self.render()

    def close(self):
        if self.screen is not None:
            pygame.quit()
            self.screen = None

    def render(self):
        if self.render_mode is None:
            gymnasium.logger.warn(
                "You are calling render method without specifying any render mode."
            )
            return

        if self.screen is None:
            if self.render_mode == "human":
                self.screen = pygame.display.set_mode((self.s_width, self.s_height))
                pygame.display.set_caption("Cooperative Pong")
        self.draw()

        observation = np.array(pygame.surfarray.pixels3d(self.screen))
        if self.render_mode == "human":
            pygame.display.flip()
            self.clock.tick(self.render_fps)
        return (
            np.transpose(observation, axes=(1, 0, 2))
            if self.render_mode == "rgb_array"
            else None
        )

    def observe(self):
        observation = np.array(pygame.surfarray.pixels3d(self.screen))
        observation = np.rot90(
            observation, k=3
        )  # now the obs is laid out as H, W as rows and cols
        observation = np.fliplr(observation)  # laid out in the correct order
        return observation

    def state(self):
        """Returns an observation of the global environment."""
        state = pygame.surfarray.pixels3d(self.screen).copy()
        state = np.rot90(state, k=3)
        state = np.fliplr(state)
        return state

    def draw(self):
        pygame.draw.rect(self.screen, (0, 0, 0), self.area)
        self.p0.draw(self.screen)
        self.p1.draw(self.screen)
        self.ball.draw(self.screen)

    def step(self, action, agent):
        # update p0, p1 accordingly
        # action: 0: do nothing,
        # action: 1: p[i] move up
        # action: 2: p[i] move down
        if agent == self.agents[0]:
            self.rewards = {a: 0 for a in self.agents}
            self.p0.update(self.area, action)
        elif agent == self.agents[1]:
            self.p1.update(self.area, action)

            # do the rest if not terminated
            if not self.terminate:
                # update ball position
                self.terminate = self.ball.update2(self.area, self.p0, self.p1)

                # do the miscellaneous stuff after the last agent has moved
                # reward is the length of time ball is in play
                reward = 0
                # ball is out-of-bounds
                if self.terminate:
                    reward = self.off_screen_penalty
                    self.score += reward
                if not self.terminate:
                    self.num_frames += 1
                    reward = self.max_reward / self.max_cycles
                    self.score += reward
                    self.truncate = self.num_frames >= self.max_cycles

                for ag in self.agents:
                    self.rewards[ag] = reward
                    self.terminations[ag] = self.terminate
                    self.truncations[ag] = self.truncate
                    self.infos[ag] = {}

        self.render()


[docs] def env(**kwargs): env = raw_env(**kwargs) env = wrappers.AssertOutOfBoundsWrapper(env) env = wrappers.OrderEnforcingWrapper(env) return env
parallel_env = parallel_wrapper_fn(env)
[docs] class raw_env(AECEnv, EzPickle): # class env(MultiAgentEnv): metadata = { "render_modes": ["human", "rgb_array"], "name": "cooperative_pong_v5", "is_parallelizable": True, "render_fps": FPS, "has_manual_policy": True, } def __init__(self, **kwargs): EzPickle.__init__(self, **kwargs) self._kwargs = kwargs self._seed() self.agents = self.env.agents[:] self.possible_agents = self.agents[:] self._agent_selector = agent_selector(self.agents) self.agent_selection = self._agent_selector.reset() # spaces self.action_spaces = dict(zip(self.agents, self.env.action_space)) self.observation_spaces = dict(zip(self.agents, self.env.observation_space)) self.state_space = self.env.state_space # dicts self.observations = {} self.rewards = self.env.rewards self.terminations = self.env.terminations self.truncations = self.env.truncations self.infos = self.env.infos self.score = self.env.score self.render_mode = self.env.render_mode self.screen = None
[docs] def observation_space(self, agent): return self.observation_spaces[agent]
[docs] def action_space(self, agent): return self.action_spaces[agent]
# def convert_to_dict(self, list_of_list): # return dict(zip(self.agents, list_of_list)) def _seed(self, seed=None): self.randomizer, seed = seeding.np_random(seed) self.env = CooperativePong(self.randomizer, **self._kwargs)
[docs] def reset(self, seed=None, options=None): if seed is not None: self._seed(seed=seed) self.env.reset() self.agents = self.possible_agents[:] self.agent_selection = self._agent_selector.reset() self.rewards = self.env.rewards self._cumulative_rewards = {a: 0 for a in self.agents} self.terminations = self.env.terminations self.truncations = self.env.truncations self.infos = self.env.infos
[docs] def observe(self, agent): obs = self.env.observe() return obs
[docs] def state(self): state = self.env.state() return state
[docs] def close(self): self.env.close()
[docs] def render(self): return self.env.render()
[docs] def step(self, action): if ( self.terminations[self.agent_selection] or self.truncations[self.agent_selection] ): self._was_dead_step(action) return agent = self.agent_selection if not self.action_spaces[agent].contains(action): raise Exception( "Action for agent {} must be in Discrete({})." "It is currently {}".format(agent, self.action_spaces[agent].n, action) ) self.env.step(action, agent) # select next agent and observe self.agent_selection = self._agent_selector.next() self.rewards = self.env.rewards self.terminations = self.env.terminations self.truncations = self.env.truncations self.infos = self.env.infos self.score = self.env.score self._cumulative_rewards[agent] = 0 self._accumulate_rewards()
# This was originally created, in full, by Ananth Hari in a different repo, and was # added in by J K Terry (which is why they're shown as the creator in the git history)