import numpy as np
import torch
import pytorchrl as prl
from pytorchrl.agent.storages.base import Storage as S
import gym
from torch.nn.functional import one_hot
[docs]def dim0_reshape(tensor, size):
"""
Reshapes tensor so indices are defined like this:
00, 01, 02, 03, 04, 05, 06, 07, 08, 09, size + 1, ..., self.max_size
10, 11, 12, 13, 14, 15, 16, 17, 18, 19, size + 1, ..., self.max_size
20, 21, 22, 23, 24, 25, 26, 27, 28, 29, size + 1, ..., self.max_size
"""
return np.moveaxis(tensor, [0, 1], [1, 0])[:, 0: size].reshape(-1, *tensor.shape[2:])
[docs]class MBReplayBuffer(S):
"""Storage class for Model Based algorithms.
Implements oll necessary functions to handle data storage and processing in model-based RL algorithms.
Attributes
----------
size : int
Storage capacity along time axis.
device : torch.device
CPU or specific GPU where data tensors will be placed and class
computations will take place. Should be the same device where the
actor model is located.
actor : Actor
Actor class instance.
algo : Algorithm
Algorithm class instance
"""
# Data fields to store in buffer and contained in the generated batches
storage_tensors = prl.MBDataKeys
def __init__(self, size, device, actor, algorithm, envs):
self.actor = actor
self.scaler = actor.dynamics_model.standard_scaler
self.device = device
self.algo = algorithm
self.max_size, self.size, self.step = size, 0, 0
self.data = {k: None for k in self.storage_tensors} # lazy init
self.reset()
[docs] @classmethod
def create_factory(cls, size):
"""
Returns a function that creates ReplayBuffer instances.
Parameters
----------
size : int
Storage capacity along time axis.
Returns
-------
create_buffer_instance : func
creates a new MBReplayBuffer class instance.
"""
def create_buffer(device, actor, algorithm, envs):
"""Create and return a ReplayBuffer instance."""
return cls(size, device, actor, algorithm, envs)
return create_buffer
[docs] def init_tensors(self, sample):
"""
Lazy initialization of data tensors from a sample.
Parameters
----------
sample : dict
Data sample (containing all tensors of an environment transition)
"""
for k, v in sample.items():
if k not in self.storage_tensors:
continue
if isinstance(v, dict):
self.data[k] = {}
for x, y in sample[k].items():
self.data[k][x] = np.zeros((self.max_size, *y.shape), dtype=np.float32)
else:
self.data[k] = np.zeros((self.max_size, *v.shape), dtype=np.float32)
[docs] def get_data_slice(self, start_pos, end_pos):
"""
Makes a copy of all tensors in the buffer between steps `start_pos`
and `end_pos`.
Parameters
----------
start_pos : int
initial slice position.
end_pos : int
final slice position.
Returns
-------
data : dict
data slice copied from the buffer.
"""
copied_data = {k: None for k in self.storage_tensors}
for k, v in self.data.items():
if v is None:
continue
if isinstance(self.data[k], dict):
copied_data[k] = {x: None for x in self.data[k]}
for x, y in v.items():
copied_data[k][x] = np.copy(y[start_pos:end_pos])
else:
copied_data[k] = np.copy(v[start_pos:end_pos])
return copied_data
[docs] def get_all_buffer_data(self, data_to_cpu=False):
"""
Return all currently stored data. If data_to_cpu, no need to do
anything since data tensors are already in cpu memory.
Parameters
----------
data_to_cpu : bool
Whether or not to move data tensors to cpu memory.
Returns
-------
data : dict
data currently stored in the buffer.
"""
# Define data structure
data = {k: None if not isinstance(self.data[k], dict) else
{x: None for x in self.data[k]} for k in self.data}
idx = self.step
# Fill up data
for k, v in self.data.items():
if v is None:
continue
if isinstance(self.data[k], dict):
for x, y in self.data[k].items():
data[k][x] = y[:idx]
else:
data[k] = v[:idx]
return data
[docs] def reset(self):
"""
Set class size and step to zero. If self.actor uses RNNs, add overlap
slice of last sequence before reset at the beginning of the storage.
"""
self.size -= self.step
self.step = 0
[docs] def insert_data_slice(self, new_data):
"""
Appends new_data to currently stored data.
Parameters
----------
new_data : dict
Dictionary of env transition samples to be added to self.data.
"""
lengths = []
for k, v in new_data.items():
if v is None:
continue
if isinstance(new_data[k], dict):
if self.data[k] is None:
self.data[k] = {i: None for i in new_data[k].keys()}
for x, y in new_data[k].items():
length = self.insert_single_tensor_slice(self.data[k], x, y)
lengths.append(length)
else:
length = self.insert_single_tensor_slice(self.data, k, v)
lengths.append(length)
assert len(set(lengths)) == 1
self.step = (self.step + length) % self.max_size
self.size = min(self.size + length, self.max_size)
[docs] def insert_single_tensor_slice(self, tensor_storage, tensor_key, tensor_values):
"""
Appends tensor_value to buffer dict using tensor_key as key.
Parameters
----------
tensor_storage :
tensor_key : str
key to use to store the tensor.
tensor_values : np.ndarray
tensor values.
Returns
-------
l : int
length (time axe) of the tensor added to the buffer.
"""
l = tensor_values.shape[0]
if tensor_storage[tensor_key] is None: # If not defined, initialize tensor
tensor_storage[tensor_key] = np.zeros((self.max_size, *tensor_values.shape[1:]), dtype=np.float32)
if self.step + l <= self.max_size: # If enough space, add tensor at the end
tensor_storage[tensor_key][self.step:self.step + l] = tensor_values
else: # Circular buffer
tensor_storage[tensor_key][
self.step:self.max_size] = tensor_values[0:self.max_size - self.step]
tensor_storage[tensor_key][0:l - self.max_size + self.step] = tensor_values[self.max_size - self.step:]
return l
[docs] def insert_transition(self, sample):
"""
Store new transition sample.
Parameters
----------
sample : dict
Data sample (containing all tensors of an environment transition)
"""
# Data tensors lazy initialization
if self.size == 0 and self.data[prl.OBS] is None:
self.init_tensors(sample)
# Insert
for k, v in sample.items():
if k in self.data.keys():
if isinstance(sample[k], dict):
for x, y in sample[k].items():
self.data[k][x][self.step] = y.cpu()
else:
self.data[k][self.step] = v.cpu()
# Update
self.step = (self.step + 1) % self.max_size
self.size = min(self.size + 1, self.max_size)
[docs] def before_gradients(self):
"""
Steps required before updating actor policy model.
"""
pass
[docs] def after_gradients(self, batch, info):
"""
Steps required after updating actor policy model validation_percentage
Parameters
----------
batch : dict
Data batch used to compute the gradients.
info : dict
Additional relevant info from gradient computation.
Returns
-------
info : dict
info dict updated with relevant info from Storage.
"""
return info
[docs] def generate_batches(self, num_mini_batch, mini_batch_size=256, num_epochs=1):
"""
Returns a batch iterator to update dynamics model.
Parameters
----------
num_mini_batch : int
Number mini batches per epoch. (not used since MB training)
mini_batch_size : int
Number of samples contained in each mini batch.
num_epochs : int
Number of epochs. (not used since MB training)
Yields
------
batch : dict
Generated data batches.
"""
for k, v in self.data.items():
if k == prl.OBS:
observations = v[:self.size]
elif k == prl.ACT:
actions = v[:self.size]
elif k == prl.OBS2:
next_observations = v[:self.size]
elif k == prl.REW:
rewards = v[:self.size]
else:
pass
if type(self.actor.action_space) == gym.spaces.discrete.Discrete:
actions = actions.astype(np.int64)
actions = np.eye(actions.max() + 1)[actions].squeeze(1)
assert actions.shape == (observations.shape[0], 1, self.actor.action_space.n)
inputs = np.concatenate((observations, actions), axis=-1)
delta_state = next_observations - observations
if self.actor.dynamics_model.reward_function is not None:
targets = delta_state
else:
targets = np.concatenate((delta_state, rewards), axis=-1)
# watch out inputs have shape (all data, 1, dim) not sure why this extra dim
inputs = torch.from_numpy(inputs).float().to(self.device).squeeze(1)
targets = torch.from_numpy(targets).float().to(self.device).squeeze(1)
self.scaler.fit(inputs=inputs, targets=targets)
norm_inputs, norm_targets = self.scaler.transform(inputs=inputs, targets=targets)
train_indices = np.arange(observations.shape[0])
np.random.shuffle(train_indices)
for batch_number, j in enumerate(range((norm_inputs.shape[0] // mini_batch_size) + 1)):
start_index = j * mini_batch_size
indices_shuffled = train_indices[start_index:start_index + mini_batch_size]
input_batch = norm_inputs[indices_shuffled, :]
target_batch = norm_targets[indices_shuffled, :]
batch = {"train_input": input_batch,
"train_label": target_batch,
"batch_number": batch_number}
yield batch
[docs] def update_storage_parameter(self, parameter_name, new_parameter_value):
"""
If `parameter_name` is an attribute of the algorithm, change its value
to `new_parameter_value value`.
Parameters
----------
parameter_name : str
Attribute name
new_parameter_value : int or float
New value for `parameter_name`.
"""
if hasattr(self, parameter_name):
if parameter_name == "max_size" and self.recurrent_actor:
new_parameter_value = (new_parameter_value // self.sequence_length) * self.sequence_length
new_parameter_value *= 2
setattr(self, parameter_name, new_parameter_value)