Source code for pytorchrl.agent.algorithms.model_based.mpc_pddm

import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

import pytorchrl as prl
from pytorchrl.agent.algorithms.base import Algorithm
from pytorchrl.agent.algorithms.utils import get_gradients, set_gradients


[docs]class MPC_PDDM(Algorithm): """ Model-Based MPC Planning with Deep Dynamics Models (PDDM) class. Trains a model of the environment and uses PDDM to select actions. Parameters ---------- lr: float Dynamics model learning rate. envs : VecEnv Vector of environments instance. actor : Actor actor class instance. device : torch.device CPU or specific GPU where class computations will take place. mb_epochs : int Training epochs for the dynamics model. start_steps: int Number of steps collected with initial random policy. update_every : int Amount of data collected in between dynamics model updates. action_noise : Exploration noise. mini_batch_size : int Size of actor update batches. gamma : float Reward-weighting factor. beta : float Action filtering coefficient. max_grad_norm : float Gradient clipping parameter. test_every : int Regularity of test evaluations. num_test_episodes : int Number of episodes to complete in each test phase. """ def __init__(self, lr, envs, actor, device, start_steps, update_every, mb_epochs, action_noise, mini_batch_size, gamma=1.0, beta=0.5, max_grad_norm=0.5, test_every=10, num_test_episodes=3, ): # ---- General algo attributes ---------------------------------------- # Number of steps collected with initial random policy self._start_steps = int(start_steps) # Times data in the buffer is re-used before data collection proceeds self._num_epochs = int(mb_epochs) # Tracks the number of times data is reused self.mb_train_epochs = 0 # Number of data samples collected between network update stages self._update_every = int(update_every) # Number mini batches per epoch self._num_mini_batch = int(1) # Depends on how much data is available # Size of update mini batches self._mini_batch_size = int(mini_batch_size) # Number of network updates between test evaluations self._test_every = int(test_every) # Number of episodes to complete when testing self._num_test_episodes = num_test_episodes # ---- PDDM-specific attributes ---------------------------------------- # Number of episodes to complete when testing self.iter = 0 self.envs = envs self.actor = actor self.device = device self.reuse_data = False self.action_noise = action_noise self.max_grad_norm = max_grad_norm assert isinstance(self.actor.dynamics_model.action_space, gym.spaces.Box),\ "PDDM requires a continuous action space!" self.beta = beta self._gamma = gamma # Reward-weighting factor self.mu = np.zeros((self.actor.horizon, self.actor.action_dims)) # ----- Optimizers ---------------------------------------------------- self.dynamics_optimizer = optim.Adam(self.actor.dynamics_model.parameters(), lr=lr) self.loss_func = torch.nn.MSELoss()
[docs] @classmethod def create_factory(cls, lr, start_steps, update_every, mb_epochs, action_noise, mini_batch_size, gamma=1.0, beta=0.5, max_grad_norm=0.5, test_every=10, num_test_episodes=3): """ Returns a function to create a new Model-Based MPC instance. Parameters ---------- lr: float Dynamics model learning rate. mb_epochs : int Training epochs for the dynamics model. start_steps: int Number of steps collected with initial random policy. update_every : int Amount of data collected in between dynamics model updates. action_noise : Exploration noise. mini_batch_size : int Size of actor update batches. gamma : float Reward-weighting factor. beta : float Action filtering coefficient. max_grad_norm : float Gradient clipping parameter. test_every : int Regularity of test evaluations. num_test_episodes : int Number of episodes to complete in each test phase. Returns ------- create_algo_instance : func Function that creates a new MPC_PDDM class instance. algo_name : str Name of the algorithm. """ def create_algo_instance(device, actor, envs): return cls(lr=lr, beta=beta, envs=envs, actor=actor, gamma=gamma, device=device, mb_epochs=update_every, start_steps=start_steps, update_every=update_every, action_noise=action_noise, mini_batch_size=mini_batch_size, num_test_episodes=num_test_episodes, max_grad_norm=max_grad_norm, test_every=test_every) return create_algo_instance, prl.MPC_RS
@property def gamma(self): """Returns discount factor gamma.""" return self._gamma @property def start_steps(self): """Returns the number of steps to collect with initial random policy.""" return self._start_steps @property def num_epochs(self): """ Returns the number of times the whole buffer is re-used before data collection proceeds. """ return self._num_epochs @property def update_every(self): """ Returns the number of data samples collected between network update stages. """ return self._update_every @property def num_mini_batch(self): """ Returns the number of times the whole buffer is re-used before data collection proceeds. """ return self._num_mini_batch @property def mini_batch_size(self): """ Returns the number of mini batches per epoch. """ return self._mini_batch_size @property def test_every(self): """Number of network updates between test evaluations.""" return self._test_every @property def num_test_episodes(self): """ Returns the number of episodes to complete when testing. """ return self._num_test_episodes
[docs] def update_mu(self, action_hist, returns): """ Updates the mean value for the action sampling distribution. Parameters ---------- action_hist: np.array Action history of the planned trajectories. returns: np.array Returns of the planned trajectories. Returns ------- mu: np.array Updates mean value. """ assert action_hist.shape == (self.actor.n_planner, self.actor.horizon, self.actor.action_dims) assert returns.shape == (self.actor.n_planner, 1) c = np.exp(self.gamma * (returns - np.max(returns))) d = np.sum(c) + 1e-10 assert c.shape == (self.actor.n_planner, 1) assert d.shape == (), "Has shape {}".format(d.shape) c_expanded = c[:, :, None] assert c_expanded.shape == (self.actor.n_planner, 1, 1) weighted_actions = c_expanded * action_hist # print("D", d) # print("weighted_actions", weighted_actions.sum(0)) self.mu = weighted_actions.sum(0) / d assert self.mu.shape == (self.actor.horizon, self.actor.action_dims) return self.mu[0]
[docs] def sample_actions(self, past_action): """ Samples action trajectories. Parameters ---------- past_action: np.array Previous action mean value. Returns ------- actions: np.array Sampled action trajectories. """ u = np.random.normal(loc=0, scale=1.0, size=(self.actor.n_planner, self.actor.horizon, self.actor.action_dims)) actions = u.copy() for t in range(self.actor.horizon): if t == 0: actions[:, t, :] = self.beta * (self.mu[t, :] + u[:, t, :]) + (1 - self.beta) * past_action else: actions[:, t, :] = self.beta * (self.mu[t, :] + u[:, t, :]) + (1 - self.beta) * actions[:, t - 1, :] assert actions.shape == ( self.actor.n_planner, self.actor.horizon, self.actor.action_dims), "Has shape {} but should have shape {}".format( actions.shape, (self.actor.n_planner, self.actor.horizon, self.actor.action_dims)) actions = np.clip(actions, self.actor.action_low, self.actor.action_high) return actions
[docs] def get_pred_trajectories(self, states, model): """ Calculates the returns when planning given a state and a model. Parameters ---------- states: torch.Tensor Initial states that are used for the planning. model: dynamics model nn.Module The dynamics model that is used to predict the next state and reward. Returns ------- actions: np.array Action history of the sampled trajectories used for planning. returns: np.array Returns of the action trajectories. """ returns = np.zeros((self.actor.n_planner, 1)) np.random.seed() past_action = self.mu[0].copy() actions = self.sample_actions(past_action) torch_actions = torch.from_numpy(actions).float().to(self.device) for t in range(self.actor.horizon): with torch.no_grad(): actions_t = torch_actions[:, t, :] assert actions_t.shape == (self.actor.n_planner, self.actor.action_dims) states, rewards = model.predict(states, actions_t) returns += rewards.cpu().numpy() return actions, returns
[docs] def acting_step(self, obs, rhs, done, deterministic=False): """ Does the MPC search with PDDM action planning process. Parameters ---------- obs: torch.tensor Current world observation rhs: dict RNN recurrent hidden states. done: torch.tensor 1.0 if current obs is the last one in the episode, else 0.0. deterministic: bool Whether to randomly sample action from predicted distribution or taking the mode. Returns ------- action: torch.tensor Predicted next action. clipped_action: torch.tensor Predicted next action (clipped to be within action space). rhs: batch Actor recurrent hidden state. other: dict Additional MPC predictions, which are not used in other algorithms. """ with torch.no_grad(): initial_states = obs.repeat(self.actor.n_planner, 1).to(self.device) actions, returns = self.get_pred_trajectories(initial_states, self.actor.dynamics_model) optimal_action = self.update_mu(actions, returns) if self.action_noise: optimal_action += np.random.normal(0, 0.005, size=optimal_action.shape) action = torch.from_numpy(optimal_action).float().to(self.device) clipped_action = action return action.unsqueeze(0), clipped_action.unsqueeze(0), rhs, {}
[docs] def training_step(self, batch): """ Does the forward pass and loss calculation of the dynamics model given the training data. Parameters ---------- batch: dict Training data with inputs and labels Returns ------- torch.Tensor: Returns the training loss """ train_inputs = batch["train_input"] train_labels = batch["train_label"] self.actor.train() prediction = self.actor.dynamics_model.model(train_inputs) loss = self.loss_func(prediction, train_labels) return loss
[docs] def compute_gradients(self, batch, grads_to_cpu=True): """ Compute loss and compute gradients but don't do optimization step, return gradients instead. Parameters ---------- batch: dict data batch containing all required tensors to compute dynamics model losses. grads_to_cpu: bool If gradient tensor will be sent to another node, need to be in CPU. Returns ------- grads: list of tensors List of actor_critic gradients. info: dict Dict containing current dynamics model iteration information. """ if batch["batch_number"] == 0: # TODO: add reinitialization # reinitializes model for new training # if self.iter != 0 and self.mb_train_epochs == 0: # self.actor.reinitialize_dynamics_model() # self.actor.to(self.device) # self.dynamics_optimizer = optim.Adam(self.actor.dynamics_model.parameters(), lr=self.lr) self.reuse_data = True self.mb_train_epochs += 1 if self.mb_train_epochs == self.num_epochs: self.reuse_data = False self.mb_train_epochs = 0 train_loss = self.training_step(batch) self.dynamics_optimizer.zero_grad() train_loss.backward() nn.utils.clip_grad_norm_(self.actor.dynamics_model.parameters(), self.max_grad_norm) dyna_grads = get_gradients(self.actor.dynamics_model, grads_to_cpu=grads_to_cpu) info = {"train_loss": train_loss.item()} grads = {"dyna_grads": dyna_grads} # once break condition is used set reuse_data to False return grads, info
[docs] def apply_gradients(self, gradients=None): """ Take an optimization step, previously setting new gradients if provided. Update also target networks. Parameters ---------- gradients : list of tensors List of actor gradients. """ if gradients is not None: set_gradients( self.actor.dynamics_model, gradients=gradients["dyna_grads"], device=self.device) self.dynamics_optimizer.step() self.iter += 1
[docs] def set_weights(self, actor_weights): """ Update actor with the given weights. Update also target networks. Parameters ---------- actor_weights : dict of tensors Dict containing actor weights to be set. """ self.actor.load_state_dict(actor_weights) self.iter += 1
[docs] def update_algorithm_parameter(self, parameter_name, new_parameter_value): """ If `parameter_name` is an attribute of the algorithm, change its value to `new_parameter_value value`. Parameters ---------- parameter_name : str Worker.algo attribute name new_parameter_value : int or float New value for `parameter_name`. """ if hasattr(self, parameter_name): setattr(self, parameter_name, new_parameter_value) if parameter_name == "lr": for param_group in self.dynamics_optimizer.param_groups: param_group['lr'] = new_parameter_value