ray
ray copied to clipboard
[RLlib] Ape-X crashes for custom environment with Dict observation space
What happened + What you expected to happen
I am trying to run Ape-X following the example here for the Wordcraft environemnt. I have taken a look at a similar issue here and the initial example provided runs correctly. so probably it's not an issue of the dependencies but of my environment.
Wordcraft is a quite big environment with different files, hence the big script. The main parts to notice are the run_apex.py
method and the WordcraftEnvNoGoal
class.
The error I get from running the script is related to the size of the observation space ( Expected flattened obs shape of [..., 10680], got (?, 5607)):
The full error log:
2022-07-30 18:01:53,647 INFO services.py:1462 -- View the Ray dashboard at http://127.0.0.1:8265
2022-07-30 18:01:56,122 INFO trainer.py:2296 -- Your framework setting is 'tf', meaning you are using static-graph mode. Set framework='tf2' to enable eager execution with tf2.x. You may also then want to set eager_tracing=True in order to reach similar execution speed as with static-graph mode.
2022-07-30 18:01:56,127 INFO simple_q.py:162 -- In multi-agent mode, policies will be optimized sequentially by the multi-GPU optimizer. Consider setting simple_optimizer=True
if this doesn't work for you.
2022-07-30 18:01:56,127 INFO trainer.py:867 -- Current log_level is WARN. For more information, set 'log_level': 'INFO' / 'DEBUG' or use the -v and -vv flags.
(pid=26361)
(RolloutWorker pid=26770) 2022-07-30 18:02:01,875 WARNING rollout_worker.py:499 -- We've added a module for checking environments that are used in experiments. It will cause your environment to fail if your environment is not set upcorrectly. You can disable check env by setting disable_env_checking
to True in your experiment config dictionary. You can run the environment checking module standalone by calling ray.rllib.utils.check_env(env).
(RolloutWorker pid=26770) 2022-07-30 18:02:01,875 WARNING env.py:70 -- Env checking isn't implemented for VectorEnvs, RemoteBaseEnvs, ExternalMultiAgentEnv,or ExternalEnvs or Environments that are Ray actors
(RolloutWorker pid=26770) /home/elena/workspace/playground/ray/SAPIENS
(RolloutWorker pid=26770) 2022-07-30 18:02:05,516 ERROR worker.py:449 -- Exception raised in creation task: The actor died because of an error raised in its creation task, ray::RolloutWorker.init() (pid=26770, ip=192.168.1.36, repr=<ray.rllib.evaluation.rollout_worker.RolloutWorker object at 0x7f4c5a4cc7d0>)
(RolloutWorker pid=26770) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/evaluation/rollout_worker.py", line 630, in init
(RolloutWorker pid=26770) seed=seed,
(RolloutWorker pid=26770) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/evaluation/rollout_worker.py", line 1723, in _build_policy_map
(RolloutWorker pid=26770) name, orig_cls, obs_space, act_space, conf, merged_conf
(RolloutWorker pid=26770) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/policy/policy_map.py", line 141, in create_policy
(RolloutWorker pid=26770) observation_space, action_space, merged_config
(RolloutWorker pid=26770) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/policy/tf_policy_template.py", line 270, in init
(RolloutWorker pid=26770) get_batch_divisibility_req=get_batch_divisibility_req,
(RolloutWorker pid=26770) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/policy/dynamic_tf_policy.py", line 326, in init
(RolloutWorker pid=26770) is_training=in_dict.is_training,
(RolloutWorker pid=26770) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/agents/dqn/dqn_tf_policy.py", line 229, in get_distribution_inputs_and_class
(RolloutWorker pid=26770) policy, model, input_dict, state_batches=None, explore=explore
(RolloutWorker pid=26770) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/agents/dqn/dqn_tf_policy.py", line 390, in compute_q_values
(RolloutWorker pid=26770) model_out, state = model(input_batch, state_batches or [], seq_lens)
(RolloutWorker pid=26770) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/models/modelv2.py", line 240, in call
(RolloutWorker pid=26770) input_dict["obs"], self.obs_space, self.framework
(RolloutWorker pid=26770) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/models/modelv2.py", line 402, in restore_original_dimensions
(RolloutWorker pid=26770) return _unpack_obs(obs, original_space, tensorlib=tensorlib)
(RolloutWorker pid=26770) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/models/modelv2.py", line 437, in _unpack_obs
(RolloutWorker pid=26770) prep.shape[0], obs.shape
(RolloutWorker pid=26770) ValueError: Expected flattened obs shape of [..., 10680], got (?, 5607)
Traceback (most recent call last):
File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/agents/trainer.py", line 896, in setup
self._init(self.config, self.env_creator)
File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/agents/trainer.py", line 1035, in _init
raise NotImplementedError
NotImplementedError
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/worker.py", line 1811, in get raise value ray.exceptions.RayActorError: The actor died because of an error raised in its creation task, ray::RolloutWorker.init() (pid=26770, ip=192.168.1.36, repr=<ray.rllib.evaluation.rollout_worker.RolloutWorker object at 0x7f4c5a4cc7d0>) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/evaluation/rollout_worker.py", line 630, in init seed=seed, File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/evaluation/rollout_worker.py", line 1723, in _build_policy_map name, orig_cls, obs_space, act_space, conf, merged_conf File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/policy/policy_map.py", line 141, in create_policy observation_space, action_space, merged_config File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/policy/tf_policy_template.py", line 270, in init get_batch_divisibility_req=get_batch_divisibility_req, File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/policy/dynamic_tf_policy.py", line 326, in init is_training=in_dict.is_training, File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/agents/dqn/dqn_tf_policy.py", line 229, in get_distribution_inputs_and_class policy, model, input_dict, state_batches=None, explore=explore File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/agents/dqn/dqn_tf_policy.py", line 390, in compute_q_values model_out, state = model(input_batch, state_batches or [], seq_lens) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/models/modelv2.py", line 240, in call input_dict["obs"], self.obs_space, self.framework File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/models/modelv2.py", line 402, in restore_original_dimensions return _unpack_obs(obs, original_space, tensorlib=tensorlib) File "/home/elena/anaconda3/envs/apex/lib/python3.7/site-packages/ray/rllib/models/modelv2.py", line 437, in _unpack_obs prep.shape[0], obs.shape ValueError: Expected flattened obs shape of [..., 10680], got (?, 5607) python-BaseException (RolloutWorker pid=26770)
Process finished with exit code 1
Versions / Dependencies
gym Version: 0.18.0 ray Version: 1.12.0 python Version: 3.7.13 OS: Linux 5.4.0-122-generic
To run the script you need to have the file "example.json" under the same directory with content:
{"entities": {"a_base_a_0": {"id": 0, "recipes": []}, "a_base_b_0": {"id": 1, "recipes": []}, "a_base_c_0": {"id": 2, "recipes": []}, "a_1": {"id": 3, "recipes": [["a_base_b_0", "a_base_b_0"]]}, "a_2": {"id": 4, "recipes": [["a_1", "a_base_a_0"]]}, "a_3": {"id": 5, "recipes": [["a_2", "a_base_a_0"]]}, "a_4": {"id": 6, "recipes": [["a_3", "a_base_c_0"]]}, "a_5": {"id": 7, "recipes": [["a_4", "a_base_c_0"]]}, "a_6": {"id": 8, "recipes": [["a_5", "a_base_c_0"]]}, "a_7": {"id": 9, "recipes": [["a_6", "a_base_b_0"]]}, "a_8": {"id": 10, "recipes": [["a_7", "a_base_a_0"]]}, "a_9": {"id": 11, "recipes": [["a_8", "a_base_b_0"]]}, "_dist_a_0": {"id": 12, "recipes": []}, "_dist_b_0": {"id": 13, "recipes": []}, "_dist_c_0": {"id": 14, "recipes": []}, "_dist_d_0": {"id": 15, "recipes": []}, "_dist_e_0": {"id": 16, "recipes": []}, "_dist_f_0": {"id": 17, "recipes": []}, "_dist_g_0": {"id": 18, "recipes": []}, "_dist_h_0": {"id": 19, "recipes": []}}}
Reproduction script
from ray import tune
import sys
sys.path.append(".")
import gym
import ray
import ray.rllib.agents.dqn.apex as apex
from ray.tune.logger import pretty_print
from ray.tune.registry import register_env
import os
import json
import pickle
import collections
import random
import timeit
import copy
import re
import numpy as np
from gym.utils import seeding
recipe_book_info = {"example": {"path": "example.json",
"best_paths": ["a_8"],
"n_paths": 1,
"best_reward": 36,
"max_steps": 1000000,
"early_step": 500000}}
DEBUG = False
class Recipe(collections.Counter):
"""A hashable recipe.
Allows for indexing into dictionaries.
"""
def __hash__(self):
return tuple(
sorted(self.items(), key=lambda x: x[0] if x[0] is not None else "")
).__hash__()
def __len__(self):
return len(list(self.elements()))
class Task:
"""
A hashable recipe task.
"""
def __init__(self, goal, base_entities, intermediate_entities, relevant_recipes):
self.goal = goal
self.base_entities = tuple(sorted(base_entities))
self.intermediate_entities = tuple(sorted(intermediate_entities))
self.relevant_recipes = tuple(relevant_recipes)
def __hash__(self):
return tuple(
(
self.goal,
self.base_entities,
self.intermediate_entities,
self.relevant_recipes,
)
).__hash__()
class RecipeBook:
def __init__(
self,
data_path="datasets/alchemy2.json",
max_depth=1,
split=None,
train_ratio=1.0,
seed=None,
):
self.test_mode = False
self.train_ratio = train_ratio
self.set_seed(seed)
self.data_path = data_path
self._rawdata = self._load_data(data_path)
self.max_depth = max_depth
self.entities = tuple(self._rawdata["entities"].keys())
self.entity2level = {
e: [int(s) for s in re.findall(r"\d+", e)][
0
] # Todo : use the recipe chaining tools from task instead of name based
for i, e in enumerate(self.entities)
}
self.entity2index = {e: i for i, e in enumerate(self.entities)}
self.entity2recipes = collections.defaultdict(list)
self.distractors = tuple(e for e in self.entities if "dist" in e)
for e in self.entities:
for r in self._rawdata["entities"][e]["recipes"]:
if e not in r:
self.entity2recipes[e].append(Recipe(r))
self.entity2recipes = dict(self.entity2recipes)
self.max_recipe_size = 0
self.recipe2entity = collections.defaultdict(str)
for entity, recipes in self.entity2recipes.items():
for r in recipes:
self.recipe2entity[r] = entity
self.max_recipe_size = max(len(r), self.max_recipe_size)
self.root_entities = set(
[e for e in self.entities if e not in self.entity2recipes]
)
self.init_neighbors_combineswith()
self.terminal_entities = set(
[e for e in self.entities if e not in self.neighbors_combineswith]
)
# self._init_tasks_for_depth(max_depth)
# self._init_recipe_weighted_entity_dist()
# self._init_data_split(split=split, train_ratio=train_ratio)
def _random_choice(self, options):
# Fast random choice
i = self.np_random.randint(0, len(options))
return options[i]
def _load_data(self, path):
print(os.getcwd())
f = open(path)
jsondata = json.load(f)
f.close()
return jsondata
def set_seed(self, seed):
self.np_random, self.seed = seeding.np_random(seed)
def save(self, path):
"""
Serialize to bytes and save to file
"""
path = os.path.expandvars(os.path.expanduser(path))
f = open(path, "wb+")
pickle.dump(self, f)
@staticmethod
def load(path):
"""
Returns a new RecipeBook object loaded from a binary file that is the output of save.
"""
path = os.path.expandvars(os.path.expanduser(path))
f = open(path, "rb")
return pickle.load(f)
def get_recipes(self, entity):
return self.entity2recipes[entity] if entity in self.entity2recipes else None
def evaluate_recipe(self, recipe):
e = self.recipe2entity[recipe]
return e if e != "" else None
def init_neighbors_combineswith(self):
self.neighbors_combineswith = collections.defaultdict(set)
for recipe in self.recipe2entity:
e1, e2 = recipe if len(recipe.keys()) == 2 else list(recipe.keys()) * 2
self.neighbors_combineswith[e1].add(e2)
self.neighbors_combineswith[e2].add(e1)
def sample_task(self, depth=None):
"""
Returns a task tuple (<goal>, <intermediate entities>, <base entities>)
"""
if depth is None:
depth = self.np_random.choice(range(1, self.max_depth + 1))
sample_space = (
self.depth2task_test
if self.test_mode and self.train_ratio < 1.0
else self.depth2task_train
)
return self._random_choice(sample_space[depth])
def sample_distractors(self, task, num_distractors=1, uniform=True):
base_e = set(task.base_entities)
intermediate_e = set(task.intermediate_entities)
def is_valid(e):
return e != task.goal and e not in base_e and e not in intermediate_e
options = [(i, e) for i, e in enumerate(self.entities) if is_valid(e)]
sample_index_space, sample_space = zip(*options)
if uniform:
return tuple(self._random_choice(sample_space, num_distractors).tolist())
else:
# sample according to recipe-weighted entity distribution
sample_index_space = set(sample_index_space)
dist = np.array(
[p for i, p in enumerate(self.entity_dist) if i in sample_index_space]
)
dist /= dist.sum()
return tuple(
self.np_random.choice(sample_space, num_distractors, p=dist).tolist()
)
def _generate_all_tasks_for_goal(self, goal, max_depth=3):
base_entities = [goal]
intermediate_entities = set()
print(DEBUG, f"Expanding tasks to goal {goal}")
self._expand_tasks_to_goal(
goal, max_depth, base_entities, intermediate_entities
)
print(DEBUG, "Done.")
def _expand_tasks_to_goal(
self,
goal,
max_depth=1,
base_entities=[],
intermediate_entities=set(),
relevant_recipes=[],
):
"""
DFS expansion of recipes for an entity to generate new tasks
"""
for b in base_entities:
if (
b not in self.root_entities
): # Can't expand if it's a root entity or cyclic
if b != goal:
intermediate_entities.add(b)
next_base_entities = base_entities[:]
next_base_entities.remove(b)
cur_depth = len(intermediate_entities) + 1
print(DEBUG, "--Expanding base entity", b)
# Expand each recipe for each base entity
for recipe in self.entity2recipes[b]:
print(DEBUG, f"----Trying recipe for {b}, {recipe}")
expanded_entities = [
e for e in recipe if e not in next_base_entities
]
is_cycle = False
for e in recipe:
if e in intermediate_entities or e == goal:
print(
DEBUG, f"------Cycle detected, skipping recipe {recipe}"
)
is_cycle = True
break
if is_cycle:
continue
old_base_entities = next_base_entities
next_base_entities = expanded_entities + next_base_entities
# Add task
relevant_recipes.append(recipe)
task = Task(
goal,
next_base_entities,
intermediate_entities,
relevant_recipes[:],
)
if task not in self.depth2task[cur_depth]:
self.depth2task[cur_depth].add(task)
print(DEBUG, f"------Adding task {task}")
if cur_depth < max_depth:
print(DEBUG, f"current depth is {cur_depth}")
self._expand_tasks_to_goal(
goal,
max_depth,
next_base_entities,
intermediate_entities,
relevant_recipes[:],
)
relevant_recipes.remove(recipe)
next_base_entities = old_base_entities
if b != goal:
intermediate_entities.remove(b)
def _init_tasks_for_depth(self, max_depth=2):
self.depth2task = collections.defaultdict(set) # depth to task tuples
total = 0
for e in self.entities:
# self._generate_all_tasks_for_goal(e)
s = timeit.timeit(
lambda: self._generate_all_tasks_for_goal(e, max_depth=max_depth),
number=1,
)
# print(f'Generated max-depth {max_depth} recipes for {e} in {s} s.')
total += s
print(
f"Generated all max-depth {max_depth} tasks for {len(self.entities)} entities in {total} s."
)
for d in self.depth2task:
self.depth2task[d] = tuple(self.depth2task[d])
print(f"Depth {d} tasks: {len(self.depth2task[d])}")
def _init_recipe_weighted_entity_dist(self):
entities_cnt = dict({e: 0 for e in self.entities})
for recipe in self.recipe2entity.keys():
for e in recipe:
entities_cnt[e] += 1
unnormalized = (
np.array(list(entities_cnt.values())) + 1
) # Even terminal entities have > 0 chance of being sampled
self.entity_dist = unnormalized / unnormalized.sum()
def _init_data_split(self, split, train_ratio):
self.split = split
depths = range(1, self.max_depth + 1)
self.goals_train = []
self.goals_test = []
self.depth2task_train = {d: [] for d in depths}
self.depth2task_test = {d: [] for d in depths}
if split in ["debug", "by_goal", "by_goal_train_terminals"]:
# Map goals --> depth --> tasks
self.goal2depth2task = {
goal: {depth: [] for depth in depths} for goal in self.entities
}
for depth in self.depth2task:
tasks = self.depth2task[depth]
for task in tasks:
self.goal2depth2task[task.goal][depth].append(task)
# Split goals into train and test
all_goals = list(self.entities)
self.np_random.shuffle(all_goals)
if split == "debug":
train_ratio = 1.0
train_size = int(np.ceil(train_ratio * len(all_goals)))
if split == "by_goal_train_terminals":
assert train_size > len(
self.terminal_entities
), "Train size must be > terminal entities"
all_goals = list(set(all_goals) - self.terminal_entities)
train_size = train_size - len(self.terminal_entities)
self.goals_train = all_goals[:train_size]
self.goals_test = all_goals[train_size:]
if split == "debug":
self.goals_test = list(self.goals_train)
for depth in depths:
for goal in self.goals_train:
self.depth2task_train[depth] += self.goal2depth2task[goal][depth]
for goal in self.goals_test:
self.depth2task_test[depth] += self.goal2depth2task[goal][depth]
elif split in ["by_recipe", "by_recipe_train_all_goals"]:
all_recipes = list(self.recipe2entity.keys())
self.np_random.shuffle(all_recipes)
train_size = int(np.ceil(train_ratio * len(all_recipes)))
self.recipes_train = set(all_recipes[:train_size])
self.recipes_test = set(all_recipes[train_size:])
if split == "by_recipe_train_all_goals":
self._fill_recipe_entity_support()
for depth in self.depth2task:
tasks = self.depth2task[depth]
for task in tasks:
is_test_task = False
for recipe in task.relevant_recipes:
if recipe in self.recipes_test:
self.depth2task_test[depth].append(task)
is_test_task = True
break
if not is_test_task:
self.depth2task_train[depth].append(task)
elif split == "by_task":
for depth in depths:
all_tasks_at_depth = list(self.depth2task[depth])
self.np_random.shuffle(all_tasks_at_depth)
train_size_at_depth = int(
np.ceil(train_ratio * len(all_tasks_at_depth))
)
self.depth2task_train[depth] = all_tasks_at_depth[:train_size_at_depth]
self.depth2task_test[depth] = all_tasks_at_depth[train_size_at_depth:]
else:
raise ValueError(f"Unsupported split {split}")
train_size = 0
test_size = 0
overlap = 0
for depth in depths:
train_tasks = set(self.depth2task_train[depth])
test_tasks = set(self.depth2task_test[depth])
train_size += len(train_tasks)
test_size += len(test_tasks)
overlap += len(train_tasks.intersection(test_tasks))
def _fill_recipe_entity_support(self):
# Make sure all entities are represented among self.recipes_train at depth=1 as either ingredient or goa
def make_entity2recipes(recipes):
entity2recipes = collections.defaultdict(set)
for recipe in recipes:
goal = self.recipe2entity[recipe]
entity2recipes[goal].add(recipe)
for e in recipe:
entity2recipes[e].add(recipe)
return entity2recipes
entity2recipes_train = make_entity2recipes(self.recipes_train)
entity2recipes_test = make_entity2recipes(self.recipes_test)
train_entities = set(entity2recipes_train.keys())
missing_entities = [e for e in self.entities if e not in train_entities]
aux_recipes = set()
for e in missing_entities:
aux_recipe = self._random_choice(list(entity2recipes_test[e]))
aux_recipes.add(aux_recipe)
for recipe in aux_recipes:
self.recipes_train.add(recipe)
self.recipes_test.remove(recipe)
NO_RECIPE_PENALTY = 0
IRRELEVANT_RECIPE_PENALTY = 0
GOAL_REWARD = 1.0
SUBGOAL_REWARD = 1.0
class FeatureMap:
"""
Used for initializing entity features.
"""
def __init__(
self,
words,
feature_type='glove',
random_feature_size=300, # only for random features
random_feature_std=0.4,
shuffle=False,
seed=None,
savedir='~/model_cache/spacy',
cache_id='latest'
):
self.feature_map = {}
self.feature_dim = 0
self.np_random, self.seed = seeding.np_random(seed)
self.savedir = savedir
self.cache_id = cache_id
# Initialize featurizer
if feature_type == 'one_hot':
one_hots = np.eye(len(words))
for i, e in enumerate(words):
self.feature_map[e] = one_hots[i]
self.feature_dim = one_hots[i].shape[-1]
elif feature_type == 'random':
b = random_feature_std/np.sqrt(2)
rand_features = self.np_random.laplace(
loc=0, scale=b, size=(len(words), random_feature_size)
)
for i, e in enumerate(words):
self.feature_map[e] = rand_features[i,:]
self.feature_dim = rand_features[i,:].shape[-1]
elif feature_type == 'glove':
loaded = self._load_feature_map()
if loaded is not None:
words, feature_vectors = loaded
else:
print('Loading glove model...')
word2vec = spacy.load('en_core_web_lg') # 300-dim GloVe embeddings
print('Loaded glove model.')
feature_vectors = []
for e in words:
tokens_embeddings = []
for token in e.split():
tokens_embeddings.append(word2vec(token).vector)
feature_vectors.append(np.array(tokens_embeddings).max(0))
if cache_id:
feature_vectors = np.array(feature_vectors)
self._save_feature_map(words, feature_vectors)
if shuffle:
self.np_random.shuffle(feature_vectors)
self.feature_map = dict(zip(words, feature_vectors))
self.feature_dim = self.feature_map[words[0]].shape[-1]
else:
raise ValueError(f'Feature type {feature_type} is not implemented.')
def seed(self, seed):
self.np_random, self.seed = seeding.np_random(seed)
def feature(self, word):
return self.feature_map[word]
def _basepath(self, savedir, cache_id):
basepath = os.path.expandvars(os.path.expanduser(savedir))
basepath = os.path.join(basepath, cache_id)
return basepath
def _save_feature_map(self, vocab, feature_vectors, savedir=None, cache_id=None):
if savedir is None:
savedir = self.savedir
if cache_id is None:
cache_id = self.cache_id
basepath = self._basepath(savedir, cache_id)
if not os.path.exists(basepath):
os.makedirs(basepath, exist_ok=True)
feature_bin_path = os.path.join(basepath, f'{self.cache_id}')
vocab_path = os.path.join(basepath, f'{self.cache_id}.vocab')
np.save(feature_bin_path, feature_vectors)
with open(vocab_path, 'w+') as vocab_file:
for w in vocab:
vocab_file.write(w+'\n')
def _load_feature_map(self, savedir=None, cache_id=None):
if savedir is None:
savedir = self.savedir
if cache_id is None:
cache_id = self.cache_id
basepath = self._basepath(savedir, cache_id)
feature_bin_path = os.path.join(basepath, f'{self.cache_id}.npy')
vocab_path = os.path.join(basepath, f'{self.cache_id}.vocab')
if os.path.exists(feature_bin_path) and os.path.exists(vocab_path):
with open(vocab_path) as vocab_file:
words = [w.strip() for w in vocab_file]
feature_vectors = np.load(feature_bin_path)
feature_map = dict(zip(words, feature_vectors))
return words, feature_vectors
else:
return None
class WordCraftEnvNoGoal(gym.Env):
"""
Simple text-only RL environment for crafting multi-step recipes.
At a high level, the state consists of a goal, the inventory, and the current selection.
"""
def __init__(
self, env_config
):
super().__init__()
self.eval_mode = False
if env_config["seed"] is None:
env_config["seed"] = int.from_bytes(os.urandom(4), byteorder="little")
self.set_seed(env_config["seed"])
# utils_seed(seed)
if env_config["recipe_book_path"] is not None:
self.recipe_book = RecipeBook.load(env_config["recipe_book_path"])
self.recipe_book.set_seed(env_config["seed"])
max_depth = self.recipe_book.max_depth
else:
self.recipe_book = RecipeBook(
data_path=env_config["data_path"],
max_depth=env_config["max_depth"],
split=env_config["split"],
train_ratio=env_config["train_ratio"],
seed=env_config["seed"],
)
self.feature_map = FeatureMap(
words=self.recipe_book.entities,
feature_type=env_config["feature_type"],
random_feature_size=env_config["random_feature_size"],
shuffle=env_config["shuffle_features"],
seed=env_config["seed"],
)
self.max_selection_size = self.recipe_book.max_recipe_size
self.max_mix_steps = max(env_config["max_mix_steps"] or env_config["max_depth"], env_config["max_depth"])
self.max_steps = self.max_selection_size * self.max_mix_steps
self.sample_depth = env_config["max_depth"]
self.subgoal_rewards = env_config["subgoal_rewards"]
self.max_depth = env_config["max_depth"]
self.num_distractors = env_config["num_distractors"]
self.uniform_distractors = env_config["uniform_distractors"]
if self.num_distractors:
self.distractors = np.random.choice(
self.recipe_book.distractors, self.num_distractors
)
else:
self.distractors = []
self.orig_table = [
entity
for entity in self.recipe_book.entity2level
if self.recipe_book.entity2level[entity] == 0
and entity not in self.recipe_book.distractors
]
# I think this assumes 2 base elements
self.max_table_size = (
2 ** env_config["max_depth"] + env_config["num_distractors"] + self.max_mix_steps + len(self.orig_table) - 2
)
self.task = None
self.goal_features = np.zeros(self.feature_map.feature_dim)
self._reset_table()
self._reset_selection()
self._reset_history()
self.episode_step = 0
self.episode_mix_steps = 0
self.episode_reward = 0
self.done = False
self.data_path = env_config["data_path"]
obs = self.reset()
num_entities = len(self.recipe_book.entities)
dspaces = {
"table_index": gym.spaces.MultiDiscrete(
self.max_table_size * [num_entities]
),
"table_features": gym.spaces.Box(
shape=self.table_features.shape, low=-1.0, high=1.0
),
"selection_index": gym.spaces.MultiDiscrete(
self.max_selection_size * [num_entities]
),
"selection_features": gym.spaces.Box(
shape=self.selection_features.shape, low=-1.0, high=1.0
),
}
self.observation_space = gym.spaces.Dict(dspaces)
self.action_space = gym.spaces.Discrete(
self.max_table_size
) # Actions correspond to choosing an entity in a table position
self.discovered = []
self.debug = {"actions": []}
"""
self.n_entities = len(self.recipe_book.entities)
self.action_space = Discrete(
self.n_entities
) # could be changed to only the elements than can be reached in the allowed steps (for now, too deep to be
# requires)
dspaces = {
"table_features": MultiDiscrete(self.n_entities * [self.n_entities]),
"selection_features": MultiDiscrete(self.n_entities * [self.n_entities]),
}
self.observation_space = Dict(dspaces)
self.proportional = proportional
self.discovered = []
"""
def reset(self):
self.discovered = []
self.episode_step = 0
self.episode_mix_steps = 0
self.episode_reward = 0
self.done = False
# self.task = self.recipe_book.sample_task(depth=self.sample_depth)
# self.distractors = self.recipe_book.sample_distractors(
# self.task, self.num_distractors, uniform=self.uniform_distractors
# )
# self.goal_features = self.feature_map.feature(self.task.goal)
# self.distractors = np.random.choice(
# self.recipe_book.distractors, self.num_distractors
# )
self._reset_selection()
self._reset_table()
self._reset_history()
return self._get_observation()
def eval(self, split="test"):
self.eval_mode = True
self.recipe_book.test_mode = split == "test"
def train(self):
self.eval_mode = False
self.recipe_book.test_mode = False
def set_seed(self, seed):
self.np_random, self.seed = seeding.np_random(seed)
def sample_depth(self, depth):
self.sample_depth = depth
def _reset_table(self):
self.distractors = np.random.choice(
self.recipe_book.distractors, self.num_distractors
)
self.table = list(
np.concatenate(
(
self.orig_table,
self.distractors,
)
)
)
self.np_random.shuffle(self.table)
self.table_index = -np.ones(self.max_table_size, dtype=int)
self.table_features = np.zeros(
(self.max_table_size, self.feature_map.feature_dim)
)
num_start_items = len(self.table)
self.table_index[:num_start_items] = np.array(
[self.recipe_book.entity2index[e] for e in self.table], dtype=int
)
# if self.task:
self.table_features[:num_start_items, :] = np.array(
[self.feature_map.feature(e) for e in self.table]
)
def _reset_selection(self):
self.selection = []
self.selection_index = -np.ones(self.max_selection_size, dtype=int)
self.selection_features = np.zeros(
(self.max_selection_size, self.feature_map.feature_dim)
)
def _reset_history(self):
self.subgoal_history = set()
def _get_observation(self):
"""
Note, includes indices for each inventory and selection item,
since torchbeast stores actions in a shared_memory tensor shared among actor processes
"""
return {
# "goal_index": [self.recipe_book.entity2index[self.task.goal]],
# "goal_features": self.goal_features,
"table_index": self.table_index,
"table_features": self.table_features,
"selection_index": self.selection_index,
"selection_features": self.selection_features,
}
def step(self, action):
reward = 0
if self.done: # no-op if env is done
return self._get_observation(), reward, self.done, {}
# Handle invalid actions
invalid_action = not (0 <= action < self.max_table_size)
if invalid_action:
self.episode_step += 1
if self.episode_step >= self.max_steps:
self.done = True
i = self.table_index[action]
e = self.recipe_book.entities[i]
selection_size = len(self.selection)
if selection_size < self.max_selection_size:
# Update selection
self.selection.append(e)
self.selection_index[selection_size] = i
self.selection_features[selection_size, :] = self.feature_map.feature(e)
selection_size = len(self.selection)
if selection_size == self.max_selection_size:
self.episode_mix_steps += 1
# Evaluate selection
recipe = Recipe(self.selection)
result = self.recipe_book.evaluate_recipe(recipe)
if result != None and result not in self.discovered:
reward = self.recipe_book.entity2level[result]
self.discovered.append(result)
self.episode_reward += reward
if result:
result_i = self.recipe_book.entity2index[result]
table_size = len(self.table)
self.table.append(result)
self.table_index[table_size] = result_i
self.table_features[table_size, :] = self.feature_map.feature(result)
# Clear selection
self._reset_selection()
self.episode_step += 1
if (
self.episode_mix_steps >= self.max_mix_steps
or self.episode_step >= self.max_steps
):
self.done = True
obs = self._get_observation()
return obs, reward, self.done, {}
gym.envs.registration.register(
id="wordcraft-multistep-no-goal-v0",
entry_point=f"{__name__}:WordCraftEnvNoGoal",
)
def run_apex():
top_dir = "."
ray.init()
config = apex.APEX_DEFAULT_CONFIG.copy()
config["num_gpus"] = 0
config["num_workers"] = 1
tasks = ["example"]
for task in tasks:
project_path = top_dir + "/" + task
config["env_config"] = {"log_path": project_path + "/tb_logs",
"data_path": recipe_book_info[task]["path"], "seed": None,
"recipe_book_path": None,
"feature_type": "one_hot",
"shuffle_features": False,
"random_feature_size": 300,
"max_depth": 8,
"split": "by_recipe",
"train_ratio": 1.0,
"num_distractors": 0,
"uniform_distractors": False,
"max_mix_steps": 8,
"subgoal_rewards": True}
select_env = "wordcraft-multistep-no-goal-v0"
register_env(select_env, lambda config: WordCraftEnvNoGoal)
trainer = apex.ApexTrainer(env=WordCraftEnvNoGoal, config=config)
for _ in range(1000):
# Perform one iteration of training the policy with Apex-DQN
result = trainer.train()
print(pretty_print(result))
if __name__ == "__main__":
run_apex()
Issue Severity
High: It blocks me from completing my task.
Hey @eleninisioti , thanks for posting this issue here!
I tried to reproduce it, but ran into another, seemingly related error message:
The observation collected from env.reset() was not contained within your env's observation space. It is possible that there was a type mismatch, or that one of the sub-observations was out of bounds:
reset_obs: {'table_index': array([ 1, 0, 2, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1]), 'table_features': array([[0., 1., 0., ..., 0., 0., 0.],
[1., 0., 0., ..., 0., 0., 0.],
[0., 0., 1., ..., 0., 0., 0.],
...,
[0., 0., 0., ..., 0., 0., 0.],
[0., 0., 0., ..., 0., 0., 0.],
[0., 0., 0., ..., 0., 0., 0.]]), 'selection_index': array([-1, -1]), 'selection_features': array([[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0.],
[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
0., 0., 0., 0.]])}
env.observation_space: Dict(selection_features:Box(-1.0, 1.0, (2, 20), float32), selection_index:MultiDiscrete([20 20]), table_features:Box(-1.0, 1.0, (265, 20), float32), table_index:MultiDiscrete([20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20
20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20
20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20
20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20
20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20
20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20
20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20
20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20
20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20
20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20
20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20
20]))
reset_obs's dtype: <class 'dict'>
env.observation_space's dtype: None
If you look at the value returned from your reset()
method on the "table_index" key, you can see that even though the space for this is MultiDiscrete([20, 20, ...])
, some of the values are -1.0 (only values between 0 and 19 are allowed here).
Could you make sure to fix that issue inside your environment or your observation space and repost a new repro script here?
Since I was not able to fully reproduce your exact error, could you also post the Ray version and OSS here? Maybe your particular issue was already fixed in master.
Could be related to the ordering issue in the dict space: https://github.com/ray-project/ray/issues/33327