import os
import glob
import copy
import uuid
import torch
import numpy as np
from collections import defaultdict
import pytorchrl as prl
from pytorchrl.agent.storages.on_policy.gae_buffer import GAEBuffer as B
[docs]class PPODBuffer(B):
"""
Storage class for PPO+D algorithm.
Parameters
----------
size : int
Storage capacity along time axis.
device: torch.device
CPU or specific GPU where data tensors will be placed and class
computations will take place. Should be the same device where the
actor model is located.
envs : VecEnv
Vector of environments instance.
actor : Actor
Actor class instance.
algorithm : Algorithm
Algorithm class instance.
initial_human_demos_dir : str
Path to directory containing human initial demonstrations.
initial_agent_demos_dir : str
Path to directory containing other agent initial demonstrations.
supplementary_demos_dir : str
Path to a directory where additional demos can be added after training has started.
these demos will be incorporated into the buffer as bonus agent demos.
target_agent_demos_dir : str
Path to directory where best reward demonstrations should be saved.
rho : float
PPO+D rho parameter.
phi : float
PPO+D phi parameter.
alpha : float
PPO+D alpha parameter
gae_lambda : float
GAE lambda parameter.
total_buffer_demo_capacity : int
Maximum number of demos to keep between reward and value demos.
save_demos_every : int
Save top demos every `save_demo_frequency`th data collection.
num_agent_demos_to_save : int
Number of top reward demos to save.
initial_reward_threshold : float
initial value to use as reward threshold for new demos.
demo_dtypes : dict
data types to use for the demos.
"""
# Accepted data fields. Inserting other fields will raise AssertionError
on_policy_data_fields = prl.OnPolicyDataKeys
# Data tensors to collect for each demos
demos_data_fields = prl.DemosDataKeys
def __init__(self, size, device, actor, algorithm, envs, rho=0.1, phi=0.3, gae_lambda=0.95, alpha=10,
total_buffer_demo_capacity=51, initial_human_demos_dir=None, initial_agent_demos_dir=None,
supplementary_demos_dir=None, target_agent_demos_dir=None, num_agent_demos_to_save=10,
initial_reward_threshold=None, save_demos_every=10,
demo_dtypes={prl.OBS: np.float32, prl.ACT: np.float32, prl.REW: np.float32}):
super(PPODBuffer, self).__init__(
size=size,
envs=envs,
actor=actor,
device=device,
algorithm=algorithm,
gae_lambda=gae_lambda,
)
# PPO + D parameters
self.rho = rho
self.phi = phi
self.iter = 0
self.alpha = alpha
self.initial_rho = rho
self.initial_phi = phi
self.save_demos_every = save_demos_every
self.max_demos = total_buffer_demo_capacity
self.num_agent_demos_to_save = num_agent_demos_to_save
self.initial_human_demos_dir = initial_human_demos_dir
self.initial_agent_demos_dir = initial_agent_demos_dir
self.supplementary_demos_dir = supplementary_demos_dir
self.target_agent_demos_dir = target_agent_demos_dir
# Data parameters
self.demo_dtypes = demo_dtypes
self.num_channels_obs = None # Lazy initialization
self.inserted_samples = 0
# Reward and Value buffers
self.reward_demos = []
self.value_demos = []
# Load initial demos
self.load_initial_demos()
# Define reward_threshold
self.reward_threshold = initial_reward_threshold or - np.inf
if len(self.reward_demos) > 0:
self.reward_threshold = max(
self.reward_threshold, min([d["TotalReward"] for d in self.reward_demos]))
self.max_demo_reward = max(
[d["TotalReward"] for d in self.reward_demos]) if len(self.reward_demos) > 0 else -np.inf
# Define variables to track potential demos
self.potential_demos_val = {"env{}".format(i + 1): - np.inf for i in range(self.num_envs)}
self.potential_demos = {"env{}".format(i + 1): defaultdict(list) for i in range(self.num_envs)}
# Define variable to track demos in progress
self.demos_in_progress = {
"env{}".format(i + 1): {
"ID": None,
"Demo": None,
"Step": 0,
"DemoLength": -1,
"MaxValue": - np.inf,
prl.RHS: None,
} for i in range(self.num_envs)}
# To keep track of supplementary demos loaded
self.supplementary_demos_loaded = []
[docs] @classmethod
def create_factory(cls, size, rho=0.1, phi=0.3, gae_lambda=0.95, alpha=10, total_buffer_demo_capacity=51,
initial_human_demos_dir=None, initial_agent_demos_dir=None, supplementary_demos_dir=None,
target_agent_demos_dir=None, num_agent_demos_to_save=10, initial_reward_threshold=None,
save_demos_every=10, demo_dtypes={prl.OBS: np.float32, prl.ACT: np.float32, prl.REW: np.float32}):
"""
Returns a function that creates PPODBuffer instances.
Parameters
----------
size : int
Storage capacity along time axis.
initial_human_demos_dir : str
Path to directory containing human initial demonstrations.
initial_agent_demos_dir : str
Path to directory containing other agent initial demonstrations.
supplementary_demos_dir : str
Path to a directory where additional demos can be added after training has started.
these demos will be incorporated into the buffer as bonus agent demos.
target_agent_demos_dir : str
Path to directory where best reward demonstrations should be saved.
rho : float
PPO+D rho parameter.
phi : float
PPO+D phi parameter.
alpha : float
PPO+D alpha parameter
gae_lambda : float
GAE lambda parameter.
total_buffer_demo_capacity : int
Maximum number of demos to keep between reward and value demos.
save_demos_every : int
Save top demos every `save_demo_frequency`th data collection.
num_agent_demos_to_save : int
Number of top reward demos to save.
initial_reward_threshold : float
initial value to use as reward threshold for new demos.
demo_dtypes : dict
data types to use for the demos.
Returns
-------
create_buffer_instance : func
creates a new PPODBuffer class instance.
"""
def create_buffer_instance(device, actor, algorithm, envs):
"""Create and return a PPODBuffer instance."""
return cls(size, device, actor, algorithm, envs, rho, phi, gae_lambda, alpha, total_buffer_demo_capacity,
initial_human_demos_dir, initial_agent_demos_dir, supplementary_demos_dir, target_agent_demos_dir,
num_agent_demos_to_save, initial_reward_threshold, save_demos_every, demo_dtypes)
return create_buffer_instance
[docs] def before_gradients(self):
"""
Before updating actor policy model, compute returns and advantages.
"""
print("\nREWARD DEMOS {}, VALUE DEMOS {}, RHO {}, PHI {}, REWARD THRESHOLD {}, MAX DEMO REWARD {}\n".format(
len(self.reward_demos), len(self.value_demos), self.rho, self.phi, self.reward_threshold, self.max_demo_reward))
super(PPODBuffer, self).before_gradients()
self.iter += 1
if self.iter % self.save_demos_every == 0:
self.save_demos()
if self.supplementary_demos_dir:
self.load_supplementary_demos()
[docs] def after_gradients(self, batch, info):
"""
After updating actor policy model, make sure self.step is at 0.
Parameters
----------
batch : dict
Data batch used to compute the gradients.
info : dict
Additional relevant info from gradient computation.
Returns
-------
info : dict
info dict updated with relevant info from Storage.
"""
super(PPODBuffer, self).after_gradients(batch, info)
# info['NumberSamples'] -= self.inserted_samples
self.inserted_samples = 0
return info
[docs] def insert_transition(self, sample):
"""
Store new transition sample.
Parameters
----------
sample : dict
Data sample (containing all tensors of an environment transition)
"""
# Data tensors lazy initialization, only executed the first time
if self.size == 0 and self.data[prl.OBS] is None:
self.init_tensors(sample)
self.get_num_channels_obs(sample)
# Insert sample data
for k in sample:
if k not in self.storage_tensors:
continue
if not self.recurrent_actor and k in (prl.RHS, prl.RHS2):
continue
# We use the same tensor to store obs and obs2
# We also use single tensors for rhs and rhs2,
# and done and done2
if k in (prl.OBS, prl.RHS, prl.DONE):
pos = self.step + 1
sample_k = "Next" + k
else:
pos = self.step
sample_k = k
# Copy sample tensor to buffer target position
if isinstance(sample[k], dict):
for x, v in sample[k].items():
self.data[k][x][pos].copy_(sample[sample_k][x])
else:
self.data[k][pos].copy_(sample[sample_k])
# Track episodes for potential demos
self.track_potential_demos(sample)
# Overwrite demo data in environments where a demo being replayed
all_envs, all_obs, all_done, all_rhs = [], [], [], {k: [] for k in sample[prl.RHS].keys()}
# Step 1: prepare tensors
for i in range(self.num_envs):
# If demo replay is in progress
if self.demos_in_progress["env{}".format(i + 1)]["Demo"]:
# Get demo obs, rhs and done tensors to run forward pass
obs = self.data[prl.OBS][self.step][i].unsqueeze(0)
if self.demos_in_progress["env{}".format(i + 1)][prl.RHS]:
rhs = self.demos_in_progress["env{}".format(i + 1)][prl.RHS]
done = torch.zeros(1, 1).to(self.device)
else:
obs, rhs, done = self.actor.actor_initial_states(obs)
all_envs.append(i)
all_obs.append(obs)
all_done.append(done)
for k in rhs:
all_rhs[k].append(rhs[k])
# Otherwise check if end of episode reached and randomly start new demo
elif sample[prl.DONE2][i] == 1.0:
self.sample_demo(env_id=i)
# Step 2: perform acting step and overwrite data
if len(all_envs) > 0:
# Cast obs, rhs, dones into the right format
all_obs = torch.cat(all_obs)
all_done = torch.cat(all_done)
for k in all_rhs:
all_rhs[k] = torch.cat(all_rhs[k])
# Run forward pass
_, _, rhs2, algo_data = self.algo.acting_step(all_obs, all_rhs, all_done)
for num, i in enumerate(all_envs):
demo_step = self.demos_in_progress["env{}".format(i + 1)]["Step"]
# Insert demo act and rew tensors to self.step
for tensor in (prl.ACT, prl.REW):
self.data[tensor][self.step][i].copy_(
torch.FloatTensor(self.demos_in_progress["env{}".format(i + 1)]["Demo"][tensor][demo_step]))
# Insert demo logprob to self.step. Demo action prob is 1.0, so logprob is 0.0
self.data[prl.LOGP][self.step][i].copy_(torch.zeros(1))
# Insert other tensors predicted in the forward pass
for tensor in (prl.IREW, prl.VAL, prl.IVAL):
if tensor in algo_data.keys():
self.data[tensor][self.step][i].copy_(algo_data[tensor][num])
# Update demo_in_progress variables
self.demos_in_progress["env{}".format(i + 1)]["Step"] += 1
self.demos_in_progress["env{}".format(i + 1)][prl.RHS] = {
k: rhs2[k][num].reshape(1, -1) for k in rhs2.keys()}
self.demos_in_progress["env{}".format(i + 1)]["MaxValue"] = max(
[algo_data[prl.VAL][num].item(), self.demos_in_progress["env{}".format(i + 1)]["MaxValue"]])
self.inserted_samples += 1
# Handle end of demos
if demo_step == self.demos_in_progress["env{}".format(i + 1)]["DemoLength"] - 1:
# If value demo
if "MaxValue" in self.demos_in_progress["env{}".format(i + 1)]["Demo"].keys():
for value_demo in self.value_demos:
# If demo still in buffer, update MaxValue
if self.demos_in_progress["env{}".format(i + 1)]["Demo"]["ID"] == value_demo["ID"]:
value_demo["MaxValue"] = self.demos_in_progress["env{}".format(i + 1)]["MaxValue"]
# Randomly sample new demo if last demo has finished
self.sample_demo(env_id=i)
else:
# Insert demo done2 tensor to self.step + 1
self.data[prl.DONE][self.step + 1][i].copy_(torch.zeros(1))
# Insert demo obs2 tensor to self.step + 1
obs2 = torch.roll(all_obs[num:num+1], -self.num_channels_obs, dims=1).squeeze(0)
obs2[-self.num_channels_obs:].copy_(torch.FloatTensor(
self.demos_in_progress["env{}".format(i + 1)]["Demo"][prl.OBS][demo_step + 1]))
self.data[prl.OBS][self.step + 1][i].copy_(obs2)
# Insert demo rhs2 tensor to self.step + 1
if self.recurrent_actor:
for k in self.data[prl.RHS]:
self.data[prl.RHS][k][self.step + 1][i].copy_(rhs2[k][num].squeeze())
self.step = (self.step + 1) % self.max_size
self.size = min(self.size + 1, self.max_size)
[docs] def track_potential_demos(self, sample):
""" Tracks current episodes looking for potential demos """
for i in range(self.num_envs):
# Copy transition
for tensor in self.demos_data_fields:
if tensor in (prl.OBS, ):
self.potential_demos["env{}".format(i + 1)][tensor].append(copy.deepcopy(
sample[tensor][i, -self.num_channels_obs:]).cpu().numpy().astype(self.demo_dtypes[tensor]))
else:
self.potential_demos["env{}".format(i + 1)][tensor].append(
copy.deepcopy(sample[tensor][i]).cpu().numpy().astype(self.demo_dtypes[tensor]))
# Track highest value prediction
self.potential_demos_val[i] = max([self.potential_demos_val["env{}".format(
i + 1)], sample[prl.VAL][i].item()])
# Handle end of episode
if sample[prl.DONE2][i] == 1.0:
# Get candidate demos
potential_demo = {}
for tensor in self.demos_data_fields:
potential_demo[tensor] = np.stack(self.potential_demos["env{}".format(i + 1)][tensor])
if tensor == prl.REW:
nonzero = np.flatnonzero(potential_demo[tensor] > 0.0)
last_reward = len(potential_demo[tensor]) if len(nonzero) == 0 else np.max(nonzero)
# Cut off data after last reward
for tensor in self.demos_data_fields:
potential_demo[tensor] = potential_demo[tensor][0:last_reward + 1]
# Compute accumulated reward
episode_reward = potential_demo[prl.REW].sum()
potential_demo["ID"] = str(uuid.uuid4())
potential_demo["TotalReward"] = episode_reward
potential_demo["DemoLength"] = potential_demo[prl.ACT].shape[0]
# Consider candidate demos for demos reward
if episode_reward >= self.reward_threshold:
# If better than any other existing demo, empty buffer
if episode_reward > self.max_demo_reward:
self.reward_demos = []
# Add demos to reward buffer
self.reward_demos.append(potential_demo)
# Check if buffers are full
self.check_demo_buffer_capacity()
# Anneal rho and phi
self.anneal_parameters()
# Update reward_threshold.
self.reward_threshold = max(
self.reward_threshold, min([d["TotalReward"] for d in self.reward_demos]))
# Update max demo reward
self.max_demo_reward = max([d["TotalReward"] for d in self.reward_demos])
else: # Consider candidate demos for value reward
# Find current number of demos, and current value threshold
potential_demo["MaxValue"] = self.potential_demos_val[i]
total_demos = len(self.reward_demos) + len(self.value_demos)
value_thresh = - np.float("Inf") if len(self.value_demos) == 0 \
else min([p["MaxValue"] for p in self.value_demos])
if self.potential_demos_val["env{}".format(i + 1)] >= value_thresh or total_demos < self.max_demos:
# Add demos to value buffer
self.value_demos.append(potential_demo)
# Check if buffers are full
self.check_demo_buffer_capacity()
# Reset potential demos dict
for tensor in self.demos_data_fields:
self.potential_demos["env{}".format(i + 1)][tensor] = []
self.potential_demos_val["env{}".format(i + 1)] = - np.inf
[docs] def load_demo(self, demo_path):
"""Loads and returns a environment demonstration."""
# Load demos tensors
demo = np.load(demo_path)
new_demo = {k: {} for k in self.demos_data_fields}
if int(demo["FrameSkip"]) != self.frame_skip:
raise ValueError(
"Env and demo with different frame skip!")
# Add action, obs, rew
new_demo[prl.ACT] = demo[prl.ACT]
new_demo[prl.OBS] = demo[prl.OBS]
new_demo[prl.REW] = demo[prl.REW]
new_demo.update({
"ID": str(uuid.uuid4()),
"DemoLength": demo[prl.ACT].shape[0],
"TotalReward": new_demo[prl.REW].sum()})
return new_demo
[docs] def load_initial_demos(self):
"""
Load initial demonstrations.
Warning: make sure the environment frame_skip and frame_stack hyperparameters are
the same as those used to record the demonstrations!
"""
num_loaded_human_demos = 0
num_loaded_reward_demos = 0
initial_human_demos = glob.glob(self.initial_human_demos_dir + '/*.npz') if self.initial_human_demos_dir else []
initial_reward_demos = glob.glob(self.initial_agent_demos_dir + '/*.npz') if self.initial_agent_demos_dir else []
if len(initial_human_demos) + len(initial_reward_demos) > self.max_demos:
raise ValueError("demo dir contains more than ´total_buffer_demo_capacity´")
for demo_file in initial_human_demos:
try:
new_demo = self.load_demo(demo_file)
self.reward_demos.append(new_demo)
num_loaded_human_demos += 1
except Exception:
print("Failed to load human demo!")
for demo_file in initial_reward_demos:
try:
new_demo = self.load_demo(demo_file)
self.reward_demos.append(new_demo)
num_loaded_reward_demos += 1
except Exception:
print("Failed to load agent demo!")
self.num_loaded_human_demos = num_loaded_human_demos
self.num_loaded_reward_demos = num_loaded_reward_demos
print("\nLOADED {} HUMAN DEMOS AND {} REWARD DEMOS".format(num_loaded_human_demos, num_loaded_reward_demos))
[docs] def load_supplementary_demos(self):
"""
Load demonstrations found in the self.supplementary_demos (if any).
Warning: make sure the environment frame_skip and frame_stack hyperparameters are
the same as those used in the demonstrations!
"""
# Create supp dir if necessary
if not os.path.exists(self.supplementary_demos_dir):
os.makedirs(self.supplementary_demos_dir, exist_ok=True)
num_loaded_supplementary_demos = 0
supplementary_demos = glob.glob(self.supplementary_demos_dir + '/*.npz') if self.supplementary_demos_dir else []
for demo_file in supplementary_demos:
if demo_file not in self.supplementary_demos_loaded:
self.supplementary_demos_loaded.append(demo_file)
try:
new_demo = self.load_demo(demo_file)
if new_demo["TotalReward"] >= self.reward_threshold:
self.reward_demos.append(new_demo)
num_loaded_supplementary_demos += 1
# Check if buffer is full and if so, handle it
self.check_demo_buffer_capacity()
except Exception:
print("Failed to load supplementary demo!")
print("\nLOADED {} SUPPLEMENTARY DEMOS\n".format(num_loaded_supplementary_demos))
[docs] def sample_demo(self, env_id):
"""With probability rho insert reward demos, with probability phi insert value demos."""
# Reset demos tracking variables
self.demos_in_progress["env{}".format(env_id + 1)]["Step"] = 0
self.demos_in_progress["env{}".format(env_id + 1)][prl.RHS] = None
# Sample episode type
episode_source = np.random.choice(["reward_demo", "value_demo", "env"],
p=[self.rho, self.phi, 1.0 - self.rho - self.phi])
if episode_source == "reward_demo" and len(self.reward_demos) > 0:
# Randomly select a reward demo
selected = np.random.choice(range(len(self.reward_demos)))
demo = copy.deepcopy(self.reward_demos[selected])
elif episode_source == "value_demo" and len(self.value_demos) > 0:
# randomly select a value demo
probs = np.array([p["MaxValue"] for p in self.value_demos]) ** self.alpha
probs = probs / probs.sum()
selected = np.random.choice(range(len(self.value_demos)), p=probs)
demo = copy.deepcopy(self.value_demos[selected])
else:
demo = None
# Set demos to demos_in_progress
self.demos_in_progress["env{}".format(env_id + 1)]["Demo"] = demo
# Set done to True
self.data[prl.DONE][self.step + 1][env_id].copy_(torch.ones(1).to(self.device))
# Set initial rhs to zeros
if self.recurrent_actor:
for k in self.data[prl.RHS]:
self.data[prl.RHS][k][self.step + 1][env_id].fill_(0.0)
if demo:
# Set demos length
self.demos_in_progress["env{}".format(env_id + 1)]["DemoLength"] = demo["DemoLength"]
# Set demos MaxValue
self.demos_in_progress["env{}".format(env_id + 1)]["MaxValue"] = - np.Inf
# Set next buffer obs to be the starting demo obs
for k in range(self.frame_stack):
self.data[prl.OBS][self.step + 1][env_id][
k * self.num_channels_obs:(k + 1) * self.num_channels_obs].copy_(torch.FloatTensor(
self.demos_in_progress["env{}".format(env_id + 1)]["Demo"][prl.OBS][0]))
else:
# Reset `i-th` environment as set next buffer obs to be the starting episode obs
self.data[prl.OBS][self.step + 1][env_id].copy_(self.envs.reset_single_env(env_id=env_id).squeeze())
# Reset potential demos dict
for tensor in self.demos_data_fields:
self.potential_demos["env{}".format(env_id + 1)][tensor] = []
self.potential_demos_val["env{}".format(env_id + 1)] = - np.inf
[docs] def anneal_parameters(self):
"""Update demos probabilities as explained in PPO+D paper."""
if 0.0 < self.rho < 1.0 and len(self.value_demos) > 0:
self.rho += self.initial_phi / len(self.value_demos)
self.rho = np.clip(self.rho, 0.0, self.initial_rho + self.initial_phi)
if 0.0 < self.phi < 1.0 and len(self.value_demos) > 0:
self.phi -= self.initial_phi / len(self.value_demos)
self.phi = np.clip(self.phi, 0.0, self.initial_rho + self.initial_phi)
[docs] def check_demo_buffer_capacity(self):
"""
Check total amount of demos. If total amount of demos exceeds
self.max_demos, pop demos.
"""
# First pop value demos
total_demos = len(self.reward_demos) + len(self.value_demos)
if total_demos > self.max_demos:
for _ in range(min(total_demos - self.max_demos, len(self.value_demos))):
# Pop value demos with lowest MaxValue
del self.value_demos[np.array([p["MaxValue"] for p in self.value_demos]).argmin()]
# If after popping all value demos, still over max_demos, pop reward demos
if len(self.reward_demos) > self.max_demos:
for _ in range(len(self.reward_demos) - self.max_demos):
# # Option 1: Eject agent demo with lowest reward
# rewards = np.array([p[prl.REW].sum() for p in self.reward_demos[self.num_loaded_human_demos:]])
# del self.reward_demos[np.argmin(rewards) + self.num_loaded_human_demos]
# Option 2: Eject longer agent demo
lengths = np.array([p[prl.OBS].shape[0] for p in self.reward_demos[self.num_loaded_human_demos:]])
del self.reward_demos[np.argmax(lengths) + self.num_loaded_human_demos]
[docs] def save_demos(self):
"""
Saves the top `num_rewards_demos` demos from the reward demos buffer and
the top `num_value_demos` demos from the value demos buffer.
"""
if self.target_agent_demos_dir:
# Create target dir for reward demos if necessary
if not os.path.exists(self.target_agent_demos_dir):
os.makedirs(self.target_agent_demos_dir, exist_ok=True)
# Rank agent demos according to episode reward
reward_ranking = np.flip(np.array(
[d["TotalReward"] for d in self.reward_demos[self.num_loaded_human_demos:]]
).argsort())[:self.num_agent_demos_to_save]
# Save agent reward demos
for num, demo_pos in enumerate(reward_ranking):
filename = "reward_demo_{}".format(num + 1)
demo_pos += self.num_loaded_human_demos
np.savez(
os.path.join(self.target_agent_demos_dir, filename),
Observation=np.array(self.reward_demos[demo_pos][prl.OBS]).astype(self.demo_dtypes[prl.OBS]),
Reward=np.array(self.reward_demos[demo_pos][prl.REW]).astype(self.demo_dtypes[prl.REW]),
Action=np.array(self.reward_demos[demo_pos][prl.ACT]).astype(self.demo_dtypes[prl.ACT]),
FrameSkip=self.frame_skip)