# Code heavily based on the original Prioritized Level Replay implementation from https://github.com/facebookresearch/level-replay
# If you use this code, please cite the above codebase and original PLR paper: https://arxiv.org/abs/2010.03934
import threading
import time
import gymnasium as gym
import numpy as np
import torch
from collections import defaultdict, deque
from syllabus.core.evaluator import Evaluator
from syllabus.task_space import TaskSpace
INT32_MAX = 2147483647
np.seterr(all='raise')
[docs]
class TaskSampler:
""" Task sampler for Prioritized Level Replay (PLR)
Args:
tasks (list): List of tasks to sample from
action_space (gym.spaces.Space): Action space of the environment
num_actors (int): Number of actors/processes
strategy (str): Strategy for sampling tasks. Some possible values include
"random", "sequential", "policy_entropy", "least_confidence",
"min_margin", "gae", "value_l1", "one_step_td_error", "signed_value_loss",
"positive_value_loss", "grounded_signed_value_loss", "grounded_positive_value_loss",
"alt_advantage_abs", "uniform", "off".
replay_schedule (str): Schedule for sampling replay levels. One of "fixed" or "proportionate".
score_transform (str): Transform to apply to task scores. One of "constant", "max", "eps_greedy",
"rank", "power", "softmax", etc.
temperature (float): Temperature for score transform. Increasing temperature makes the sampling
distribution more uniform.
eps (float): Epsilon for eps-greedy score transform.
rho (float): Proportion of seen tasks before replay sampling is allowed.
nu (float): Probability of sampling a replay level if using a fixed replay_schedule.
alpha (float): Linear interpolation weight for score updates (0.0 uses old scores only, 1.0 uses new).
staleness_coef (float): Linear interpolation weight for task staleness vs. task score.
staleness_transform (str): Transform to apply to task staleness (e.g. "power").
staleness_temperature (float): Temperature for staleness transform.
max_score_coef (float): Interpolation weight for combining max_score and mean_score.
sample_full_distribution (bool): If True, treat the task space as unbounded and manage a buffer.
task_buffer_size (int): Size of that buffer if sample_full_distribution is True.
task_buffer_priority (str): Criterion (e.g. "replay_support") for picking replacement in the buffer.
use_dense_rewards (bool): If True, uses dense rewards in certain grounded strategies.
gamma (float): Discount factor for one-step TD-error calculations.
"""
def __init__(
self,
tasks: list,
num_steps: int,
action_space: gym.spaces.Space = None,
task_space: TaskSpace = None,
num_actors: int = 1,
strategy: str = "value_l1",
replay_schedule: str = "proportionate",
score_transform: str = "rank",
temperature: float = 0.1,
eps: float = 0.05,
rho: float = 1.0,
replay_prob: float = 0.5,
alpha: float = 1.0,
staleness_coef: float = 0.1,
staleness_transform: str = "power",
staleness_temperature: float = 1.0,
max_score_coef=0.0,
sample_full_distribution=False,
task_buffer_size=0,
task_buffer_priority="replay_support",
use_dense_rewards=False,
gamma=0.999,
gae_lambda=0.95,
robust_plr: bool = False,
evaluator: Evaluator = None,
):
if robust_plr:
assert task_space is not None, "Task space must be provided for robust PLR."
assert evaluator is not None, "Evaluator must be provided for robust PLR."
self.evaluator = evaluator
self.num_eval_envs = self.evaluator.eval_envs.num_envs if self.evaluator.eval_envs is not None else 0
self.num_actors = num_actors + self.num_eval_envs
self.task_space = task_space
self.action_space = action_space
self.tasks = tasks
self.num_tasks = len(self.tasks)
self.num_steps = num_steps
self.num_actors = num_actors
self.num_train_actors = num_actors
self.strategy = strategy
self.replay_schedule = replay_schedule
self.score_transform = score_transform
self.temperature = temperature
self.eps = eps
self.rho = rho
self.replay_prob = replay_prob
self.alpha = float(alpha)
self.staleness_coef = staleness_coef
self.staleness_transform = staleness_transform
self.staleness_temperature = staleness_temperature
self.max_score_coef = max_score_coef
self.sample_full_distribution = sample_full_distribution
self.task_buffer_size = task_buffer_size
self.task_buffer_priority = task_buffer_priority
self.use_dense_rewards = use_dense_rewards
self.gamma = gamma
self.gae_lambda = gae_lambda
self.score_function = self._get_score_function()
self._init_task_index(tasks if tasks else [])
self.num_tasks = len(self.tasks)
N = self.task_buffer_size if self.sample_full_distribution else self.num_tasks
self.unseen_task_weights = np.array([1.0] * N)
self.task_scores = np.array([0.0] * N, dtype=np.float32)
self.partial_task_scores = np.zeros((self.num_actors, N), dtype=np.float32)
self.partial_task_max_scores = np.ones((self.num_actors, N), dtype=np.float32) * float("-inf")
self.partial_task_steps = np.zeros((self.num_actors, N), dtype=np.int32)
self.task_staleness = np.array([0.0] * N, dtype=np.float32)
self.running_sample_count = 0
self.next_task_index = 0 # Only used for sequential strategy
self.track_solvable = False
self.grounded_values = None
if self.strategy.startswith("grounded"):
self.grounded_values = np.array([np.NINF] * N, dtype=np.float32)
if self.sample_full_distribution:
self.task2actor = defaultdict(set)
self.working_task_buffer_size = 0
self.staging_task_set = set()
self.working_task_set = set()
self.task2timestamp_buffer = {}
self.partial_task_scores_buffer = [defaultdict(float) for _ in range(self.num_actors)]
self.partial_task_max_scores_buffer = [defaultdict(lambda: float("-inf")) for _ in range(self.num_actors)]
self.partial_task_steps_buffer = [defaultdict(int) for _ in range(self.num_actors)]
self._last_score = 0.0
# Offline evaluation
self.robust_plr = robust_plr
if self.robust_plr:
self.evaluate_thread = threading.Thread(name='robustplr-evaluate', target=self._evaluate_tasks, daemon=True)
self.evaluate_thread.start()
def _init_task_index(self, tasks):
if tasks:
self.tasks = np.array(tasks, dtype=np.int64)
self.task2index = {t: i for i, t in enumerate(tasks)}
else:
self.tasks = np.zeros(self.task_buffer_size, dtype=np.int64) - 1
self.task2index = {}
def _init_solvable_tracking(self):
self.track_solvable = True
self.staging_task2solvable = {}
n = self.task_buffer_size if self.sample_full_distribution else self.num_tasks
self.task_solvable = np.ones(n, dtype=bool)
@property
def _proportion_filled(self):
if self.sample_full_distribution:
return self.working_task_buffer_size / self.task_buffer_size if self.task_buffer_size > 0 else 0.0
else:
num_unseen = (self.unseen_task_weights > 0).sum()
proportion_seen = (len(self.tasks) - num_unseen) / len(self.tasks)
return proportion_seen
@property
def requires_value_buffers(self):
return self.strategy in [
"gae",
"value_l1",
"signed_value_loss",
"positive_value_loss",
"grounded_signed_value_loss",
"grounded_positive_value_loss",
"one_step_td_error",
"alt_advantage_abs",
]
def _get_score_function(self):
if self.strategy in ["random", "off"]:
return
if self.strategy == "uniform":
score_function = self._uniform
elif self.strategy == "policy_entropy":
score_function = self._average_entropy
elif self.strategy == "least_confidence":
score_function = self._average_least_confidence
elif self.strategy == "min_margin":
score_function = self._average_min_margin
elif self.strategy == "gae":
score_function = self._average_gae
elif self.strategy == "value_l1":
score_function = self._average_value_l1
elif self.strategy == "signed_value_loss":
score_function = self._average_signed_value_loss
elif self.strategy == "positive_value_loss":
score_function = self._average_positive_value_loss
elif self.strategy == "grounded_signed_value_loss":
score_function = self._average_grounded_signed_value_loss
elif self.strategy == "grounded_positive_value_loss":
score_function = self._average_grounded_positive_value_loss
elif self.strategy == "one_step_td_error":
score_function = self._one_step_td_error
elif self.strategy == "alt_advantage_abs":
score_function = self._average_alt_advantage_abs
else:
raise ValueError(f"Unsupported strategy, {self.strategy}")
return score_function
[docs]
def update_with_rollouts(self, rollouts, actor_id=None, external_scores=None):
self._update_with_rollouts(rollouts, actor_index=actor_id, external_scores=external_scores)
[docs]
def update_task_score(self, actor_index, task, score, max_score, num_steps, running_mean=True):
if self.sample_full_distribution and task in self.staging_task_set:
score_out, task_idx = self._partial_update_task_score_buffer(
actor_index, task, score, num_steps, done=True, running_mean=running_mean
)
else:
score_out, task_idx = self._partial_update_task_score(
actor_index, task, score, max_score, num_steps, done=True, running_mean=running_mean
)
return score_out, task_idx
def _partial_update_task_score(self, actor_index, task, score, max_score, num_steps, done=False, running_mean=True):
task_idx = self.task2index.get(task, -1)
old_partial_score = self.partial_task_scores[actor_index][task_idx]
old_partial_max = self.partial_task_max_scores[actor_index][task_idx]
old_steps = self.partial_task_steps[actor_index][task_idx]
new_steps = old_steps + num_steps
merged_score = old_partial_score + (score - old_partial_score) * num_steps
if running_mean:
merged_score /= float(new_steps)
merged_max = max(old_partial_max, max_score)
if done:
self.partial_task_scores[actor_index][task_idx] = 0.0
self.partial_task_max_scores[actor_index][task_idx] = float("-inf")
self.partial_task_steps[actor_index][task_idx] = 0
self.unseen_task_weights[task_idx] = 0.0
old_score = self.task_scores[task_idx]
total_score = self.max_score_coef * merged_max + (1.0 - self.max_score_coef) * merged_score
self.task_scores[task_idx] = (1.0 - self.alpha) * old_score + self.alpha * total_score
self._last_score = total_score
else:
self.partial_task_scores[actor_index][task_idx] = merged_score
self.partial_task_max_scores[actor_index][task_idx] = merged_max
self.partial_task_steps[actor_index][task_idx] = new_steps
return merged_score, task_idx
def _partial_update_task_score_buffer(self, actor_index, task, score, num_steps, done=False, running_mean=True):
task_idx = -1
self.task2actor[task].add(actor_index)
old_partial_score = self.partial_task_scores_buffer[actor_index].get(task, 0.0)
old_steps = self.partial_task_steps_buffer[actor_index].get(task, 0)
new_steps = old_steps + num_steps
merged_score = old_partial_score + (score - old_partial_score) * num_steps
if running_mean:
merged_score /= float(new_steps)
if done:
task_idx = self._next_buffer_index
if self.task_scores[task_idx] <= merged_score or self.unseen_task_weights[task_idx] > 0:
self.unseen_task_weights[task_idx] = 0.0
if self.tasks[task_idx] in self.working_task_set:
self.working_task_set.remove(self.tasks[task_idx])
self.working_task_set.add(task)
self.tasks[task_idx] = task
self.task2index[task] = task_idx
self.task_scores[task_idx] = merged_score
self.partial_task_scores[:, task_idx] = 0.0
self.partial_task_steps[:, task_idx] = 0
self.task_staleness[task_idx] = self.running_sample_count - self.task2timestamp_buffer[task]
self.working_task_buffer_size = min(self.working_task_buffer_size + 1, self.task_buffer_size)
if self.track_solvable:
self.task_solvable[task_idx] = self.staging_task2solvable.get(task, True)
else:
task_idx = None
for a in self.task2actor[task]:
self.partial_task_scores_buffer[a].pop(task, None)
self.partial_task_steps_buffer[a].pop(task, None)
self.partial_task_max_scores_buffer[a].pop(task, None)
del self.task2timestamp_buffer[task]
del self.task2actor[task]
self.staging_task_set.remove(task)
if self.track_solvable:
del self.staging_task2solvable[task]
else:
self.partial_task_scores_buffer[actor_index][task] = merged_score
self.partial_task_steps_buffer[actor_index][task] = new_steps
return merged_score, task_idx
def _uniform(self, **kwargs):
return 1.0, 1.0
def _average_entropy(self, **kwargs):
episode_logits = kwargs["episode_logits"]
num_actions = self.action_space.n
max_entropy = -(1.0 / num_actions) * np.log(1.0 / num_actions) * num_actions
score_tensor = (-torch.exp(episode_logits) * episode_logits).sum(-1) / max_entropy
return score_tensor.mean().item(), score_tensor.max().item()
def _average_least_confidence(self, **kwargs):
episode_logits = kwargs["episode_logits"]
score_tensor = 1.0 - torch.exp(episode_logits.max(-1, keepdim=True)[0])
return score_tensor.mean().item(), score_tensor.max().item()
def _average_min_margin(self, **kwargs):
episode_logits = kwargs["episode_logits"]
top2_confidence = torch.exp(episode_logits.topk(2, dim=-1)[0])
gap = top2_confidence[:, 0] - top2_confidence[:, 1]
mean_score = 1.0 - gap.mean().item()
max_score = 1.0 - gap.min().item()
return mean_score, max_score
def _average_gae(self, **kwargs):
returns = kwargs["returns"]
value_preds = kwargs["value_preds"]
advantages = returns - value_preds
return advantages.mean().item(), advantages.max().item()
def _average_value_l1(self, **kwargs):
returns = kwargs["returns"]
value_preds = kwargs["value_preds"]
abs_adv = (returns - value_preds).abs()
return abs_adv.mean().item(), abs_adv.max().item()
def _average_signed_value_loss(self, **kwargs):
returns = kwargs["returns"]
value_preds = kwargs["value_preds"]
advantages = returns - value_preds
return advantages.mean().item(), advantages.max().item()
def _average_positive_value_loss(self, **kwargs):
returns = kwargs["returns"]
value_preds = kwargs["value_preds"]
clipped_adv = (returns - value_preds).clamp(0)
return clipped_adv.mean().item(), clipped_adv.max().item()
def _average_grounded_signed_value_loss(self, **kwargs):
# TODO: Make this work when called from offline eval
task = kwargs["task"]
actor_idx = kwargs["actor_index"]
done = kwargs["done"]
value_preds = kwargs["value_preds"]
grounded_val = kwargs.get("grounded_value", None)
if self.sample_full_distribution and task in self.partial_task_steps_buffer[actor_idx]:
partial_steps = self.partial_task_steps_buffer[actor_idx][task]
else:
task_idx = self.task2index.get(task, None)
partial_steps = self.partial_task_steps[actor_idx][task_idx] if task_idx is not None else 0
new_steps = len(kwargs["value_preds"])
total_steps = partial_steps + new_steps
if done and grounded_val is not None:
if self.use_dense_rewards:
adv = grounded_val - value_preds[0]
else:
adv = grounded_val - value_preds
mean_score = (total_steps / new_steps) * adv.mean().item()
max_score = adv.max().item()
else:
mean_score, max_score = 0.0, 0.0
return mean_score, max_score
def _average_external_score(self, **kwargs):
done = kwargs["done"]
external_scores = kwargs["external_scores"]
if done:
ms = external_scores.item()
return ms, ms
return 0.0, 0.0
def _average_grounded_positive_value_loss(self, **kwargs):
# TODO: Make this work when called from offline eval
task = kwargs["task"]
actor_idx = kwargs["actor_index"]
done = kwargs["done"]
value_preds = kwargs["value_preds"]
grounded_val = kwargs.get("grounded_value", None)
if self.sample_full_distribution and task in self.partial_task_steps_buffer[actor_idx]:
partial_steps = self.partial_task_steps_buffer[actor_idx][task]
else:
task_idx = self.task2index.get(task, None)
partial_steps = self.partial_task_steps[actor_idx][task_idx] if task_idx is not None else 0
new_steps = len(kwargs["value_preds"])
total_steps = partial_steps + new_steps
if done and grounded_val is not None:
if self.use_dense_rewards:
adv = grounded_val - value_preds[0]
else:
adv = grounded_val - value_preds
adv = adv.clamp(0)
mean_score = (total_steps / new_steps) * adv.mean().item()
max_score = adv.max().item()
else:
mean_score, max_score = 0.0, 0.0
return mean_score, max_score
def _one_step_td_error(self, **kwargs):
rewards = kwargs["rewards"]
value_preds = kwargs["value_preds"]
max_t = len(rewards)
if max_t > 1:
td_errors = (rewards[:-1] + self.gamma * value_preds[1:max_t] - value_preds[: max_t - 1]).abs()
else:
td_errors = (rewards[0] - value_preds[0]).abs()
return td_errors.mean().item(), td_errors.max().item()
def _average_alt_advantage_abs(self, **kwargs):
returns = kwargs["alt_returns"]
value_preds = kwargs["value_preds"]
abs_adv = (returns - value_preds).abs()
return abs_adv.mean().item(), abs_adv.max().item()
@property
def _next_buffer_index(self):
if self._proportion_filled < 1.0:
return self.working_task_buffer_size
else:
if self.task_buffer_priority == "replay_support":
return self.sample_weights().argmin()
return self.task_scores.argmin()
@property
def _has_working_task_buffer(self):
return (not self.sample_full_distribution) or (
self.sample_full_distribution and self.task_buffer_size > 0
)
def _update_with_rollouts(self, rollouts, actor_index=None, external_scores=None):
if not self._has_working_task_buffer:
return
tasks = rollouts.tasks
if not self.requires_value_buffers:
policy_logits = rollouts.action_log_dist
done = ~(rollouts.masks > 0)
score_function = self.score_function if external_scores is None else self._average_external_score
actors = [actor_index] if actor_index is not None else range(self.num_train_actors)
for actor_index in actors:
done_steps = done[:, actor_index].nonzero()[:self.num_steps, 0]
start_t = 0
for t in done_steps:
if not start_t < self.num_steps:
break
if t == 0:
continue
task_t = tasks[start_t, actor_index].item()
kwargs_ = {
"actor_index": actor_index,
"done": True,
"task": task_t,
}
if not self.requires_value_buffers:
ep_logits = policy_logits[start_t:t, actor_index]
kwargs_["episode_logits"] = torch.log_softmax(ep_logits, -1)
if external_scores is not None:
kwargs_["external_scores"] = external_scores[actor_index]
if self.requires_value_buffers:
kwargs_["returns"] = rollouts.returns[start_t:t, actor_index]
kwargs_["rewards"] = rollouts.rewards[start_t:t, actor_index]
if self.strategy == "alt_advantage_abs":
kwargs_["alt_returns"] = rollouts.alt_returns[start_t:t, actor_index]
# if rollouts.use_popart:
# kwargs_["value_preds"] = rollouts.denorm_value_preds[start_t:t, actor_index]
# else:
kwargs_["value_preds"] = rollouts.value_preds[start_t:t, actor_index]
if self.grounded_values is not None:
task_idx_ = self.task2index.get(task_t, None)
ret_ = rollouts.rewards[start_t:t].sum(0)[actor_index]
if task_idx_ is not None:
gv_ = max(self.grounded_values[task_idx_], ret_)
else:
gv_ = ret_
kwargs_["grounded_value"] = gv_
score, max_score = score_function(**kwargs_)
num_steps = len(rollouts.tasks[start_t:t, actor_index])
_, final_task_idx = self.update_task_score(
actor_index, task_t, score, max_score, num_steps, running_mean=(external_scores is not None)
)
if (
self.grounded_values is not None
and final_task_idx is not None
and "grounded_value" in kwargs_
and kwargs_["grounded_value"] is not None
):
self.grounded_values[final_task_idx] = kwargs_["grounded_value"]
start_t = t.item()
if start_t < self.num_steps:
task_t = tasks[start_t, actor_index].item()
kwargs_ = {
"actor_index": actor_index,
"done": False,
"task": task_t,
}
if not self.requires_value_buffers:
ep_logits = policy_logits[start_t:, actor_index]
kwargs_["episode_logits"] = torch.log_softmax(ep_logits, -1)
if external_scores is not None:
kwargs_["external_scores"] = external_scores[actor_index]
if self.requires_value_buffers:
kwargs_["returns"] = rollouts.returns[start_t:, actor_index]
kwargs_["rewards"] = rollouts.rewards[start_t:, actor_index]
if self.strategy == "alt_advantage_abs":
kwargs_["alt_returns"] = rollouts.alt_returns[start_t:, actor_index]
# if rollouts.use_popart:
# kwargs_["value_preds"] = rollouts.denorm_value_preds[start_t:, actor_index]
# else:
kwargs_["value_preds"] = rollouts.value_preds[start_t:, actor_index]
if self.grounded_values is not None:
kwargs_["grounded_value"] = self.grounded_values[task_t]
score, max_score = score_function(**kwargs_)
self._last_score = score
num_steps = len(rollouts.tasks[start_t:, actor_index])
if self.sample_full_distribution and task_t in self.staging_task_set:
self._partial_update_task_score_buffer(
actor_index, task_t, score, num_steps, running_mean=(external_scores is not None))
else:
self._partial_update_task_score(actor_index, task_t, score, max_score,
num_steps, running_mean=(external_scores is not None))
def _update_actor_with_data(self, actor_index, tasks, dones, returns, rewards, value_preds, policy_logits=None, alt_returns=None, external_scores=None):
if not self._has_working_task_buffer:
return
score_function = self.score_function if external_scores is None else self._average_external_score
done_steps = dones.nonzero()[:self.num_steps, 0]
start_t = 0
for t in done_steps:
if not start_t < self.num_steps:
break
if t == 0:
continue
task_t = tasks[start_t].item()
kwargs_ = {
"actor_index": actor_index,
"done": True,
"task": task_t,
}
if not self.requires_value_buffers:
ep_logits = policy_logits[start_t:t]
kwargs_["episode_logits"] = torch.log_softmax(ep_logits, -1)
if external_scores is not None:
kwargs_["external_scores"] = external_scores
if self.requires_value_buffers:
kwargs_["returns"] = returns[start_t:t]
kwargs_["rewards"] = rewards[start_t:t]
if self.strategy == "alt_advantage_abs":
kwargs_["alt_returns"] = alt_returns[start_t:t]
# if rollouts.use_popart:
# kwargs_["value_preds"] = rollouts.denorm_value_preds[start_t:t, actor_index]
# else:
kwargs_["value_preds"] = value_preds[start_t:t]
if self.grounded_values is not None:
task_idx_ = self.task2index.get(task_t, None)
ret_ = rewards[start_t:t].sum(0)
if task_idx_ is not None:
gv_ = max(self.grounded_values[task_idx_], ret_)
else:
gv_ = ret_
kwargs_["grounded_value"] = gv_
score, max_score = score_function(**kwargs_)
num_steps = len(tasks[start_t:t])
_, final_task_idx = self.update_task_score(
actor_index, task_t, score, max_score, num_steps, running_mean=(external_scores is not None)
)
if (
self.grounded_values is not None
and final_task_idx is not None
and "grounded_value" in kwargs_
and kwargs_["grounded_value"] is not None
):
self.grounded_values[final_task_idx] = kwargs_["grounded_value"]
[docs]
def after_update(self, actor_indices=None):
if not self._has_working_task_buffer:
return
actor_indices = range(self.num_actors) if actor_indices is None else actor_indices
for actor_index in actor_indices:
for task_idx in range(self.partial_task_scores.shape[1]):
if self.partial_task_scores[actor_index][task_idx] != 0.0:
self.update_task_score(
actor_index,
self.tasks[task_idx],
0.0,
float("-inf"),
0,
)
self.partial_task_scores.fill(0.0)
self.partial_task_steps.fill(0.0)
if self.sample_full_distribution:
for actor_index in actor_indices:
staging_list = list(self.partial_task_scores_buffer[actor_index].keys())
for t_ in staging_list:
if self.partial_task_scores_buffer[actor_index][t_] > 0.0:
self.update_task_score(actor_index, t_, 0.0, float("-inf"), 0)
def _update_staleness(self, selected_idx):
if self.staleness_coef > 0:
self.task_staleness = self.task_staleness + 1
self.task_staleness[selected_idx] = 0
[docs]
def sample_replay_decision(self):
proportion_seen = self._proportion_filled
if self.sample_full_distribution:
if self.task_buffer_size > 0:
if self.replay_schedule == "fixed":
if proportion_seen >= self.rho and np.random.rand() < self.replay_prob:
return True
return False
else:
if proportion_seen >= self.rho and np.random.rand() < min(proportion_seen, self.replay_prob):
return True
return False
return False
elif self.replay_schedule == "fixed":
# Sample random level until we have seen enough tasks
if proportion_seen >= self.rho:
# Sample replay level with fixed replay_prob OR if all levels seen
if np.random.rand() < self.replay_prob or proportion_seen >= 1.0:
return True
return False
else:
if proportion_seen >= self.rho and np.random.rand() < min(proportion_seen, self.replay_prob):
return True
return False
[docs]
def observe_external_unseen_sample(self, tasks, solvable=None):
for i, t_ in enumerate(tasks):
self.running_sample_count += 1
if not (t_ in self.staging_task_set or t_ in self.working_task_set):
self.task2timestamp_buffer[t_] = self.running_sample_count
self.staging_task_set.add(t_)
if solvable is not None:
if not self.track_solvable:
self._init_solvable_tracking()
self.staging_task2solvable[t_] = solvable[i]
else:
task_idx = self.task2index.get(t_, None)
if task_idx is not None:
self._update_staleness(task_idx)
def _sample_replay_level(self, update_staleness=True):
sample_weights = self.sample_weights()
total = np.sum(sample_weights)
if np.isclose(total, 0):
sample_weights = np.ones_like(self.tasks, dtype=np.float32) / len(self.tasks)
sample_weights *= (1 - self.unseen_task_weights)
sample_weights /= np.sum(sample_weights)
elif total != 1.0:
sample_weights /= total
task_idx = np.random.choice(range(len(self.tasks)), 1, p=sample_weights)[0]
if update_staleness:
self._update_staleness(task_idx)
return int(self.tasks[task_idx])
def _sample_unseen_level(self):
if self.sample_full_distribution:
t_val = int(np.random.randint(1, INT32_MAX))
while t_val in self.staging_task_set or t_val in self.working_task_set:
t_val = int(np.random.randint(1, INT32_MAX))
self.task2timestamp_buffer[t_val] = self.running_sample_count
self.staging_task_set.add(t_val)
return t_val
else:
sample_weights = self.unseen_task_weights / self.unseen_task_weights.sum()
task_idx = np.random.choice(range(len(self.tasks)), 1, p=sample_weights)[0]
self._update_staleness(task_idx)
return int(self.tasks[task_idx])
def _sample_random_level(self):
if self.sample_full_distribution:
# TODO: Fix this, stolen from unseen level sampling
t_val = int(np.random.randint(1, INT32_MAX))
while t_val in self.staging_task_set or t_val in self.working_task_set:
t_val = int(np.random.randint(1, INT32_MAX))
self.task2timestamp_buffer[t_val] = self.running_sample_count
self.staging_task_set.add(t_val)
return t_val
else:
task_idx = np.random.choice(range(len(self.tasks)), 1)[0]
self._update_staleness(task_idx)
return int(self.tasks[task_idx])
[docs]
def compute_gae_returns(self,
rewards,
value_preds,
masks,
returns_buffer,
next_value,
gamma,
gae_lambda):
value_preds[-1, :] = next_value
gae = 0
value_preds = value_preds
# if self.use_proper_time_limits:
# # Get truncated value preds
# self._compute_truncated_value_preds()
# value_preds = self.truncated_value_preds
# if self.use_popart:
# self.denorm_value_preds = self.model.popart.denormalize(value_preds) # denormalize all value predictions
# value_preds = self.denorm_value_preds
for step in reversed(range(rewards.size(0))):
delta = rewards[step] + \
gamma*value_preds[step + 1]*masks[step + 1] - value_preds[step]
gae = delta + gamma * gae_lambda * masks[step + 1] * gae
returns_buffer[step] = gae + value_preds[step]
return returns_buffer
[docs]
def compute_discounted_returns(self,
rewards,
value_preds,
masks,
returns_buffer,
next_value,
gamma):
value_preds[-1] = next_value
value_preds = value_preds
# if self.use_proper_time_limits:
# self._compute_truncated_value_preds()
# value_preds = self.truncated_value_preds
# if self.use_popart:
# self.denorm_value_preds = self.model.popart.denormalize(value_preds) # denormalize all value predictions
returns_buffer[-1] = value_preds[-1]
for step in reversed(range(rewards.size(0))):
returns_buffer[step] = returns_buffer[step + 1] * \
gamma * masks[step + 1] + rewards[step]
return returns_buffer
[docs]
def compute_returns(self,
rewards,
value_preds,
masks,
next_value,
use_gae,
gamma,
gae_lambda):
returns = np.zeros_like(value_preds)
if use_gae:
return self.compute_gae_returns(
rewards, value_preds, masks, returns, next_value, gamma, gae_lambda)
else:
return self.compute_discounted_returns(
rewards, value_preds, masks, returns, next_value, gamma)
def _evaluate_tasks(self):
# TODO: Force DummyEvaluator to have dummy eval envs
obs, _ = self.evaluator.eval_envs.reset() if self.num_eval_envs > 0 else (None, None)
recurrent_state = self.evaluator._initial_recurrent_state(self.num_eval_envs)
rewards = torch.zeros((self.num_steps, self.num_eval_envs), dtype=torch.float32)
dones = torch.zeros((self.num_steps + 1, self.num_eval_envs), dtype=torch.float32)
tasks = torch.zeros((self.num_steps, self.num_eval_envs), dtype=torch.float32)
value_preds = torch.zeros((self.num_steps + 1, self.num_eval_envs), dtype=torch.float32)
while True:
# TODO: Make sure queue is being emptied at reasonable rate
obs, recurrent_state, rewards, dones, tasks, value_preds = self.evaluator.evaluate_batch(
self.num_steps, obs, recurrent_state=recurrent_state, rewards=rewards, dones=dones, tasks=tasks, value_preds=value_preds
)
next_value, _, _ = self.evaluator.get_value(obs, recurrent_state)
returns = self.compute_returns(rewards, value_preds, 1 - dones, torch.squeeze(next_value.cpu()),
True, self.gamma, self.gae_lambda)
# Iterate over eval actor indices
for actor_index in range(self.num_eval_envs):
self._update_actor_with_data(
self.num_train_actors + actor_index,
tasks[:, actor_index],
dones[:, actor_index],
returns[:, actor_index],
rewards[:, actor_index],
value_preds[:, actor_index],
)
time.sleep(0.01)
[docs]
def sample(self, strategy=None):
if strategy == "full_distribution":
raise ValueError("One-off sampling via full_distribution strategy is not supported.")
self.running_sample_count += 1
if not strategy:
strategy = self.strategy
if not self.sample_full_distribution:
if strategy == "random":
task_idx = np.random.choice(range(self.num_tasks))
return int(self.tasks[task_idx])
if strategy == "sequential":
task_idx = self.next_task_index
self.next_task_index = (self.next_task_index + 1) % self.num_tasks
return int(self.tasks[task_idx])
replay_decision = self.sample_replay_decision()
# If we have seen enough tasks to sample a replay level, stop training on them and only evaluate
if self._proportion_filled >= self.rho:
# Add random levels to an evaluation queue until we sample a replay level
while not replay_decision:
level = self._sample_unseen_level() if self._proportion_filled < 1.0 else self._sample_random_level()
self.evaluator.eval_curriculum.send_task(level) # Send task to evaluator environments
replay_decision = self.sample_replay_decision()
return self._sample_replay_level()
else:
return self._sample_unseen_level()
[docs]
def sample_weights(self):
weights = self._score_transform(self.score_transform, self.temperature, self.task_scores)
weights *= (1 - self.unseen_task_weights)
z = np.sum(weights)
if z > 0:
weights /= z
else:
weights = np.ones_like(weights, dtype=np.float32) / len(weights)
weights *= (1 - self.unseen_task_weights)
weights /= np.sum(weights)
if self.staleness_coef > 0:
staleness_w = self._score_transform(
self.staleness_transform, self.staleness_temperature, self.task_staleness)
staleness_w *= (1 - self.unseen_task_weights)
z_s = np.sum(staleness_w)
if z_s > 0:
staleness_w /= z_s
else:
staleness_w = (1.0 / len(staleness_w)) * (1 - self.unseen_task_weights)
weights = (1.0 - self.staleness_coef) * weights + self.staleness_coef * staleness_w
weights = weights / weights.sum() if weights.sum() > 0 else weights
return weights
def _score_transform(self, transform, temperature, scores):
if transform == "constant":
weights = np.ones_like(scores)
elif transform == "max":
weights = np.zeros_like(scores)
scores_ = scores[:]
scores_[self.unseen_task_weights > 0] = -float("inf")
argmax = np.random.choice(np.flatnonzero(np.isclose(scores_, scores_.max())))
weights[argmax] = 1.0
elif transform == "eps_greedy":
weights = np.zeros_like(scores)
weights[scores.argmax()] = 1.0 - self.eps
weights += self.eps / len(self.tasks)
elif transform == "rank":
temp = np.flip(scores.argsort())
ranks = np.empty_like(temp)
ranks[temp] = np.arange(len(temp)) + 1
weights = 1 / (ranks ** (1.0 / temperature))
elif transform == "power":
eps_ = 0 if self.staleness_coef > 0 else 1e-3
weights = (np.array(scores).clip(0) + eps_) ** (1.0 / temperature)
elif transform == "softmax":
weights = np.exp(np.array(scores) / temperature)
elif transform == "match":
w_ = np.array([(1.0 - s) * s for s in scores])
weights = w_ ** (1.0 / temperature)
elif transform == "match_rank":
w_ = np.array([(1.0 - s) * s for s in scores])
temp = np.flip(w_.argsort())
ranks = np.empty_like(temp)
ranks[temp] = np.arange(len(temp)) + 1
weights = 1 / (ranks ** (1.0 / temperature))
else:
weights = np.ones_like(scores)
return weights
[docs]
def metrics(self):
""" Return sampling metrics for logging. """
n = self.task_buffer_size if self.sample_full_distribution else self.num_tasks
proportion_seen = (n - (self.unseen_task_weights > 0).sum()) / float(n) if n > 0 else 0.0
return {
"task_scores": self.task_scores,
"unseen_task_weights": self.unseen_task_weights,
"task_staleness": self.task_staleness,
"proportion_seen": proportion_seen,
"score": self._last_score,
}
@property
def solvable_mass(self):
if self.track_solvable:
sw = self.sample_weights()
return np.sum(sw[self.task_solvable])
return 1.0
@property
def max_score(self):
return max(self.task_scores)