ValueError: DistributedVariable.handle is not available outside the replica context or a `tf.distribute.Strategy.update()` call.
Hi, I tried to implement a DDPG based Actor critic framework using MirroredStrategy,. Without MirroredStrategy code runs perfectly fine. The actor network gets created fine but error appears on tf.gradients(...) line. I tried looking up online but nothing worked for me and I'm not able to move forward. I would really appreciate if anyone could take a look and guide.
Complete code and error message are provided below:
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
import numpy as np
import gym
from gym import wrappers
import tflearn
import argparse
import pprint as pp
from replay_buffer import ReplayBuffer
class ActorNetwork(object):
def __init__(self, sess, state_dim, action_dim, action_bound, learning_rate, tau, batch_size):
self.sess = sess
self.s_dim = state_dim
self.a_dim = action_dim
self.action_bound = action_bound
self.learning_rate = learning_rate
self.tau = tau
self.batch_size = batch_size
# Actor Network
self.inputs, self.out, self.scaled_out = self.create_actor_network()
self.network_params = tf.trainable_variables()
# Target Network
self.target_inputs, self.target_out, self.target_scaled_out = self.create_actor_network()
self.target_network_params = tf.trainable_variables()[
len(self.network_params):]
# Op for periodically updating target network with online network
# weights
self.update_target_network_params = \
[self.target_network_params[i].assign(tf.multiply(self.network_params[i], self.tau) +
tf.multiply(self.target_network_params[i], 1. - self.tau))
for i in range(len(self.target_network_params))]
# This gradient will be provided by the critic network
self.action_gradient = tf.placeholder(tf.float32, [None, self.a_dim])
# Combine the gradients here
self.unnormalized_actor_gradients = tf.gradients(
self.scaled_out, self.network_params, -self.action_gradient)
self.actor_gradients = list(map(lambda x: tf.div(x, self.batch_size), self.unnormalized_actor_gradients))
# Optimization Op
self.optimize = tf.train.AdamOptimizer(self.learning_rate).\
apply_gradients(zip(self.actor_gradients, self.network_params))
self.num_trainable_vars = len(
self.network_params) + len(self.target_network_params)
def create_actor_network(self):
inputs = tflearn.input_data(shape=[None, self.s_dim])
net = tf.keras.layers.Dense(400)(inputs)
net = tf.keras.layers.BatchNormalization()(net)
net = tf.keras.layers.ReLU()(net)
net = tf.keras.layers.Dense(300)(net)
net = tf.keras.layers.BatchNormalization()(net)
net = tf.keras.layers.ReLU()(net)
# Final layer weights are init to Uniform[-3e-3, 3e-3]
w_init = tf.keras.initializers.RandomUniform(minval=-0.003, maxval=0.003)
out = tf.keras.layers.Dense(self.a_dim, activation='linear', kernel_initializer=w_init)(net)
# Scale output to -action_bound to action_bound
scaled_out = tf.multiply(out, self.action_bound)
return inputs, out, scaled_out
def train(self, inputs, a_gradient):
self.sess.run(self.optimize, feed_dict={
self.inputs: inputs,
self.action_gradient: a_gradient
})
def predict(self, inputs):
return self.sess.run(self.scaled_out, feed_dict={
self.inputs: inputs
})
def predict_target(self, inputs):
return self.sess.run(self.target_scaled_out, feed_dict={
self.target_inputs: inputs
})
def update_target_network(self):
self.sess.run(self.update_target_network_params)
def get_num_trainable_vars(self):
return self.num_trainable_vars
class CriticNetwork(object):
def __init__(self, sess, state_dim, action_dim, learning_rate, tau, gamma, num_actor_vars):
self.sess = sess
self.s_dim = state_dim
self.a_dim = action_dim
self.learning_rate = learning_rate
self.tau = tau
self.gamma = gamma
# Create the critic network
self.inputs, self.action, self.out = self.create_critic_network()
self.network_params = tf.trainable_variables()[num_actor_vars:]
# Target Network
self.target_inputs, self.target_action, self.target_out = self.create_critic_network()
self.target_network_params = tf.trainable_variables()[(len(self.network_params) + num_actor_vars):]
# Op for periodically updating target network with online network
# weights with regularization
self.update_target_network_params = \
[self.target_network_params[i].assign(tf.multiply(self.network_params[i], self.tau) \
+ tf.multiply(self.target_network_params[i], 1. - self.tau))
for i in range(len(self.target_network_params))]
# Network target (y_i)
self.predicted_q_value = tf.placeholder(tf.float32, [None, 1])
# Define loss and optimization Op
self.loss = tflearn.mean_square(self.predicted_q_value, self.out)
self.optimize = tf.train.AdamOptimizer(
self.learning_rate).minimize(self.loss)
# Get the gradient of the net w.r.t. the action.
# For each action in the minibatch (i.e., for each x in xs),
# this will sum up the gradients of each critic output in the minibatch
# w.r.t. that action. Each output is independent of all
# actions except for one.
self.action_grads = tf.gradients(self.out, self.action)
def create_critic_network(self):
inputs = tflearn.input_data(shape=[None, self.s_dim])
action = tflearn.input_data(shape=[None, self.a_dim])
net = tf.keras.layers.Dense(400)(inputs)
net = tf.keras.layers.BatchNormalization()(net)
net = tf.keras.layers.ReLU()(net)
# Add the action tensor in the 2nd hidden layer
# Use two temp layers to get the corresponding weights and biases
#t1 = tflearn.fully_connected(net, 300)
#t2 = tflearn.fully_connected(action, 300)
#net = tflearn.activation(
# tf.matmul(net, t1.W) + tf.matmul(action, t2.W) + t2.b, activation='relu')
# linear layer connected to 1 output representing Q(s,a)
# Weights are init to Uniform[-3e-3, 3e-3]
w_init = tf.keras.initializers.RandomUniform(minval=-0.003, maxval=0.003)
out = out = tf.keras.layers.Dense(1, kernel_initializer=w_init)(net)
return inputs, action, out
def train(self, inputs, action, predicted_q_value):
return self.sess.run([self.out, self.optimize], feed_dict={
self.inputs: inputs,
self.action: action,
self.predicted_q_value: predicted_q_value
})
def predict(self, inputs, action):
return self.sess.run(self.out, feed_dict={
self.inputs: inputs,
self.action: action
})
def predict_target(self, inputs, action):
return self.sess.run(self.target_out, feed_dict={
self.target_inputs: inputs,
self.target_action: action
})
def action_gradients(self, inputs, actions):
return self.sess.run(self.action_grads, feed_dict={
self.inputs: inputs,
self.action: actions
})
def update_target_network(self):
self.sess.run(self.update_target_network_params)
class OrnsteinUhlenbeckActionNoise:
def __init__(self, mu, sigma=0.3, theta=.15, dt=1e-2, x0=None):
self.theta = theta
self.mu = mu
self.sigma = sigma
self.dt = dt
self.x0 = x0
self.reset()
def __call__(self):
x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + \
self.sigma * np.sqrt(self.dt) * np.random.normal(size=self.mu.shape)
self.x_prev = x
return x
def reset(self):
self.x_prev = self.x0 if self.x0 is not None else np.zeros_like(self.mu)
def __repr__(self):
return 'OrnsteinUhlenbeckActionNoise(mu={}, sigma={})'.format(self.mu, self.sigma)
def build_summaries():
episode_reward = tf.Variable(0.)
tf.summary.scalar("Reward", episode_reward)
episode_ave_max_q = tf.Variable(0.)
tf.summary.scalar("Qmax Value", episode_ave_max_q)
summary_vars = [episode_reward, episode_ave_max_q]
summary_ops = tf.summary.merge_all()
return summary_ops, summary_vars
def train(sess, env, args, actor, critic, actor_noise):
# Set up summary Ops
summary_ops, summary_vars = build_summaries()
sess.run(tf.global_variables_initializer())
writer = tf.summary.FileWriter(args['summary_dir'], sess.graph)
# Initialize target network weights
actor.update_target_network()
critic.update_target_network()
# Initialize replay memory
replay_buffer = ReplayBuffer(int(args['buffer_size']), int(args['random_seed']))
# Needed to enable BatchNorm.
# This hurts the performance on Pendulum but could be useful
# in other environments.
# tflearn.is_training(True)
for i in range(int(args['max_episodes'])):
s = env.reset()
ep_reward = 0
ep_ave_max_q = 0
for j in range(int(args['max_episode_len'])):
if args['render_env']:
env.render()
# Added exploration noise
#a = actor.predict(np.reshape(s, (1, 3))) + (1. / (1. + i))
a = actor.predict(np.reshape(s, (1, actor.s_dim))) + actor_noise()
s2, r, terminal, info = env.step(a[0])
replay_buffer.add(np.reshape(s, (actor.s_dim,)), np.reshape(a, (actor.a_dim,)), r,
terminal, np.reshape(s2, (actor.s_dim,)))
# Keep adding experience to the memory until
# there are at least minibatch size samples
if replay_buffer.size() > int(args['minibatch_size']):
s_batch, a_batch, r_batch, t_batch, s2_batch = \
replay_buffer.sample_batch(int(args['minibatch_size']))
# Calculate targets
target_q = critic.predict_target(
s2_batch, actor.predict_target(s2_batch))
y_i = []
for k in range(int(args['minibatch_size'])):
if t_batch[k]:
y_i.append(r_batch[k])
else:
y_i.append(r_batch[k] + critic.gamma * target_q[k])
# Update the critic given the targets
predicted_q_value, _ = critic.train(
s_batch, a_batch, np.reshape(y_i, (int(args['minibatch_size']), 1)))
ep_ave_max_q += np.amax(predicted_q_value)
# Update the actor policy using the sampled gradient
a_outs = actor.predict(s_batch)
grads = critic.action_gradients(s_batch, a_outs)
actor.train(s_batch, grads[0])
# Update target networks
actor.update_target_network()
critic.update_target_network()
s = s2
ep_reward += r
if terminal:
summary_str = sess.run(summary_ops, feed_dict={
summary_vars[0]: ep_reward,
summary_vars[1]: ep_ave_max_q / float(j)
})
writer.add_summary(summary_str, i)
writer.flush()
print('| Reward: {:d} | Episode: {:d} | Qmax: {:.4f}'.format(int(ep_reward), \
i, (ep_ave_max_q / float(j))))
break
def main(args):
device_type = 'GPU'
devices = tf.config.experimental.list_physical_devices(device_type)
devices_names = [d.name.split("e:")[1] for d in devices]
strategy = tf.distribute.MirroredStrategy(devices=devices_names[:int(args['ngpus'])])
with strategy.scope():
with tf.Session() as sess:
env = gym.make(args['env'])
np.random.seed(int(args['random_seed']))
tf.set_random_seed(int(args['random_seed']))
env.seed(int(args['random_seed']))
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
action_bound = env.action_space.high
# Ensure action bound is symmetric
assert (env.action_space.high == -env.action_space.low)
actor = ActorNetwork(sess, state_dim, action_dim, action_bound,
float(args['actor_lr']), float(args['tau']),
int(args['minibatch_size']))
critic = CriticNetwork(sess, state_dim, action_dim,
float(args['critic_lr']), float(args['tau']),
float(args['gamma']),
actor.get_num_trainable_vars())
actor_noise = OrnsteinUhlenbeckActionNoise(mu=np.zeros(action_dim))
if args['use_gym_monitor']:
if not args['render_env']:
env = wrappers.Monitor(
env, args['monitor_dir'], video_callable=False, force=True)
else:
env = wrappers.Monitor(env, args['monitor_dir'], force=True)
train(sess, env, args, actor, critic, actor_noise)
if args['use_gym_monitor']:
env.monitor.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='provide arguments for DDPG agent')
# agent parameters
parser.add_argument('--actor-lr', help='actor network learning rate', default=0.0001)
parser.add_argument('--critic-lr', help='critic network learning rate', default=0.001)
parser.add_argument('--gamma', help='discount factor for critic updates', default=0.99)
parser.add_argument('--tau', help='soft target update parameter', default=0.001)
parser.add_argument('--buffer-size', help='max size of the replay buffer', default=1000000)
parser.add_argument('--minibatch-size', help='size of minibatch for minibatch-SGD', default=64)
parser.add_argument('--ngpus', help='Number of GPUs', default=2)
# run parameters
parser.add_argument('--env', help='choose the gym env- tested on {Pendulum-v0}', default='Pendulum-v1')
parser.add_argument('--random-seed', help='random seed for repeatability', default=1234)
parser.add_argument('--max-episodes', help='max num of episodes to do while training', default=50000)
parser.add_argument('--max-episode-len', help='max length of 1 episode', default=1000)
parser.add_argument('--render-env', help='render the gym env', action='store_true')
parser.add_argument('--use-gym-monitor', help='record gym results', action='store_true')
parser.add_argument('--monitor-dir', help='directory for storing gym results', default='./results/gym_ddpg')
parser.add_argument('--summary-dir', help='directory for storing tensorboard info', default='./results/tf_ddpg')
parser.set_defaults(render_env=False)
parser.set_defaults(use_gym_monitor=True)
args = vars(parser.parse_args())
pp.pprint(args)
main(args)
OUTPUT:
WARNING:tensorflow:From /usr/local/lib/python3.8/dist-packages/tensorflow/python/compat/v2_compat.py:107: disable_resource_variables (from tensorflow.python.ops.variable_scope) is deprecated and will be removed in a future version. Instructions for updating: non-resource variables are not supported in the long term Scipy not supported! {'actor_lr': 0.0001, 'buffer_size': 1000000, 'critic_lr': 0.001, 'env': 'Pendulum-v1', 'gamma': 0.99, 'max_episode_len': 1000, 'max_episodes': 50000, 'minibatch_size': 64, 'monitor_dir': './results/gym_ddpg', 'ngpus': 2, 'random_seed': 1234, 'render_env': False, 'summary_dir': './results/tf_ddpg', 'tau': 0.001, 'use_gym_monitor': True} 2022-04-11 19:58:20.286219: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations: AVX2 FMA To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags. 2022-04-11 19:58:21.117294: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1525] Created device /device:GPU:0 with 10795 MB memory: -> device: 0, name: Tesla K80, pci bus id: 0000:04:00.0, compute capability: 3.7 2022-04-11 19:58:21.118538: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1525] Created device /device:GPU:1 with 10795 MB memory: -> device: 1, name: Tesla K80, pci bus id: 0000:05:00.0, compute capability: 3.7 2022-04-11 19:58:21.136084: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1525] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 10795 MB memory: -> device: 0, name: Tesla K80, pci bus id: 0000:04:00.0, compute capability: 3.7 2022-04-11 19:58:21.137010: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1525] Created device /job:localhost/replica:0/task:0/device:GPU:1 with 10795 MB memory: -> device: 1, name: Tesla K80, pci bus id: 0000:05:00.0, compute capability: 3.7 WARNING:tensorflow:From ddpg.py:77: The name tf.keras.initializers.RandomUniform is deprecated. Please use tf.compat.v1.keras.initializers.RandomUniform instead.
WARNING:tensorflow:From /usr/local/lib/python3.8/dist-packages/keras/initializers/initializers_v1.py:277: calling RandomUniform.init (from tensorflow.python.ops.init_ops) with dtype is deprecated and
will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
Traceback (most recent call last):
File "ddpg.py", line 406, in tf.distribute.Strategy.update() call.
Note that I also tried running it without BatchNormalization, however, it did not work for me. Any help will be greatly appreciated. Thanks
Still not been able to figure that out. Any update on the above issue please? Thanks
I would recommend using TF v2 and leveraging the learner API which handles the distribution strategies.
That would require revamping the code completely. Is there any way to use Session based code (As above) with MirroredStrategy? Thanks for your help.
I'm not sure if MirroredStrategy is compatible with TF1 and tf.Session. Take a look at MirroredStrategy, you need to make sure that all the ops are created under strategy scope and that you run using strategy.run()
I would recommend taking a look at the DDPGAgent.py which works with TF2
In the above code, all of the ops are created under MirroredStrategy scope. But I would consider your recommendation and try learner API. Meanwhile, if you find a way to run tf.Session with MirroredStrategy, please let me know. Thanks for your time.
I also have this problem when develop user custom variable.