from machin.frame.buffers.prioritized_buffer import PrioritizedBuffer
from machin.utils.logging import default_logger
# pylint: disable=wildcard-import, unused-wildcard-import
from .ddpg import *
[docs]class DDPGPer(DDPG):
"""
DDPG with prioritized experience replay.
Warning:
Your criterion must return a tensor of shape ``[batch_size,1]``
when given two tensors of shape ``[batch_size,1]``, since we
need to multiply the loss with importance sampling weight
element-wise.
If you are using loss modules given by pytorch. It is always
safe to use them without any modification.
"""
def __init__(
self,
actor: Union[NeuralNetworkModule, nn.Module],
actor_target: Union[NeuralNetworkModule, nn.Module],
critic: Union[NeuralNetworkModule, nn.Module],
critic_target: Union[NeuralNetworkModule, nn.Module],
optimizer: Callable,
criterion,
*_,
lr_scheduler: Callable = None,
lr_scheduler_args: Tuple[Tuple, Tuple] = None,
lr_scheduler_kwargs: Tuple[Dict, Dict] = None,
batch_size: int = 100,
update_rate: float = 0.005,
update_steps: Union[int, None] = None,
actor_learning_rate: float = 0.0005,
critic_learning_rate: float = 0.001,
discount: float = 0.99,
gradient_max: float = np.inf,
replay_size: int = 500000,
replay_device: Union[str, t.device] = "cpu",
replay_buffer: Buffer = None,
visualize: bool = False,
visualize_dir: str = "",
**__
):
# DOC INHERITED
super().__init__(
actor,
actor_target,
critic,
critic_target,
optimizer,
criterion,
lr_scheduler=lr_scheduler,
lr_scheduler_args=lr_scheduler_args,
lr_scheduler_kwargs=lr_scheduler_kwargs,
batch_size=batch_size,
update_rate=update_rate,
update_steps=update_steps,
actor_learning_rate=actor_learning_rate,
critic_learning_rate=critic_learning_rate,
discount=discount,
gradient_max=gradient_max,
replay_size=replay_size,
replay_device=replay_device,
replay_buffer=(
PrioritizedBuffer(replay_size, replay_device)
if replay_buffer is None
else replay_buffer
),
visualize=visualize,
visualize_dir=visualize_dir,
)
# Must have reduction attribute and value must be None
if not hasattr(self.criterion, "reduction"):
raise RuntimeError(
"Criterion does not have the "
"'reduction' property, are you using a custom "
"criterion?"
)
else:
# A loss defined in ``torch.nn.modules.loss``
if getattr(self.criterion, "reduction") != "none":
default_logger.warning(
"The reduction property of criterion is not 'none', "
"automatically corrected."
)
self.criterion.reduction = "none"
[docs] def update(
self,
update_value=True,
update_policy=True,
update_target=True,
concatenate_samples=True,
**__
):
# DOC INHERITED
self.actor.train()
self.critic.train()
(
batch_size,
(state, action, reward, next_state, terminal, others),
index,
is_weight,
) = self.replay_buffer.sample_batch(
self.batch_size,
concatenate_samples,
sample_attrs=["state", "action", "reward", "next_state", "terminal", "*"],
)
# Update critic network first.
# Generate value reference :math: `y_i` using target actor and
# target critic.
with t.no_grad():
next_action = self.action_transform_function(
self._act(next_state, True), next_state, others
)
next_value = self._criticize(next_state, next_action, True)
next_value = next_value.view(batch_size, -1)
y_i = self.reward_function(
reward, self.discount, next_value, terminal, others
)
# critic loss
cur_value = self._criticize(state, action)
value_loss = self.criterion(cur_value, y_i.type_as(cur_value))
value_loss = value_loss * t.from_numpy(is_weight).view([batch_size, 1]).type_as(
value_loss
)
value_loss = value_loss.mean()
if self.visualize:
self.visualize_model(value_loss, "critic", self.visualize_dir)
if update_value:
self.critic.zero_grad()
self._backward(value_loss)
nn.utils.clip_grad_norm_(self.critic.parameters(), self.gradient_max)
self.critic_optim.step()
# actor loss
cur_action = self.action_transform_function(self._act(state), state, others)
act_value = self._criticize(state, cur_action)
# "-" is applied because we want to maximize J_b(u),
# but optimizer workers by minimizing the target
act_policy_loss = -act_value.mean()
# update priority
abs_error = (
t.sum(t.abs(act_value - y_i.type_as(act_value)), dim=1)
.flatten()
.detach()
.cpu()
.numpy()
)
self.replay_buffer.update_priority(abs_error, index)
if self.visualize:
self.visualize_model(act_policy_loss, "actor", self.visualize_dir)
if update_policy:
self.actor.zero_grad()
self._backward(act_policy_loss)
nn.utils.clip_grad_norm_(self.actor.parameters(), self.gradient_max)
self.actor_optim.step()
# Update target networks
if update_target:
if self.update_rate is not None:
soft_update(self.actor_target, self.actor, self.update_rate)
soft_update(self.critic_target, self.critic, self.update_rate)
else:
self._update_counter += 1
if self._update_counter % self.update_steps == 0:
hard_update(self.actor_target, self.actor)
hard_update(self.critic_target, self.critic)
self.actor.eval()
self.critic.eval()
# use .item() to prevent memory leakage
return -act_policy_loss.item(), value_loss.item()
[docs] @classmethod
def generate_config(cls, config: Union[Dict[str, Any], Config]):
config = DDPG.generate_config(config)
config["frame"] = "DDPGPer"
return config
[docs] @classmethod
def init_from_config(
cls,
config: Union[Dict[str, Any], Config],
model_device: Union[str, t.device] = "cpu",
):
f_config = config["frame_config"]
models = assert_and_get_valid_models(f_config["models"])
model_args = f_config["model_args"]
model_kwargs = f_config["model_kwargs"]
models = [
m(*arg, **kwarg).to(model_device)
for m, arg, kwarg in zip(models, model_args, model_kwargs)
]
optimizer = assert_and_get_valid_optimizer(f_config["optimizer"])
criterion = assert_and_get_valid_criterion(f_config["criterion"])(
*f_config["criterion_args"], **f_config["criterion_kwargs"]
)
criterion.reduction = "none"
lr_scheduler = f_config["lr_scheduler"] and assert_and_get_valid_lr_scheduler(
f_config["lr_scheduler"]
)
f_config["optimizer"] = optimizer
f_config["criterion"] = criterion
f_config["lr_scheduler"] = lr_scheduler
frame = cls(*models, **f_config)
return frame