Source code for pytorchrl.agent.algorithms.on_policy.rnd_ppo

import torch
import torch.nn as nn
import torch.optim as optim

import pytorchrl as prl
from pytorchrl.utils import RunningMeanStd, clip_grad_norm_
from pytorchrl.agent.algorithms.base import Algorithm
from pytorchrl.agent.algorithms.policy_loss_addons import PolicyLossAddOn
from pytorchrl.agent.algorithms.utils import get_gradients, set_gradients
from pytorchrl.agent.actors.feature_extractors import default_feature_extractor


[docs]class RND_PPO(Algorithm): """ Exploration by Random Network Distillation with Proximal Policy Optimization algorithm class. Algorithm class to execute RND PPO, from Burda et al., 2018 (https://arxiv.org/abs/1810.12894). Algorithms are modules generally required by multiple workers, so RND_PPO.algo_factory(...) returns a function that can be passed on to workers to instantiate their own RND_PPO module. Parameters ---------- device: torch.device CPU or specific GPU where class computations will take place. envs : VecEnv Vector of environments instance. actor : Actor Actor class instance. lr : float Optimizer learning rate. eps : float Optimizer epsilon parameter. num_epochs : int Number of PPO epochs. gamma : float Discount factor parameter. clip_param : float PPO clipping parameter. num_mini_batch : int Number of batches to create from collected data for actor updates. num_test_episodes : int Number of episodes to complete in each test phase. test_every : int Regularity of test evaluations. max_grad_norm : float Gradient clipping parameter. entropy_coef : float PPO entropy coefficient parameter. value_loss_coef : float PPO value coefficient parameter. use_clipped_value_loss : bool Prevent value loss from shifting too fast. policy_loss_addons : list List of PolicyLossAddOn components adding loss terms to the algorithm policy loss. gamma_intrinsic: float Discount factor parameter for intrinsic rewards. ext_adv_coeff: float Extrinsic advantage coefficient. int_adv_coeff: float Intrinsic advantage coefficient. predictor_proportion: float Proportion of buffer sample to use to train the predictor network. pre_normalization_steps: int Number of obs running average normalization steps to take before starting to train. pre_normalization_length: int Length of each pre normalization steps (in environment steps). intrinsic_rewards_network : nn.Module PyTorch nn.Module used for target and predictor networks. intrinsic_rewards_target_network_kwargs : dict Keyword arguments for the target network. intrinsic_rewards_predictor_network_kwargs : dict Keyword arguments for the predictor network. Examples -------- >>> create_algo = RND_PPO.create_factory( lr=0.01, eps=1e-5, num_epochs=4, clip_param=0.2, entropy_coef=0.01, value_loss_coef=0.5, max_grad_norm=0.5, num_mini_batch=4, use_clipped_value_loss=True, gamma=0.99) """ def __init__(self, envs, actor, device, lr=1e-4, eps=1e-8, gamma=0.99, num_epochs=4, clip_param=0.2, num_mini_batch=1, test_every=1000, max_grad_norm=2.0, entropy_coef=0.01, value_loss_coef=0.5, num_test_episodes=5, gamma_intrinsic=0.99, ext_adv_coeff=2.0, int_adv_coeff=1.0, predictor_proportion=2.0, pre_normalization_steps=50, pre_normalization_length=128, use_clipped_value_loss=False, intrinsic_rewards_network=None, intrinsic_rewards_target_network_kwargs={}, intrinsic_rewards_predictor_network_kwargs={}, policy_loss_addons=[]): # ---- General algo attributes ---------------------------------------- # Discount factor self._gamma = gamma # Number of steps collected with initial random policy self._start_steps = 0 # Default to 0 for On-policy algos # Times data in the buffer is re-used before data collection proceeds self._num_epochs = int(num_epochs) # Number of data samples collected between network update stages self._update_every = None # Depends on storage capacity # Number mini batches per epoch self._num_mini_batch = int(num_mini_batch) # Size of update mini batches self._mini_batch_size = None # Depends on storage capacity # Number of network updates between test evaluations self._test_every = int(test_every) # Number of episodes to complete when testing self._num_test_episodes = int(num_test_episodes) # ---- RND-PPO-specific attributes ---------------------------------------- self.envs = envs self.actor = actor self.device = device self.clip_param = clip_param self.entropy_coef = entropy_coef self.max_grad_norm = max_grad_norm self.value_loss_coef = value_loss_coef self.use_clipped_value_loss = use_clipped_value_loss self.ext_adv_coeff = ext_adv_coeff self.int_adv_coeff = int_adv_coeff self.gamma_intrinsic = gamma_intrinsic self.predictor_proportion = predictor_proportion self.pre_normalization_steps = pre_normalization_steps self.pre_normalization_length = pre_normalization_length assert hasattr(self.actor, "value_net1"), "RND_PPO requires value critic" assert hasattr(self.actor, "ivalue_net1"), "RND_PPO requires ivalue critic" # Get observation shape obs_space = self.envs.observation_space.shape # Get frame stack value frame_stack = 1 if "frame_stack" in self.envs.env_kwargs.keys(): frame_stack = self.envs.env_kwargs["frame_stack"] # Get number of obs channels self.obs_channels = int(obs_space[0] / frame_stack) # Define network type int_net = intrinsic_rewards_network or default_feature_extractor(self.envs.observation_space) # Create target model setattr( self.actor, "rnd_target_net", int_net((self.obs_channels,) + obs_space[1:], **intrinsic_rewards_target_network_kwargs).to(self.device)) # Freeze target model parameters for param in self.actor.rnd_target_net.parameters(): param.requires_grad = False # Create predictor model setattr( self.actor, "rnd_predictor_net", int_net((self.obs_channels,) + obs_space[1:], **intrinsic_rewards_predictor_network_kwargs).to(self.device)) # Define running means for int reward and obs self.state_rms = RunningMeanStd(shape=(1, ) + obs_space[1:], device=self.device) print("---Pre_normalization started.---") obs, rhs, done = self.actor.actor_initial_states(envs.reset()) total_obs = torch.zeros( (self.pre_normalization_length, obs.shape[0], self.obs_channels) + obs.shape[2:]).to(self.device) for i in range(self.pre_normalization_steps * self.pre_normalization_length): _, clipped_action, rhs, _ = self.acting_step(obs, rhs, done) obs, _, _, _ = envs.step(clipped_action) total_obs[i % self.pre_normalization_length].copy_(obs[:, -self.obs_channels:, ...]) if i % self.pre_normalization_length == 0 and i != 0: self.state_rms.update(total_obs.reshape(-1, *total_obs.shape[2:])) print("{}/{}".format(i//self.pre_normalization_length, self.pre_normalization_steps)) envs.reset() print("---Pre_normalization is done.---") # ----- Policy Loss Addons -------------------------------------------- # Sanity check, policy_loss_addons is a PolicyLossAddOn instance # or a list of PolicyLossAddOn instances assert isinstance(policy_loss_addons, (PolicyLossAddOn, list)),\ "RND PPO policy_loss_addons parameter should be a PolicyLossAddOn instance " \ "or a list of PolicyLossAddOn instances" if isinstance(policy_loss_addons, list): for addon in policy_loss_addons: assert isinstance(addon, PolicyLossAddOn), \ "RND PPO policy_loss_addons parameter should be a PolicyLossAddOn " \ "instance or a list of PolicyLossAddOn instances" else: policy_loss_addons = [policy_loss_addons] self.policy_loss_addons = policy_loss_addons for addon in self.policy_loss_addons: addon.setup(self.actor, self.device) # ----- Optimizers ---------------------------------------------------- self.optimizer = optim.Adam(self.actor.parameters(), lr=lr, eps=eps)
[docs] @classmethod def create_factory(cls, lr=1e-4, eps=1e-8, gamma=0.99, num_epochs=4, clip_param=0.2, num_mini_batch=1, test_every=1000, max_grad_norm=0.5, entropy_coef=0.01, value_loss_coef=0.5, num_test_episodes=5, gamma_intrinsic=0.99, ext_adv_coeff=2.0, int_adv_coeff=1.0, predictor_proportion=2.0, pre_normalization_steps=50, pre_normalization_length=128, use_clipped_value_loss=True, intrinsic_rewards_network=None, intrinsic_rewards_target_network_kwargs={}, intrinsic_rewards_predictor_network_kwargs={}, policy_loss_addons=[]): """ Returns a function to create new RND PPO instances. Parameters ---------- lr : float Optimizer learning rate. eps : float Optimizer epsilon parameter. num_epochs : int Number of PPO epochs. gamma : float Discount factor parameter. clip_param : float PPO clipping parameter. num_mini_batch : int Number of batches to create from collected data for actor update. num_test_episodes : int Number of episodes to complete in each test phase. test_every : int Regularity of test evaluations. max_grad_norm : float Gradient clipping parameter. entropy_coef : float PPO entropy coefficient parameter. value_loss_coef : float PPO value coefficient parameter. use_clipped_value_loss : bool Prevent value loss from shifting too fast. gamma_intrinsic: float Discount factor parameter for intrinsic rewards. ext_adv_coeff: float Extrinsic advantage coefficient. int_adv_coeff: float Intrinsic advantage coefficient. predictor_proportion: float Proportion of buffer sample to use to train the predictor network. pre_normalization_steps: int Number of obs running average normalization steps to take before starting to train. pre_normalization_length: int Length of each pre normalization steps (in environment steps). intrinsic_rewards_network : nn.Module PyTorch nn.Module used for target and predictor networks. intrinsic_rewards_target_network_kwargs : dict Keyword arguments for the target network. intrinsic_rewards_predictor_network_kwargs : dict Keyword arguments for the predictor network. policy_loss_addons : list List of PolicyLossAddOn components adding loss terms to the algorithm policy loss. Returns ------- create_algo_instance : func Function that creates a new PPO class instance. algo_name : str Name of the algorithm. """ def create_algo_instance(device, actor, envs): return cls(lr=lr, eps=eps, envs=envs, actor=actor, gamma=gamma, device=device, test_every=test_every, num_epochs=num_epochs, clip_param=clip_param, entropy_coef=entropy_coef, ext_adv_coeff=ext_adv_coeff, int_adv_coeff=int_adv_coeff, max_grad_norm=max_grad_norm, num_mini_batch=num_mini_batch, value_loss_coef=value_loss_coef, gamma_intrinsic=gamma_intrinsic, num_test_episodes=num_test_episodes, predictor_proportion=predictor_proportion, pre_normalization_length=pre_normalization_length, pre_normalization_steps=pre_normalization_steps, use_clipped_value_loss=use_clipped_value_loss, intrinsic_rewards_network=intrinsic_rewards_network, intrinsic_rewards_target_network_kwargs=intrinsic_rewards_target_network_kwargs, intrinsic_rewards_predictor_network_kwargs=intrinsic_rewards_predictor_network_kwargs, policy_loss_addons=policy_loss_addons) return create_algo_instance, prl.RND_PPO
@property def gamma(self): """Returns discount factor gamma.""" return self._gamma @property def start_steps(self): """Returns the number of steps to collect with initial random policy.""" return self._start_steps @property def num_epochs(self): """ Returns the number of times the whole buffer is re-used before data collection proceeds. """ return self._num_epochs @property def update_every(self): """ Returns the number of data samples collected between network update stages. """ return self._update_every @property def num_mini_batch(self): """ Returns the number of times the whole buffer is re-used before data collection proceeds. """ return self._num_mini_batch @property def mini_batch_size(self): """ Returns the number of mini batches per epoch. """ return self._mini_batch_size @property def test_every(self): """Number of network updates between test evaluations.""" return self._test_every @property def num_test_episodes(self): """ Returns the number of episodes to complete when testing. """ return self._num_test_episodes
[docs] def acting_step(self, obs, rhs, done, deterministic=False): """ PPO acting function. Parameters ---------- obs: torch.tensor Current world observation rhs: torch.tensor RNN recurrent hidden state (if policy is not a RNN, rhs will contain zeroes). done: torch.tensor 1.0 if current obs is the last one in the episode, else 0.0. deterministic: bool Whether to randomly sample action from predicted distribution or take the mode. Returns ------- action: torch.tensor Predicted next action. clipped_action: torch.tensor Predicted next action (clipped to be within action space). rhs: torch.tensor Policy recurrent hidden state (if policy is not a RNN, rhs will contain zeroes). other: dict Additional PPO predictions, value score and action log probability. """ with torch.no_grad(): (action, clipped_action, logp_action, rhs, entropy_dist, dist) = self.actor.get_action( obs, rhs, done, deterministic) value_dict = self.actor.get_value(obs, rhs, done) ext_value = value_dict.pop("value_net1") int_value = value_dict.pop("ivalue_net1") rhs = value_dict.pop("rhs") # predict intrinsic reward obs = obs[:, -self.obs_channels:, ...] obs = torch.clamp((obs - self.state_rms.mean.float()) / (self.state_rms.var.float() ** 0.5), -5, 5) # obs = torch.clamp((obs - self.state_rms.mean) / (self.state_rms.var ** 0.5), -5, 5).float() predictor_encoded_features = self.actor.rnd_predictor_net(obs) target_encoded_features = self.actor.rnd_target_net(obs) int_reward = (predictor_encoded_features - target_encoded_features).pow(2).mean(1).unsqueeze(1) other = {prl.VAL: ext_value, prl.IVAL: int_value, prl.LOGP: logp_action, prl.IREW: int_reward} return action, clipped_action, rhs, other
[docs] def compute_loss(self, data): """ Compute PPO loss from data batch. Parameters ---------- data: dict Data batch dict containing all required tensors to compute PPO loss. Returns ------- value_loss: torch.tensor value term of PPO loss. action_loss: torch.tensor policy term of PPO loss. dist_entropy: torch.tensor policy term of PPO loss. loss: torch.tensor PPO loss. """ o, rhs, a, old_v = data[prl.OBS], data[prl.RHS], data[prl.ACT], data[prl.VAL] r, d, old_logp, adv = data[prl.RET], data[prl.DONE], data[prl.LOGP], data[prl.ADV] # RDN PPO ir, old_iv, iadv = data[prl.IRET], data[prl.IVAL], data[prl.IADV] mask = data[prl.MASK] if prl.MASK in data.keys() else None advs = adv * self.ext_adv_coeff + iadv * self.int_adv_coeff new_logp, dist_entropy, dist = self.actor.evaluate_actions(o, rhs, d, a) new_vs = self.actor.get_value(o, rhs, d) new_v = new_vs.get("value_net1") new_iv = new_vs.get("ivalue_net1") # Policy loss ratio = torch.exp(new_logp - old_logp) surr1 = ratio * advs surr2 = torch.clamp(ratio, 1.0 - self.clip_param, 1.0 + self.clip_param) * advs action_loss = - torch.min(surr1, surr2).mean() # Ext value loss if self.use_clipped_value_loss: # Ext value value_losses = (new_v - r).pow(2) value_pred_clipped = old_v + (new_v - old_v).clamp(-self.clip_param, self.clip_param) value_losses_clipped = (value_pred_clipped - r).pow(2) value_loss = 0.5 * torch.max(value_losses, value_losses_clipped).mean() # Int value ivalue_losses = (new_iv - ir).pow(2) ivalue_pred_clipped = old_iv + (new_iv - old_iv).clamp(-self.clip_param, self.clip_param) ivalue_losses_clipped = (ivalue_pred_clipped - ir).pow(2) ivalue_loss = 0.5 * torch.max(ivalue_losses, ivalue_losses_clipped).mean() else: # Ext value value_loss = 0.5 * (r - new_v).pow(2).mean() * self.value_loss_coef # Int value ivalue_loss = 0.5 * (ir - new_iv).pow(2).mean() * self.value_loss_coef total_value_loss = value_loss + ivalue_loss o = o[:, -self.obs_channels:, ...] o = torch.clamp((o - self.state_rms.mean.float()) / (self.state_rms.var.float() ** 0.5), -5, 5) # Rnd loss encoded_target_features = self.actor.rnd_target_net(o) encoded_predictor_features = self.actor.rnd_predictor_net(o) loss = (encoded_predictor_features - encoded_target_features).pow(2).mean(-1) mask2 = torch.rand(loss.size(), device=self.device) mask2 = (mask2 <= self.predictor_proportion).float() mask = mask.squeeze() * mask2 if mask is not None else mask2 rnd_loss = (mask * loss).sum() / torch.max(mask.sum(), torch.Tensor([1]).to(self.device)) entropy_loss = self.entropy_coef * dist_entropy loss = total_value_loss + action_loss - entropy_loss + rnd_loss # Extend policy loss with addons addons_info = {} for addon in self.policy_loss_addons: addon_loss, addons_info = addon.compute_loss_term(data, dist, addons_info) loss += addon_loss return value_loss, ivalue_loss, action_loss, rnd_loss, entropy_loss, loss, addons_info
[docs] def compute_gradients(self, batch, grads_to_cpu=True): """ Compute loss and compute gradients but don't do optimization step, return gradients instead. Parameters ---------- data: dict data batch containing all required tensors to compute PPO loss. grads_to_cpu: bool If gradient tensor will be sent to another node, need to be in CPU. Returns ------- grads: list of tensors List of actor gradients. info: dict Dict containing current PPO iteration information. """ value_loss, ivalue_loss, action_loss, rnd_loss, dist_entropy, loss, addons_info = self.compute_loss(batch) self.optimizer.zero_grad() loss.backward() clip_grad_norm_(self.actor.parameters()) # nn.utils.clip_grad_norm_(self.actor.parameters(), self.max_grad_norm) pi_grads = get_gradients(self.actor.policy_net, grads_to_cpu=grads_to_cpu) v_grads = get_gradients(self.actor.value_net1, grads_to_cpu=grads_to_cpu) iv_grads = get_gradients(self.actor.ivalue_net1, grads_to_cpu=grads_to_cpu) pred_grads = get_gradients(self.actor.rnd_predictor_net, grads_to_cpu=grads_to_cpu) grads = {"pi_grads": pi_grads, "v_grads": v_grads, "iv_grads": iv_grads, "pred_grads": pred_grads} info = { "loss": loss.item(), "value_loss": value_loss.item(), "ivalue_loss": ivalue_loss.item(), "rnd_loss": rnd_loss.item(), "action_loss": action_loss.item(), "entropy_loss": dist_entropy.item(), "mean_intrinsic_rewards": batch[prl.IREW].mean().cpu().item(), "min_intrinsic_rewards": batch[prl.IREW].min().cpu().item(), "max_intrinsic_rewards": batch[prl.IREW].max().cpu().item(), } info.update(addons_info) return grads, info
[docs] def apply_gradients(self, gradients=None): """ Take an optimization step, previously setting new gradients if provided. Parameters ---------- gradients: list of tensors List of actor gradients. """ if gradients is not None: set_gradients( self.actor.policy_net, gradients=gradients["pi_grads"], device=self.device) set_gradients( self.actor.value_net1, gradients=gradients["v_grads"], device=self.device) set_gradients( self.actor.ivalue_net1, gradients=gradients["iv_grads"], device=self.device) set_gradients( self.actor.rnd_predictor_net, gradients=gradients["pred_grads"], device=self.device) self.optimizer.step()
[docs] def set_weights(self, actor_weights): """ Update actor with the given weights Parameters ---------- actor_weights: dict of tensors Dict containing actor weights to be set. """ self.actor.load_state_dict(actor_weights)
[docs] def update_algorithm_parameter(self, parameter_name, new_parameter_value): """ If `parameter_name` is an attribute of the algorithm, change its value to `new_parameter_value value`. Parameters ---------- parameter_name : str Worker.algo attribute name new_parameter_value : int or float New value for `parameter_name`. """ if hasattr(self, parameter_name): setattr(self, parameter_name, new_parameter_value) if parameter_name == "lr": for param_group in self.optimizer.param_groups: param_group['lr'] = new_parameter_value