Source code for pytorchrl.agent.storages.off_policy.ere_buffer

import numpy as np
import torch
from collections import deque
import pytorchrl as prl
from pytorchrl.agent.storages.off_policy.per_buffer import PERBuffer as B


[docs]def dim0_reshape(tensor, size1, size2): """ Reshapes tensor so indices are defined like this: 00, 01, 02, 03, 04, 05, 06, 07, 08, 09, size + 1, ..., self.max_size 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, size + 1, ..., self.max_size 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, size + 1, ..., self.max_size """ return np.moveaxis(tensor, [0, 1], [1, 0])[:, size1: size2].reshape(-1, *tensor.shape[2:])
[docs]class EREBuffer(B): """ Storage class for Off-Policy algorithms with Emphasizing Recent Experience buffer (https://arxiv.org/abs/1906.04009). This component extends PERBuffer, allowing to combine ERE with Prioritized Experience Replay (PER) if required. Nonetheless PER parameters, epsilon, alpha and beta, are set by default to values that make PER equivalent to a vanilla replay buffer, allowing to use only ERE. Also n step learning can be combined with PER and ERE using this component, but default n_step value is 1. Parameters ---------- size : int Storage capacity along time axis. device: torch.device CPU or specific GPU where data tensors will be placed and class computations will take place. Should be the same device where the actor model is located. envs : VecEnv Vector of environments instance. actor : Actor Actor class instance. algorithm : Algorithm Algorithm class instance. n_step : int or float Number of future steps used to computed the truncated n-step return value. epsilon : float PER epsilon parameter. alpha : float PER alpha parameter. beta : float PER beta parameter. default_error : int or float Default TD error value to use for newly added data samples. eta : float ERE eta parameter. cmin : int ERE cmin parameter. """ # Data fields to store in buffer and contained in the generated batches storage_tensors = prl.OffPolicyDataKeys def __init__(self, size, device, actor, algorithm, envs, n_step=1, epsilon=0.0, alpha=0.0, beta=1.0, default_error=1000000, eta=1.0, cmin=5000): super(EREBuffer, self).__init__( size=size, device=device, actor=actor, algorithm=algorithm, envs=envs, n_step=n_step, epsilon=epsilon, alpha=alpha, beta=beta, default_error=default_error) self.eta = eta self.cmin = cmin self.initial_eta = eta self.eps_reward = deque(maxlen=20) self.max_grad_rew, self.min_grad_rew = - np.Inf, 0.0
[docs] @classmethod def create_factory(cls, size, n_step=1, epsilon=0.0, alpha=0.0, beta=1.0, default_error=1000000, eta=1.0, cmin=5000): """ Returns a function that creates EREBuffer instances. Parameters ---------- size : int Storage capacity along time axis. n_step : int or float Number of future steps used to computed the truncated n-step return value. epsilon : float PER epsilon parameter. alpha : float PER alpha parameter. beta : float PER beta parameter. default_error : int or float Default TD error value to use for newly added data samples. eta : float ERE eta parameter. cmin : int ERE cmin parameter. Returns ------- create_buffer_instance : func creates a new EREBuffer class instance. """ def create_buffer(device, actor, algorithm, envs): """Create and return a EREBuffer instance.""" return cls(size, device, actor, algorithm, envs, n_step, epsilon, alpha, beta, default_error, eta, cmin) return create_buffer
[docs] def after_gradients(self, batch, info): """ Steps required after updating actor policy model Parameters ---------- batch : dict Data batch used to compute the gradients. info : dict Additional relevant info from gradient computation. Returns ------- info : dict info dict updated with relevant info from Storage. """ if "per_weights" in batch.keys() and isinstance(batch["per_weights"], torch.Tensor): num_proc = self.data[prl.DONE].shape[1] if self.recurrent_actor: N = self.size // self.sequence_length ck = batch.pop("ck") start = int((N - ck) * self.sequence_length) end = int(self.size + self.sequence_length) # Get data indices and td errors idxs = np.array(batch.pop("idxs")).reshape(-1, self.sequence_length) errors = info[prl.ALGORITHM]["errors"].reshape(-1, self.non_overlap_length) # Since sequences overlap, update both current sequence and # start of the next overlapping sequence idxs = np.concatenate([idxs, idxs[:, :self.overlap_length] + self.overlap_length], axis=1) errors = torch.cat([errors, errors[:, :self.overlap_length]], dim=1) for i, e in zip(idxs, errors): # Assign priorities to both end of current sequence # and start of next sequence dim0_reshape(self.data["priority"], start, end )[i] = self.get_priority(e.unsqueeze(1)) # Update current sequence average priority sequence = dim0_reshape(self.data["priority"], start, end)[ i - self.overlap_length] pri = self.get_sequence_priority(sequence) dim0_reshape(self.data["priority"], start, end)[i - self.overlap_length] = pri * np.ones(sequence.shape) # Update next sequence average if some overlap if self.overlap_length > 0: sequence = dim0_reshape(self.data["priority"], start, end)[i + self.non_overlap_length] pri = self.get_sequence_priority(sequence) dim0_reshape(self.data["priority"], start, end)[i + self.non_overlap_length] = pri * np.ones(sequence.shape) else: N = self.size * num_proc ck = batch.pop("ck") self.data["priority"][0:self.size].reshape(-1, *self.data[ "priority"].shape[2:])[N - ck: N][batch.pop("idxs")] = \ self.get_priority(info[prl.ALGORITHM]["errors"]) info = self.update_eta(info) return info
[docs] def update_eta(self, info): """ Adjust eta parameter based on how fast or slow the agent is learning in recent episodes """ slopes = np.linspace(-1.0, 1.0, 1000) etas = np.linspace(1.0, self.initial_eta, 1000) if prl.EPISODES in info.keys() and "TrainReward" in info[prl.EPISODES].keys(): self.eps_reward.append(info[prl.EPISODES]["TrainReward"]) if len(self.eps_reward) == self.eps_reward.maxlen: reward_grad = np.gradient(self.eps_reward).mean() if reward_grad > self.max_grad_rew: self.max_grad_rew = reward_grad else: self.max_grad_rew *= 0.9999 if reward_grad < - self.min_grad_rew: self.min_grad_rew = abs(reward_grad) else: self.min_grad_rew *= 0.9999 if np.sign(reward_grad) == 1: reward_grad /= self.max_grad_rew else: reward_grad /= self.min_grad_rew idx = (np.abs(slopes - reward_grad)).argmin() self.eta = etas[idx] info[prl.ALGORITHM].update({"RewardGradient": reward_grad}) info[prl.ALGORITHM].update({"eta": self.eta}) return info
[docs] def generate_batches(self, num_mini_batch, mini_batch_size, num_epochs=1): """ Returns a batch iterator to update actor. Parameters ---------- num_mini_batch : int Number mini batches per epoch. mini_batch_size : int Number of samples contained in each mini batch. num_epochs : int Number of epochs. Yields ------ batch : dict Generated data batches. Contains also extra information relevant to ERE. """ num_proc = self.data[prl.DONE].shape[1] if self.recurrent_actor: # Batches to a feed recurrent actor for k in range(num_mini_batch): # Define batch structure batch = {k: {} if not isinstance(self.data[k], dict) else {x: {} for x in self.data[k]} for k in self.data.keys()} sequences_x_batch = mini_batch_size // self.sequence_length + 1 sequences_x_proc = int(self.size / self.sequence_length) N = sequences_x_proc assert self.size % self.sequence_length == 0, \ "Buffer does not contain an integer number of complete rollout sequences" if num_proc * self.size < self.cmin: # Standard seq_idxs = np.random.randint(0, num_proc * sequences_x_proc, size=sequences_x_batch) per_weigths, ck = 1.0, N start, end = int((N - ck) * self.sequence_length), int(self.size) elif self.alpha == 0.0: # ERE cmin = self.cmin // self.sequence_length // num_proc ck = int(max(N * self.eta ** ((1000 * k) / num_mini_batch), cmin)) # reshape between N - ck, N start, end = int((N - ck) * self.sequence_length), int(self.size) seq_idxs = np.random.randint(0, ck * num_proc, size=sequences_x_batch) per_weigths = 1.0 else: # PER + ERE cmin = self.cmin // self.sequence_length // num_proc ck = max(N * self.eta ** ((1000 * k) / num_mini_batch), cmin) # reshape between N - ck, N start = int((N - ck) * self.sequence_length) end = int(self.size) priors = dim0_reshape(self.data["priority"], start, end) probs = priors / priors.sum() per_weigths = np.power(num_proc * self.size * probs, - self.beta) per_weigths = per_weigths / per_weigths.max() # Trick to allow updating priorities of next overlapping # sequences after gradient computation. Insert some "0.0" # values in per_weights for end-of-row + 1 sequences. per_weigths = np.split(per_weigths, num_proc) per_weigths = [np.concatenate([chunk, np.zeros(( self.sequence_length, 1))]) for chunk in per_weigths] per_weigths = np.concatenate(per_weigths) # Trick to allow updating priorities of next overlapping # sequences after gradient computation. Insert some "0.0" # values in probs for end-of-row + 1 sequences. probs = probs[int((N - ck + 1) * self.sequence_length) - 1 ::self.sequence_length] * self.sequence_length probs = np.split(probs, num_proc) probs = [np.concatenate([chunk, np.zeros((1, 1))]) for chunk in probs] ext_probs = np.concatenate(probs).squeeze(1) end += self.sequence_length seq_idxs = np.random.choice(range( len(ext_probs)), size=sequences_x_batch, p=ext_probs) # Get data indexes idxs = [] for idx in seq_idxs: idxs += range(idx * self.sequence_length, (idx + 1) * self.sequence_length) if not isinstance(per_weigths, float): per_weigths = torch.as_tensor(per_weigths[idxs], dtype=torch.float32).to(self.device) # Fill up batch with data for k, v in self.data.items(): # Only first recurrent state in each sequence needed positions = seq_idxs * self.sequence_length if k in (prl.RHS, prl.RHS2) else idxs if isinstance(v, dict): for x, y in v.items(): t = dim0_reshape(y, start, end)[positions] batch[k][x] = torch.as_tensor(t, dtype=torch.float32).to(self.device) else: t = dim0_reshape(v, start, end)[positions] batch[k] = torch.as_tensor(t, dtype=torch.float32).to(self.device) batch.update({"per_weights": per_weigths, "n_step": self.n_step, "idxs": idxs, "ck": ck}) yield batch else: # Batches for a feed forward actor for k in range(num_mini_batch): batch = {k: {} for k in self.data.keys()} N = num_proc * self.size per_weigths = None if num_proc * self.size < self.cmin: # Standard samples = np.random.randint(0, num_proc * self.size, size=mini_batch_size) ck = N elif self.alpha == 0.0: # ERE ck = int(max(N * self.eta ** ((1000 * k) / num_mini_batch), self.cmin)) samples = np.random.randint(ck, size=mini_batch_size) else: # PER + ERE ck = int(max(N * self.eta ** ((1000 * k) / num_mini_batch), self.cmin)) priors = self.data["priority"][0:self.size].reshape(-1, 1)[N - ck: N] probs = priors / priors.sum() samples = np.random.choice(range(ck), size=mini_batch_size, p=probs.squeeze(1)) per_weigths = np.power(ck * probs, - self.beta) per_weigths /= per_weigths.max() per_weigths = per_weigths[samples] per_weigths = torch.as_tensor(per_weigths, dtype=torch.float32).to(self.device) for k, v in self.data.items(): if k in (prl.RHS, prl.RHS2): size, idxs = 1, np.array([0]) else: size, idxs = self.size, samples if isinstance(v, dict): for x, y in v.items(): batch[k][x] = torch.as_tensor(y[0:self.size].reshape( -1, *y.shape[2:])[N - ck: N][idxs], dtype=torch.float32).to(self.device) else: batch[k] = torch.as_tensor(v[0:self.size].reshape( -1, *v.shape[2:])[N - ck: N][idxs], dtype=torch.float32).to(self.device) batch.update({"n_step": self.n_step, "idxs": idxs, "ck": ck}) if per_weigths is not None: batch.update({"per_weights": per_weigths}) yield batch