# noqa: D212, D415
"""
# Connect Four
```{figure} classic_connect_four.gif
:width: 140px
:name: connect_four
```
This environment is part of the <a href='..'>classic environments</a>. Please read that page first for general information.
| Import | `from pettingzoo.classic import connect_four_v3` |
|--------------------|--------------------------------------------------|
| Actions | Discrete |
| Parallel API | Yes |
| Manual Control | No |
| Agents | `agents= ['player_0', 'player_1']` |
| Agents | 2 |
| Action Shape | (1,) |
| Action Values | Discrete(7) |
| Observation Shape | (6, 7, 2) |
| Observation Values | [0,1] |
Connect Four is a 2-player turn based game, where players must connect four of their tokens vertically, horizontally or diagonally. The players drop their respective token in a column of a standing grid, where each token will fall until it reaches the bottom of the column or reaches an existing
token. Players cannot place a token in a full column, and the game ends when either a player has made a sequence of 4 tokens, or when all 7 columns have been filled.
### Observation Space
The observation is a dictionary which contains an `'observation'` element which is the usual RL observation described below, and an `'action_mask'` which holds the legal moves, described in the Legal Actions Mask section.
The main observation space is 2 planes of a 6x7 grid. Each plane represents a specific agent's tokens, and each location in the grid represents the placement of the corresponding agent's token. 1 indicates that the agent has a token placed in that cell, and 0 indicates they do not have a token in
that cell. A 0 means that either the cell is empty, or the other agent has a token in that cell.
#### Legal Actions Mask
The legal moves available to the current agent are found in the `action_mask` element of the dictionary observation. The `action_mask` is a binary vector where each index of the vector represents whether the action is legal or not. The `action_mask` will be all zeros for any agent except the one
whose turn it is. Taking an illegal move ends the game with a reward of -1 for the illegally moving agent and a reward of 0 for all other agents.
### Action Space
The action space is the set of integers from 0 to 6 (inclusive), where the action represents which column a token should be dropped in.
### Rewards
If an agent successfully connects four of their tokens, they will be rewarded 1 point. At the same time, the opponent agent will be awarded -1 points. If the game ends in a draw, both players are rewarded 0.
### Version History
* v3: Fixed bug in arbitrary calls to observe() (1.8.0)
* v2: Legal action mask in observation replaced illegal move list in infos (1.5.0)
* v1: Bumped version of all environments due to adoption of new agent iteration scheme where all agents are iterated over after they are done (1.4.0)
* v0: Initial versions release (1.0.0)
"""
from __future__ import annotations
import os
import gymnasium
import numpy as np
import pygame
from gymnasium import spaces
from gymnasium.utils import EzPickle
from pettingzoo import AECEnv
from pettingzoo.utils import wrappers
from pettingzoo.utils.agent_selector import agent_selector
def get_image(path):
from os import path as os_path
import pygame
cwd = os_path.dirname(__file__)
image = pygame.image.load(cwd + "/" + path)
sfc = pygame.Surface(image.get_size(), flags=pygame.SRCALPHA)
sfc.blit(image, (0, 0))
return sfc
[docs]
def env(**kwargs):
env = raw_env(**kwargs)
env = wrappers.TerminateIllegalWrapper(env, illegal_reward=-1)
env = wrappers.AssertOutOfBoundsWrapper(env)
env = wrappers.OrderEnforcingWrapper(env)
return env
[docs]
class raw_env(AECEnv, EzPickle):
metadata = {
"render_modes": ["human", "rgb_array"],
"name": "connect_four_v3",
"is_parallelizable": False,
"render_fps": 2,
}
def __init__(self, render_mode: str | None = None, screen_scaling: int = 9):
EzPickle.__init__(self, render_mode, screen_scaling)
super().__init__()
# 6 rows x 7 columns
# blank space = 0
# agent 0 -- 1
# agent 1 -- 2
# flat representation in row major order
self.screen = None
self.render_mode = render_mode
self.screen_scaling = screen_scaling
self.board = [0] * (6 * 7)
self.agents = ["player_0", "player_1"]
self.possible_agents = self.agents[:]
self.action_spaces = {i: spaces.Discrete(7) for i in self.agents}
self.observation_spaces = {
i: spaces.Dict(
{
"observation": spaces.Box(
low=0, high=1, shape=(6, 7, 2), dtype=np.int8
),
"action_mask": spaces.Box(low=0, high=1, shape=(7,), dtype=np.int8),
}
)
for i in self.agents
}
if self.render_mode == "human":
self.clock = pygame.time.Clock()
# Key
# ----
# blank space = 0
# agent 0 = 1
# agent 1 = 2
# An observation is list of lists, where each list represents a row
#
# array([[0, 1, 1, 2, 0, 1, 0],
# [1, 0, 1, 2, 2, 2, 1],
# [0, 1, 0, 0, 1, 2, 1],
# [1, 0, 2, 0, 1, 1, 0],
# [2, 0, 0, 0, 1, 1, 0],
# [1, 1, 2, 1, 0, 1, 0]], dtype=int8)
[docs]
def observe(self, agent):
board_vals = np.array(self.board).reshape(6, 7)
cur_player = self.possible_agents.index(agent)
opp_player = (cur_player + 1) % 2
cur_p_board = np.equal(board_vals, cur_player + 1)
opp_p_board = np.equal(board_vals, opp_player + 1)
observation = np.stack([cur_p_board, opp_p_board], axis=2).astype(np.int8)
legal_moves = self._legal_moves() if agent == self.agent_selection else []
action_mask = np.zeros(7, "int8")
for i in legal_moves:
action_mask[i] = 1
return {"observation": observation, "action_mask": action_mask}
[docs]
def observation_space(self, agent):
return self.observation_spaces[agent]
[docs]
def action_space(self, agent):
return self.action_spaces[agent]
def _legal_moves(self):
return [i for i in range(7) if self.board[i] == 0]
# action in this case is a value from 0 to 6 indicating position to move on the flat representation of the connect4 board
[docs]
def step(self, action):
if (
self.truncations[self.agent_selection]
or self.terminations[self.agent_selection]
):
return self._was_dead_step(action)
# assert valid move
assert self.board[0:7][action] == 0, "played illegal move."
piece = self.agents.index(self.agent_selection) + 1
for i in list(filter(lambda x: x % 7 == action, list(range(41, -1, -1)))):
if self.board[i] == 0:
self.board[i] = piece
break
next_agent = self._agent_selector.next()
winner = self.check_for_winner()
# check if there is a winner
if winner:
self.rewards[self.agent_selection] += 1
self.rewards[next_agent] -= 1
self.terminations = {i: True for i in self.agents}
# check if there is a tie
elif all(x in [1, 2] for x in self.board):
# once either play wins or there is a draw, game over, both players are done
self.terminations = {i: True for i in self.agents}
self.agent_selection = next_agent
self._accumulate_rewards()
if self.render_mode == "human":
self.render()
[docs]
def reset(self, seed=None, options=None):
# reset environment
self.board = [0] * (6 * 7)
self.agents = self.possible_agents[:]
self.rewards = {i: 0 for i in self.agents}
self._cumulative_rewards = {name: 0 for name in self.agents}
self.terminations = {i: False for i in self.agents}
self.truncations = {i: False for i in self.agents}
self.infos = {i: {} for i in self.agents}
self._agent_selector = agent_selector(self.agents)
self.agent_selection = self._agent_selector.reset()
[docs]
def render(self):
if self.render_mode is None:
gymnasium.logger.warn(
"You are calling render method without specifying any render mode."
)
return
screen_width = 99 * self.screen_scaling
screen_height = 86 / 99 * screen_width
if self.screen is None:
pygame.init()
if self.render_mode == "human":
pygame.display.set_caption("Connect Four")
self.screen = pygame.display.set_mode((screen_width, screen_height))
elif self.render_mode == "rgb_array":
self.screen = pygame.Surface((screen_width, screen_height))
# Load and scale all of the necessary images
tile_size = (screen_width * (91 / 99)) / 7
red_chip = get_image(os.path.join("img", "C4RedPiece.png"))
red_chip = pygame.transform.scale(
red_chip, (int(tile_size * (9 / 13)), int(tile_size * (9 / 13)))
)
black_chip = get_image(os.path.join("img", "C4BlackPiece.png"))
black_chip = pygame.transform.scale(
black_chip, (int(tile_size * (9 / 13)), int(tile_size * (9 / 13)))
)
board_img = get_image(os.path.join("img", "Connect4Board.png"))
board_img = pygame.transform.scale(
board_img, ((int(screen_width)), int(screen_height))
)
self.screen.blit(board_img, (0, 0))
# Blit the necessary chips and their positions
for i in range(0, 42):
if self.board[i] == 1:
self.screen.blit(
red_chip,
(
(i % 7) * (tile_size) + (tile_size * (6 / 13)),
int(i / 7) * (tile_size) + (tile_size * (6 / 13)),
),
)
elif self.board[i] == 2:
self.screen.blit(
black_chip,
(
(i % 7) * (tile_size) + (tile_size * (6 / 13)),
int(i / 7) * (tile_size) + (tile_size * (6 / 13)),
),
)
if self.render_mode == "human":
pygame.event.pump()
pygame.display.update()
self.clock.tick(self.metadata["render_fps"])
observation = np.array(pygame.surfarray.pixels3d(self.screen))
return (
np.transpose(observation, axes=(1, 0, 2))
if self.render_mode == "rgb_array"
else None
)
[docs]
def close(self):
if self.screen is not None:
pygame.quit()
self.screen = None
def check_for_winner(self):
board = np.array(self.board).reshape(6, 7)
piece = self.agents.index(self.agent_selection) + 1
# Check horizontal locations for win
column_count = 7
row_count = 6
for c in range(column_count - 3):
for r in range(row_count):
if (
board[r][c] == piece
and board[r][c + 1] == piece
and board[r][c + 2] == piece
and board[r][c + 3] == piece
):
return True
# Check vertical locations for win
for c in range(column_count):
for r in range(row_count - 3):
if (
board[r][c] == piece
and board[r + 1][c] == piece
and board[r + 2][c] == piece
and board[r + 3][c] == piece
):
return True
# Check positively sloped diagonals
for c in range(column_count - 3):
for r in range(row_count - 3):
if (
board[r][c] == piece
and board[r + 1][c + 1] == piece
and board[r + 2][c + 2] == piece
and board[r + 3][c + 3] == piece
):
return True
# Check negatively sloped diagonals
for c in range(column_count - 3):
for r in range(3, row_count):
if (
board[r][c] == piece
and board[r - 1][c + 1] == piece
and board[r - 2][c + 2] == piece
and board[r - 3][c + 3] == piece
):
return True
return False