def__sub__(self,other):assertisinstance(other,StatMean)n_new=self.n-other.nifn_new==0:returnStatMean(0,0,0)mu_new=(self.mu*self.n-other.mu*other.n)/n_newdelta=other.mu-mu_newm2_new=self.m2-other.m2-(delta**2)*n_new*other.n/self.nreturnStatMean(n_new,mu_new,m2_new)def__iadd__(self,other):ifisinstance(other,StatMean):other_n=other.nother_mu=other.muother_m2=other.m2elifisinstance(other,torch.Tensor):other_n=other.numel()other_mu=other.mean().item()other_m2=((other-other_mu)**2).sum().item()else:other_n=1other_mu=otherother_m2=0# See parallelized Welford in wikinew_n=other_n+self.ndelta=other_mu-self.muself.mu+=delta*(other_n/max(new_n,1))delta2=other_mu-self.muself.m2+=other_m2+(delta2**2)*(self.n*other_n/max(new_n,1))self.n=new_nreturnself
[docs]classStatRecorder:""" Individual statistics tracking for each task. """def__init__(self,task_space:TaskSpace,calc_past_n=None,task_names=None):"""Initialize the StatRecorder"""self.task_space=task_spaceself.calc_past_n=calc_past_nself.task_names=task_namesiftask_namesisnotNoneelselambdatask,idx:idxassertisinstance(self.task_space,TaskSpace),f"task_space must be a TaskSpace object. Got {type(task_space)} instead."assertisinstance(self.task_space.gym_space,Discrete),f"Only Discrete task spaces are supported. Got {type(task_space.gym_space)}"self.tasks=self.task_space.tasksself.num_tasks=self.task_space.num_tasksself.episode_returns={task:StatMean()fortaskinself.tasks}self.episode_lengths={task:StatMean()fortaskinself.tasks}
[docs]defrecord(self,episode_return:float,episode_length:int,episode_task,env_id=None):""" Record the length and return of an episode for a given task. :param episode_length: Length of the episode, i.e. the total number of steps taken during the episode :param episodic_return: Total return for the episode :param episode_task: Identifier for the task """ifepisode_tasknotinself.tasks:raiseValueError(f"Stat recorder received unknown task {episode_task}.")self.episode_returns[episode_task]+=episode_returnself.episode_lengths[episode_task]+=episode_length
[docs]defget_metrics(self,log_n_tasks=1):"""Log the statistics of the first 5 tasks to the provided tensorboard writer. :param writer: Tensorboard summary writer. :param log_n_tasks: Number of tasks to log statistics for. Use -1 to log all tasks. """tasks_to_log=self.tasksiflen(self.tasks)>log_n_tasksandlog_n_tasks!=-1:warnings.warn(f"Too many tasks to log {len(self.tasks)}. Only logging stats for 1 task.",stacklevel=2)tasks_to_log=self.tasks[:log_n_tasks]logs=[]foridxintasks_to_log:ifself.episode_returns[idx].n>0:name=self.task_names(list(self.task_space.tasks)[idx],idx)logs.append((f"tasks/{name}_episode_return",self.episode_returns[idx].mean()))logs.append((f"tasks/{name}_episode_length",self.episode_lengths[idx].mean()))returnlogs
[docs]defnormalize(self,reward,task):""" Normalize reward by task. :param reward: Reward to normalize :param task: Task to normalize reward by """task_return_stats=self.episode_returns[task]reward_std=task_return_stats.std()normalized_reward=deque(maxlen=reward.maxlen)forrinreward:normalized_reward.append(r/max(0.01,reward_std))returnnormalized_reward
[docs]defsave_statistics(self,output_path):""" Write task-specific statistics to file. :param output_path: Path to save the statistics file. """defconvert_numpy(obj):ifisinstance(obj,np.generic):returnobj.item()# Use .item() to convert numpy types to native Python typesraiseTypeErrorstats=json.dumps(self.episode_returns,default=convert_numpy)withopen(os.path.join(output_path,'task_specific_stats.json'),"w",encoding="utf-8")asfile:file.write(stats)