import random
import numpy as np
from copy import deepcopy
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import pytorchrl as prl
from pytorchrl.agent.algorithms.base import Algorithm
from pytorchrl.agent.algorithms.utils import get_gradients, set_gradients
from pytorchrl.agent.algorithms.policy_loss_addons import PolicyLossAddOn
[docs]class DDQN(Algorithm):
"""
Deep Q Learning algorithm class.
Algorithm class to execute DQN, from Mhin et al.
(https://www.nature.com/articles/nature14236?wm=book_wap_0005) with
target network.
Parameters
----------
device : torch.device
CPU or specific GPU where class computations will take place.
envs : VecEnv
Vector of environments instance.
actor : ActorCritic
actor class instance.
lr : float
learning rate.
gamma : float
Discount factor parameter.
num_updates: int
Num consecutive actor updates before data collection continues.
update_every: int
Regularity of actor updates in number environment steps.
start_steps: int
Num of initial random environment steps before learning starts.
mini_batch_size: int
Size of actor update batches.
num_test_episodes : int
Number of episodes to complete in each test phase.
test_every : int
Regularity of test evaluations in actor updates.
max_grad_norm : float
Gradient clipping parameter.
initial_epsilon : float
initial value for DQN epsilon parameter.
epsilon_decay : float
Exponential decay rate for epsilon parameter.
policy_loss_addons : list
List of PolicyLossAddOn components adding loss terms to the algorithm policy loss.
"""
def __init__(self,
device,
envs,
actor,
lr=1e-4,
gamma=0.99,
polyak=0.995,
num_updates=1,
update_every=50,
test_every=5000,
max_grad_norm=0.5,
start_steps=20000,
mini_batch_size=64,
num_test_episodes=5,
initial_epsilon=1.0,
epsilon_decay=0.999,
target_update_interval=1,
policy_loss_addons=[]):
# ---- General algo attributes ----------------------------------------
# Discount factor
self._gamma = gamma
# Number of steps collected with initial random policy
self._start_steps = int(start_steps)
# Times data in the buffer is re-used before data collection proceeds
self._num_epochs = 1 # Default to 1 for off-policy algorithms
# Number of data samples collected between network update stages
self._update_every = int(update_every)
# Number mini batches per epoch
self._num_mini_batch = int(num_updates)
# Size of update mini batches
self._mini_batch_size = int(mini_batch_size)
# Number of network updates between test evaluations
self._test_every = int(test_every)
# Number of episodes to complete when testing
self._num_test_episodes = int(num_test_episodes)
# ---- DDQN-specific attributes ---------------------------------------
self.iter = 0
self.envs = envs
self.actor = actor
self.device = device
self.polyak = polyak
self.epsilon = initial_epsilon
self.max_grad_norm = max_grad_norm
self.epsilon_decay = epsilon_decay
self.target_update_interval = target_update_interval
assert hasattr(self.actor, "q1"), "DDPG requires q critic (num_critics=1)"
# Create target network
self.actor_targ = deepcopy(actor)
# Freeze target networks with respect to optimizers
for p in self.actor_targ.parameters():
p.requires_grad = False
# ----- Policy Loss Addons --------------------------------------------
# Sanity check, policy_loss_addons is a PolicyLossAddOn instance
# or a list of PolicyLossAddOn instances
assert isinstance(policy_loss_addons, (PolicyLossAddOn, list)),\
"DDQN policy_loss_addons parameter should be a PolicyLossAddOn instance " \
"or a list of PolicyLossAddOn instances"
if isinstance(policy_loss_addons, list):
for addon in policy_loss_addons:
assert isinstance(addon, PolicyLossAddOn), \
"DDQN policy_loss_addons parameter should be a PolicyLossAddOn" \
" instance or a list of PolicyLossAddOn instances"
else:
policy_loss_addons = [policy_loss_addons]
self.policy_loss_addons = policy_loss_addons
for addon in self.policy_loss_addons:
addon.setup(self.actor, self.device)
# ----- Optimizer -----------------------------------------------------
self.q_optimizer = optim.Adam(self.actor.q1.parameters(), lr=lr)
[docs] @classmethod
def create_factory(cls,
lr=1e-4,
gamma=0.99,
polyak=0.995,
num_updates=50,
update_every=50,
test_every=5000,
start_steps=20000,
max_grad_norm=0.5,
mini_batch_size=64,
num_test_episodes=5,
epsilon_decay=0.999,
initial_epsilon=1.0,
target_update_interval=1,
policy_loss_addons=[]):
"""
Returns a function to create new DDQN instances.
Parameters
----------
lr : float
learning rate.
gamma : float
Discount factor parameter.
polyak: float
Polyak averaging parameter.
num_updates: int
Num consecutive actor updates before data collection continues.
update_every: int
Regularity of actor updates in number environment steps.
start_steps: int
Num of initial random environment steps before learning starts.
mini_batch_size: int
Size of actor update batches.
target_update_interval: float
regularity of target nets updates with respect to actor Adam updates.
num_test_episodes : int
Number of episodes to complete in each test phase.
test_every : int
Regularity of test evaluations in actor updates.
max_grad_norm : float
Gradient clipping parameter.
initial_epsilon : float
initial value for DQN epsilon parameter.
epsilon_decay : float
Exponential decay rate for epsilon parameter.
policy_loss_addons : list
List of PolicyLossAddOn components adding loss terms to the algorithm policy loss.
Returns
-------
create_algo_instance : func
Function that creates a new DDQN class instance.
algo_name : str
Name of the algorithm.
"""
def create_algo_instance(device, actor, envs):
return cls(lr=lr,
envs=envs,
actor=actor,
gamma=gamma,
device=device,
polyak=polyak,
test_every=test_every,
start_steps=start_steps,
num_updates=num_updates,
update_every=update_every,
epsilon_decay=epsilon_decay,
max_grad_norm=max_grad_norm,
mini_batch_size=mini_batch_size,
initial_epsilon=initial_epsilon,
num_test_episodes=num_test_episodes,
target_update_interval=target_update_interval,
policy_loss_addons=policy_loss_addons)
return create_algo_instance, prl.DDQN
[docs] def acting_step(self, obs, rhs, done, deterministic=False):
"""
DDQN acting function.
Parameters
----------
obs: torch.tensor
Current world observation
rhs: dict
RNN recurrent hidden states.
done: torch.tensor
1.0 if current obs is the last one in the episode, else 0.0.
deterministic: bool
Whether to randomly sample action from predicted distribution or taking the mode.
Returns
-------
action: torch.tensor
Predicted next action.
clipped_action: torch.tensor
Predicted next action (clipped to be within action space).
rhs: batch
Actor recurrent hidden state.
other: dict
Additional DDQN predictions, which are not used in other algorithms.
"""
# Epsilon-greedy action selection
if random.random() > self.epsilon:
with torch.no_grad():
q = self.actor.get_q_scores(obs).get("q1")
action = clipped_action = torch.argmax(q, dim=1).unsqueeze(0)
else:
action = clipped_action = torch.tensor(
[self.actor.action_space.sample()]).unsqueeze(0)
other = {}
return action, clipped_action, rhs, other
[docs] def compute_loss(self, batch, n_step=1, per_weights=1):
"""
Calculate DDQN loss
Parameters
----------
batch: dict
Data batch dict containing all required tensors to compute DDQN loss.
Returns
-------
loss : torch.tensor
DDQN loss.
errors : torch.tensor
TD errors.
"""
o, rhs, d = batch[prl.OBS], batch[prl.RHS], batch[prl.DONE]
a, r = batch[prl.ACT], batch[prl.REW]
o2, rhs2, d2 = batch[prl.OBS2], batch[prl.RHS2], batch[prl.DONE2]
# Get max predicted Q values (for next states) from target model
q_targ_vals = self.actor_targ.get_q_scores(o2, rhs2, d2).get("q1")
q_targ_next = q_targ_vals.max(dim=1)[0].unsqueeze(1)
# Compute Q targets for current states
q_targ = r + (self.gamma ** n_step) * (1 - d2) * q_targ_next
# Get expected Q values from local model
q_vals = self.actor.get_q_scores(o, rhs, d).get("q1")
q_exp = q_vals.gather(1, a.long())
# Compute loss
loss = F.mse_loss(q_targ, q_exp)
errors = (q_exp - q_targ).abs().detach().cpu()
# Extend policy loss with addons
addons_info = {}
for addon in self.policy_loss_addons:
addon_loss, addons_info = addon.compute_loss_term(data, dist, addons_info)
loss += addon_loss
return loss, errors, addons_info
[docs] def compute_gradients(self, batch, grads_to_cpu=True):
"""
Compute loss and compute gradients but don't do optimization step,
return gradients instead.
Parameters
----------
data: dict
data batch containing all required tensors to compute DQN loss.
grads_to_cpu: bool
If gradient tensor will be sent to another node, need to be in CPU.
Returns
-------
grads: list of tensors
List of actor gradients.
info: dict
Dict containing current DQN iteration information.
"""
# Recurrent burn-in
if self.actor.is_recurrent:
batch = self.actor.burn_in_recurrent_states(batch)
# N-step returns
n_step = batch["n_step"] if "n_step" in batch else 1.0
# PER
per_weights = batch["per_weights"] if "per_weights" in batch else 1.0
# Compute DDQN loss and gradients
loss, errors, addons_info = self.compute_loss(batch, n_step, per_weights)
self.q_optimizer.zero_grad()
loss.backward()
nn.utils.clip_grad_norm_(self.actor.q1.parameters(), self.max_grad_norm)
grads = get_gradients(self.actor.q1, grads_to_cpu=grads_to_cpu)
info = {
"loss_q": loss.detach().item(),
"epsilon": self.epsilon,
}
if "per_weights" in batch:
info.update({"errors": errors})
info.update(addons_info)
return grads, info
[docs] def update_target_networks(self):
"""Update actor critic target networks with polyak averaging"""
if self.iter % self.target_update_interval == 0:
with torch.no_grad():
for p, p_targ in zip(self.actor.parameters(), self.actor_targ.parameters()):
p_targ.data.mul_(self.polyak)
p_targ.data.add_((1 - self.polyak) * p.data)
[docs] def update_epsilon(self):
self.epsilon *= self.epsilon_decay
self.epsilon = np.clip(self.epsilon, 0.05, 1.0)
[docs] def apply_gradients(self, gradients=None):
"""
Take an optimization step, previously setting new gradients if provided.
Update also target networks.
Parameters
----------
gradients: list of tensors
List of actor gradients.
"""
if gradients is not None:
set_gradients(
self.actor.q1, gradients=gradients, device=self.device)
self.q_optimizer.step()
# Update target networks by polyak averaging.
self.iter += 1
self.update_target_networks()
self.update_epsilon()
[docs] def set_weights(self, weights):
"""
Update actor critic with the given weights. Update also target networks.
Parameters
----------
weights: dict of tensors
Dict containing actor weights to be set.
"""
self.actor.load_state_dict(weights)
# Update target networks by polyak averaging.
self.iter += 1
self.update_target_networks()
self.update_epsilon()
[docs] def update_algo_parameter(self, parameter_name, new_parameter_value):
"""
If `parameter_name` is an attribute of the algorithm, change its value
to `new_parameter_value value`.
Parameters
----------
parameter_name : str
Worker.algo attribute name
new_parameter_value : int or float
New value for `parameter_name`.
"""
if hasattr(self, parameter_name):
setattr(self, parameter_name, new_parameter_value)
if parameter_name == "lr":
for param_group in self.q_optimizer.param_groups:
param_group['lr'] = new_parameter_value