Source code for syllabus.curricula.selfplay

""" Self play curricula for training agents against copies themselves. This is an experimental API and subject to change."""

import os
import time
from copy import deepcopy
from typing import List
from collections import OrderedDict
import joblib
import numpy as np
from gymnasium import spaces
from scipy.special import softmax
from queue import Queue

from syllabus.core import Agent, Curriculum  # noqa: E402
from syllabus.task_space import DiscreteTaskSpace  # noqa: E402


class WinrateBuffer:
    """
    Stores the winrate of each agent in a queue and provides a sampling distribution.
    """

    def __init__(
        self,
        max_agents: int,
        entropy_parameter: float,
        smoothing_constant: int,
        buffer_size: int = 128,
    ):
        self.max_agents = max_agents
        self.buffer_size = buffer_size
        self.buffer = {i: Queue(maxsize=buffer_size) for i in range(max_agents)}
        self.entropy_parameter = entropy_parameter
        self.smoothing_constant = smoothing_constant
        self.initialized_agents = np.zeros(max_agents)

    def update_winrate(self, agent_id: int, reward: float):
        reward = reward == 1  # converts rewards {-1;1} to winrate {0;1}
        self.buffer[agent_id].put(reward)
        if self.buffer[agent_id].full():
            self.buffer[agent_id].get()

        # mark agent as initialized
        # unitiliazed agents will be masked from the sampling distribution
        if not self.initialized_agents[agent_id]:
            self.initialized_agents[agent_id] = 1

    def get_winrate(self, agent_id: int):
        # TODO: should we return a winrate if the queue is not full?
        if self.buffer[agent_id].empty():
            return 0.0
        return np.mean(self.buffer[agent_id].queue)

    def _apply_entropy(self, winrate: float):
        if np.isnan(winrate):
            return 0.0
        return winrate**self.entropy_parameter

    def get_sampling_distribution(self):
        """
        Return a sampling distribution reflecting the difficulty of each opponent.
        Uninitialized agents are masked and not included in the distribution.
        """
        loss_rates = np.array([1 - self.get_winrate(i) for i in range(self.max_agents)])

        # mask uninitialized agents
        masked_loss_rates = np.ma.masked_array(
            loss_rates, mask=self.initialized_agents == 0
        )

        # apply the entropy function, smoothing and normalization to all valid loss rates
        masked_loss_rates = np.ma.array(
            [self._apply_entropy(winrate) for winrate in masked_loss_rates]
        )
        masked_loss_rates += self.smoothing_constant
        masked_sampling_distribution = masked_loss_rates / masked_loss_rates.sum()

        # unmask and set masked values to 0
        sampling_distribution = np.where(
            masked_sampling_distribution.mask, 0, masked_sampling_distribution
        )

        # if no agents are initialized, sample the first agent
        # this happens when the first agent has not yet receiveda reward
        if sampling_distribution.sum() == 0:
            sampling_distribution = np.zeros(self.max_agents)
            sampling_distribution[0] = 1.0

        return sampling_distribution

    def __repr__(self):
        return {i: self.get_winrate(i) for i in range(self.max_agents)}.__repr__()

    def __getitem__(self, agent_id):
        return self.get_winrate(agent_id)


class FIFOAgentBuffer:
    """
    First-In-First-Out buffer implemented as an OrderedDict.
    """

    def __init__(
        self,
        max_agents: int,
        curriculum_name: str,
        device: str,
        storage_path: str,
        seed: int,
    ):
        self.max_agents = max_agents
        self.curriculum_name = curriculum_name
        self.device = device
        self.storage_path = storage_path
        self.seed = seed
        self.buffer = OrderedDict()

    def add_agent(self, agent_id: int, agent: Agent) -> None:
        # Remove first item so that buffer length does not exceed max_agents
        if len(self.buffer) >= self.max_agents:
            self.buffer.popitem(last=False)
        self.buffer[agent_id] = agent
        # Move recently accessed agent to end of buffer (last to be removed)
        self.buffer.move_to_end(agent_id)

    def get_agent(self, agent_id: int) -> Agent:
        if agent_id not in self.buffer:
            # Delete first so that buffer length does not exceed max_agents
            if len(self.buffer) >= self.max_agents:
                self.buffer.popitem(last=False)
            print(
                "load agent",
                agent_id,
                f"{self.storage_path}/{self.curriculum_name}_{self.seed}_agent_checkpoint_{agent_id}.pkl",
            )
            self.buffer[agent_id] = joblib.load(
                f"{self.storage_path}/{self.curriculum_name}_{self.seed}_agent_checkpoint_{agent_id}.pkl"
            ).to(self.device)

        return self.buffer[agent_id]

    def __getitem__(self, agent_id):
        return self.buffer.get(agent_id, None)

    def __contains__(self, key):
        return key in self.buffer

    def __len__(self):
        return len(self.buffer)

    def __repr__(self):
        return self.buffer.__repr__()


class FILOAgentBuffer:
    """
    First-In-Last-Out buffer implemented as an OrderedDict.
    """

    def __init__(
        self,
        max_agents: int,
        curriculum_name: str,
        device: str,
        storage_path: str,
        seed: int,
    ):
        self.max_agents = max_agents
        self.curriculum_name = curriculum_name
        self.device = device
        self.storage_path = storage_path
        self.seed = seed
        self.buffer = OrderedDict()

    def add_agent(self, agent_id: int, agent: Agent) -> None:
        if agent_id in self.buffer:
            del self.buffer[agent_id]
        elif len(self.buffer) >= self.max_agents:
            self.buffer.popitem(last=False)
        self.buffer[agent_id] = agent

    def get_agent(self, agent_id: int) -> Agent:
        if agent_id not in self.buffer:
            print(
                "load agent",
                agent_id,
                f"{self.storage_path}/{self.curriculum_name}_{self.seed}_agent_checkpoint_{agent_id}.pkl",
            )
            self.buffer.add(
                agent_id,
                joblib.load(
                    f"{self.storage_path}/{self.curriculum_name}_{self.seed}_agent_checkpoint_{agent_id}.pkl"
                ).to(self.device),
            )

        return self.buffer[agent_id]

    def __getitem__(self, agent_id):
        return self.buffer.get(agent_id, None)

    def __contains__(self, key):
        return key in self.buffer

    def __len__(self):
        return len(self.buffer)

    def __repr__(self):
        return self.buffer.__repr__()


class WinrateBuffer:
    """
    Stores the winrate of each agent in a queue and provides a sampling distribution.
    """

    def __init__(
        self,
        entropy_parameter: float,
        smoothing_constant: int,
        buffer_size: int = 128,
    ):
        self.buffer_size = buffer_size
        self.buffer = {}
        self.entropy_parameter = entropy_parameter
        self.smoothing_constant = smoothing_constant

    def update_winrate(self, agent_id: int, reward: float):
        if agent_id not in self.buffer:
            self.buffer[agent_id] = Queue(maxsize=self.buffer_size)

        reward = reward == 1  # converts rewards {-1;1} to winrate {0;1}
        self.buffer[agent_id].put(reward)
        if self.buffer[agent_id].full():
            self.buffer[agent_id].get()

    def get_winrate(self, agent_id: int):
        if agent_id not in self.buffer:
            return 0.0

        return np.mean(self.buffer[agent_id].queue)

    def _apply_entropy(self, winrate: float):
        if np.isnan(winrate):
            return 0.0
        return winrate**self.entropy_parameter

    def __repr__(self):
        return {i: self.get_winrate(i) for i in range(len(self.buffer))}.__repr__()

    def __getitem__(self, agent_id):
        return self.get_winrate(agent_id)


[docs] class SelfPlay(Curriculum): """Self play curriculum for training agents against themselves.""" def __init__( self, task_space: DiscreteTaskSpace, agent: Agent, device: str, ): """Initialize the self play curriculum. :param task_space: The task space of the environment :param agent: The initial agent to play against :param device: The device to run the agent on """ # Self play can only return agent_id == 0 assert ( isinstance(task_space, DiscreteTaskSpace) and task_space.num_tasks == 1 ), "Self play only supports DiscreteTaskSpaces with a single element." super().__init__(task_space) self.device = device self.agent = deepcopy(agent).to(self.device) self.task_space = DiscreteTaskSpace(1) # SelfPlay can only return agent_id = 0 self.history = { "winrate": 0, "n_games": 0, }
[docs] def add_agent(self, agent: Agent) -> int: # TODO: Perform copy in RAM instead of VRAM self.agent = deepcopy(agent).to(self.device) return 0
[docs] def get_agent(self, agent_id: int) -> Agent: assert agent_id == 0, ( f"Self play only tracks the current agent." f"Expected agent id 0, got {agent_id}" ) return self.agent
def _sample_distribution(self) -> List[float]: return [1.0]
[docs] def sample(self, k=1): return [0 for _ in range(k)]
[docs] def update_winrate(self, agent_id: int, reward: int) -> None: """ Uses an incremental mean to update an agent's winrate. This assumes that reward is positive for a win and negative for a loss. Not used for sampling. :param agent_id: Identifier of the agent :param reward: Reward received by the agent """ win = reward > 0 # converts the reward to 0 or 1 self.history["n_games"] += 1 old_winrate = self.history["winrate"] n = self.history["n_games"] # TODO: Is this formula correct? # I think it should be ((old_winrate * n) + win) / (n+1) (where n is the value before you add 1) # old_winrate * n is the old # of wins. Then add win to get the new number of wins. Divide by the new number of games. self.history["winrate"] = old_winrate + (win - old_winrate) / n
[docs] def log_metrics(self, writer, logs, step=None, log_n_tasks=1): """Log metrics for the curriculum.""" logs.append("winrate", self.history["winrate"]) logs.append("n_games", self.history["n_games"]) super().log_metrics(writer, logs, step, log_n_tasks)
[docs] class FictitiousSelfPlay(Curriculum): def __init__( self, task_space: DiscreteTaskSpace, agent: Agent, device: str, storage_path: str, max_agents: int, seed: int = 0, max_loaded_agents: int = 10, ): super().__init__(task_space) self.uid = int(time.time()) self.device = device self.storage_path = storage_path self.seed = seed if not os.path.exists(self.storage_path): os.makedirs(self.storage_path, exist_ok=True) self.current_agent_index = 0 self.max_agents = max_agents self.task_space = DiscreteTaskSpace(self.max_agents) self.loaded_agents = FIFOAgentBuffer( max_loaded_agents, self.__class__.__name__, device, storage_path, seed ) self.history = { "winrate": 0, "n_games": 0, } self.max_loaded_agents = max_loaded_agents self.add_agent(agent) # creates the initial opponent
[docs] def add_agent(self, agent): """ Saves the current agent instance to a pickle file and adds it to the loaded agents buffer. When the `max_agents` limit is met, older agent checkpoints are overwritten. """ # TODO: Check that this doesn't move original agent to cpu agent = agent.to("cpu") joblib.dump( agent, filename=( f"{self.storage_path}/{self.__class__.__name__}_{self.seed}_agent_checkpoint_" f"{self.current_agent_index}.pkl" ), ) agent = agent.to(self.device) self.loaded_agents.add_agent(self.current_agent_index, agent) self.current_agent_index += 1
[docs] def get_agent(self, agent_id: int) -> Agent: """Loads an agent from the buffer of saved agents.""" return self.loaded_agents.get_agent(agent_id)
def _sample_distribution(self) -> List[float]: # Number of saved agents up to max_agents n_agents = min(self.current_agent_index, self.max_agents) return [1.0 / n_agents for _ in range(n_agents)] + [ 0.0 for _ in range(self.max_agents - n_agents) ]
[docs] def sample(self, k=1): probs = self._sample_distribution() # max_agents below the highest agent index, but not below 0 min_agent_id = max(0, self.current_agent_index - self.max_agents) sample = list( np.random.choice( np.arange(min_agent_id, min_agent_id + self.max_agents), p=probs, size=k, ) ) return sample
[docs] def log_metrics(self, writer, logs, step=None, log_n_tasks=1): """Log metrics for the curriculum.""" logs.append("winrate", self.history["winrate"]) logs.append("games_played", self.history["n_games"]) logs.append("stored_agents", len(self.loaded_agents)) super().log_metrics(writer, logs, step, log_n_tasks)
[docs] class PrioritizedFictitiousSelfPlay(Curriculum): def __init__( self, task_space: DiscreteTaskSpace, agent: Agent, device: str, storage_path: str, max_agents: int, entropy_parameter: float, smoothing_constant: int, seed: int = 0, max_loaded_agents: int = 10, ): super().__init__(task_space) self.uid = int(time.time()) self.device = device self.storage_path = storage_path self.seed = seed if not os.path.exists(self.storage_path): os.makedirs(self.storage_path, exist_ok=True) self.current_agent_index = 0 self.max_agents = max_agents self.task_space = DiscreteTaskSpace(self.max_agents) self.winrate_buffer = WinrateBuffer(entropy_parameter, smoothing_constant) self.loaded_agents = FIFOAgentBuffer( max_loaded_agents, self.__class__.__name__, device, storage_path, seed ) self.history = { "winrate": 0, "n_games": 0, } self.max_loaded_agents = max_loaded_agents self.add_agent(agent) # creates the initial opponent
[docs] def add_agent(self, agent): """ Saves the current agent instance to a pickle file and adds it to the loaded agents buffer. When the `max_agents` limit is met, older agent checkpoints are overwritten. """ # TODO: Check that this doesn't move original agent to cpu agent = agent.to("cpu") joblib.dump( agent, filename=( f"{self.storage_path}/{self.__class__.__name__}_{self.seed}_agent_checkpoint_" f"{self.current_agent_index}.pkl" ), ) agent = agent.to(self.device) self.loaded_agents.add_agent(self.current_agent_index, agent) self.current_agent_index += 1
[docs] def update_winrate(self, opponent_id: int, learner_reward: int) -> None: self.winrate_buffer.update_winrate(opponent_id, learner_reward)
[docs] def get_agent(self, agent_id: int) -> Agent: """ Samples an agent id from the softmax distribution induced by winrates then loads the selected agent from the buffer of saved agents. """ # TODO: add sampling from the distribution if self.loaded_agents[agent_id] is None: if len(self.loaded_agents) >= self.max_loaded_agents: pass print( "get agent", agent_id, f"{self.storage_path}/{self.__class__.__name__}_{self.seed}_agent_checkpoint_{agent_id}.pkl", ) self.loaded_agents.add_agent( agent_id, joblib.load( f"{self.storage_path}/{self.__class__.__name__}_{self.seed}_agent_checkpoint_{agent_id}.pkl" ).to(self.device), ) return self.loaded_agents[agent_id]
[docs] def sample(self, k=1): """ Samples k agents from the buffer of saved agents, prioritizing opponents with higher winrates. Uninitialized agents are masked and not included in the distribution. """ loss_rates = np.array( [ 1 - self.winrate_buffer.get_winrate(i) for i in self.loaded_agents.buffer.keys() ] ) n_stored_agents = len(loss_rates) loaded_agent_keys = list(self.loaded_agents.buffer.keys()) # pad loss rates and agent keys if the buffer is not full if n_stored_agents < self.max_agents: def pad(x): return np.pad( x, pad_width=(0, self.max_agents - n_stored_agents), constant_values=0.0, ) loss_rates = pad(loss_rates) loaded_agent_keys = pad(loaded_agent_keys) # mask uninitialized agents # masked_loss_rates = np.ma.masked_array( # loss_rates, # mask=[self.winrate_buffer.initialized_agents == 0][:n_stored_agents], # ) # apply the entropy function, smoothing and normalization to all valid loss rates masked_loss_rates = np.ma.array( [ self.winrate_buffer._apply_entropy(winrate) for winrate in loss_rates ] ) masked_loss_rates += self.winrate_buffer.smoothing_constant masked_sampling_distribution = masked_loss_rates / masked_loss_rates.sum() # unmask and set masked values to 0 sampling_distribution = np.where( masked_sampling_distribution.mask, 0, masked_sampling_distribution ) # if no agents are initialized, sample the first agent # this happens when the first agent has not yet receiveda reward if sampling_distribution.sum() == 0: sampling_distribution = np.zeros(self.max_agents) sampling_distribution[0] = 1.0 return list( np.random.choice( loaded_agent_keys, p=sampling_distribution, size=k, ) )
[docs] def log_metrics(self, writer, logs, step=None, log_n_tasks=1): """Log metrics for the curriculum.""" logs.append("winrate", self.history["winrate"]) logs.append("games_played", self.history["n_games"]) logs.append("stored_agents", len(self.loaded_agents)) super().log_metrics(writer, logs, step, log_n_tasks)