import itertools
import numpy as np
from copy import deepcopy
import torch
import torch.nn as nn
import torch.optim as optim
import pytorchrl as prl
from pytorchrl.agent.algorithms.base import Algorithm
from pytorchrl.agent.algorithms.policy_loss_addons import PolicyLossAddOn
from pytorchrl.agent.algorithms.utils import get_gradients, set_gradients
[docs]class DDPG(Algorithm):
"""
Deep Deterministic Policy Gradient algorithm class.
Algorithm class to execute DDPG, from Timothy P. Lillicrap et al.
CONTINUOUS CONTROL WITH DEEP REINFORCEMENT LEARNING
(https://arxiv.org/pdf/1509.02971.pdf). Algorithms are modules generally
required by multiple workers, so DDPG.algo_factory(...) returns a function
that can be passed on to workers to instantiate their own DDPG module.
Parameters
----------
device : torch.device
CPU or specific GPU where class computations will take place.
envs : VecEnv
Vector of environments instance.
actor : Actor
Actor class instance.
lr_pi : float
Policy optimizer learning rate.
lr_q : float
Q-nets optimizer learning rate.
gamma : float
Discount factor parameter.
polyak : float
DDPG polyak averaging parameter.
num_updates : int
Num consecutive actor_critic updates before data collection continues.
update_every : int
Regularity of actor_critic updates in number environment steps.
start_steps : int
Num of initial random environment steps before learning starts.
mini_batch_size : int
Size of actor_critic update batches.
target_update_interval : float
regularity of target nets updates with respect to actor_critic Adam updates.
num_test_episodes : int
Number of episodes to complete in each test phase.
test_every : int
Regularity of test evaluations in actor_critic updates.
max_grad_norm : float
Gradient clipping parameter.
policy_loss_addons : list
List of PolicyLossAddOn components adding loss terms to the algorithm policy loss.
Examples
--------
>>> create_algo = DDPG.create_factory(
lr_q=1e-4, lr_pi=1e-4, gamma=0.99, polyak=0.995,
num_updates=50, update_every=50, test_every=5000, start_steps=20000,
mini_batch_size=64, num_test_episodes=0, target_update_interval=1)
"""
def __init__(self,
device,
envs,
actor,
lr_q=1e-4,
lr_pi=1e-4,
gamma=0.99,
polyak=0.995,
num_updates=1,
update_every=50,
test_every=1000,
max_grad_norm=0.5,
start_steps=20000,
mini_batch_size=64,
num_test_episodes=5,
target_update_interval=1,
policy_loss_addons=[]):
# ---- General algo attributes ----------------------------------------
# Discount factor
self._gamma = gamma
# Number of steps collected with initial random policy
self._start_steps = int(start_steps)
# Times data in the buffer is re-used before data collection proceeds
self._num_epochs = int(1) # Default to 1 for off-policy algorithms
# Number of data samples collected between network update stages
self._update_every = int(update_every)
# Number mini batches per epoch
self._num_mini_batch = int(num_updates)
# Size of update mini batches
self._mini_batch_size = int(mini_batch_size)
# Number of network updates between test evaluations
self._test_every = int(test_every)
# Number of episodes to complete when testing
self._num_test_episodes = int(num_test_episodes)
# ---- DDPG-specific attributes ----------------------------------------
self.iter = 0
self.envs = envs
self.actor = actor
self.polyak = polyak
self.device = device
self.max_grad_norm = max_grad_norm
self.target_update_interval = target_update_interval
assert hasattr(self.actor, "q1"), "DDPG requires q critic (num_critics=1)"
# Create target networks
self.actor_targ = deepcopy(actor)
# Freeze target networks with respect to optimizers (only update via polyak averaging)
for p in self.actor_targ.parameters():
p.requires_grad = False
# List of parameters for both Q-networks
q_params = itertools.chain(self.actor.q1.parameters())
# List of parameters for both Q-networks
p_params = itertools.chain(self.actor.policy_net.parameters())
# ----- Policy Loss Addons --------------------------------------------
# Sanity check, policy_loss_addons is a PolicyLossAddOn instance
# or a list of PolicyLossAddOn instances
assert isinstance(policy_loss_addons, (PolicyLossAddOn, list)),\
"DDPG policy_loss_addons parameter should be a PolicyLossAddOn instance " \
"or a list of PolicyLossAddOn instances"
if isinstance(policy_loss_addons, list):
for addon in policy_loss_addons:
assert isinstance(addon, PolicyLossAddOn), \
"DDPG policy_loss_addons parameter should be a PolicyLossAddOn " \
"instance or a list of PolicyLossAddOn instances"
else:
policy_loss_addons = [policy_loss_addons]
self.policy_loss_addons = policy_loss_addons
for addon in self.policy_loss_addons:
addon.setup(self.actor, self.device)
# ----- Optimizers ----------------------------------------------------
self.pi_optimizer = optim.Adam(p_params, lr=lr_pi)
self.q_optimizer = optim.Adam(q_params, lr=lr_q)
[docs] @classmethod
def create_factory(cls,
lr_q=1e-3,
lr_pi=1e-4,
gamma=0.99,
polyak=0.995,
num_updates=50,
test_every=5000,
update_every=50,
start_steps=1000,
max_grad_norm=0.5,
mini_batch_size=64,
num_test_episodes=5,
target_update_interval=1.0,
policy_loss_addons=[]):
"""
Returns a function to create new DDPG instances.
Parameters
----------
lr_pi : float
Policy optimizer learning rate.
lr_q : float
Q-nets optimizer learning rate.
gamma : float
Discount factor parameter.
polyak: float
DDPG polyak averaging parameter.
num_updates: int
Num consecutive actor_critic updates before data collection continues.
update_every: int
Regularity of actor_critic updates in number environment steps.
start_steps: int
Num of initial random environment steps before learning starts.
mini_batch_size: int
Size of actor_critic update batches.
target_update_interval: float
regularity of target nets updates with respect to actor_critic Adam updates.
num_test_episodes : int
Number of episodes to complete in each test phase.
test_every : int
Regularity of test evaluations in actor_critic updates.
max_grad_norm : float
Gradient clipping parameter.
policy_loss_addons : list
List of PolicyLossAddOn components adding loss terms to the algorithm policy loss.
Returns
-------
create_algo_instance : func
Function that creates a new DDPG class instance.
algo_name : str
Name of the algorithm.
"""
def create_algo_instance(device, actor, envs):
return cls(lr_q=lr_q,
lr_pi=lr_pi,
envs=envs,
actor=actor,
gamma=gamma,
device=device,
polyak=polyak,
test_every=test_every,
start_steps=start_steps,
num_updates=num_updates,
update_every=update_every,
max_grad_norm=max_grad_norm,
mini_batch_size=mini_batch_size,
num_test_episodes=num_test_episodes,
target_update_interval=target_update_interval,
policy_loss_addons=policy_loss_addons)
return create_algo_instance, prl.DDPG
@property
def gamma(self):
"""Returns discount factor gamma."""
return self._gamma
@property
def start_steps(self):
"""Returns the number of steps to collect with initial random policy."""
return self._start_steps
@property
def num_epochs(self):
"""
Returns the number of times the whole buffer is re-used before data
collection proceeds.
"""
return self._num_epochs
@property
def update_every(self):
"""
Returns the number of data samples collected between
network update stages.
"""
return self._update_every
@property
def num_mini_batch(self):
"""
Returns the number of times the whole buffer is re-used before data
collection proceeds.
"""
return self._num_mini_batch
@property
def mini_batch_size(self):
"""
Returns the number of mini batches per epoch.
"""
return self._mini_batch_size
@property
def test_every(self):
"""Number of network updates between test evaluations."""
return self._test_every
@property
def num_test_episodes(self):
"""
Returns the number of episodes to complete when testing.
"""
return self._num_test_episodes
[docs] def acting_step(self, obs, rhs, done, deterministic=False):
"""
DDPG acting function.
Parameters
----------
obs : torch.tensor
Current world observation
rhs : torch.tensor
RNN recurrent hidden state (if policy is not a RNN, rhs will contain zeroes).
done : torch.tensor
1.0 if current obs is the last one in the episode, else 0.0.
deterministic : bool
Whether to randomly sample action from predicted distribution or taking the mode.
Returns
-------
action : torch.tensor
Predicted next action.
clipped_action : torch.tensor
Predicted next action (clipped to be within action space).
rhs : torch.tensor
Policy recurrent hidden state (if policy is not a RNN, rhs will contain zeroes).
other : dict
Additional DDPG predictions, which are not used in other algorithms.
"""
with torch.no_grad():
(action, clipped_action, logp_action, rhs,
entropy_dist, dist) = self.actor.get_action(
obs, rhs, done, deterministic=deterministic)
return action, clipped_action, rhs, {}
[docs] def compute_loss_q(self, data, n_step=1, per_weights=1):
"""
Calculate DDPG Q-nets loss
Parameters
----------
data: dict
Data batch dict containing all required tensors to compute TD3 losses.
n_step : int or float
Number of future steps used to computed the truncated n-step return value.
per_weights :
Prioritized Experience Replay (PER) important sampling weights or 1.0.
Returns
-------
loss_q1 : torch.tensor
Q1-net loss.
loss_q2 : torch.tensor
Q2-net loss.
loss_q : torch.tensor
Weighted average of loss_q1 and loss_q2.
errors : torch.tensor
TD errors.
"""
o, rhs, d = data[prl.OBS], data[prl.RHS], data[prl.DONE]
a, r = data[prl.ACT], data[prl.REW]
o2, rhs2, d2 = data[prl.OBS2], data[prl.RHS2], data[prl.DONE2]
# Q-values for all actions
q = self.actor.get_q_scores(o, rhs, d, a).get("q1")
# Bellman backup for Q functions
with torch.no_grad():
# Target actions come from *current* policy
a2, _, _, _, _, dist = self.actor.get_action(o2, rhs2, d2)
# Target Q-values
q_targ = self.actor_targ.get_q_scores(o2, rhs2, d2, a2).get("q1")
backup = r + (self.gamma ** n_step) * (1 - d2) * q_targ
# MSE loss against Bellman backup
loss_q = 0.5 * (((q - backup) ** 2) * per_weights).mean()
# errors = (torch.min(q1, q2) - backup).abs()
errors = (q - backup).abs()
# reset Noise
self.actor.policy_net.dist.noise.reset()
return loss_q, errors
[docs] def compute_loss_pi(self, data, per_weights=1):
"""
Calculate DDPG policy loss.
Parameters
----------
data: dict
Data batch dict containing all required tensors to compute DDPG losses.
Returns
-------
loss_pi: torch.tensor
DDPG policy loss.
"""
o, rhs, d = data[prl.OBS], data[prl.RHS], data[prl.DONE]
pi, _, _, _, _, dist = self.actor.get_action(o, rhs, d)
q_pi = self.actor.get_q_scores(o, rhs, d, pi).get("q1")
loss_pi = - (q_pi * per_weights).mean()
# Extend policy loss with addons
addons_info = {}
for addon in self.policy_loss_addons:
addon_loss, addons_info = addon.compute_loss_term(data, dist, addons_info)
loss_pi += addon_loss
info.update(addons_info)
return loss_pi, addons_info
[docs] def compute_gradients(self, batch, grads_to_cpu=True):
"""
Compute loss and compute gradients but don't do optimization step,
return gradients instead.
Parameters
----------
batch: dict
data batch containing all required tensors to compute DDPG losses.
grads_to_cpu: bool
If gradient tensor will be sent to another node, need to be in CPU.
Returns
-------
grads: list of tensors
List of actor_critic gradients.
info: dict
Dict containing current DDPG iteration information.
"""
# Recurrent burn-in
if self.actor.is_recurrent:
batch = self.actor.burn_in_recurrent_states(batch)
# PER
per_weights = batch.pop("per_weights") if "per_weights" in batch else 1.0
# N-step returns
n_step = batch.pop("n_step") if "n_step" in batch else 1.0
# First run one gradient descent step for Q1 and Q2
loss_q, errors = self.compute_loss_q(batch, n_step, per_weights)
self.q_optimizer.zero_grad()
loss_q.backward(retain_graph=True)
nn.utils.clip_grad_norm_(self.actor.q1.parameters(), self.max_grad_norm)
q_grads = get_gradients(self.actor.q1, grads_to_cpu=grads_to_cpu)
# Freeze Q-network so you don't waste computational effort
# computing gradients for them during the policy learning step.
for p in itertools.chain(self.actor.q1.parameters()):
p.requires_grad = False
# Next run one gradient descent step for pi.
loss_pi, addons_info = self.compute_loss_pi(batch, per_weights)
self.pi_optimizer.zero_grad()
loss_pi.backward()
nn.utils.clip_grad_norm_(self.actor.policy_net.parameters(), self.max_grad_norm)
pi_grads = get_gradients(self.actor.policy_net, grads_to_cpu=grads_to_cpu)
for p in itertools.chain(self.actor.q1.parameters()):
p.requires_grad = True
info = {
"loss_pi": loss_pi.detach().item(),
"loss_q1": loss_q.detach().item(),
}
if "per_weights" in batch:
info.update({"errors": errors})
grads = {"q_grads": q_grads, "pi_grads": pi_grads}
return grads, info
[docs] def update_target_networks(self):
"""Update actor critic target networks with polyak averaging"""
if self.iter % self.target_update_interval == 0:
with torch.no_grad():
for p, p_targ in zip(self.actor.parameters(), self.actor_targ.parameters()):
p_targ.data.mul_(self.polyak)
p_targ.data.add_((1 - self.polyak) * p.data)
[docs] def apply_gradients(self, gradients=None):
"""
Take an optimization step, previously setting new gradients if provided.
Update also target networks.
Parameters
----------
gradients : list of tensors
List of actor gradients.
"""
if gradients is not None:
set_gradients(
self.actor.policy_net,
gradients=gradients["pi_grads"], device=self.device)
set_gradients(
self.actor.q1,
gradients=gradients["q_grads"], device=self.device)
self.q_optimizer.step()
self.pi_optimizer.step()
# Update target networks by polyak averaging.
self.iter += 1
self.update_target_networks()
[docs] def set_weights(self, actor_weights):
"""
Update actor with the given weights. Update also target networks.
Parameters
----------
actor_weights : dict of tensors
Dict containing actor weights to be set.
"""
self.actor.load_state_dict(actor_weights)
# Update target networks by polyak averaging.
self.iter += 1
self.update_target_networks()
[docs] def update_algorithm_parameter(self, parameter_name, new_parameter_value):
"""
If `parameter_name` is an attribute of the algorithm, change its value
to `new_parameter_value value`.
Parameters
----------
parameter_name : str
Worker.algo attribute name
new_parameter_value : int or float
New value for `parameter_name`.
"""
if hasattr(self, parameter_name):
setattr(self, parameter_name, new_parameter_value)
if parameter_name == "lr":
for param_group in self.pi_optimizer.param_groups:
param_group['lr'] = new_parameter_value
for param_group in self.q_optimizer.param_groups:
param_group['lr'] = new_parameter_value