Source code for pytorchrl.scheme.collection.c_worker

import time
import torch
import numpy as np
from collections import defaultdict

import pytorchrl as prl
from pytorchrl.scheme.utils import check_message, pack
from pytorchrl.scheme.base.worker import Worker as W


[docs]class CWorker(W): """ Worker class handling data collection. This class wraps an actor instance, a storage class instance and a train and a test vector environments. It collects data samples, sends them and and evaluates network versions. Parameters ---------- index_worker : int Worker index. index_worker : int Index of gradient worker in charge of this data collection worker. algo_factory : func A function that creates an algorithm class. actor_factory : func A function that creates a policy. storage_factory : func A function that create a rollouts storage. fraction_samples : float Minimum fraction of samples required to stop if collection is synchronously coordinated and most workers have finished their collection task. compress_data_to_send : bool Whether or not to compress data before sending it to grad worker. train_envs_factory : func A function to create train environments. test_envs_factory : func A function to create test environments. initial_weights : ray object ID Initial model weights. device : str "cpu" or specific GPU "cuda:number`" to use for computation. Attributes ---------- index_worker : int Index assigned to this worker. fraction_samples : float Minimum fraction of samples required to stop if collection is synchronously coordinated and most workers have finished their collection task. device : torch.device CPU or specific GPU to use for computation. compress_data_to_send : bool Whether or not to compress data before sending it to grad worker. actor : Actor An actor class instance. algo : Algo An algorithm class instance. envs_train : VecEnv A VecEnv class instance with the train environments. envs_test : VecEnv A VecEnv class instance with the test environments. storage : Storage A Storage class instance. iter : int Number of times samples have been collected and sent. actor_version : int Number of times the current actor version been has been updated. update_every : int Number of data samples to collect between network update stages. obs : torch.tensor Latest train environment observation. rhs : torch.tensor Latest policy recurrent hidden state. done : torch.tensor Latest train environment done flag. """ def __init__(self, index_worker, index_parent, algo_factory, actor_factory, storage_factory, fraction_samples=1.0, compress_data_to_send=False, train_envs_factory=lambda x, y, z: None, test_envs_factory=lambda v, x, y, z: None, initial_weights=None, device=None): super(CWorker, self).__init__(index_worker) self.index_worker = index_worker self.fraction_samples = fraction_samples # Computation device dev = device or "cuda" if torch.cuda.is_available() else "cpu" self.device = torch.device(dev) self.compress_data_to_send = compress_data_to_send # Create train environments self.envs_train = train_envs_factory(self.device, index_worker, index_parent) # Create test environments (if creation function available) self.envs_test = test_envs_factory(self.device, index_worker, index_parent, "test") # Create Actor Critic instance self.actor = actor_factory(self.device) # Create Algorithm instance self.algo = algo_factory(self.device, self.actor, self.envs_train) # Create Storage instance and set world initial state self.storage = storage_factory(self.device, self.actor, self.algo, self.envs_train) # Try to load weight from previous checkpoint to actor self.actor.try_load_from_checkpoint() # Define counters and other attributes self.iter, self.actor_version, self.samples_collected = 0, 0, 0 self.update_every = self.algo.update_every or self.storage.max_size self.updates_per_iter = self.algo.num_mini_batch * self.algo.num_epochs if initial_weights: # if remote worker # Set initial weights self.set_weights(initial_weights) if self.envs_train: # Define initial train states self.obs, self.rhs, self.done = self.actor.actor_initial_states( self.envs_train.reset()) # Collect initial samples print("Collecting initial samples...") self.collect_train_data(self.algo.start_steps) # Print worker information self.print_worker_info()
[docs] def collect_data(self, listen_to=[], data_to_cpu=True): """ Perform a data collection operation, returning rollouts and other relevant information about the process. Parameters ---------- listen_to : list List of keywords to listen to trigger early stopping during collection. Returns ------- data : dict Collected train data samples. info : dict Additional relevant information about the collection operation. """ # Define dictionary to record relevant information info = {prl.EPISODES: {}, prl.TIME: {}, prl.VERSION: {}} # Collect train data t = time.time() train_info = self.collect_train_data(listen_to=listen_to) col_time = time.time() - t # Add information to info dict info[prl.EPISODES] = train_info info[prl.TIME][prl.COLLECTION] = col_time info[prl.VERSION][prl.COLLECTION] = self.actor_version info[prl.NUMSAMPLES] = self.samples_collected # Get new data collected data = self.storage.get_all_buffer_data(data_to_cpu) # Evaluate current network on test environments if self.envs_test and self.algo.num_test_episodes > 0: # Actor model updates performed so far updates_so_far = self.iter * self.updates_per_iter # Actor model updates after using current train data updates_next_time = (self.iter + 1) * self.updates_per_iter # If self.algo.test_every actor model updates will be reached using # current train data, evaluate actor updates = np.arange(updates_so_far, updates_next_time) if (updates % self.algo.test_every == 0).sum() > 0: test_perf = self.evaluate() info[prl.EPISODES]["TestReward"] = test_perf # Encode data if self.compress_data_to_send is True data_to_send = pack((data, info)) if self.compress_data_to_send else (data, info) # Reset storage and number of collected samples self.storage.reset() self.samples_collected = 0 # Update counter self.iter += 1 return data_to_send
[docs] def collect_train_data(self, num_steps=None, listen_to=[]): """ Collect train data from interactions with the environments. Parameters ---------- num_steps : int Target number of train environment steps to take. listen_to : list Returns ------- col_time : float Time, in seconds, spent in this operation. train_perf : float Average accumulated reward over recent train episodes. """ info = defaultdict(list) # Define number of collections steps to perform num_steps = int(num_steps) if num_steps is not None else int(self.update_every) min_steps = int(num_steps * self.fraction_samples) self.actor.eval() for step in range(num_steps): # Predict next action, next rnn hidden state and # algo-specific outputs act, clip_act, rhs2, algo_data = self.algo.acting_step( self.obs, self.rhs, self.done) # Interact with env with predicted action (clipped # within action space) obs2, reward, done2, episode_infos = self.envs_train.step(clip_act) # Define transition sample transition = prl.DataTransition( self.obs, self.rhs, self.done, act, reward, obs2, rhs2, done2, episode_infos)._asdict() transition.update(algo_data) # Store transition in buffer self.storage.insert_transition(transition) # Handle end of episode - collect episode info done_positions = torch.nonzero(done2, as_tuple=False)[:, 0].tolist() for i in done_positions: if "episode" in episode_infos[i]: # gym envs should have it for k, v in episode_infos[i]["episode"].items(): if isinstance(v, (float, int)): if k == 'r': k = "TrainReward" info[k].append(v) # Update current world state self.obs, self.rhs, self.done = obs2, rhs2, done2 # Keep track of num collected samples self.samples_collected += self.envs_train.num_envs # Check if stop message sent if self.is_remote: for l in listen_to: if check_message(l) == b"stop" and step >= min_steps: break # Average episodes infos info = {} if len(info) == 0 else {k: np.mean(v) for k, v in info.items()} self.actor.train() return info
[docs] def evaluate(self): """ Test current actor version in self.envs_test. Returns ------- mean_test_perf : float Average accumulated reward over all tested episodes. """ completed_episodes = [] obs = self.envs_test.reset() rewards = np.zeros(obs.shape[0]) obs, rhs, done = self.actor.actor_initial_states(obs) self.actor.eval() while len(completed_episodes) < self.algo.num_test_episodes: # Predict next action and rnn hidden state act, clip_act, rhs, _ = self.algo.acting_step( obs, rhs, done, deterministic=True) # Interact with env with predicted action (clipped # within action space) obs2, reward, done, _ = self.envs_test.step(clip_act) # Keep track of episode rewards and completed episodes rewards += reward.cpu().squeeze(-1).numpy() completed_episodes.extend( rewards[done.cpu().squeeze(-1).numpy() == 1.0].tolist()) rewards[done.cpu().squeeze(-1).numpy() == 1.0] = 0.0 obs = obs2 self.actor.train() return np.mean(completed_episodes)
[docs] def set_weights(self, actor_weights): """ Update the worker actor version with provided weights. Parameters ---------- actor_weights : dict of tensors Dict containing actor weights to be set. """ self.actor_version = actor_weights[prl.VERSION] self.actor.load_state_dict(actor_weights[prl.WEIGHTS])
[docs] def update_algorithm_parameter(self, parameter_name, new_parameter_value): """ If `parameter_name` is an attribute of self.algo, change its value to `new_parameter_value value`. Parameters ---------- parameter_name : str Algorithm attribute name new_parameter_value : float Algorithm new parameter value. """ self.algo.update_algorithm_parameter(parameter_name, new_parameter_value)
[docs] def update_storage_parameter(self, parameter_name, new_parameter_value): """ If `parameter_name` is an attribute of self.storage, change its value to `new_parameter_value value`. Parameters ---------- parameter_name : str Storage attribute name new_parameter_value : float Storage new parameter value. """ self.storage.update_storage_parameter(parameter_name, new_parameter_value)
[docs] def replace_agent_component(self, component_name, new_component_factory): """ If `component_name` is an attribute of c_worker, replaces it with the component created by `new_component_factory`. Parameters ---------- component_name : str Worker component name new_component_factory : func Function to create an instance of the new component. """ # TODO. get component name from component itself if hasattr(self, component_name): if component_name == prl.ALGORITHM: new_component_component = new_component_factory( self.device, self.actor) elif component_name == prl.ENV_TRAIN: new_component_component = new_component_factory( self.device, self.index_worker) elif component_name == prl.ENV_TEST: new_component_component = new_component_factory( self.device, self.index_worker, "test") else: new_component_component = new_component_factory(self.device) setattr(self, component_name, new_component_component)
[docs] def stop(self): """Stop all processes""" self.envs_train.close() try: self.envs_test.close() except Exception: pass