ml-agents
ml-agents copied to clipboard
mlagents-env be able to query directly from the sim without using step() and reset()
Currently, the way the protocol buffers in the mlagents_env is designed, we can not send immediate queries to sim and get responses. For a project we had to add this feature, sending it as a pull request.
In current setup, even if you add side channels, the response does not arrive unless the request is pushed before step() or reset() and response is received after one of these.
To help us understand your goal of this feature request -- can you tell us a bit more about your use case, and why the is the current way of message communication a blocker for your project?
Since this feature would be useful to the rest of the community also, hence attempt to explain in more detail below:
Most of the simulators allow being queried directly. For example, the habitat-sim https://github.com/facebookresearch/habitat-sim. As an example if the sim has an agent or a map, you can directly call functions on the sim anytime, without involving the reinforcement learning loop of reset()
and step()
, such as the following query functions:
sim.get_agent_state()
sim.get_navgiable_map()
sim.sample_navigable_point()
sim.is_navigable_point()
Having said that, we tried implementing it with mlagents_env
using side channels. The process was convoluted as follows:
- First you have to send a message from python side channel, say get_navigable_map
- Then in the C# you put this map back n the message queue.
- Then in the python side you have to call another function to get this message, i.e. the map itself.
- The worst part of the above 3 step loop is that the message doesn't get back right away. It only gets back after the next step() or reset() function is executed.
The pull request that I have sent, adds a new kind of query called immediate along with step and reset. If this request is merged then the above loop will become as follows:
- Send a request from the python side, say get_navigable_map
- in C# send the map back.
- Get the response right away without waiting for step() or reset() methods.
The function call in python now looks like this :
def get_navigable_map(self, resolution_x=256, resolution_y=256,
cell_occupancy_threshold=0.5) -> np.ndarray:
self.uenv._process_immediate_message(
self.map_side_channel.build_immediate_request("binaryMap",
[resolution_x,
resolution_y,
cell_occupancy_threshold]))
return self.map_side_channel.requested_map
The map side-channel class:
class MapSideChannel(SideChannel):
"""This is the SideChannel for retrieving map data from Unity.
You can send map requests to Unity using send_request.
The arguments for a mapRequest are ("binaryMap", [RESOLUTION_X, RESOLUTION_Y, THRESHOLD])
"""
resolution = None
def __init__(self) -> None:
channel_id = uuid.UUID("24b099f1-b184-407c-af72-f3d439950bdb")
super().__init__(channel_id)
self.requested_map = None
def on_message_received(self, msg: IncomingMessage) -> np.ndarray:
if self.resolution is None:
return None
raw_bytes = msg.get_raw_bytes()
self.requested_map = np.unpackbits(raw_bytes)[
0:self.resolution[0] * self.resolution[1]]
self.requested_map = self.requested_map.reshape((self.resolution[1],
self.resolution[0]))
return self.requested_map
def send_request(self, key: str, value: List[float]) -> None:
"""Sends a request to Unity
The arguments for a mapRequest are ("binaryMap", [RESOLUTION_X, RESOLUTION_Y, THRESHOLD])
"""
self.resolution = value
msg = OutgoingMessage()
msg.write_string(key)
msg.write_float32_list(value)
super().queue_message_to_send(msg)
def build_immediate_request(self, key: str,
value: List[float]) -> bytearray:
self.resolution = value
msg = OutgoingMessage()
msg.write_string(key)
msg.write_float32_list(value)
result = bytearray()
result += self.channel_id.bytes_le
result += struct.pack("<i", len(msg.buffer))
result += msg.buffer
return result
However if the capability to send immediate messages is not there then the get_navigable_map would be divided into two functions like this:
def start_navigable_map(self, resolution_x=256, resolution_y=256,
cell_occupancy_threshold=0.5):
"""
Returns:
Nothing
"""
self.map_side_channel.send_request("binaryMap",
[resolution_x, resolution_y,
cell_occupancy_threshold])
def get_navigable_map(self) -> np.ndarray:
"""
Returns:
A numpy array having 0 for non-navigable and 1 for navigable cells
Note:
This only works if you have called ``reset()`` or ``step()`` on the
environment at least once after calling start_navigable_map() method.
"""
return self.map_side_channel.requested_map