Curriculum Synchronization#

The curriculum synchronization wrapper routes update messages to the proper Curriculum methods and requests tasks from the Curriculum to send back to the environments. You can create a synchronized curriculum by calling the make_multiprocessing_curriculum or make_ray_curriculum functions. The ray method will wrap the curriculum in a ray actor, allowing it to communicate with the environments running as separate ray actors. The native multiprocessing method will wrap the curriculum in a wrapper class which has a components property. This contains all of the information that needs to be passed to the environment synchronization wrapper, including the task and update queues, methods for getting unique environment identifiers, and a method indicating whether the environment should send step updates. Passing these components to the environment synchronization wrapper will allow the curriculum to communicate with the environments. It is possible to have multiple separate curricula and sets of environments communicating via different components but that behavior is not officially supported.

The make_multiprocessing_curriculum method has several options to provide control over the queue behavior. The timeout option controls how long each environment will wait for a new task before throwing an error. The remaining communication methods do not block and instead throw an error if a value is not immediately available. Updates are only requested after checking that the update queue is not empty, so this should never throw an error. Placing updates or tasks into their respective queues will only error if the queue is full. If this happens, you can increase the maxsize argument to increase the size of both queues, though this is likely indicative of the curriculum processing updates too slowly. If this happens, you can choose to limit the number of environments that are used to update the curriculum with the max_envs option. This causes the environment synchronization wrapper to only send step updates for at most max_envs environments. Task updates and environment updates are still sent because they are used to request new tasks from the curriculum, and they are sent at a much slower rate than step updates.

class syllabus.core.curriculum_sync_wrapper.CurriculumSyncWrapper(curriculum: Curriculum, **kwargs)[source]#

Bases: CurriculumWrapper

log_metrics(writer, logs, step=None, log_n_tasks=1)[source]#
route_update(update_data: Dict[str, tuple])[source]#

Update the curriculum with the specified update type. TODO: Change method header to not use dictionary, use enums?

Parameters:

update_data (Dictionary with "update_type" key which maps to one of ["step", "step_batch", "episode", "on_demand", "task_progress", "noop"] and "args" with a tuple of the appropriate arguments for the given "update_type".) – Dictionary

Raises:

NotImplementedError

start()[source]#

Start the thread that reads the complete_queue and reads the task_queue.

stop()[source]#

Stop the thread that reads the complete_queue and reads the task_queue.

class syllabus.core.curriculum_sync_wrapper.CurriculumWrapper(curriculum: Curriculum)[source]#

Bases: object

Wrapper class for adding multiprocessing synchronization to a curriculum.

count_tasks(task_space=None, *, _ray_trace_ctx=None)[source]#
get_tasks(task_space=None, *, _ray_trace_ctx=None)[source]#
log_metrics(writer, logs, step=None, log_n_tasks=1, *, _ray_trace_ctx=None)[source]#
normalize(rewards, task, *, _ray_trace_ctx=None)[source]#
property num_tasks#
property requires_step_updates#
sample(k=1, *, _ray_trace_ctx=None)[source]#
property tasks#
update_on_episode(episode_return, length, task, progress, env_id=None, *, _ray_trace_ctx=None)[source]#
update_on_step(task, obs, reward, term, trunc, info, progress, *, _ray_trace_ctx=None)[source]#
update_on_step_batch(step_results, env_id=None, *, _ray_trace_ctx=None)[source]#
update_task_progress(task, progress, *, _ray_trace_ctx=None)[source]#
class syllabus.core.curriculum_sync_wrapper.MultiProcessingComponents(requires_step_updates, max_queue_size=1000000, timeout=60, max_envs=None)[source]#

Bases: object

close()[source]#
get_id()[source]#
get_metrics(log_n_tasks=1)[source]#
get_task()[source]#
get_update()[source]#
peek_id()[source]#
put_task(task)[source]#
put_update(update)[source]#
should_sync(env_id)[source]#
class syllabus.core.curriculum_sync_wrapper.RayCurriculumSyncWrapper(curriculum, actor_name='curriculum')[source]#

Bases: CurriculumWrapper

Subclass of LearningProgress Curriculum that uses Ray to share tasks and receive feedback from the environment. The only change is the @ray.remote decorator on the class.

The @decorate_all_functions(remote_call) annotation automatically forwards all functions not explicitly overridden here to the remote curriculum. This is intended to forward private functions of Curriculum subclasses for convenience. # TODO: Implement the Curriculum methods explicitly

count_tasks(task_space=None, *, _ray_trace_ctx=None)#
get_tasks(task_space=None, *, _ray_trace_ctx=None)#
log_metrics(writer, logs, step=None, log_n_tasks=1, *, _ray_trace_ctx=None)#
normalize(rewards, task, *, _ray_trace_ctx=None)#
sample(k: int = 1)[source]#
update_on_episode(episode_return, length, task, progress, env_id=None, *, _ray_trace_ctx=None)#
update_on_step(task, obs, reward, term, trunc, info, progress, *, _ray_trace_ctx=None)#
update_on_step_batch(step_results, env_id=None) None[source]#
update_task_progress(task, progress, *, _ray_trace_ctx=None)#
syllabus.core.curriculum_sync_wrapper.make_multiprocessing_curriculum(curriculum, start=True, **kwargs)[source]#

Helper function for creating a MultiProcessingCurriculumWrapper.

syllabus.core.curriculum_sync_wrapper.make_ray_curriculum(curriculum, actor_name='curriculum', **kwargs)[source]#

Helper function for creating a RayCurriculumWrapper.

syllabus.core.curriculum_sync_wrapper.remote_call(func)[source]#

Decorator for automatically forwarding calls to the curriculum via ray remote calls.

Note that this causes functions to block, and should be only used for operations that do not require parallelization.

Multiagent Curriculum Synchronization#

There are multiple valid ways to use curricula with multiagent environments. Each agent might be given a separate curriculum, or they may all update the same curriculum. A shared curriculum may be explicitly designed for multiagent environments, or they may be single agent curricula that simply use all of the agent’s data to update a single sampling distribution. In cases where the curriculum tracks environments individually, there is also a question of whether each agent from a given environment should be treated as the same data stream, or if each agent should be treated as a separate data stream, similar to a separate single agent environment.

By default, Syllabus sends updates in the PettingZoo format, and therefore assumes that a curriculum is explicitly designed for multiagent environments. If you want to use a single agent curriculum instead, you can wrap the curriculum in the MultiagentSharedCurriculumWrapper which separates each agent’s data and calls the curriculum’s update_on_step method. The joint_policy argument controls whether the wrapper will separate observations into individual agent observations, or keep the global observation such that a joint policy could select actions for all agents at once, and also creates a unique environment identifier for each agent in each environment. This is only important if you plan to use an Evaluator with the observations, or if your curriculum uses the env_id argument of update_on_step.

class syllabus.core.multiagent_curriculum_wrappers.MultiagentIndependentCurriculumWrapper(curriculum, possible_agents, *args, **kwargs)[source]#

Bases: CurriculumWrapper

class syllabus.core.multiagent_curriculum_wrappers.MultiagentSharedCurriculumWrapper(curriculum, possible_agents, *args, joint_policy=False, **kwargs)[source]#

Bases: CurriculumWrapper

update_on_episode(episode_return, length, task, progress, env_id=None) None[source]#

Update the curriculum with episode results from the environment.

update_on_step(task, obs, reward, term, trunc, info, progress, env_id: int | None = None) None[source]#

Update the curriculum with the current step results from the environment.

update_on_step_batch(step_results, env_id: int | None = None) None[source]#
update_task_progress(task, progress, env_id=None)[source]#

Additionally, if you have an competitive multiagent, multitask environment and want to design a curriculum over both opponents and tasks, you can use the DualCurriculumWrapper to combine an agent-based and task-based curriculum into a single curriculum that samples from both. For example, you can use Self Play with Prioritized Level Replay to individually sample an opponent and seed for each episode. Internally it will send all episode and task updates to both curricula, and it will send step updates to any curriculum that requires them. It will sample tasks from each curriculum and then concatenate them into an (agent, task) tuple.

class syllabus.core.dual_curriculum_wrapper.DualCurriculumWrapper(env_curriculum: Curriculum, agent_curriculum: Curriculum, batch_agent_tasks: bool = False, batch_size: int = 32, *args, **kwargs)[source]#

Bases: CurriculumWrapper

Curriculum wrapper containing both an agent and environment-based curriculum.

get_agent(agent: AgentTask) Agent[source]#
sample(k=1) Tuple[EnvTask, AgentTask][source]#

Sets new tasks for the environment and agent curricula.

update_agent(agent: Agent) int[source]#
update_on_episode(episode_return, length, task, progress, env_id=None)[source]#
update_on_step(task, obs, reward, term, trunc, info, env_id=None)[source]#
update_on_step_batch(step_results, env_id=None)[source]#
update_task_progress(task, progress)[source]#