Global Synchronization#

Syllabus’s multiprocessing infrastructure uses a bidirectional sender-receiver model in which the curriculum sends tasks and receives environment metrics, while the environment receives tasks and sends metrics. The environment runs the provided task in the next episode and the curriculum uses the metrics to update its task distribution. You can also update the curriculum directly from the main learner process to incorporate training information.

Usage#

If your environment is vectorized with python multiprocessing (this is true for most libraries, including CleanRL, Stable Baselines 3, and Torchbeast) then you can wrap your curriculum and environment as follows:

from syllabus.core import make_multiprocessing_curriculum, MultiProcessingSyncWrapper
curriculum, task_queue, update_queue = make_multiprocessing_curriculum(curriculum)
env = MultiProcessingSyncWrapper(env, task_queue, update_queue)

If your environment is vectorized by Ray (e.g. RLLib) then you can wrap your environment as follows:

from syllabus.core import make_ray_curriculum, RaySyncWrapper
curriculum = make_ray_curriculum(curriculum)
env = RaySyncWrapper(env)

Now that you’ve applied these wrappers, the environment will automatically receive tasks from the curriculum at the start of each episode. Depending on your environment wrapper options and curriculum method, the curriculum might also receive metrics from the environment at each step or at the end of the episode.

You can also update the curriculum directly from the main learner process to incorporate training information. The exact update metrics will depend on your curriculum method, but the API is the same for all methods. You can see an example of how to do this in various RL libraries using Prioritized Level Replay

update = {
   "update_type": "on_demand",
   "metrics": {
      ...
   },
}
curriculum.update(update)

syllabus.core.curriculum_sync_wrapper module#

class syllabus.core.curriculum_sync_wrapper.CurriculumWrapper(curriculum: Curriculum)#

Bases: object

Wrapper class for adding multiprocessing synchronization to a curriculum.

add_task(task, *, _ray_trace_ctx=None)#
batch_update(metrics, *, _ray_trace_ctx=None)#
count_tasks(task_space=None, *, _ray_trace_ctx=None)#
get_tasks(task_space=None, *, _ray_trace_ctx=None)#
log_metrics(writer, step=None, *, _ray_trace_ctx=None)#
property num_tasks#
sample(k=1, *, _ray_trace_ctx=None)#
property tasks#
update(metrics, *, _ray_trace_ctx=None)#
update_on_step(task, step, reward, term, trunc, *, _ray_trace_ctx=None)#
update_on_step_batch(step_results, *, _ray_trace_ctx=None)#
update_task_progress(task, progress, *, _ray_trace_ctx=None)#
class syllabus.core.curriculum_sync_wrapper.MultiProcessingCurriculumWrapper(curriculum: Curriculum, task_queue: SimpleQueue, update_queue: SimpleQueue, sequential_start: bool = True)#

Bases: CurriculumWrapper

Wrapper which sends tasks and receives updates from environments wrapped in a corresponding MultiprocessingSyncWrapper.

add_task(task)#
log_metrics(writer, step=None)#
start()#

Start the thread that reads the complete_queue and reads the task_queue.

stop()#

Stop the thread that reads the complete_queue and reads the task_queue.

class syllabus.core.curriculum_sync_wrapper.RayCurriculumWrapper(curriculum, actor_name='curriculum')#

Bases: CurriculumWrapper

Subclass of LearningProgress Curriculum that uses Ray to share tasks and receive feedback from the environment. The only change is the @ray.remote decorator on the class.

The @decorate_all_functions(remote_call) annotation automatically forwards all functions not explicitly overridden here to the remote curriculum. This is intended to forward private functions of Curriculum subclasses for convenience. # TODO: Implement the Curriculum methods explicitly

add_task(task)#
batch_update(metrics, *, _ray_trace_ctx=None)#
count_tasks(task_space=None, *, _ray_trace_ctx=None)#
get_tasks(task_space=None, *, _ray_trace_ctx=None)#
log_metrics(writer, step=None, *, _ray_trace_ctx=None)#
sample(k: int = 1)#
update(metrics, *, _ray_trace_ctx=None)#
update_on_step(task, step, reward, term, trunc, *, _ray_trace_ctx=None)#
update_on_step_batch(step_results: List[Tuple[int, int, int, int]]) None#
update_task_progress(task, progress, *, _ray_trace_ctx=None)#
syllabus.core.curriculum_sync_wrapper.make_multiprocessing_curriculum(curriculum, **kwargs)#

Helper function for creating a MultiProcessingCurriculumWrapper.

syllabus.core.curriculum_sync_wrapper.make_ray_curriculum(curriculum, actor_name='curriculum', **kwargs)#

Helper function for creating a RayCurriculumWrapper.

syllabus.core.curriculum_sync_wrapper.remote_call(func)#

Decorator for automatically forwarding calls to the curriculum via ray remote calls.

Note that this causes functions to block, and should be only used for operations that do not require parallelization.

syllabus.core.environment_sync_wrapper module#

class syllabus.core.environment_sync_wrapper.MultiProcessingSyncWrapper(env, task_queue: SimpleQueue, update_queue: SimpleQueue, update_on_step: bool = True, buffer_size: int = 1, task_space: TaskSpace | None = None, global_task_completion: Callable[[Curriculum, ndarray, float, bool, Dict[str, Any]], bool] | None = None)#

Bases: Wrapper

This wrapper is used to set the task on reset for a Gym environments running on parallel processes created using multiprocessing.Process. Meant to be used with a QueueLearningProgressCurriculum running on the main process.

add_task(task)#
reset(*args, **kwargs)#

Uses the reset() of the env that can be overwritten to change the returned data.

step(action)#

Uses the step() of the env that can be overwritten to change the returned data.

class syllabus.core.environment_sync_wrapper.RaySyncWrapper(env, update_on_step: bool = True, task_space: Space | None = None, global_task_completion: Callable[[Curriculum, ndarray, float, bool, Dict[str, Any]], bool] | None = None)#

Bases: Wrapper

This wrapper is used to set the task on reset for a Gym environments running on parallel processes created using ray. Meant to be used with a RayLearningProgressCurriculum running on the main process.

add_task(task)#
change_task(new_task)#

Changes the task of the existing environment to the new_task.

Each environment will implement tasks differently. The easiest system would be to call a function or set an instance variable to change the task.

Some environments may need to be reset or even reinitialized to change the task. If you need to reset or re-init the environment here, make sure to check that it is not in the middle of an episode to avoid unexpected behavior.

reset(*args, **kwargs)#

Uses the reset() of the env that can be overwritten to change the returned data.

step(action)#

Uses the step() of the env that can be overwritten to change the returned data.

syllabus.core.multivariate_curriculum_wrapper module#

class syllabus.core.multivariate_curriculum_wrapper.MultitaskWrapper(*args, num_components: int, component_names: List[str] | None = None, **kwargs)#

Bases: CurriculumWrapper

Uniform sampling for task spaces with multiple subspaces (Tuple or Dict)

log_metrics(writer, step=None)#
sample(k: int = 1) List | Any#

Sample k tasks from the curriculum.

syllabus.core.utils module#

exception syllabus.core.utils.UsageError#

Bases: Exception

syllabus.core.utils.decorate_all_functions(function_decorator)#
syllabus.core.utils.enumerate_axes(list_or_size: ndarray | int)#