Source code for syllabus.curricula.learning_progress

import math
import warnings
from itertools import groupby
from typing import Any, List, Optional, Union

import numpy as np

from syllabus.core import Curriculum
from syllabus.core.evaluator import Evaluator
from syllabus.task_space import DiscreteTaskSpace, MultiDiscreteTaskSpace, StratifiedDiscreteTaskSpace


[docs] def compress_ranges(nums): nums = sorted(set(nums)) ranges = [] for _, group in groupby(enumerate(nums), lambda x: x[1] - x[0]): group = list(group) start, end = group[0][1], group[-1][1] ranges.append(f"{start}" if start == end else f"{start}-{end}") return ", ".join(ranges)
[docs] class LearningProgress(Curriculum): """ Provides an interface for tracking success rates of discrete tasks and sampling tasks based on their success rate using the method from https://arxiv.org/abs/2106.14876. TODO: Support task spaces aside from Discrete """ def __init__( self, evaluator: Evaluator, *args, ema_alpha: float = 0.1, p_theta: float = 0.1, eval_interval: Optional[int] = None, eval_interval_steps: Optional[int] = None, eval_eps: float = 1, baseline_eval_eps: Optional[float] = None, normalize_success: bool = True, continuous_progress: bool = False, **kwargs ): super().__init__(*args, **kwargs) self.evaluator = evaluator self.ema_alpha = ema_alpha self.p_theta = p_theta self.eval_interval = eval_interval assert eval_interval is None or eval_interval_steps is None, "Only one of eval_interval or eval_interval_steps can be set." self.eval_interval_steps = eval_interval_steps self.eval_eps = eval_eps self.completed_episodes = 0 self.current_steps = 0 self.normalize_success = normalize_success self.continuous_progress = continuous_progress self.normalized_task_success_rates = None assert isinstance( self.task_space, (DiscreteTaskSpace, MultiDiscreteTaskSpace) ), f"LearningProgressCurriculum only supports Discrete and MultiDiscrete task spaces. Got {self.task_space.__class__.__name__}." self.random_baseline = None self._p_fast = None self._p_slow = None self._p_true = None self.task_rates = None self.task_dist = None self._stale_dist = True self._baseline_eval_eps = baseline_eval_eps if baseline_eval_eps is not None else eval_eps
[docs] def eval_and_update(self, eval_eps=1): _, task_success_rates, final_success_rates, _ = self.evaluator.evaluate_agent(eval_eps, verbose=True) task_success_rates = final_success_rates.numpy() if self.continuous_progress else task_success_rates.numpy() if self.random_baseline is None: # Assume that any perfect success rate is actually 75% due to evaluation precision. # Prevents NaN probabilities and prevents task from being completely ignored. high_success_idxs = np.where(task_success_rates > 0.75)[0] high_success_rates = task_success_rates[high_success_idxs] if len(high_success_idxs) > 0: warnings.warn( f"Tasks {high_success_idxs} had very high success rates {high_success_rates} for random baseline. Consider removing them from the training set of tasks.") self.random_baseline = np.minimum(task_success_rates, 0.75) # Update task scores self.normalized_task_success_rates = np.maximum( task_success_rates - self.random_baseline, np.zeros(task_success_rates.shape)) / (1.0 - self.random_baseline) task_rates = self.normalized_task_success_rates if self.normalize_success else task_success_rates if self._p_fast is None: # Initial values self._p_fast = task_rates self._p_slow = task_rates self._p_true = task_success_rates else: # Exponential moving average self._p_fast = (task_rates * self.ema_alpha) + (self._p_fast * (1.0 - self.ema_alpha)) self._p_slow = (self._p_fast * self.ema_alpha) + (self._p_slow * (1.0 - self.ema_alpha)) self._p_true = (task_success_rates * self.ema_alpha) + (self._p_true * (1.0 - self.ema_alpha)) self.task_rates = task_success_rates # Used for logging and OMNI self._stale_dist = True self.task_dist = None return task_success_rates
[docs] def update_task_progress(self, task: int, progress: Union[float, bool], env_id: int = None): """ Update the success rate for the given task using a fast and slow exponential moving average. """ super().update_task_progress(task, progress)
[docs] def update_on_episode(self, episode_return: float, length: int, task: Any, progress: Union[float, bool], env_id: int = None) -> None: self.completed_episodes += 1 self.current_steps += length if self.eval_interval is not None and self.completed_episodes % self.eval_interval == 0: self.eval_and_update(eval_eps=self.eval_eps) if self.eval_interval_steps is not None and self.current_steps > self.eval_interval_steps: self.eval_and_update(eval_eps=self.eval_eps) self.current_steps = 0
def _learning_progress(self, reweight: bool = True) -> float: """ Compute the learning progress metric for the given task. """ fast = self._reweight(self._p_fast) if reweight else self._p_fast slow = self._reweight(self._p_slow) if reweight else self._p_slow return abs(fast - slow) def _reweight(self, p: np.ndarray) -> float: """ Reweight the given success rate using the reweighting function from the paper. """ numerator = p * (1.0 - self.p_theta) denominator = p + self.p_theta * (1.0 - 2.0 * p) return numerator / denominator def _sigmoid(self, x: np.ndarray): """ Sigmoid function for reweighting the learning progress.""" return 1 / (1 + np.exp(-x)) def _sample_distribution(self) -> List[float]: """ Return sampling distribution over the task space based on the learning progress.""" if not self._stale_dist: # No changes since distribution was last computed return self.task_dist if self.task_rates is None: self.eval_and_update(self._baseline_eval_eps) task_dist = np.ones(self.num_tasks) / self.num_tasks learning_progress = self._learning_progress() posidxs = [i for i, lp in enumerate(learning_progress) if lp > 0 or self._p_true[i] > 0] any_progress = len(posidxs) > 0 subprobs = learning_progress[posidxs] if any_progress else learning_progress std = np.std(subprobs) subprobs = (subprobs - np.mean(subprobs)) / (std if std else 1) # z-score subprobs = self._sigmoid(subprobs) # sigmoid subprobs = subprobs / np.sum(subprobs) # normalize if any_progress: # If some tasks have nonzero progress, zero out the rest task_dist = np.zeros(len(learning_progress)) task_dist[posidxs] = subprobs else: # If all tasks have 0 progress, return uniform distribution task_dist = subprobs self.task_dist = task_dist self._stale_dist = False return task_dist
[docs] def log_metrics(self, writer, logs, step, log_n_tasks=-1): logs = [] if logs is None else logs learning_progresses = self._learning_progress() logs.append(("curriculum/learning_progress", np.mean(learning_progresses))) if self.task_rates is not None: logs.append(("curriculum/mean_success_rate", np.mean(self.task_rates))) tasks = range(self.num_tasks) if self.num_tasks > log_n_tasks and log_n_tasks != -1: warnings.warn(f"Too many tasks to log {self.num_tasks}. Only logging stats for 1 task.", stacklevel=2) tasks = tasks[:log_n_tasks] for idx in tasks: name = self.task_names(self.tasks[idx], idx) logs.append((f"curriculum/{name}_success_rate", self.task_rates[idx])) logs.append((f"curriculum/{name}_lp", learning_progresses[idx])) return super().log_metrics(writer, logs, step=step, log_n_tasks=log_n_tasks)
[docs] class StratifiedLearningProgress(LearningProgress): def __init__(self, *args, selection_metric="success", **kwargs): super().__init__(*args, **kwargs) assert isinstance(self.task_space, StratifiedDiscreteTaskSpace) assert selection_metric in [ "success", "score", "learnability"], f"Selection metric {selection_metric} not recognized. Use 'success', 'score', or 'learnability'." self.selection_metric = selection_metric def _sample_distribution(self) -> List[float]: # Prioritize tasks by learning progress first lp_dist = super()._sample_distribution() selection_weight = np.ones(len(lp_dist)) * 0.001 if self.selection_metric == "learnability": metric = self.task_rates * (1.0 - self.task_rates) elif self.selection_metric == "score": metric = lp_dist else: metric = self.task_rates # Find the highest success rate task in each strata for strata in self.task_space.strata: task_idx = np.argsort(metric[np.array(list(strata))])[-1] selection_weight[strata[task_idx]] = 1.0 # Scale and normalize stratified_dist = lp_dist * selection_weight stratified_dist = stratified_dist / np.sum(stratified_dist) return stratified_dist
[docs] class StratifiedDomainRandomization(LearningProgress): def __init__(self, *args, selection_metric="success", **kwargs): super().__init__(*args, **kwargs) assert isinstance(self.task_space, StratifiedDiscreteTaskSpace) assert selection_metric in ["success", "progress"] self.selection_metric = selection_metric def _sample_distribution(self) -> List[float]: if not self._stale_dist: # No changes since distribution was last computed return self.task_dist if self.task_rates is None: self.eval_and_update(self._baseline_eval_eps) # Prioritize tasks uniformly first uni_dist = np.ones(self.num_tasks) / self.num_tasks selection_weight = np.ones(len(uni_dist)) * 0.0001 metric = self.task_rates if self.selection_metric == "success" else uni_dist # Find the highest success rate task in each strata for strata in self.task_space.strata: task_idx = np.argsort(metric[np.array(list(strata))])[-1] selection_weight[strata[task_idx]] = 1.0 # Scale and normalize stratified_dist = uni_dist * selection_weight stratified_dist = stratified_dist / np.sum(stratified_dist) self.task_dist = stratified_dist self._stale_dist = False return stratified_dist