时间:2026-03-18 15:04
人气:
作者:admin
Unitree RL GYM 是一个开源的 基于 Unitree 机器人强化学习(Reinforcement Learning, RL)控制示例项目,用于训练、测试和部署四足机器人控制策略。该仓库支持多种 Unitree 机器人型号,包括 Go2、H1、H1_2 和 G1。仓库地址 
本系列将着手解析整个仓库的核心代码与算法实现和训练教程。此系列默认读者拥有一定的强化学习基础和代码基础,故在部分原理和基础代码逻辑不做解释,对强化学习基础感兴趣的读者可以阅读我的入门系列:
阅读本系列的前置知识:
python语法,明白面向对象的封装pytorch基础使用Policy Gradient、Actor-Critic 和 PPO本系列:
上一期我们从train.py开始解读,着重分析了LeggedRobotCfg和LeggedRobotCfgPPO两个类中各个参数调节对训练的作用。
然后实际训练中,PPO的参数基本上不需要进行更改,如果训练结果不好,多半都是奖励函数出了问题,那么这一期,我们就来看看legged_gym/envs/base/legged_robot.py,来看看奖励函数的具体实现。
同时我们也接着上一期提到的train.py,这一期讲讲对应的play.py
Isaac Gym 是 NVIDIA 推出的一个基于 GPU 的高性能物理仿真平台,专为强化学习(Reinforcement Learning)设计。MuJoCo / PyBullet)
Isaac Gym 做到了全部在GPU上进行仿真,就可以实现超大规模高速并行训练。1 ( 条件 ) = { 1 条件成立 0 条件不成立 \mathbf{1}(\text{条件}) = \begin{cases} 1 & \text{条件成立} \\ 0 & \text{条件不成立} \end{cases} 1(条件)={10条件成立条件不成立
LeggedRobot类位于legged_gym/envs/base/legged_robot.pyfrom legged_gym import LEGGED_GYM_ROOT_DIR, envs
import time
from warnings import WarningMessage
import numpy as np
import os
from isaacgym.torch_utils import *
from isaacgym import gymtorch, gymapi, gymutil
import torch
from torch import Tensor
from typing import Tuple, Dict
from legged_gym import LEGGED_GYM_ROOT_DIR
from legged_gym.envs.base.base_task import BaseTask
from legged_gym.utils.math import wrap_to_pi
from legged_gym.utils.isaacgym_utils import get_euler_xyz as get_euler_xyz_in_tensor
from legged_gym.utils.helpers import class_to_dict
from .legged_robot_config import LeggedRobotCfg
class LeggedRobot(BaseTask):
def __init__(self, cfg: LeggedRobotCfg, sim_params, physics_engine, sim_device, headless):
""" Parses the provided config file,
calls create_sim() (which creates, simulation and environments),
initilizes pytorch buffers used during training
Args:
cfg (Dict): Environment config file
sim_params (gymapi.SimParams): simulation parameters
physics_engine (gymapi.SimType): gymapi.SIM_PHYSX (must be PhysX)
device_type (string): 'cuda' or 'cpu'
device_id (int): 0, 1, ...
headless (bool): Run without rendering if True
"""
self.cfg = cfg
self.sim_params = sim_params
self.height_samples = None
self.debug_viz = False
self.init_done = False
self._parse_cfg(self.cfg)
super().__init__(self.cfg, sim_params, physics_engine, sim_device, headless)
if not self.headless:
self.set_camera(self.cfg.viewer.pos, self.cfg.viewer.lookat)
self._init_buffers()
self._prepare_reward_function()
self.init_done = True
def step(self, actions):
""" Apply actions, simulate, call self.post_physics_step()
Args:
actions (torch.Tensor): Tensor of shape (num_envs, num_actions_per_env)
"""
clip_actions = self.cfg.normalization.clip_actions
self.actions = torch.clip(actions, -clip_actions, clip_actions).to(self.device)
# step physics and render each frame
self.render()
for _ in range(self.cfg.control.decimation):
self.torques = self._compute_torques(self.actions).view(self.torques.shape)
self.gym.set_dof_actuation_force_tensor(self.sim, gymtorch.unwrap_tensor(self.torques))
self.gym.simulate(self.sim)
if self.cfg.env.test:
elapsed_time = self.gym.get_elapsed_time(self.sim)
sim_time = self.gym.get_sim_time(self.sim)
if sim_time-elapsed_time>0:
time.sleep(sim_time-elapsed_time)
if self.device == 'cpu':
self.gym.fetch_results(self.sim, True)
self.gym.refresh_dof_state_tensor(self.sim)
self.post_physics_step()
# return clipped obs, clipped states (None), rewards, dones and infos
clip_obs = self.cfg.normalization.clip_observations
self.obs_buf = torch.clip(self.obs_buf, -clip_obs, clip_obs)
if self.privileged_obs_buf is not None:
self.privileged_obs_buf = torch.clip(self.privileged_obs_buf, -clip_obs, clip_obs)
return self.obs_buf, self.privileged_obs_buf, self.rew_buf, self.reset_buf, self.extras
def post_physics_step(self):
""" check terminations, compute observations and rewards
calls self._post_physics_step_callback() for common computations
calls self._draw_debug_vis() if needed
"""
self.gym.refresh_actor_root_state_tensor(self.sim)
self.gym.refresh_net_contact_force_tensor(self.sim)
self.episode_length_buf += 1
self.common_step_counter += 1
# prepare quantities
self.base_pos[:] = self.root_states[:, 0:3]
self.base_quat[:] = self.root_states[:, 3:7]
self.rpy[:] = get_euler_xyz_in_tensor(self.base_quat[:])
self.base_lin_vel[:] = quat_rotate_inverse(self.base_quat, self.root_states[:, 7:10])
self.base_ang_vel[:] = quat_rotate_inverse(self.base_quat, self.root_states[:, 10:13])
self.projected_gravity[:] = quat_rotate_inverse(self.base_quat, self.gravity_vec)
self._post_physics_step_callback()
# compute observations, rewards, resets, ...
self.check_termination()
self.compute_reward()
env_ids = self.reset_buf.nonzero(as_tuple=False).flatten()
self.reset_idx(env_ids)
if self.cfg.domain_rand.push_robots:
self._push_robots()
self.compute_observations() # in some cases a simulation step might be required to refresh some obs (for example body positions)
self.last_actions[:] = self.actions[:]
self.last_dof_vel[:] = self.dof_vel[:]
self.last_root_vel[:] = self.root_states[:, 7:13]
def check_termination(self):
""" Check if environments need to be reset
"""
self.reset_buf = torch.any(torch.norm(self.contact_forces[:, self.termination_contact_indices, :], dim=-1) > 1., dim=1)
self.reset_buf |= torch.logical_or(torch.abs(self.rpy[:,1])>1.0, torch.abs(self.rpy[:,0])>0.8)
self.time_out_buf = self.episode_length_buf > self.max_episode_length # no terminal reward for time-outs
self.reset_buf |= self.time_out_buf
def reset_idx(self, env_ids):
""" Reset some environments.
Calls self._reset_dofs(env_ids), self._reset_root_states(env_ids), and self._resample_commands(env_ids)
[Optional] calls self._update_terrain_curriculum(env_ids), self.update_command_curriculum(env_ids) and
Logs episode info
Resets some buffers
Args:
env_ids (list[int]): List of environment ids which must be reset
"""
if len(env_ids) == 0:
return
# reset robot states
self._reset_dofs(env_ids)
self._reset_root_states(env_ids)
self._resample_commands(env_ids)
# reset buffers
self.actions[env_ids] = 0.
self.last_actions[env_ids] = 0.
self.last_dof_vel[env_ids] = 0.
self.feet_air_time[env_ids] = 0.
self.episode_length_buf[env_ids] = 0
self.reset_buf[env_ids] = 1
# fill extras
self.extras["episode"] = {}
for key in self.episode_sums.keys():
self.extras["episode"]['rew_' + key] = torch.mean(self.episode_sums[key][env_ids]) / self.max_episode_length_s
self.episode_sums[key][env_ids] = 0.
if self.cfg.commands.curriculum:
self.extras["episode"]["max_command_x"] = self.command_ranges["lin_vel_x"][1]
# send timeout info to the algorithm
if self.cfg.env.send_timeouts:
self.extras["time_outs"] = self.time_out_buf
def compute_reward(self):
""" Compute rewards
Calls each reward function which had a non-zero scale (processed in self._prepare_reward_function())
adds each terms to the episode sums and to the total reward
"""
self.rew_buf[:] = 0.
for i in range(len(self.reward_functions)):
name = self.reward_names[i]
rew = self.reward_functions[i]() * self.reward_scales[name]
self.rew_buf += rew
self.episode_sums[name] += rew
if self.cfg.rewards.only_positive_rewards:
self.rew_buf[:] = torch.clip(self.rew_buf[:], min=0.)
# add termination reward after clipping
if "termination" in self.reward_scales:
rew = self._reward_termination() * self.reward_scales["termination"]
self.rew_buf += rew
self.episode_sums["termination"] += rew
def compute_observations(self):
""" Computes observations
"""
self.obs_buf = torch.cat(( self.base_lin_vel * self.obs_scales.lin_vel,
self.base_ang_vel * self.obs_scales.ang_vel,
self.projected_gravity,
self.commands[:, :3] * self.commands_scale,
(self.dof_pos - self.default_dof_pos) * self.obs_scales.dof_pos,
self.dof_vel * self.obs_scales.dof_vel,
self.actions
),dim=-1)
# add perceptive inputs if not blind
# add noise if needed
if self.add_noise:
self.obs_buf += (2 * torch.rand_like(self.obs_buf) - 1) * self.noise_scale_vec
def create_sim(self):
""" Creates simulation, terrain and evironments
"""
self.up_axis_idx = 2 # 2 for z, 1 for y -> adapt gravity accordingly
self.sim = self.gym.create_sim(self.sim_device_id, self.graphics_device_id, self.physics_engine, self.sim_params)
self._create_ground_plane()
self._create_envs()
def set_camera(self, position, lookat):
""" Set camera position and direction
"""
cam_pos = gymapi.Vec3(position[0], position[1], position[2])
cam_target = gymapi.Vec3(lookat[0], lookat[1], lookat[2])
self.gym.viewer_camera_look_at(self.viewer, None, cam_pos, cam_target)
#------------- Callbacks --------------
def _process_rigid_shape_props(self, props, env_id):
""" Callback allowing to store/change/randomize the rigid shape properties of each environment.
Called During environment creation.
Base behavior: randomizes the friction of each environment
Args:
props (List[gymapi.RigidShapeProperties]): Properties of each shape of the asset
env_id (int): Environment id
Returns:
[List[gymapi.RigidShapeProperties]]: Modified rigid shape properties
"""
if self.cfg.domain_rand.randomize_friction:
if env_id==0:
# prepare friction randomization
friction_range = self.cfg.domain_rand.friction_range
num_buckets = 64
bucket_ids = torch.randint(0, num_buckets, (self.num_envs, 1))
friction_buckets = torch_rand_float(friction_range[0], friction_range[1], (num_buckets,1), device='cpu')
self.friction_coeffs = friction_buckets[bucket_ids]
for s in range(len(props)):
props[s].friction = self.friction_coeffs[env_id]
return props
def _process_dof_props(self, props, env_id):
""" Callback allowing to store/change/randomize the DOF properties of each environment.
Called During environment creation.
Base behavior: stores position, velocity and torques limits defined in the URDF
Args:
props (numpy.array): Properties of each DOF of the asset
env_id (int): Environment id
Returns:
[numpy.array]: Modified DOF properties
"""
if env_id==0:
self.dof_pos_limits = torch.zeros(self.num_dof, 2, dtype=torch.float, device=self.device, requires_grad=False)
self.dof_vel_limits = torch.zeros(self.num_dof, dtype=torch.float, device=self.device, requires_grad=False)
self.torque_limits = torch.zeros(self.num_dof, dtype=torch.float, device=self.device, requires_grad=False)
for i in range(len(props)):
self.dof_pos_limits[i, 0] = props["lower"][i].item()
self.dof_pos_limits[i, 1] = props["upper"][i].item()
self.dof_vel_limits[i] = props["velocity"][i].item()
self.torque_limits[i] = props["effort"][i].item()
# soft limits
m = (self.dof_pos_limits[i, 0] + self.dof_pos_limits[i, 1]) / 2
r = self.dof_pos_limits[i, 1] - self.dof_pos_limits[i, 0]
self.dof_pos_limits[i, 0] = m - 0.5 * r * self.cfg.rewards.soft_dof_pos_limit
self.dof_pos_limits[i, 1] = m + 0.5 * r * self.cfg.rewards.soft_dof_pos_limit
return props
def _process_rigid_body_props(self, props, env_id):
# if env_id==0:
# sum = 0
# for i, p in enumerate(props):
# sum += p.mass
# print(f"Mass of body {i}: {p.mass} (before randomization)")
# print(f"Total mass {sum} (before randomization)")
# randomize base mass
if self.cfg.domain_rand.randomize_base_mass:
rng = self.cfg.domain_rand.added_mass_range
props[0].mass += np.random.uniform(rng[0], rng[1])
return props
def _post_physics_step_callback(self):
""" Callback called before computing terminations, rewards, and observations
Default behaviour: Compute ang vel command based on target and heading, compute measured terrain heights and randomly push robots
"""
#
env_ids = (self.episode_length_buf % int(self.cfg.commands.resampling_time / self.dt)==0).nonzero(as_tuple=False).flatten()
self._resample_commands(env_ids)
if self.cfg.commands.heading_command:
forward = quat_apply(self.base_quat, self.forward_vec)
heading = torch.atan2(forward[:, 1], forward[:, 0])
self.commands[:, 2] = torch.clip(0.5*wrap_to_pi(self.commands[:, 3] - heading), -1., 1.)
def _resample_commands(self, env_ids):
""" Randommly select commands of some environments
Args:
env_ids (List[int]): Environments ids for which new commands are needed
"""
self.commands[env_ids, 0] = torch_rand_float(self.command_ranges["lin_vel_x"][0], self.command_ranges["lin_vel_x"][1], (len(env_ids), 1), device=self.device).squeeze(1)
self.commands[env_ids, 1] = torch_rand_float(self.command_ranges["lin_vel_y"][0], self.command_ranges["lin_vel_y"][1], (len(env_ids), 1), device=self.device).squeeze(1)
if self.cfg.commands.heading_command:
self.commands[env_ids, 3] = torch_rand_float(self.command_ranges["heading"][0], self.command_ranges["heading"][1], (len(env_ids), 1), device=self.device).squeeze(1)
else:
self.commands[env_ids, 2] = torch_rand_float(self.command_ranges["ang_vel_yaw"][0], self.command_ranges["ang_vel_yaw"][1], (len(env_ids), 1), device=self.device).squeeze(1)
# set small commands to zero
self.commands[env_ids, :2] *= (torch.norm(self.commands[env_ids, :2], dim=1) > 0.2).unsqueeze(1)
def _compute_torques(self, actions):
""" Compute torques from actions.
Actions can be interpreted as position or velocity targets given to a PD controller, or directly as scaled torques.
[NOTE]: torques must have the same dimension as the number of DOFs, even if some DOFs are not actuated.
Args:
actions (torch.Tensor): Actions
Returns:
[torch.Tensor]: Torques sent to the simulation
"""
#pd controller
actions_scaled = actions * self.cfg.control.action_scale
control_type = self.cfg.control.control_type
if control_type=="P":
torques = self.p_gains*(actions_scaled + self.default_dof_pos - self.dof_pos) - self.d_gains*self.dof_vel
elif control_type=="V":
torques = self.p_gains*(actions_scaled - self.dof_vel) - self.d_gains*(self.dof_vel - self.last_dof_vel)/self.sim_params.dt
elif control_type=="T":
torques = actions_scaled
else:
raise NameError(f"Unknown controller type: {control_type}")
return torch.clip(torques, -self.torque_limits, self.torque_limits)
def _reset_dofs(self, env_ids):
""" Resets DOF position and velocities of selected environmments
Positions are randomly selected within 0.5:1.5 x default positions.
Velocities are set to zero.
Args:
env_ids (List[int]): Environemnt ids
"""
self.dof_pos[env_ids] = self.default_dof_pos * torch_rand_float(0.5, 1.5, (len(env_ids), self.num_dof), device=self.device)
self.dof_vel[env_ids] = 0.
env_ids_int32 = env_ids.to(dtype=torch.int32)
self.gym.set_dof_state_tensor_indexed(self.sim,
gymtorch.unwrap_tensor(self.dof_state),
gymtorch.unwrap_tensor(env_ids_int32), len(env_ids_int32))
def _reset_root_states(self, env_ids):
""" Resets ROOT states position and velocities of selected environmments
Sets base position based on the curriculum
Selects randomized base velocities within -0.5:0.5 [m/s, rad/s]
Args:
env_ids (List[int]): Environemnt ids
"""
# base position
if self.custom_origins:
self.root_states[env_ids] = self.base_init_state
self.root_states[env_ids, :3] += self.env_origins[env_ids]
self.root_states[env_ids, :2] += torch_rand_float(-1., 1., (len(env_ids), 2), device=self.device) # xy position within 1m of the center
else:
self.root_states[env_ids] = self.base_init_state
self.root_states[env_ids, :3] += self.env_origins[env_ids]
# base velocities
self.root_states[env_ids, 7:13] = torch_rand_float(-0.5, 0.5, (len(env_ids), 6), device=self.device) # [7:10]: lin vel, [10:13]: ang vel
env_ids_int32 = env_ids.to(dtype=torch.int32)
self.gym.set_actor_root_state_tensor_indexed(self.sim,
gymtorch.unwrap_tensor(self.root_states),
gymtorch.unwrap_tensor(env_ids_int32), len(env_ids_int32))
def _push_robots(self):
""" Random pushes the robots. Emulates an impulse by setting a randomized base velocity.
"""
env_ids = torch.arange(self.num_envs, device=self.device)
push_env_ids = env_ids[self.episode_length_buf[env_ids] % int(self.cfg.domain_rand.push_interval) == 0]
if len(push_env_ids) == 0:
return
max_vel = self.cfg.domain_rand.max_push_vel_xy
self.root_states[:, 7:9] = torch_rand_float(-max_vel, max_vel, (self.num_envs, 2), device=self.device) # lin vel x/y
env_ids_int32 = push_env_ids.to(dtype=torch.int32)
self.gym.set_actor_root_state_tensor_indexed(self.sim,
gymtorch.unwrap_tensor(self.root_states),
gymtorch.unwrap_tensor(env_ids_int32), len(env_ids_int32))
def update_command_curriculum(self, env_ids):
""" Implements a curriculum of increasing commands
Args:
env_ids (List[int]): ids of environments being reset
"""
# If the tracking reward is above 80% of the maximum, increase the range of commands
if torch.mean(self.episode_sums["tracking_lin_vel"][env_ids]) / self.max_episode_length > 0.8 * self.reward_scales["tracking_lin_vel"]:
self.command_ranges["lin_vel_x"][0] = np.clip(self.command_ranges["lin_vel_x"][0] - 0.5, -self.cfg.commands.max_curriculum, 0.)
self.command_ranges["lin_vel_x"][1] = np.clip(self.command_ranges["lin_vel_x"][1] + 0.5, 0., self.cfg.commands.max_curriculum)
def _get_noise_scale_vec(self, cfg):
""" Sets a vector used to scale the noise added to the observations.
[NOTE]: Must be adapted when changing the observations structure
Args:
cfg (Dict): Environment config file
Returns:
[torch.Tensor]: Vector of scales used to multiply a uniform distribution in [-1, 1]
"""
noise_vec = torch.zeros_like(self.obs_buf[0])
self.add_noise = self.cfg.noise.add_noise
noise_scales = self.cfg.noise.noise_scales
noise_level = self.cfg.noise.noise_level
noise_vec[:3] = noise_scales.lin_vel * noise_level * self.obs_scales.lin_vel
noise_vec[3:6] = noise_scales.ang_vel * noise_level * self.obs_scales.ang_vel
noise_vec[6:9] = noise_scales.gravity * noise_level
noise_vec[9:12] = 0. # commands
noise_vec[12:12+self.num_actions] = noise_scales.dof_pos * noise_level * self.obs_scales.dof_pos
noise_vec[12+self.num_actions:12+2*self.num_actions] = noise_scales.dof_vel * noise_level * self.obs_scales.dof_vel
noise_vec[12+2*self.num_actions:12+3*self.num_actions] = 0. # previous actions
return noise_vec
#----------------------------------------
def _init_buffers(self):
""" Initialize torch tensors which will contain simulation states and processed quantities
"""
# get gym GPU state tensors
actor_root_state = self.gym.acquire_actor_root_state_tensor(self.sim)
dof_state_tensor = self.gym.acquire_dof_state_tensor(self.sim)
net_contact_forces = self.gym.acquire_net_contact_force_tensor(self.sim)
self.gym.refresh_dof_state_tensor(self.sim)
self.gym.refresh_actor_root_state_tensor(self.sim)
self.gym.refresh_net_contact_force_tensor(self.sim)
# create some wrapper tensors for different slices
self.root_states = gymtorch.wrap_tensor(actor_root_state)
self.dof_state = gymtorch.wrap_tensor(dof_state_tensor)
self.dof_pos = self.dof_state.view(self.num_envs, self.num_dof, 2)[..., 0]
self.dof_vel = self.dof_state.view(self.num_envs, self.num_dof, 2)[..., 1]
self.base_quat = self.root_states[:, 3:7]
self.rpy = get_euler_xyz_in_tensor(self.base_quat)
self.base_pos = self.root_states[:self.num_envs, 0:3]
self.contact_forces = gymtorch.wrap_tensor(net_contact_forces).view(self.num_envs, -1, 3) # shape: num_envs, num_bodies, xyz axis
# initialize some data used later on
self.common_step_counter = 0
self.extras = {}
self.noise_scale_vec = self._get_noise_scale_vec(self.cfg)
self.gravity_vec = to_torch(get_axis_params(-1., self.up_axis_idx), device=self.device).repeat((self.num_envs, 1))
self.forward_vec = to_torch([1., 0., 0.], device=self.device).repeat((self.num_envs, 1))
self.torques = torch.zeros(self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False)
self.p_gains = torch.zeros(self.num_actions, dtype=torch.float, device=self.device, requires_grad=False)
self.d_gains = torch.zeros(self.num_actions, dtype=torch.float, device=self.device, requires_grad=False)
self.actions = torch.zeros(self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False)
self.last_actions = torch.zeros(self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False)
self.last_dof_vel = torch.zeros_like(self.dof_vel)
self.last_root_vel = torch.zeros_like(self.root_states[:, 7:13])
self.commands = torch.zeros(self.num_envs, self.cfg.commands.num_commands, dtype=torch.float, device=self.device, requires_grad=False) # x vel, y vel, yaw vel, heading
self.commands_scale = torch.tensor([self.obs_scales.lin_vel, self.obs_scales.lin_vel, self.obs_scales.ang_vel], device=self.device, requires_grad=False,) # TODO change this
self.feet_air_time = torch.zeros(self.num_envs, self.feet_indices.shape[0], dtype=torch.float, device=self.device, requires_grad=False)
self.last_contacts = torch.zeros(self.num_envs, len(self.feet_indices), dtype=torch.bool, device=self.device, requires_grad=False)
self.base_lin_vel = quat_rotate_inverse(self.base_quat, self.root_states[:, 7:10])
self.base_ang_vel = quat_rotate_inverse(self.base_quat, self.root_states[:, 10:13])
self.projected_gravity = quat_rotate_inverse(self.base_quat, self.gravity_vec)
# joint positions offsets and PD gains
self.default_dof_pos = torch.zeros(self.num_dof, dtype=torch.float, device=self.device, requires_grad=False)
for i in range(self.num_dofs):
name = self.dof_names[i]
angle = self.cfg.init_state.default_joint_angles[name]
self.default_dof_pos[i] = angle
found = False
for dof_name in self.cfg.control.stiffness.keys():
if dof_name in name:
self.p_gains[i] = self.cfg.control.stiffness[dof_name]
self.d_gains[i] = self.cfg.control.damping[dof_name]
found = True
if not found:
self.p_gains[i] = 0.
self.d_gains[i] = 0.
if self.cfg.control.control_type in ["P", "V"]:
print(f"PD gain of joint {name} were not defined, setting them to zero")
self.default_dof_pos = self.default_dof_pos.unsqueeze(0)
def _prepare_reward_function(self):
""" Prepares a list of reward functions, whcih will be called to compute the total reward.
Looks for self._reward_<REWARD_NAME>, where <REWARD_NAME> are names of all non zero reward scales in the cfg.
"""
# remove zero scales + multiply non-zero ones by dt
for key in list(self.reward_scales.keys()):
scale = self.reward_scales[key]
if scale==0:
self.reward_scales.pop(key)
else:
self.reward_scales[key] *= self.dt
# prepare list of functions
self.reward_functions = []
self.reward_names = []
for name, scale in self.reward_scales.items():
if name=="termination":
continue
self.reward_names.append(name)
name = '_reward_' + name
self.reward_functions.append(getattr(self, name))
# reward episode sums
self.episode_sums = {name: torch.zeros(self.num_envs, dtype=torch.float, device=self.device, requires_grad=False)
for name in self.reward_scales.keys()}
def _create_ground_plane(self):
""" Adds a ground plane to the simulation, sets friction and restitution based on the cfg.
"""
plane_params = gymapi.PlaneParams()
plane_params.normal = gymapi.Vec3(0.0, 0.0, 1.0)
plane_params.static_friction = self.cfg.terrain.static_friction
plane_params.dynamic_friction = self.cfg.terrain.dynamic_friction
plane_params.restitution = self.cfg.terrain.restitution
self.gym.add_ground(self.sim, plane_params)
def _create_envs(self):
""" Creates environments:
1. loads the robot URDF/MJCF asset,
2. For each environment
2.1 creates the environment,
2.2 calls DOF and Rigid shape properties callbacks,
2.3 create actor with these properties and add them to the env
3. Store indices of different bodies of the robot
"""
asset_path = self.cfg.asset.file.format(LEGGED_GYM_ROOT_DIR=LEGGED_GYM_ROOT_DIR)
asset_root = os.path.dirname(asset_path)
asset_file = os.path.basename(asset_path)
asset_options = gymapi.AssetOptions()
asset_options.default_dof_drive_mode = self.cfg.asset.default_dof_drive_mode
asset_options.collapse_fixed_joints = self.cfg.asset.collapse_fixed_joints
asset_options.replace_cylinder_with_capsule = self.cfg.asset.replace_cylinder_with_capsule
asset_options.flip_visual_attachments = self.cfg.asset.flip_visual_attachments
asset_options.fix_base_link = self.cfg.asset.fix_base_link
asset_options.density = self.cfg.asset.density
asset_options.angular_damping = self.cfg.asset.angular_damping
asset_options.linear_damping = self.cfg.asset.linear_damping
asset_options.max_angular_velocity = self.cfg.asset.max_angular_velocity
asset_options.max_linear_velocity = self.cfg.asset.max_linear_velocity
asset_options.armature = self.cfg.asset.armature
asset_options.thickness = self.cfg.asset.thickness
asset_options.disable_gravity = self.cfg.asset.disable_gravity
robot_asset = self.gym.load_asset(self.sim, asset_root, asset_file, asset_options)
self.num_dof = self.gym.get_asset_dof_count(robot_asset)
self.num_bodies = self.gym.get_asset_rigid_body_count(robot_asset)
dof_props_asset = self.gym.get_asset_dof_properties(robot_asset)
rigid_shape_props_asset = self.gym.get_asset_rigid_shape_properties(robot_asset)
# save body names from the asset
body_names = self.gym.get_asset_rigid_body_names(robot_asset)
self.dof_names = self.gym.get_asset_dof_names(robot_asset)
self.num_bodies = len(body_names)
self.num_dofs = len(self.dof_names)
feet_names = [s for s in body_names if self.cfg.asset.foot_name in s]
penalized_contact_names = []
for name in self.cfg.asset.penalize_contacts_on:
penalized_contact_names.extend([s for s in body_names if name in s])
termination_contact_names = []
for name in self.cfg.asset.terminate_after_contacts_on:
termination_contact_names.extend([s for s in body_names if name in s])
base_init_state_list = self.cfg.init_state.pos + self.cfg.init_state.rot + self.cfg.init_state.lin_vel + self.cfg.init_state.ang_vel
self.base_init_state = to_torch(base_init_state_list, device=self.device, requires_grad=False)
start_pose = gymapi.Transform()
start_pose.p = gymapi.Vec3(*self.base_init_state[:3])
self._get_env_origins()
env_lower = gymapi.Vec3(0., 0., 0.)
env_upper = gymapi.Vec3(0., 0., 0.)
self.actor_handles = []
self.envs = []
for i in range(self.num_envs):
# create env instance
env_handle = self.gym.create_env(self.sim, env_lower, env_upper, int(np.sqrt(self.num_envs)))
pos = self.env_origins[i].clone()
pos[:2] += torch_rand_float(-1., 1., (2,1), device=self.device).squeeze(1)
start_pose.p = gymapi.Vec3(*pos)
rigid_shape_props = self._process_rigid_shape_props(rigid_shape_props_asset, i)
self.gym.set_asset_rigid_shape_properties(robot_asset, rigid_shape_props)
actor_handle = self.gym.create_actor(env_handle, robot_asset, start_pose, self.cfg.asset.name, i, self.cfg.asset.self_collisions, 0)
dof_props = self._process_dof_props(dof_props_asset, i)
self.gym.set_actor_dof_properties(env_handle, actor_handle, dof_props)
body_props = self.gym.get_actor_rigid_body_properties(env_handle, actor_handle)
body_props = self._process_rigid_body_props(body_props, i)
self.gym.set_actor_rigid_body_properties(env_handle, actor_handle, body_props, recomputeInertia=True)
self.envs.append(env_handle)
self.actor_handles.append(actor_handle)
self.feet_indices = torch.zeros(len(feet_names), dtype=torch.long, device=self.device, requires_grad=False)
for i in range(len(feet_names)):
self.feet_indices[i] = self.gym.find_actor_rigid_body_handle(self.envs[0], self.actor_handles[0], feet_names[i])
self.penalised_contact_indices = torch.zeros(len(penalized_contact_names), dtype=torch.long, device=self.device, requires_grad=False)
for i in range(len(penalized_contact_names)):
self.penalised_contact_indices[i] = self.gym.find_actor_rigid_body_handle(self.envs[0], self.actor_handles[0], penalized_contact_names[i])
self.termination_contact_indices = torch.zeros(len(termination_contact_names), dtype=torch.long, device=self.device, requires_grad=False)
for i in range(len(termination_contact_names)):
self.termination_contact_indices[i] = self.gym.find_actor_rigid_body_handle(self.envs[0], self.actor_handles[0], termination_contact_names[i])
def _get_env_origins(self):
""" Sets environment origins. On rough terrain the origins are defined by the terrain platforms.
Otherwise create a grid.
"""
self.custom_origins = False
self.env_origins = torch.zeros(self.num_envs, 3, device=self.device, requires_grad=False)
# create a grid of robots
num_cols = np.floor(np.sqrt(self.num_envs))
num_rows = np.ceil(self.num_envs / num_cols)
xx, yy = torch.meshgrid(torch.arange(num_rows), torch.arange(num_cols))
spacing = self.cfg.env.env_spacing
self.env_origins[:, 0] = spacing * xx.flatten()[:self.num_envs]
self.env_origins[:, 1] = spacing * yy.flatten()[:self.num_envs]
self.env_origins[:, 2] = 0.
def _parse_cfg(self, cfg):
self.dt = self.cfg.control.decimation * self.sim_params.dt
self.obs_scales = self.cfg.normalization.obs_scales
self.reward_scales = class_to_dict(self.cfg.rewards.scales)
self.command_ranges = class_to_dict(self.cfg.commands.ranges)
self.max_episode_length_s = self.cfg.env.episode_length_s
self.max_episode_length = np.ceil(self.max_episode_length_s / self.dt)
self.cfg.domain_rand.push_interval = np.ceil(self.cfg.domain_rand.push_interval_s / self.dt)
#------------ reward functions----------------
def _reward_lin_vel_z(self):
# Penalize z axis base linear velocity
return torch.square(self.base_lin_vel[:, 2])
def _reward_ang_vel_xy(self):
# Penalize xy axes base angular velocity
return torch.sum(torch.square(self.base_ang_vel[:, :2]), dim=1)
def _reward_orientation(self):
# Penalize non flat base orientation
return torch.sum(torch.square(self.projected_gravity[:, :2]), dim=1)
def _reward_base_height(self):
# Penalize base height away from target
base_height = self.root_states[:, 2]
return torch.square(base_height - self.cfg.rewards.base_height_target)
def _reward_torques(self):
# Penalize torques
return torch.sum(torch.square(self.torques), dim=1)
def _reward_dof_vel(self):
# Penalize dof velocities
return torch.sum(torch.square(self.dof_vel), dim=1)
def _reward_dof_acc(self):
# Penalize dof accelerations
return torch.sum(torch.square((self.last_dof_vel - self.dof_vel) / self.dt), dim=1)
def _reward_action_rate(self):
# Penalize changes in actions
return torch.sum(torch.square(self.last_actions - self.actions), dim=1)
def _reward_collision(self):
# Penalize collisions on selected bodies
return torch.sum(1.*(torch.norm(self.contact_forces[:, self.penalised_contact_indices, :], dim=-1) > 0.1), dim=1)
def _reward_termination(self):
# Terminal reward / penalty
return self.reset_buf * ~self.time_out_buf
def _reward_dof_pos_limits(self):
# Penalize dof positions too close to the limit
out_of_limits = -(self.dof_pos - self.dof_pos_limits[:, 0]).clip(max=0.) # lower limit
out_of_limits += (self.dof_pos - self.dof_pos_limits[:, 1]).clip(min=0.)
return torch.sum(out_of_limits, dim=1)
def _reward_dof_vel_limits(self):
# Penalize dof velocities too close to the limit
# clip to max error = 1 rad/s per joint to avoid huge penalties
return torch.sum((torch.abs(self.dof_vel) - self.dof_vel_limits*self.cfg.rewards.soft_dof_vel_limit).clip(min=0., max=1.), dim=1)
def _reward_torque_limits(self):
# penalize torques too close to the limit
return torch.sum((torch.abs(self.torques) - self.torque_limits*self.cfg.rewards.soft_torque_limit).clip(min=0.), dim=1)
def _reward_tracking_lin_vel(self):
# Tracking of linear velocity commands (xy axes)
lin_vel_error = torch.sum(torch.square(self.commands[:, :2] - self.base_lin_vel[:, :2]), dim=1)
return torch.exp(-lin_vel_error/self.cfg.rewards.tracking_sigma)
def _reward_tracking_ang_vel(self):
# Tracking of angular velocity commands (yaw)
ang_vel_error = torch.square(self.commands[:, 2] - self.base_ang_vel[:, 2])
return torch.exp(-ang_vel_error/self.cfg.rewards.tracking_sigma)
def _reward_feet_air_time(self):
# Reward long steps
# Need to filter the contacts because the contact reporting of PhysX is unreliable on meshes
contact = self.contact_forces[:, self.feet_indices, 2] > 1.
contact_filt = torch.logical_or(contact, self.last_contacts)
self.last_contacts = contact
first_contact = (self.feet_air_time > 0.) * contact_filt
self.feet_air_time += self.dt
rew_airTime = torch.sum((self.feet_air_time - 0.5) * first_contact, dim=1) # reward only on first contact with the ground
rew_airTime *= torch.norm(self.commands[:, :2], dim=1) > 0.1 #no reward for zero command
self.feet_air_time *= ~contact_filt
return rew_airTime
def _reward_stumble(self):
# Penalize feet hitting vertical surfaces
return torch.any(torch.norm(self.contact_forces[:, self.feet_indices, :2], dim=2) >\
5 *torch.abs(self.contact_forces[:, self.feet_indices, 2]), dim=1)
def _reward_stand_still(self):
# Penalize motion at zero commands
return torch.sum(torch.abs(self.dof_pos - self.default_dof_pos), dim=1) * (torch.norm(self.commands[:, :2], dim=1) < 0.1)
def _reward_feet_contact_forces(self):
# penalize high contact forces
return torch.sum((torch.norm(self.contact_forces[:, self.feet_indices, :], dim=-1) - self.cfg.rewards.max_contact_force).clip(min=0.), dim=1)
step(self, actions):强化学习的入口函数,接收策略网络输出的 actions,推动物理仿真并返回obs(观测),reward(奖励), done(是否结束)post_physics_step(self):更新机器人状态(位置、速度、姿态),处理reset,其中调用:
check_termination()(是否摔倒/结束)compute_reward()(奖励计算)compute_observations()(生成观测)check_termination(self):判断是否结束 episodereset_idx(self, env_ids):重置环境compute_reward(self):遍历所有 _reward_xxx,按权重 scale 加权求和_prepare_reward_function(self):动态注册奖励函数_reward_orientation(保持身体水平)_reward_lin_vel_z(减少上下抖动)_reward_ang_vel_xy(减少翻滚)_reward_torques_reward_dof_vel_reward_dof_acc_reward_action_rate_reward_tracking_lin_vel_reward_tracking_ang_vel_reward_collision_reward_dof_pos_limits_reward_torque_limits_reward_stumble_reward_feet_air_timecompute_observations(self):拼接观测信息obs = [
线速度,
角速度,
重力方向,
指令,
关节位置,
关节速度,
上一帧动作
]
_compute_torques(self, actions):支持三种控制:
P:位置控制(最常用)V:速度控制T:直接力矩create_sim(self):创建物理仿真_create_envs(self):批量创建机器人_process_rigid_shape_props(摩擦)_process_rigid_body_props(质量)_push_robots(随机推一下)step()step() 是 策略网络与物理环境交互的入口函数,每调用一次,就完成一次完整的“决策 → 执行 → 反馈”闭环。def step(self, actions):
""" Apply actions, simulate, call self.post_physics_step()
Args:
actions (torch.Tensor): Tensor of shape (num_envs, num_actions_per_env)
"""
clip_actions = self.cfg.normalization.clip_actions
self.actions = torch.clip(actions, -clip_actions, clip_actions).to(self.device)
# step physics and render each frame
self.render()
for _ in range(self.cfg.control.decimation):
self.torques = self._compute_torques(self.actions).view(self.torques.shape)
self.gym.set_dof_actuation_force_tensor(self.sim, gymtorch.unwrap_tensor(self.torques))
self.gym.simulate(self.sim)
if self.cfg.env.test:
elapsed_time = self.gym.get_elapsed_time(self.sim)
sim_time = self.gym.get_sim_time(self.sim)
if sim_time-elapsed_time>0:
time.sleep(sim_time-elapsed_time)
if self.device == 'cpu':
self.gym.fetch_results(self.sim, True)
self.gym.refresh_dof_state_tensor(self.sim)
self.post_physics_step()
# return clipped obs, clipped states (None), rewards, dones and infos
clip_obs = self.cfg.normalization.clip_observations
self.obs_buf = torch.clip(self.obs_buf, -clip_obs, clip_obs)
if self.privileged_obs_buf is not None:
self.privileged_obs_buf = torch.clip(self.privileged_obs_buf, -clip_obs, clip_obs)
return self.obs_buf, self.privileged_obs_buf, self.rew_buf, self.reset_buf, self.extras
step()函数做了什么clip_actions = self.cfg.normalization.clip_actions
self.actions = torch.clip(actions, -clip_actions, clip_actions).to(self.device)
# step physics and render each frame
self.render()
for _ in range(self.cfg.control.decimation):
self.torques = self._compute_torques(self.actions).view(self.torques.shape)
self.gym.set_dof_actuation_force_tensor(self.sim, gymtorch.unwrap_tensor(self.torques))
self.gym.simulate(self.sim)
if self.cfg.env.test:
elapsed_time = self.gym.get_elapsed_time(self.sim)
sim_time = self.gym.get_sim_time(self.sim)
if sim_time-elapsed_time>0:
time.sleep(sim_time-elapsed_time)
if self.device == 'cpu':
self.gym.fetch_results(self.sim, True)
self.gym.refresh_dof_state_tensor(self.sim)
self.render():进行可视化渲染self.torques = self._compute_torques(self.actions):将神经网络输出转换为电机控制量self.gym.set_dof_actuation_force_tensor(...)然后把电机控制量交给Isaac Gymself.gym.simulate(self.sim):然后进一步推进仿真self.gym.refresh_dof_state_tensor(self.sim):刷新状态,这一步后,tensor才是“最新世界状态”self.post_physics_step()
def post_physics_step(self):
""" check terminations, compute observations and rewards
calls self._post_physics_step_callback() for common computations
calls self._draw_debug_vis() if needed
"""
# 刷新仿真数据
self.gym.refresh_actor_root_state_tensor(self.sim)
self.gym.refresh_net_contact_force_tensor(self.sim)
# 更新计数器
self.episode_length_buf += 1
self.common_step_counter += 1
# 状态解包
self.base_pos[:] = self.root_states[:, 0:3]
self.base_quat[:] = self.root_states[:, 3:7]
self.rpy[:] = get_euler_xyz_in_tensor(self.base_quat[:])
self.base_lin_vel[:] = quat_rotate_inverse(self.base_quat, self.root_states[:, 7:10])
self.base_ang_vel[:] = quat_rotate_inverse(self.base_quat, self.root_states[:, 10:13])
self.projected_gravity[:] = quat_rotate_inverse(self.base_quat, self.gravity_vec)
self._post_physics_step_callback()
# compute observations, rewards, resets, ...
self.check_termination()
self.compute_reward()
# 批量重置环境(并行关键)
env_ids = self.reset_buf.nonzero(as_tuple=False).flatten()
self.reset_idx(env_ids)
if self.cfg.domain_rand.push_robots:
self._push_robots()
self.compute_observations() # in some cases a simulation step might be required to refresh some obs (for example body positions)
self.last_actions[:] = self.actions[:]
self.last_dof_vel[:] = self.dof_vel[:]
self.last_root_vel[:] = self.root_states[:, 7:13]
self.base_pos[:]:机器人位置self.base_quat[:]:四元数姿态self.rpy:欧拉角base_lin_vel:线速度base_ang_vel:角速度projected_gravity:重力投影,也就是重力在机器人坐标系中的方向(用于判断倾斜和计算reward)self.check_termination():判断是否终止self.compute_reward():计算奖励(核心!!)env_ids = self.reset_buf.nonzero(as_tuple=False).flatten()self.reset_idx(env_ids)self._push_robots():增加随机扰动,推一下机器人self.compute_observations():计算观测self.obs_buf = torch.clip(self.obs_buf, -clip_obs, clip_obs)
return (
obs, # 当前观测
privileged_obs, # 特权观测(teacher用)
reward, # 奖励
done, # 是否结束
info # 额外信息
)
compute_reward()def compute_reward(self):
""" Compute rewards
Calls each reward function which had a non-zero scale (processed in self._prepare_reward_function())
adds each terms to the episode sums and to the total reward
"""
self.rew_buf[:] = 0.
for i in range(len(self.reward_functions)):
name = self.reward_names[i]
rew = self.reward_functions[i]() * self.reward_scales[name]
self.rew_buf += rew
self.episode_sums[name] += rew
if self.cfg.rewards.only_positive_rewards:
self.rew_buf[:] = torch.clip(self.rew_buf[:], min=0.)
# add termination reward after clipping
if "termination" in self.reward_scales:
rew = self._reward_termination() * self.reward_scales["termination"]
self.rew_buf += rew
self.episode_sums["termination"] += rew
self.rew_buf[:] = 0.:每一步先清空奖励self.reward_functions列表,加权遍历所有奖励函数经求和得到最终奖励rewself.rew_buf[:] = torch.clip(self.rew_buf[:], min=0.):限制奖励为正(可选)rew = self._reward_termination() * self.reward_scales["termination"]:设置终止奖励(单独处理)self.reward_functions列表self.reward_functions列表在LeggedRobot中初始化的时候调用_prepare_reward_function时候被填充def _prepare_reward_function(self):
""" Prepares a list of reward functions, whcih will be called to compute the total reward.
Looks for self._reward_<REWARD_NAME>, where <REWARD_NAME> are names of all non zero reward scales in the cfg.
"""
# remove zero scales + multiply non-zero ones by dt
for key in list(self.reward_scales.keys()):
scale = self.reward_scales[key]
if scale==0:
self.reward_scales.pop(key)
else:
self.reward_scales[key] *= self.dt
# prepare list of functions
self.reward_functions = []
self.reward_names = []
for name, scale in self.reward_scales.items():
if name=="termination":
continue
self.reward_names.append(name)
name = '_reward_' + name
self.reward_functions.append(getattr(self, name))
# reward episode sums
self.episode_sums = {name: torch.zeros(self.num_envs, dtype=torch.float, device=self.device, requires_grad=False)
for name in self.reward_scales.keys()}
_prepare_reward_function会根据LeggedRobotCfg配置类中rewards中scales出现的配置项的名字加上前缀进行动态添加奖励函数,具体格式就是:
self._reward_<REWARD_NAME>(REWARD_NAME来自如下)class LeggedRobotCfg(BaseConfig):
class rewards:
class scales:
termination = -0.0
tracking_lin_vel = 1.0
tracking_ang_vel = 0.5
lin_vel_z = -2.0
ang_vel_xy = -0.05
orientation = -0.
torques = -0.00001
dof_vel = -0.
dof_acc = -2.5e-7
base_height = -0.
feet_air_time = 1.0
collision = -1.
feet_stumble = -0.0
action_rate = -0.01
stand_still = -0.
class LeggedRobotCfg(BaseConfig):
class rewards:
class scales:
custom_rw
LeggedRobot类中添加对应格式self._reward_<REWARD_NAME>即可def _reward_custom_rw(self):
pass
reward_scales,我们区分reward为:
reward_scales>0:奖励reward_scales<0:惩罚_reward_lin_vel_zdef _reward_lin_vel_z(self):
# Penalize z axis base linear velocity
return torch.square(self.base_lin_vel[:, 2])
_reward_ang_vel_xydef _reward_ang_vel_xy(self):
# Penalize xy axes base angular velocity
return torch.sum(torch.square(self.base_ang_vel[:, :2]), dim=1)
r ang_vel_xy = ω x 2 + ω y 2 r_{\text{ang\_vel\_xy}} = \omega_x^2 + \omega_y^2 rang_vel_xy=ωx2+ωy2
_reward_orientationdef _reward_orientation(self):
# Penalize non flat base orientation
return torch.sum(torch.square(self.projected_gravity[:, :2]), dim=1)
r orientation = g x 2 + g y 2 r_{\text{orientation}} = g_x^2 + g_y^2 rorientation=gx2+gy2
_reward_base_heightdef _reward_base_height(self):
# Penalize base height away from target
base_height = self.root_states[:, 2]
return torch.square(base_height - self.cfg.rewards.base_height_target)
r base_height = ( h − h target ) 2 r_{\text{base\_height}} = (h - h_{\text{target}})^2 rbase_height=(h−htarget)2
_reward_torquesdef _reward_torques(self):
# Penalize torques
return torch.sum(torch.square(self.torques), dim=1)
_reward_dof_veldef _reward_dof_vel(self):
# Penalize dof velocities
return torch.sum(torch.square(self.dof_vel), dim=1)
_reward_dof_accdef _reward_dof_acc(self):
# Penalize dof accelerations
return torch.sum(torch.square((self.last_dof_vel - self.dof_vel) / self.dt), dim=1)
r dof_acc = ∑ i ( q ˙ i ( t ) − q ˙ i ( t − 1 ) Δ t ) 2 r_{\text{dof\_acc}} = \sum_{i} \left( \frac{\dot{q}_i^{(t)} - \dot{q}_i^{(t-1)}}{\Delta t} \right)^2 rdof_acc=i∑(Δtq˙i(t)−q˙i(t−1))2
_reward_action_ratedef _reward_action_rate(self):
# Penalize changes in actions
return torch.sum(torch.square(self.last_actions - self.actions), dim=1)
r action_rate = ∑ i ( a i ( t ) − a i ( t − 1 ) ) 2 r_{\text{action\_rate}} = \sum_{i} (a_i^{(t)} - a_i^{(t-1)})^2 raction_rate=i∑(ai(t)−ai(t−1))2
_reward_collisiondef _reward_collision(self):
# Penalize collisions on selected bodies
return torch.sum(1.*(torch.norm(self.contact_forces[:, self.penalised_contact_indices, :], dim=-1) > 0.1), dim=1)
collision = ∑ i 1 ( ∥ f i ∥ > ϵ ) _{\text{collision}} = \sum_{i} \mathbf{1}\left(\|\mathbf{f}_i\| > \epsilon\right) collision=i∑1(∥fi∥>ϵ)
_reward_dof_pos_limitsdef _reward_dof_pos_limits(self):
# Penalize dof positions too close to the limit
out_of_limits = -(self.dof_pos - self.dof_pos_limits[:, 0]).clip(max=0.) # lower limit
out_of_limits += (self.dof_pos - self.dof_pos_limits[:, 1]).clip(min=0.)
return torch.sum(out_of_limits, dim=1)
r pos_limit = ∑ i [ max ( 0 , q min , i − q i ) + max ( 0 , q i − q max , i ) ] r_{\text{pos\_limit}} = \sum_i \left[ \max(0, q_{\min,i} - q_i) + \max(0, q_i - q_{\max,i}) \right] rpos_limit=i∑[max(0,qmin,i−qi)+max(0,qi−qmax,i)]
_reward_dof_vel_limitsdef _reward_dof_vel_limits(self):
# Penalize dof velocities too close to the limit
# clip to max error = 1 rad/s per joint to avoid huge penalties
return torch.sum((torch.abs(self.dof_vel) - self.dof_vel_limits*self.cfg.rewards.soft_dof_vel_limit).clip(min=0
r vel_limit = ∑ i clip ( ∣ q ˙ i ∣ − q ˙ max , i ⋅ α , 0 , 1 ) r_{\text{vel\_limit}} = \sum_i \text{clip}\left(|\dot{q}_i| - \dot{q}_{\max,i} \cdot \alpha,\ 0,\ 1\right) rvel_limit=i∑clip(∣q˙i∣−q˙max,i⋅α, 0, 1)
_reward_torque_limitsdef _reward_torque_limits(self):
# penalize torques too close to the limit
return torch.sum((torch.abs(self.torques) - self.torque_limits*self.cfg.rewards.soft_torque_limit).clip(min=0.), dim=1)
r torque_limit = ∑ i max ( 0 , ∣ τ i ∣ − τ max , i ⋅ α ) r_{\text{torque\_limit}} = \sum_i \max(0, |\tau_i| - \tau_{\max,i} \cdot \alpha) rtorque_limit=i∑max(0,∣τi∣−τmax,i⋅α)
_reward_tracking_lin_veldef _reward_tracking_lin_vel(self):
# Tracking of linear velocity commands (xy axes)
lin_vel_error = torch.sum(torch.square(self.commands[:, :2] - self.base_lin_vel[:, :2]), dim=1)
return torch.exp(-lin_vel_error/self.cfg.rewards.tracking_sigma)
| r lin_track = exp ( − ∣ v c m d x y − v x y ∣ 2 σ ) r_{\text{lin\_track}} = \exp\left(-\frac{ |\mathbf{v}_{cmd}^{xy} - \mathbf{v}^{xy}|^2 }{\sigma}\right) rlin_track=exp(−σ∣vcmdxy−vxy∣2) |
|---|
_reward_tracking_ang_veldef _reward_tracking_ang_vel(self):
# Tracking of angular velocity commands (yaw)
ang_vel_error = torch.square(self.commands[:, 2] - self.base_ang_vel[:, 2])
return torch.exp(-ang_vel_error/self.cfg.rewards.tracking_sigma)
r ang_track = exp ( − ( ω c m d − ω z ) 2 σ ) r_{\text{ang\_track}} = \exp\left(-\frac{ (\omega_{cmd} - \omega_z)^2 }{\sigma}\right) rang_track=exp(−σ(ωcmd−ωz)2)
_reward_feet_air_timedef _reward_feet_air_time(self):
# Reward long steps
# Need to filter the contacts because the contact reporting of PhysX is unreliable on meshes
contact = self.contact_forces[:, self.feet_indices, 2] > 1.
contact_filt = torch.logical_or(contact, self.last_contacts)
self.last_contacts = contact
first_contact = (self.feet_air_time > 0.) * contact_filt
self.feet_air_time += self.dt
rew_airTime = torch.sum((self.feet_air_time - 0.5) * first_contact, dim=1) # reward only on first contact with the ground
rew_airTime *= torch.norm(self.commands[:, :2], dim=1) > 0.1 #no reward for zero command
self.feet_air_time *= ~contact_filt
return rew_airTime
r air_time = ∑ i ( t air ( i ) − t 0 ) ⋅ 1 ( first contact ) ⋅ 1 ( ∥ v c m d ∥ > ϵ ) r_{\text{air\_time}} = \sum_{i} \left( t^{(i)}_{\text{air}} - t_0 \right) \cdot \mathbf{1}(\text{first contact}) \cdot \mathbf{1}(\|\mathbf{v}_{cmd}\| > \epsilon) rair_time=i∑(tair(i)−t0)⋅1(first contact)⋅1(∥vcmd∥>ϵ)
_reward_stumbledef _reward_stumble(self):
# Penalize feet hitting vertical surfaces
return torch.any(torch.norm(self.contact_forces[:, self.feet_indices, :2], dim=2) >\
5 *torch.abs(self.contact_forces[:, self.feet_indices, 2]), dim=1)
r stumble = 1 ( ∥ f x y ∥ > 5 ⋅ ∣ f z ∣ ) r_{\text{stumble}} = \mathbf{1}\left( \|\mathbf{f}_{xy}\| > 5 \cdot |f_z| \right) rstumble=1(∥fxy∥>5⋅∣fz∣)
_reward_stand_stilldef _reward_stand_still(self):
# Penalize motion at zero commands
return torch.sum(torch.abs(self.dof_pos - self.default_dof_pos), dim=1) * (torch.norm(self.commands[:, :2], dim=1) < 0.1)
r stand = ∥ q − q d e f a u l t ∥ 1 ⋅ 1 ( ∥ v c m d ∥ < ϵ ) r_{\text{stand}} = \|\mathbf{q} - \mathbf{q}_{default}\|_1 \cdot \mathbf{1}(\|\mathbf{v}_{cmd}\| < \epsilon) rstand=∥q−qdefault∥1⋅1(∥vcmd∥<ϵ)其中:
- q \mathbf{q} q:当前关节位置向量
- q d e f a u l t \mathbf{q}_{default} qdefault:默认(初始)关节位置向量
- v c m d \mathbf{v}_{cmd} vcmd:线速度/角速度命令向量
- 1 1 1:指示函数,当机器人被要求静止(命令速度接近 0)时生效
_reward_feet_contact_forcesdef _reward_feet_contact_forces(self):
# penalize high contact forces
return torch.sum((torch.norm(self.contact_forces[:, self.feet_indices, :], dim=-1) - self.cfg.rewards.max_contact_force).clip(min=0.), dim=1)
r contact_force = ∑ i max ( 0 , ∥ f i ∥ − f max ) r_{\text{contact\_force}} = \sum_i \max(0, \|\mathbf{f}_i\| - f_{\max}) rcontact_force=i∑max(0,∥fi∥−fmax)其中:
- f i \mathbf{f}_i fi:第 iii 个脚部的接触力向量
- f max f_{\max} fmax:允许的最大接触力
- max ( 0 , ⋅ ) \max(0, \cdot) max(0,⋅):如果接触力超过阈值才惩罚,没超过则不惩罚
_reward_terminationdef _reward_termination(self):
# Terminal reward / penalty
return self.reset_buf * ~self.time_out_buf
r termination = 1 ( reset ) ⋅ 1 ( ¬ timeout ) r_{\text{termination}} = \mathbf{1}(\text{reset}) \cdot \mathbf{1}(\neg \text{timeout}) rtermination=1(reset)⋅1(¬timeout)其中:
- 1 ( reset ) \mathbf{1}(\text{reset}) 1(reset):如果该环境本步被重置,则为 1,否则为 0
- 1 ( ¬ timeout ) \mathbf{1}(\neg \text{timeout}) 1(¬timeout):如果重置不是因为超时,则为 1,否则为 0
python legged_gym/scripts/play.py --task=go2
load_run 和 checkpoint 指定其他模型。logs/{experiment_name}/exported/policies 中:
policy_1.ptpolicy_lstm_1.pt
import sys
from legged_gym import LEGGED_GYM_ROOT_DIR
import os
import sys
from legged_gym import LEGGED_GYM_ROOT_DIR
import isaacgym
from legged_gym.envs import *
from legged_gym.utils import get_args, export_policy_as_jit, task_registry, Logger
import numpy as np
import torch
def play(args):
env_cfg, train_cfg = task_registry.get_cfgs(name=args.task)
# override some parameters for testing
env_cfg.env.num_envs = min(env_cfg.env.num_envs, 100)
env_cfg.terrain.num_rows = 5
env_cfg.terrain.num_cols = 5
env_cfg.terrain.curriculum = False
env_cfg.noise.add_noise = False
env_cfg.domain_rand.randomize_friction = False
env_cfg.domain_rand.push_robots = False
env_cfg.env.test = True
# prepare environment
env, _ = task_registry.make_env(name=args.task, args=args, env_cfg=env_cfg)
obs = env.get_observations()
# load policy
train_cfg.runner.resume = True
ppo_runner, train_cfg = task_registry.make_alg_runner(env=env, name=args.task, args=args, train_cfg=train_cfg)
policy = ppo_runner.get_inference_policy(device=env.device)
# export policy as a jit module (used to run it from C++)
if EXPORT_POLICY:
path = os.path.join(LEGGED_GYM_ROOT_DIR, 'logs', train_cfg.runner.experiment_name, 'exported', 'policies')
export_policy_as_jit(ppo_runner.alg.actor_critic, path)
print('Exported policy as jit script to: ', path)
for i in range(10*int(env.max_episode_length)):
actions = policy(obs.detach())
obs, _, rews, dones, infos = env.step(actions.detach())
if __name__ == '__main__':
EXPORT_POLICY = True
RECORD_FRAMES = False
MOVE_CAMERA = False
args = get_args()
play(args)
env_cfg, train_cfg = task_registry.get_cfgs(name=args.task)
env_cfg.env.num_envs = min(env_cfg.env.num_envs, 100)
env_cfg.terrain.num_rows = 5
env_cfg.terrain.num_cols = 5
env_cfg.terrain.curriculum = False
env_cfg.noise.add_noise = False
env_cfg.domain_rand.randomize_friction = False
env_cfg.domain_rand.push_robots = False
env_cfg.env.test = True
env_cfg.env.test=True)env, _ = task_registry.make_env(name=args.task, args=args, env_cfg=env_cfg)
obs = env.get_observations()
train_cfg.runner.resume = True
ppo_runner, train_cfg = task_registry.make_alg_runner(env=env, name=args.task, args=args, train_cfg=train_cfg)
policy = ppo_runner.get_inference_policy(device=env.device)
runner.resume=True 表示加载最近训练的 checkpointget_inference_policy 获取可用于推理的策略网络# export policy as a jit module (used to run it from C++)
if EXPORT_POLICY:
path = os.path.join(LEGGED_GYM_ROOT_DIR, 'logs', train_cfg.runner.experiment_name, 'exported', 'policies')
export_policy_as_jit(ppo_runner.alg.actor_critic, path)
print('Exported policy as jit script to: ', path)
for i in range(10*int(env.max_episode_length)):
actions = policy(obs.detach())
obs, _, rews, dones, infos = env.step(actions.detach())
.pt) 模型,用于 C++ 调用或部署play.py训练与加载模型train.py和play.py均调用了legged_gym/utils/task_registry.py中的make_alg_runner函数def make_alg_runner(self, env, name=None, args=None, train_cfg=None, log_root="default") -> Tuple[OnPolicyRunner, LeggedRobotCfgPPO]:
# 省略其他代码
#save resume path before creating a new log_dir
resume = train_cfg.runner.resume
if resume:
# load previously trained model
resume_path = get_load_path(log_root, load_run=train_cfg.runner.load_run, checkpoint=train_cfg.runner.checkpoint)
print(f"Loading model from: {resume_path}")
runner.load(resume_path)
return runner, train_cfg
play.py时候指定train_cfg.runner.resume = True,这时候就会加载上一次保存的路径
legged_gym项目中的奖励函数系统,特别是如何通过对机器人动作、状态、环境的各种影响(如线速度、角速度、姿态、碰撞等)进行加权求和来计算总奖励。此外,我们还介绍了如何使用play.py来加载训练好的模型进行测试。