[Question] How to implement RNN policy with SKRL
Question How to implement RNN policy with SKRL
Hi I can not find an implementation for skrl using LSTM or GRU in the policy I would like to ask whether it is possible to use reccurent network implementation to the current SKRL library with Isaac lab and if there is any similar example to use as guide
Hi @ParlitsisG
Please, check this discussion: https://github.com/Toni-SM/skrl/discussions/276#discussioncomment-12257542.
Config (.yaml) support is planned for the upcoming skrl release, meanwhile you can use skrl's standalone scripts for it.
hello, I am trying to do this to add a GRU and I am running in an issue. I followed the https://github.com/Toni-SM/skrl/discussions/276 thread but I get this error:
Traceback (most recent call last):
File "/home/shiven/IsaacLab/source/isaaclab_tasks/isaaclab_tasks/direct/aod_float/skrl_discrete_train.py", line 228, in <module>
trainer.train()
File "/home/shiven/env_isaaclab/lib/python3.10/site-packages/skrl/trainers/torch/sequential.py", line 86, in train
self.single_agent_train()
File "/home/shiven/env_isaaclab/lib/python3.10/site-packages/skrl/trainers/torch/base.py", line 193, in single_agent_train
actions = self.agents.act(states, timestep=timestep, timesteps=self.timesteps)[0]
File "/home/shiven/env_isaaclab/lib/python3.10/site-packages/skrl/agents/torch/ppo/ppo.py", line 243, in act
actions, log_prob, outputs = self.policy.act({"states": self._state_preprocessor(states)}, role="policy")
File "/home/shiven/IsaacLab/source/isaaclab_tasks/isaaclab_tasks/direct/aod_float/skrl_discrete_train.py", line 96, in act
return CategoricalMixin.act(self, inputs, role)
File "/home/shiven/env_isaaclab/lib/python3.10/site-packages/skrl/models/torch/categorical.py", line 89, in act
net_output, outputs = self.compute(inputs, role)
File "/home/shiven/IsaacLab/source/isaaclab_tasks/isaaclab_tasks/direct/aod_float/skrl_discrete_train.py", line 103, in compute
hidden_states = inputs["rnn"][0]
KeyError: 'rnn'
[skrl:INFO] Closing environment
[skrl:INFO] Environment closed
I am using a dict as observation and discrete action space. What is the issue here?
import torch
import torch.nn as nn
# import the skrl components to build the RL system
from skrl.agents.torch.ppo import PPO, PPO_DEFAULT_CONFIG
from skrl.envs.loaders.torch import load_isaaclab_env
from skrl.envs.wrappers.torch import wrap_env
from skrl.memories.torch import RandomMemory
from skrl.models.torch import DeterministicMixin, CategoricalMixin, Model
from skrl.resources.preprocessors.torch import RunningStandardScaler
from skrl.resources.schedulers.torch import KLAdaptiveRL
from skrl.trainers.torch import SequentialTrainer
from skrl.utils import set_seed
# Helper function to extract dimensions from gym spaces
import gym
def get_space_dim(space):
# Handle Dict spaces by summing the dimensions of all subspaces
if hasattr(space, "spaces") and isinstance(space.spaces, dict):
return sum(get_space_dim(subspace) for subspace in space.spaces.values())
elif hasattr(space, "shape") and space.shape is not None and len(space.shape) > 0:
return int(torch.prod(torch.tensor(space.shape)))
elif hasattr(space, "n"):
return int(space.n)
else:
raise ValueError(f"Unknown space type: {space}")
# seed for reproducibility
set_seed(42) # e.g. `set_seed(42)` for fixed seed
# define shared model (stochastic and deterministic models) using mixins
class Shared(CategoricalMixin, DeterministicMixin, Model):
def __init__(
self,
observation_space,
action_space,
device,
unnormalized_log_prob=True,
clip_actions=False,
sequence_length=128,
):
Model.__init__(self, observation_space, action_space, device)
CategoricalMixin.__init__(self, unnormalized_log_prob)
DeterministicMixin.__init__(self, clip_actions)
# Extract dimensions
self.num_observations = get_space_dim(observation_space)
self.num_actions = get_space_dim(action_space)
self.sequence_length = sequence_length
self.num_layers = 1
self.net = nn.Sequential(
nn.Linear(self.num_observations, 256),
nn.ELU(),
nn.Linear(256, 128),
nn.ELU(),
nn.Linear(128, 64),
nn.ELU(),
)
self.gru = nn.GRU(
input_size=self.num_observations,
hidden_size=64,
num_layers=self.num_layers,
batch_first=True,
) # batch_first -> (batch, sequence, features)
self.mean_layer = nn.Linear(64, self.num_actions)
self.value_layer = nn.Linear(64, 1)
# For RNN specification
self.hidden_size = 64
self.num_layers = 1
def get_specification(self):
# batch size (N) is the number of envs
return {
"rnn": {
"sequence_length": self.sequence_length,
"sizes": [(self.num_layers, self.num_envs, self.hidden_size)],
}
} # hidden states (D ∗ num_layers, N, Hout)
def act(self, inputs, role):
if role == "policy":
return CategoricalMixin.act(self, inputs, role)
elif role == "value":
return DeterministicMixin.act(self, inputs, role)
def compute(self, inputs, role):
states = inputs["states"]
terminated = inputs.get("terminated", None)
hidden_states = inputs["rnn"][0]
rnn_output, rnn_dict = self.rnn_rollout(states, terminated, hidden_states)
if role == "policy":
return self.mean_layer(rnn_output), rnn_dict
elif role == "value":
return self.value_layer(rnn_output), rnn_dict
def rnn_rollout(self, states, terminated, hidden_states):
if self.training:
rnn_input = states.view(
-1, self.sequence_length, states.shape[-1]
) # (N, L, Hin): N=batch_size, L=sequence_length
hidden_states = hidden_states.view(
self.num_layers, -1, self.sequence_length, hidden_states.shape[-1]
) # (D * num_layers, N, L, Hout)
# get the hidden states corresponding to the initial sequence
hidden_states = hidden_states[
:, :, 0, :
].contiguous() # (D * num_layers, N, Hout)
# reset the RNN state in the middle of a sequence
if terminated is not None and torch.any(terminated):
rnn_outputs = []
terminated = terminated.view(-1, self.sequence_length)
indexes = (
[0]
+ (
terminated[:, :-1].any(dim=0).nonzero(as_tuple=True)[0] + 1
).tolist()
+ [self.sequence_length]
)
for i in range(len(indexes) - 1):
i0, i1 = indexes[i], indexes[i + 1]
rnn_output, hidden_states = self.gru(
rnn_input[:, i0:i1, :], hidden_states
)
hidden_states[:, (terminated[:, i1 - 1]), :] = 0
rnn_outputs.append(rnn_output)
rnn_output = torch.cat(rnn_outputs, dim=1)
# no need to reset the RNN state in the sequence
else:
rnn_output, hidden_states = self.gru(rnn_input, hidden_states)
# rollout
else:
rnn_input = states.view(
-1, 1, states.shape[-1]
) # (N, L, Hin): N=num_envs, L=1
rnn_output, hidden_states = self.gru(rnn_input, hidden_states)
# flatten the RNN output
rnn_output = torch.flatten(
rnn_output, start_dim=0, end_dim=1
) # (N, L, D ∗ Hout) -> (N * L, D ∗ Hout)
# action_space
return self.net(rnn_output), {"rnn": [hidden_states]}
# load and wrap the Isaac Lab environment
env = load_isaaclab_env(task_name="AOD-Float-Discrete-v0")
env = wrap_env(env)
device = env.device
# instantiate a memory as rollout buffer (any memory can be used for this)
memory = RandomMemory(memory_size=16, num_envs=env.num_envs, device=device)
# instantiate the agent's models (function approximators).
# PPO requires 2 models, visit its documentation for more details
# https://skrl.readthedocs.io/en/latest/api/agents/ppo.html#models
models = {}
models["policy"] = Shared(env.observation_space, env.action_space, device)
models["value"] = models["policy"] # same instance: shared model
# configure and instantiate the agent (visit its documentation to see all the options)
# https://skrl.readthedocs.io/en/latest/api/agents/ppo.html#configuration-and-hyperparameters
cfg = PPO_DEFAULT_CONFIG.copy()
cfg["rollouts"] = 16 # memory_size
cfg["learning_epochs"] = 8
cfg["mini_batches"] = 4 # 16 * 1024 / 4096
cfg["discount_factor"] = 0.99
cfg["lambda"] = 0.95
cfg["learning_rate"] = 3e-4
cfg["learning_rate_scheduler"] = KLAdaptiveRL
cfg["learning_rate_scheduler_kwargs"] = {"kl_threshold": 0.008}
cfg["random_timesteps"] = 0
cfg["learning_starts"] = 0
cfg["grad_norm_clip"] = 1.0
cfg["ratio_clip"] = 0.2
cfg["value_clip"] = 0.2
cfg["clip_predicted_values"] = True
cfg["entropy_loss_scale"] = 0.0
cfg["value_loss_scale"] = 1.0
cfg["kl_threshold"] = 0
cfg["rewards_shaper"] = lambda rewards, *args, **kwargs: rewards * 0.1
cfg["time_limit_bootstrap"] = True
cfg["state_preprocessor"] = RunningStandardScaler
cfg["state_preprocessor_kwargs"] = {
"size": get_space_dim(env.observation_space),
"device": device,
}
cfg["value_preprocessor"] = RunningStandardScaler
cfg["value_preprocessor_kwargs"] = {"size": 1, "device": device}
# logging to TensorBoard and write checkpoints (in timesteps)
cfg["experiment"]["write_interval"] = 40
cfg["experiment"]["checkpoint_interval"] = 400
cfg["experiment"]["directory"] = "runs/torch/AOD-Float-Discrete-v0"
agent = PPO(
models=models,
memory=memory,
cfg=cfg,
observation_space=env.observation_space,
action_space=env.action_space,
device=device,
)
# configure and instantiate the RL trainer
cfg_trainer = {"timesteps": 20000, "headless": True}
trainer = SequentialTrainer(cfg=cfg_trainer, env=env, agents=agent)
# start training
trainer.train()
# # ---------------------------------------------------------
# # comment the code above: `trainer.train()`, and...
# # uncomment the following lines to evaluate a trained agent
# # ---------------------------------------------------------
# from skrl.utils.huggingface import download_model_from_huggingface
# # download the trained agent's checkpoint from Hugging Face Hub and load it
# path = download_model_from_huggingface("skrl/IsaacOrbit-Isaac-Ant-v0-PPO", filename="agent.pt")
# agent.load(path)
# # start evaluation
# trainer.eval()
Hi @shiven2001
Answered in https://github.com/isaac-sim/IsaacLab/issues/2940#issuecomment-3073752154