Source code for pytorchrl.scheme.collection.c_worker_set

from pytorchrl.scheme.collection.c_worker import CWorker
from pytorchrl.scheme.base.worker_set import WorkerSet as WS
from pytorchrl.scheme.base.worker import default_remote_config


[docs]class CWorkerSet(WS): """ Class to better handle the operations of ensembles of CWorkers. Parameters ---------- num_workers : int Number of remote workers in the worker set. index_parent : int Worker index of parent gradient worker. total_parent_workers : int Total number of gradient worker in the training scheme. algo_factory : func A function that creates an algorithm class. actor_factory : func A function that creates a policy. storage_factory : func A function that create a rollouts storage. train_envs_factory : func A function to create train environments. local_device : str "cpu" or specific GPU "cuda:number`" to use for computation. initial_weights : ray object ID Initial model weights. fraction_samples : Minimum fraction of samples required to stop if collection is synchronously coordinated and most workers have finished their collection task. compress_data_to_send : bool Whether or not to compress data before sending it to grad worker. test_envs_factory : func A function to create test environments. worker_remote_config : dict Ray resource specs for the remote workers. Attributes ---------- worker_class : python class Worker class to be instantiated to create Ray remote actors. remote_config : dict Ray resource specs for the remote workers. worker_params : dict Keyword arguments of the worker_class. num_workers : int Number of remote workers in the worker set. """ def __init__(self, num_workers, index_parent, algo_factory, actor_factory, storage_factory, local_device=None, initial_weights=None, fraction_samples=1.0, total_parent_workers=0, compress_data_to_send=False, train_envs_factory=lambda x, y, z: None, test_envs_factory=lambda v, x, y, c: None, worker_remote_config=default_remote_config): self.worker_class = CWorker default_remote_config.update(worker_remote_config) self.remote_config = default_remote_config self.worker_params = { "index_parent": index_parent, "algo_factory": algo_factory, "actor_factory": actor_factory, "storage_factory": storage_factory, "test_envs_factory": test_envs_factory, "train_envs_factory": train_envs_factory, "fraction_samples": fraction_samples, "compress_data_to_send": compress_data_to_send } self.num_workers = num_workers super(CWorkerSet, self).__init__( worker=self.worker_class, local_device=local_device, num_workers=self.num_workers, initial_weights=initial_weights, worker_params=self.worker_params, index_parent_worker=index_parent, worker_remote_config=self.remote_config, total_parent_workers=total_parent_workers)
[docs] @classmethod def create_factory(cls, num_workers, algo_factory, actor_factory, storage_factory, test_envs_factory, train_envs_factory, total_parent_workers=0, col_fraction_samples=1.0, compress_data_to_send=False, col_worker_resources=default_remote_config): """ Returns a function to create new CWorkerSet instances. Parameters ---------- num_workers : int Number of remote workers in the worker set. algo_factory : func A function that creates an algorithm class. actor_factory : func A function that creates a policy. storage_factory : func A function that create a rollouts storage. train_envs_factory : func A function to create train environments. col_fraction_samples : Minimum fraction of samples required to stop if collection is synchronously coordinated and most workers have finished their collection task. test_envs_factory : func A function to create test environments. total_parent_workers : int Total number of gradient worker in the training scheme. col_worker_resources : dict Ray resource specs for the remote workers. compress_data_to_send : bool Whether or not to compress data before sending it to grad worker. Returns ------- collection_worker_set_factory : func creates a new CWorkerSet class instance. """ def collection_worker_set_factory(device, initial_weights, index_parent): """ Creates and returns a CWorkerSet class instance. Parameters ---------- device : str "cpu" or specific GPU "cuda:number`" to use for computation. initial_weights : ray object ID Initial model weights. index_parent : int Worker index of parent gradient worker. Returns ------- CWorkerSet : CWorkerSet A new CWorkerSet class instance. """ return cls( local_device=device, num_workers=num_workers, index_parent=index_parent, algo_factory=algo_factory, actor_factory=actor_factory, storage_factory=storage_factory, initial_weights=initial_weights, fraction_samples=col_fraction_samples, test_envs_factory=test_envs_factory, train_envs_factory=train_envs_factory, total_parent_workers=total_parent_workers, worker_remote_config=col_worker_resources, compress_data_to_send=compress_data_to_send) return collection_worker_set_factory