""" Self play curricula for training agents against copies themselves. This is an experimental API and subject to change."""
import os
import time
from copy import deepcopy
from typing import List, TypeVar
import joblib
import numpy as np
from gymnasium import spaces
from scipy.special import softmax
from syllabus.core import Curriculum, Agent # noqa: E402
from syllabus.task_space import TaskSpace # noqa: E402
[docs]class SelfPlay(Curriculum):
"""Self play curriculum for training agents against themselves."""
def __init__(
self,
task_space: TaskSpace,
agent: Agent,
device: str,
):
""" Initialize the self play curriculum.
:param task_space: The task space of the environment
:param agent: The initial agent to play against
:param device: The device to run the agent on
"""
super().__init__(task_space)
self.device = device
self.agent = deepcopy(agent).to(self.device)
self.task_space = TaskSpace(
spaces.Discrete(1)
) # SelfPlay can only return agent_id = 0
self.history = {
"winrate": 0,
"n_games": 0,
}
[docs] def add_agent(self, agent: Agent) -> int:
self.agent = deepcopy(agent).to(self.device)
return 0
[docs] def get_agent(self, agent_id: int) -> Agent:
if agent_id is None:
agent_id = 0
assert agent_id == 0, (
f"Self play only tracks the current agent."
f"Expected agent id 0, got {agent_id}"
)
return self.agent
def _sample_distribution(self) -> List[float]:
return [1.0]
[docs] def sample(self, k=1):
return [0 for _ in range(k)]
[docs] def update_winrate(self, agent_id: int, reward: int) -> None:
"""
Uses an incremental mean to update an agent's winrate. This assumes that reward
is positive for a win and negative for a loss. Not used for sampling.
:param agent_id: Identifier of the agent
:param reward: Reward received by the agent
"""
win = reward > 0 # converts the reward to 0 or 1
self.history["n_games"] += 1
old_winrate = self.history["winrate"]
n = self.history["n_games"]
self.history["winrate"] = old_winrate + (win - old_winrate) / n
[docs] def log_metrics(self, writer, logs, step=None, log_n_tasks=1):
"""Log metrics for the curriculum."""
logs.append("winrate", self.history["winrate"])
logs.append("n_games", self.history["n_games"])
super().log_metrics(writer, logs, step, log_n_tasks)
[docs]class FictitiousSelfPlay(Curriculum):
def __init__(
self,
task_space: TaskSpace,
agent: Agent,
device: str,
storage_path: str,
max_agents: int,
seed: int = 0,
max_loaded_agents: int = 1,
):
super().__init__(task_space)
self.uid = int(time.time())
self.device = device
self.storage_path = storage_path
self.seed = seed
if not os.path.exists(self.storage_path):
os.makedirs(self.storage_path, exist_ok=True)
self.current_agent_index = 0
self.max_agents = max_agents
self.task_space = TaskSpace(spaces.Discrete(self.max_agents))
self.add_agent(agent) # creates the initial opponent
self.history = {
i: {
"winrate": 0,
"n_games": 0,
}
for i in range(self.max_agents)
}
self.loaded_agents = {i: None for i in range(self.max_agents)}
self.n_loaded_agents = 0
self.max_loaded_agents = max_loaded_agents
[docs] def add_agent(self, agent):
"""
Saves the current agent instance to a pickle file.
When the `max_agents` limit is met, older agent checkpoints are overwritten.
"""
agent = agent.to("cpu")
joblib.dump(
agent,
filename=(
f"{self.storage_path}/{self.name}_{self.seed}_agent_checkpoint_"
f"{self.current_agent_index % self.max_agents}.pkl"
),
)
agent = agent.to(self.device)
if self.current_agent_index < self.max_agents:
self.current_agent_index += 1
[docs] def update_winrate(self, opponent_id: int, opponent_reward: int) -> None:
"""
Uses an incremental mean to update the opponent's winrate i.e. priority.
This implies that sampling according to the winrates returns the most
challenging opponents.
"""
opponent_reward = opponent_reward > 0 # converts the reward to 0 or 1
self.history[opponent_id]["n_games"] += 1
old_winrate = self.history[opponent_id]["winrate"]
n = self.history[opponent_id]["n_games"]
self.history[opponent_id]["winrate"] = (
old_winrate + (opponent_reward - old_winrate) / n
)
[docs] def get_agent(self, agent_id: int) -> Agent:
"""Loads an agent from the buffer of saved agents."""
if self.loaded_agents[agent_id] is None:
if self.n_loaded_agents >= self.max_loaded_agents:
pass
print(
"get agent",
agent_id,
f"{self.storage_path}/{self.__class__.__name__}_{self.seed}_agent_checkpoint_{agent_id}.pkl",
)
self.loaded_agents[agent_id] = joblib.load(
f"{self.storage_path}/{self.__class__.__name__}_{self.seed}_agent_checkpoint_{agent_id}.pkl"
).to(self.device)
return self.loaded_agents[agent_id]
def _sample_distribution(self) -> List[float]:
return [1.0 / self.n_loaded_agents for _ in range(self.n_loaded_agents)] \
+ [0.0 for _ in range(self.max_agents - self.n_loaded_agents)]
[docs] def sample(self, k=1):
probs = self._sample_distribution()
return list(np.random.choice(
np.arange(self.current_agent_index),
p=probs,
size=k,
))
[docs] def log_metrics(self, writer, logs, step=None, log_n_tasks=1):
"""Log metrics for the curriculum."""
logs.append("winrate", self.history["winrate"])
logs.append("games_played", self.history["n_games"])
logs.append("stored_agents", self.n_loaded_agents)
super().log_metrics(writer, logs, step, log_n_tasks)
[docs]class PrioritizedFictitiousSelfPlay(Curriculum):
def __init__(
self,
task_space: TaskSpace,
agent: Agent,
device: str,
storage_path: str,
max_agents: int,
seed: int = 0,
max_loaded_agents: int = 1,
):
super().__init__(task_space)
self.uid = int(time.time())
self.device = device
self.storage_path = storage_path
self.seed = seed
if not os.path.exists(self.storage_path):
os.makedirs(self.storage_path, exist_ok=True)
self.current_agent_index = 0
self.max_agents = max_agents
self.task_space = TaskSpace(spaces.Discrete(self.max_agents))
self.add_agent(agent) # creates the initial opponent
self.history = {
i: {
"winrate": 0,
"n_games": 0,
}
for i in range(self.max_agents)
}
self.loaded_agents = {i: None for i in range(self.max_agents)}
self.n_loaded_agents = 0
self.max_loaded_agents = max_loaded_agents
[docs] def add_agent(self, agent) -> None:
"""
Saves the current agent instance to a pickle file and update
its priority.
"""
agent = agent.to("cpu")
joblib.dump(
agent,
filename=(
f"{self.storage_path}/{self.__class__.__name__}_{self.seed}_agent_checkpoint_"
f"{self.current_agent_index % self.max_agents}.pkl"
),
)
agent = agent.to(self.device)
if self.current_agent_index < self.max_agents:
self.current_agent_index += 1
[docs] def update_winrate(self, opponent_id: int, opponent_reward: int) -> None:
"""
Uses an incremental mean to update the opponent's winrate i.e. priority.
This implies that sampling according to the winrates returns the most
challenging opponents.
"""
opponent_reward = opponent_reward > 0 # converts the reward to 0 or 1
self.history[opponent_id]["n_games"] += 1
old_winrate = self.history[opponent_id]["winrate"]
n = self.history[opponent_id]["n_games"]
self.history[opponent_id]["winrate"] = (
old_winrate + (opponent_reward - old_winrate) / n
)
[docs] def get_agent(self, agent_id: int) -> Agent:
"""
Samples an agent id from the softmax distribution induced by winrates
then loads the selected agent from the buffer of saved agents.
"""
if self.loaded_agents[agent_id] is None:
if self.n_loaded_agents >= self.max_loaded_agents:
pass
print(
"get agent",
agent_id,
f"{self.storage_path}/{self.__class__.__name__}_{self.seed}_agent_checkpoint_{agent_id}.pkl",
)
self.loaded_agents[agent_id] = joblib.load(
f"{self.storage_path}/{self.__class__.__name__}_{self.seed}_agent_checkpoint_{agent_id}.pkl"
).to(self.device)
return self.loaded_agents[agent_id]
def _sample_distribution(self) -> List[float]:
logits = [
self.history[agent_id]["winrate"]
for agent_id in range(self.current_agent_index)
]
return softmax(logits)
[docs] def sample(self, k=1):
""" Samples k agents from the buffer of saved agents, prioritizing opponents with higher winrates."""
probs = self._sample_distribution()
return list(np.random.choice(
np.arange(self.current_agent_index),
p=probs,
size=k,
))
[docs] def log_metrics(self, writer, logs, step=None, log_n_tasks=1):
"""Log metrics for the curriculum."""
logs.append("winrate", self.history["winrate"])
logs.append("games_played", self.history["n_games"])
logs.append("stored_agents", self.n_loaded_agents)
super().log_metrics(writer, logs, step, log_n_tasks)