BrowserAgentError
when i tryed the browser agent i got this errror : 'NoneType' object has no attribute 'strip' Traceback (most recent call last): Traceback (most recent call last): File "/a0/agent.py", line 332, in monologue tools_result = await self.process_tools(agent_response) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/a0/agent.py", line 685, in process_tools await tool.after_execution(response) File "/a0/python/helpers/tool.py", line 36, in after_execution text = response.message.strip() ^^^^^^^^^^^^^^^^^^^^^^ AttributeError: 'NoneType' object has no attribute 'strip'
AttributeError: 'NoneType' object has no attribute 'strip' PLs help
same problem
yup me too
Same problem here as well.
I've applied the patch below to /a0/python/helpers/tool.py as a temporary measure. The underlying issue remains and requires a proper fix from developers. To use this workaround, apply the changes to the file at that path within the running Docker container.
/a0/python/helpers/tool.py:
from dataclasses import dataclass
from typing import Any # Added for more flexible response type hint if needed, but Response | None is good
from agent import Agent # Assuming Agent is correctly defined elsewhere
from python.helpers.print_style import PrintStyle
@dataclass
class Response:
message: str
break_loop: bool
class Tool:
def __init__(self, agent: Agent, name: str, method: str | None, args: dict[str,str], message: str, **kwargs) -> None:
self.agent = agent
self.name = name
self.method = method
self.args = args
self.message = message # This message seems to be the initial prompt/instruction to the tool, not its response
self.log = None # Initialize log attribute
@abstractmethod
async def execute(self,**kwargs) -> Response:
pass
async def before_execution(self, **kwargs):
PrintStyle(font_color="#1B4F72", padding=True, background_color="white", bold=True).print(f"{self.agent.agent_name}: Using tool '{self.name}'")
self.log = self.get_log_object() # self.log is set here
if self.args and isinstance(self.args, dict):
for key, value in self.args.items():
PrintStyle(font_color="#85C1E9", bold=True).stream(self.nice_key(key)+": ")
# Ensure value is a string before checking for newline
value_str = str(value) if value is not None else ""
PrintStyle(font_color="#85C1E9", padding=isinstance(value_str,str) and "\n" in value_str).stream(value_str)
PrintStyle().print()
async def after_execution(self, response: Response | None, **kwargs):
"""
Handles the response from the tool execution, making sure to
gracefully handle cases where the response or its message is None or not a string.
"""
processed_text_for_history = ""
message_for_display_and_log = f"Tool '{self.name}': Execution resulted in an unexpected state." # Default
status = "error_unknown" # Possible statuses: "success", "warning_null_message", "warning_non_string", "error_no_response", "error_invalid_type"
if response is not None and isinstance(response, Response):
if isinstance(response.message, str):
processed_text_for_history = response.message.strip()
message_for_display_and_log = response.message # Use original for display/log
status = "success"
elif response.message is None:
message_for_display_and_log = f"Tool '{self.name}': Returned a null message."
processed_text_for_history = "" # Empty string for history for null message
status = "warning_null_message"
else: # Message is not a string and not None
message_for_display_and_log = f"Tool '{self.name}': Returned non-string message content (type: {type(response.message)}): {str(response.message)}"
try:
processed_text_for_history = str(response.message).strip()
except Exception: # pylint: disable=broad-except
processed_text_for_history = str(response.message) # Fallback if strip fails
status = "warning_non_string"
elif response is None:
message_for_display_and_log = f"Tool '{self.name}': Failed to return a response object (tool execution may have failed internally)."
processed_text_for_history = "Error: Tool failed to return a response object."
status = "error_no_response"
else: # response is not None but not a Response object
message_for_display_and_log = f"Tool '{self.name}': Returned an invalid response object type ({type(response)})."
processed_text_for_history = f"Error: Tool returned invalid response object type {type(response)}."
status = "error_invalid_type"
self.agent.hist_add_tool_result(self.name, processed_text_for_history)
heading_text = f"{self.agent.agent_name}: Response from tool '{self.name}'"
heading_color = "#1B4F72" # Default for success (dark blue)
message_color = "#85C1E9" # Default for success (light blue)
if status == "success":
pass # Defaults are fine
elif status in ["warning_null_message", "warning_non_string"]:
heading_text = f"{self.agent.agent_name}: Notice from tool '{self.name}' (Warning)"
heading_color = "#E67E22" # Orange for warning
message_color = "#E67E22" # Orange for warning
elif status in ["error_no_response", "error_invalid_type", "error_unknown"]:
heading_text = f"{self.agent.agent_name}: Error from tool '{self.name}'"
heading_color = "#C0392B" # Red for error
message_color = "#C0392B" # Red for error
PrintStyle(font_color=heading_color, background_color="white", padding=True, bold=True).print(heading_text)
PrintStyle(font_color=message_color).print(message_for_display_and_log)
if self.log is not None:
self.log.update(content=message_for_display_and_log) # Log the same message that was displayed
else:
# This indicates self.log was not set in before_execution, which would be an issue.
PrintStyle(font_color="#C0392B", bold=True).print(f"CRITICAL WARNING: Log object not initialized for tool '{self.name}' before after_execution call. Log update skipped.")
def get_log_object(self):
if self.method:
heading = f"{self.agent.agent_name}: Using tool '{self.name}:{self.method}'"
else:
heading = f"{self.agent.agent_name}: Using tool '{self.name}'"
# Ensure self.args is a dictionary for logging, defaulting to empty if not.
kvps_for_log = self.args if isinstance(self.args, dict) else {}
return self.agent.context.log.log(type="tool", heading=heading, content="", kvps=kvps_for_log)
def nice_key(self, key:str):
words = key.split('_')
# Capitalize first word, lowercase others if they are part of the key
# Handle cases where a word might already be partly capitalized or fully uppercase
if words:
first_word = words[0]
# Heuristic: if the first word is all caps and more than 1 char, keep as is (e.g. API_Key)
# otherwise, capitalize it.
if first_word.isupper() and len(first_word) > 1:
processed_first_word = first_word
else:
processed_first_word = first_word.capitalize()
remaining_words = [word.lower() for word in words[1:]]
words = [processed_first_word] + remaining_words
result = ' '.join(words)
return result```
Thanks
Here are revised files. This code will let the LLM know that the browser agent failed and it should resort to other tools or methods to achieve its goal. Just a better temporary workaround:
files to replace and their locations: a0/agent.py a0/tools/browser_agent.py a0/helpers/memory.py
agent.py: `import asyncio from collections import OrderedDict from dataclasses import dataclass, field from datetime import datetime import json from typing import Any, Awaitable, Coroutine, Optional, Dict, TypedDict import uuid import models
from python.helpers import extract_tools, rate_limiter, files, errors, history, tokens from python.helpers import dirty_json # Keep this if used elsewhere from python.helpers.print_style import PrintStyle from langchain_core.prompts import ( ChatPromptTemplate, ) from langchain_core.messages import HumanMessage, SystemMessage, AIMessage, BaseMessage
import python.helpers.log as Log
from python.helpers.dirty_json import DirtyJson # Already imported above
from python.helpers.defer import DeferredTask from typing import Callable from python.helpers.localization import Localization
class AgentContext:
_contexts: dict[str, "AgentContext"] = {}
_counter: int = 0
def __init__(
self,
config: "AgentConfig",
id: str | None = None,
name: str | None = None,
agent0: "Agent|None" = None,
log: Log.Log | None = None,
paused: bool = False,
streaming_agent: "Agent|None" = None,
created_at: datetime | None = None,
):
self.id = id or str(uuid.uuid4())
self.name = name
self.config = config
self.log = log or Log.Log()
self.agent0 = agent0 or Agent(0, self.config, self)
self.paused = paused
self.streaming_agent = streaming_agent
self.task: DeferredTask | None = None
self.created_at = created_at or datetime.now()
AgentContext._counter += 1
self.no = AgentContext._counter
existing = self._contexts.get(self.id, None)
if existing:
AgentContext.remove(self.id)
self._contexts[self.id] = self
@staticmethod
def get(id: str):
return AgentContext._contexts.get(id, None)
@staticmethod
def first():
if not AgentContext._contexts:
return None
return list(AgentContext._contexts.values())[0]
@staticmethod
def remove(id: str):
context = AgentContext._contexts.pop(id, None)
if context and context.task:
context.task.kill()
return context
def serialize(self):
return {
"id": self.id,
"name": self.name,
"created_at": (
Localization.get().serialize_datetime(self.created_at)
if self.created_at else Localization.get().serialize_datetime(datetime.fromtimestamp(0))
),
"no": self.no,
"log_guid": self.log.guid,
"log_version": len(self.log.updates),
"log_length": len(self.log.logs),
"paused": self.paused,
}
def get_created_at(self):
return self.created_at
def kill_process(self):
if self.task:
self.task.kill()
def reset(self):
self.kill_process()
self.log.reset()
self.agent0 = Agent(0, self.config, self)
self.streaming_agent = None
self.paused = False
def nudge(self):
self.kill_process()
self.paused = False
if self.streaming_agent:
current_agent = self.streaming_agent
else:
current_agent = self.agent0
self.task = self.run_task(current_agent.monologue)
return self.task
def communicate(self, msg: "UserMessage", broadcast_level: int = 1):
self.paused = False
if self.streaming_agent:
current_agent = self.streaming_agent
else:
current_agent = self.agent0
if self.task and self.task.is_alive():
intervention_agent = current_agent
while intervention_agent and broadcast_level != 0:
intervention_agent.intervention = msg
broadcast_level -= 1
intervention_agent = intervention_agent.data.get(
Agent.DATA_NAME_SUPERIOR, None
)
else:
self.task = self.run_task(self._process_chain, current_agent, msg)
return self.task
def run_task(
self, func: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any
):
if not self.task:
self.task = DeferredTask(
thread_name=self.__class__.__name__,
)
self.task.start_task(func, *args, **kwargs)
return self.task
async def _process_chain(self, agent: "Agent", msg: "UserMessage|str", user=True):
try:
_ = ( # Assign to _ to indicate not used directly after call
agent.hist_add_user_message(msg)
if user
else agent.hist_add_tool_result(
tool_name="call_subordinate", tool_result=msg
)
)
response = await agent.monologue()
superior = agent.data.get(Agent.DATA_NAME_SUPERIOR, None)
if superior:
response = await self._process_chain(superior, response, False)
return response
except Exception as e:
agent.handle_critical_exception(e)
@dataclass class ModelConfig: provider: models.ModelProvider name: str ctx_length: int = 0 limit_requests: int = 0 limit_input: int = 0 limit_output: int = 0 vision: bool = False kwargs: dict = field(default_factory=dict)
@dataclass class AgentConfig: chat_model: ModelConfig utility_model: ModelConfig embeddings_model: ModelConfig browser_model: ModelConfig prompts_subdir: str = "" memory_subdir: str = "" knowledge_subdirs: list[str] = field(default_factory=lambda: ["default", "custom"]) code_exec_docker_enabled: bool = False code_exec_docker_name: str = "A0-dev" code_exec_docker_image: str = "frdel/agent-zero-run:development" code_exec_docker_ports: dict[str, int] = field( default_factory=lambda: {"22/tcp": 55022, "80/tcp": 55080} ) code_exec_docker_volumes: dict[str, dict[str, str]] = field( default_factory=lambda: { files.get_base_dir(): {"bind": "/a0", "mode": "rw"}, files.get_abs_path("work_dir"): {"bind": "/root", "mode": "rw"}, } ) code_exec_ssh_enabled: bool = True code_exec_ssh_addr: str = "localhost" code_exec_ssh_port: int = 55022 code_exec_ssh_user: str = "root" code_exec_ssh_pass: str = "" additional: Dict[str, Any] = field(default_factory=dict)
@dataclass class UserMessage: message: str attachments: list[str] = field(default_factory=list[str]) system_message: list[str] = field(default_factory=list[str])
class LoopData: def init(self, **kwargs): self.iteration = -1 self.system: list[str] = [] # Ensure type hint self.user_message: Optional[history.Message] = None # Use Optional self.history_output: list[history.OutputMessage] = [] self.extras_temporary: OrderedDict[str, history.MessageContent] = OrderedDict() self.extras_persistent: OrderedDict[str, history.MessageContent] = OrderedDict() self.last_response = ""
for key, value in kwargs.items():
setattr(self, key, value)
class InterventionException(Exception): pass
class RepairableException(Exception): pass
class HandledException(Exception): pass
class Agent:
DATA_NAME_SUPERIOR = "_superior"
DATA_NAME_SUBORDINATE = "_subordinate"
DATA_NAME_CTX_WINDOW = "ctx_window"
def __init__(
self, number: int, config: AgentConfig, context: AgentContext | None = None
):
self.config = config
self.context = context or AgentContext(config)
self.number = number
self.agent_name = f"Agent {self.number}"
self.history = history.History(self)
self.last_user_message: Optional[history.Message] = None # Use Optional
self.intervention: Optional[UserMessage] = None # Use Optional
self.data: Dict[str, Any] = {} # Ensure type hint
self.loop_data: LoopData = LoopData() # Initialize loop_data
async def monologue(self):
while True:
try:
self.loop_data = LoopData(user_message=self.last_user_message)
await self.call_extensions("monologue_start", loop_data=self.loop_data)
printer = PrintStyle(italic=True, font_color="#b3ffd9", padding=False)
while True:
self.context.streaming_agent = self
self.loop_data.iteration += 1
await self.call_extensions("message_loop_start", loop_data=self.loop_data)
try:
prompt = await self.prepare_prompt(loop_data=self.loop_data)
PrintStyle(
bold=True,
font_color="green",
padding=True,
background_color="white",
).print(f"{self.agent_name}: Generating")
log = self.context.log.log(
type="agent", heading=f"{self.agent_name}: Generating"
)
async def stream_callback(chunk: str, full: str):
if chunk:
printer.stream(chunk)
self.log_from_stream(full, log)
agent_response = await self.call_chat_model(
prompt, callback=stream_callback
)
await self.handle_intervention(agent_response)
if self.loop_data.last_response == agent_response:
self.hist_add_ai_response(agent_response)
warning_msg = self.read_prompt("fw.msg_repeat.md")
self.hist_add_warning(message=warning_msg)
PrintStyle(font_color="orange", padding=True).print(warning_msg)
self.context.log.log(type="warning", content=warning_msg)
else:
self.hist_add_ai_response(agent_response)
tools_result = await self.process_tools(agent_response)
if tools_result:
return tools_result
except InterventionException:
pass
except RepairableException as e:
error_message = errors.format_error(e)
self.hist_add_warning(error_message)
PrintStyle(font_color="red", padding=True).print(error_message)
self.context.log.log(type="error", content=error_message)
except Exception as e:
self.handle_critical_exception(e)
finally:
await self.call_extensions("message_loop_end", loop_data=self.loop_data)
except InterventionException:
pass
except Exception as e:
self.handle_critical_exception(e)
finally:
self.context.streaming_agent = None
await self.call_extensions("monologue_end", loop_data=self.loop_data)
async def prepare_prompt(self, loop_data: LoopData) -> ChatPromptTemplate:
await self.call_extensions("message_loop_prompts_before", loop_data=loop_data)
loop_data.system = await self.get_system_prompt(self.loop_data)
loop_data.history_output = self.history.output()
await self.call_extensions("message_loop_prompts_after", loop_data=loop_data)
extras_content = {}
if loop_data.extras_persistent: extras_content.update(loop_data.extras_persistent)
if loop_data.extras_temporary: extras_content.update(loop_data.extras_temporary)
extras_messages: list[history.OutputMessage] = []
if extras_content:
extras_prompt = self.read_prompt("agent.context.extras.md", extras=dirty_json.stringify(extras_content))
extras_messages = history.Message(False, content=extras_prompt).output()
loop_data.extras_temporary.clear()
history_langchain: list[BaseMessage] = history.output_langchain(
loop_data.history_output + extras_messages
)
system_text = "\n\n".join(loop_data.system)
prompt = ChatPromptTemplate.from_messages(
[
SystemMessage(content=system_text),
*history_langchain,
]
)
self.set_data(
Agent.DATA_NAME_CTX_WINDOW,
{
"text": prompt.format(),
"tokens": self.history.get_tokens()
+ tokens.approximate_tokens(system_text)
+ tokens.approximate_tokens(history.output_text(extras_messages)),
},
)
return prompt
def handle_critical_exception(self, exception: Exception):
if isinstance(exception, HandledException):
raise exception
elif isinstance(exception, asyncio.CancelledError):
PrintStyle(font_color="white", background_color="red", padding=True).print(
f"Context {self.context.id} terminated during message loop"
)
raise HandledException(exception)
else:
error_text = errors.error_text(exception)
error_message = errors.format_error(exception)
PrintStyle(font_color="red", padding=True).print(error_message)
self.context.log.log(
type="error",
heading="Error",
content=error_message,
kvps={"text": error_text},
)
raise HandledException(exception)
async def get_system_prompt(self, loop_data: LoopData) -> list[str]:
system_prompt_list: list[str] = [] # Ensure type hint
await self.call_extensions(
"system_prompt", system_prompt=system_prompt_list, loop_data=loop_data
)
return system_prompt_list
def parse_prompt(self, file: str, **kwargs):
prompt_dir = files.get_abs_path("prompts/default")
backup_dir = []
if self.config.prompts_subdir:
prompt_dir = files.get_abs_path("prompts", self.config.prompts_subdir)
backup_dir.append(files.get_abs_path("prompts/default"))
parsed_content = files.parse_file( # Renamed to avoid conflict
files.get_abs_path(prompt_dir, file), _backup_dirs=backup_dir, **kwargs
)
return parsed_content
def read_prompt(self, file: str, **kwargs) -> str:
prompt_dir = files.get_abs_path("prompts/default")
backup_dir = []
if self.config.prompts_subdir:
prompt_dir = files.get_abs_path("prompts", self.config.prompts_subdir)
backup_dir.append(files.get_abs_path("prompts/default"))
file_content = files.read_file( # Renamed to avoid conflict
files.get_abs_path(prompt_dir, file), _backup_dirs=backup_dir, **kwargs
)
file_content = files.remove_code_fences(file_content)
return file_content
def get_data(self, field: str):
return self.data.get(field, None)
def set_data(self, field: str, value):
self.data[field] = value
def hist_add_message(
self, ai: bool, content: history.MessageContent, tokens: int = 0
):
return self.history.add_message(ai=ai, content=content, tokens=tokens)
def hist_add_user_message(self, message: UserMessage, intervention: bool = False):
self.history.new_topic()
prompt_file = "fw.intervention.md" if intervention else "fw.user_message.md"
content = self.parse_prompt(
prompt_file,
message=message.message,
attachments=message.attachments,
system_message=message.system_message
)
if isinstance(content, dict):
content = {k: v for k, v in content.items() if v}
msg_obj = self.hist_add_message(False, content=content) # Renamed to avoid conflict
self.last_user_message = msg_obj
return msg_obj
def hist_add_ai_response(self, message: str):
self.loop_data.last_response = message
content = self.parse_prompt("fw.ai_response.md", message=message)
return self.hist_add_message(True, content=content)
def hist_add_warning(self, message: history.MessageContent):
content = self.parse_prompt("fw.warning.md", message=message)
return self.hist_add_message(False, content=content)
def hist_add_tool_result(self, tool_name: str, tool_result: str):
content = self.parse_prompt(
"fw.tool_result.md", tool_name=tool_name, tool_result=tool_result
)
return self.hist_add_message(False, content=content)
def concat_messages(self, messages):
return self.history.output_text(human_label="user", ai_label="assistant")
def get_chat_model(self):
return models.get_model(
models.ModelType.CHAT,
self.config.chat_model.provider,
self.config.chat_model.name,
**self.config.chat_model.kwargs,
)
def get_utility_model(self):
return models.get_model(
models.ModelType.CHAT,
self.config.utility_model.provider,
self.config.utility_model.name,
**self.config.utility_model.kwargs,
)
def get_embedding_model(self):
return models.get_model(
models.ModelType.EMBEDDING,
self.config.embeddings_model.provider,
self.config.embeddings_model.name,
**self.config.embeddings_model.kwargs,
)
async def call_utility_model(
self,
system: str,
message: str,
callback: Callable[[str], Awaitable[None]] | None = None,
background: bool = False,
):
prompt = ChatPromptTemplate.from_messages(
[SystemMessage(content=system), HumanMessage(content=message)]
)
response_text = "" # Renamed to avoid conflict
model = self.get_utility_model()
limiter = await self.rate_limiter(
self.config.utility_model, prompt.format(), background
)
async for chunk in (prompt | model).astream({}):
await self.handle_intervention()
content = models.parse_chunk(chunk)
limiter.add(output=tokens.approximate_tokens(content))
response_text += content
if callback:
await callback(content)
return response_text
async def call_chat_model(
self,
prompt: ChatPromptTemplate,
callback: Callable[[str, str], Awaitable[None]] | None = None,
):
response_text = "" # Renamed to avoid conflict
model = self.get_chat_model()
limiter = await self.rate_limiter(self.config.chat_model, prompt.format())
async for chunk in (prompt | model).astream({}):
await self.handle_intervention()
content = models.parse_chunk(chunk)
limiter.add(output=tokens.approximate_tokens(content))
response_text += content
if callback:
await callback(content, response_text)
return response_text
async def rate_limiter(
self, model_config: ModelConfig, input_str: str, background: bool = False # Renamed input to input_str
):
wait_log = None
async def wait_callback(msg_text: str, key: str, total: int, limit: int): # Renamed msg to msg_text
nonlocal wait_log
if not wait_log:
wait_log = self.context.log.log(
type="util",
update_progress="none",
heading=msg_text,
model=f"{model_config.provider.value}\\{model_config.name}",
)
wait_log.update(heading=msg_text, key=key, value=total, limit=limit)
if not background:
self.context.log.set_progress(msg_text, -1)
limiter = models.get_rate_limiter(
model_config.provider,
model_config.name,
model_config.limit_requests,
model_config.limit_input,
model_config.limit_output,
)
limiter.add(input=tokens.approximate_tokens(input_str))
limiter.add(requests=1)
await limiter.wait(callback=wait_callback)
return limiter
async def handle_intervention(self, progress: str = ""):
while self.context.paused:
await asyncio.sleep(0.1)
if self.intervention:
intervention_msg = self.intervention # Renamed to avoid conflict
self.intervention = None
if progress.strip():
self.hist_add_ai_response(progress)
self.hist_add_user_message(intervention_msg, intervention=True)
raise InterventionException(str(intervention_msg.message)) # Pass message content
async def wait_if_paused(self):
while self.context.paused:
await asyncio.sleep(0.1)
async def process_tools(self, msg: str): # msg is the LLM's response with tool request
tool_request = extract_tools.json_parse_dirty(msg)
if tool_request is not None:
tool_name_from_llm = tool_request.get("tool_name", "") # Renamed to avoid conflict
tool_method = None
tool_args = tool_request.get("tool_args", {})
if ":" in tool_name_from_llm:
tool_name_from_llm, tool_method = tool_name_from_llm.split(":", 1)
tool_instance = self.get_tool(name=tool_name_from_llm, method=tool_method, args=tool_args, message=msg) # Renamed
await self.handle_intervention()
await tool_instance.before_execution(**tool_args)
await self.handle_intervention()
# tool_response is a Response object (message: str, break_loop: bool)
tool_response = await tool_instance.execute(**tool_args)
await self.handle_intervention()
# tool_instance.after_execution adds tool_response.message to history via hist_add_tool_result
await tool_instance.after_execution(tool_response)
await self.handle_intervention()
# Check for critical tool failure signaled by the tool's response message
try:
# tool_response.message should be the JSON string from BrowserAgent if it failed critically
if isinstance(tool_response.message, str):
response_data = json.loads(tool_response.message)
if isinstance(response_data, dict) and "critical_error" in response_data:
failed_tool_name = response_data.get("tool_name", tool_name_from_llm) # Use tool_name from payload if available
error_details = response_data.get("details", "No specific details provided.")
original_task_for_tool = response_data.get("original_task_for_tool", "N/A")
self.context.log.log(
type="error",
heading=f"Critical failure in tool '{failed_tool_name}'",
content=error_details,
kvps={"original_task": original_task_for_tool}
)
PrintStyle(font_color="red", padding=True, bold=True).print(
f"{self.agent_name}: Critical failure reported by tool '{failed_tool_name}'. Details: {error_details}"
)
guidance_message_parts = [
f"ATTENTION: The tool '{failed_tool_name}' reported a CRITICAL FAILURE while attempting to perform the following task: '{original_task_for_tool}'.",
f"Failure details from the tool: {error_details}",
"You MUST NOT attempt to use the same tool ('{}') for this exact sub-task again in your immediate next step.".format(failed_tool_name),
"Instead, you MUST devise an ALTERNATIVE STRATEGY to achieve the original goal.",
"Consider the following alternatives:",
" - Use the 'code_interpreter' tool to write and execute a Python script. For web tasks, this could involve libraries like 'requests' for fetching page content and 'BeautifulSoup4' or 'lxml' for parsing HTML. You can install these libraries if needed.",
" - Use the 'shell_command' tool if a command-line utility can accomplish the task (e.g., 'curl' for simple GET requests, or other specific CLI tools).",
" - Re-evaluate if another available tool is more suitable for this task.",
" - If the task was information retrieval, consider reformulating it as a search query for a search tool if available, or use Python to access search APIs.",
"Analyze the failure and the original task, then select the most appropriate alternative method and provide the necessary arguments for the chosen tool."
]
guidance_message = "\n".join(guidance_message_parts)
# This adds the guidance as a new "warning" message in the history,
# which will be part of the next prompt to the LLM.
self.hist_add_warning(message=guidance_message)
PrintStyle(font_color="#FF8C00", padding=True, bold=True).print( # DarkOrange color for emphasis
f"{self.agent_name}: Instructing LLM to find alternatives for failed task of tool '{failed_tool_name}'."
)
except (json.JSONDecodeError, TypeError) as e:
# This means tool_response.message was not a JSON string or None, so no critical error signal from tool.
# Or, it was JSON but not matching the 'critical_error' structure. This is normal for successful tool calls.
# PrintStyle(font_color="grey").print(f"Debug: Tool response not a critical error signal or not JSON: {e} | Response: {tool_response.message[:100]}")
pass # Normal execution path if not a critical error JSON
if tool_response.break_loop:
return tool_response.message # Return the actual message from the tool
else:
# This 'else' block handles cases where the LLM's response (msg) was not a valid tool request JSON.
warning_msg_text = self.read_prompt("fw.msg_misformat.md") # Renamed
self.hist_add_warning(warning_msg_text)
PrintStyle(font_color="red", padding=True).print(warning_msg_text)
self.context.log.log(
type="error", content=f"{self.agent_name}: LLM message misformat - did not provide a valid tool request."
)
return None # Explicitly return None if no tool broke the loop with a message
def log_from_stream(self, stream: str, logItem: Log.LogItem):
try:
if len(stream) < 25: return
response_obj = dirty_json.DirtyJson.parse_string(stream) # Renamed
if isinstance(response_obj, dict):
logItem.update(content=stream, kvps=response_obj)
except Exception:
pass
def get_tool(self, name: str, method: str | None, args: dict, message: str, **kwargs):
from python.tools.unknown import Unknown
from python.helpers.tool import Tool
classes = extract_tools.load_classes_from_folder(
"python/tools", name + ".py", Tool
)
tool_class = classes[0] if classes else Unknown
return tool_class(agent=self, name=name, method=method, args=args, message=message, **kwargs)
async def call_extensions(self, folder: str, **kwargs) -> Any:
from python.helpers.extension import Extension
classes = extract_tools.load_classes_from_folder(
"python/extensions/" + folder, "*", Extension
)
for cls in classes:
# Ensure the extension class is instantiated correctly if it needs specific args
# For now, assuming Extension(agent=self) is standard
ext_instance = cls(agent=self)
if hasattr(ext_instance, 'execute') and asyncio.iscoroutinefunction(ext_instance.execute):
await ext_instance.execute(**kwargs)
elif hasattr(ext_instance, 'execute'): # For synchronous extensions, though async is preferred
ext_instance.execute(**kwargs)`
browser_agent.py: `import asyncio import json import time from agent import Agent, InterventionException
import models from python.helpers.tool import Tool, Response from python.helpers import files, defer, persist_chat #, strings # Assuming strings.dict_to_text might be available from python.helpers.browser_use import browser_use from python.extensions.message_loop_start._10_iteration_no import get_iter_no from pydantic import BaseModel import uuid from python.helpers.dirty_json import DirtyJson from langchain_core.messages import SystemMessage
class State: @staticmethod async def create(agent: Agent): state = State(agent) return state
def __init__(self, agent: Agent):
self.agent = agent
self.context = None
self.task = None
self.use_agent = None
self.browser = None
self.iter_no = 0
def __del__(self):
self.kill_task()
async def _initialize(self):
if self.context:
return
self.browser = browser_use.Browser(
config=browser_use.BrowserConfig(
headless=True,
disable_security=True,
)
)
self.context = await self.browser.new_context()
self.override_hooks()
await self.context._initialize_session()
pw_context = self.context.session.context
js_override = files.get_abs_path("lib/browser/init_override.js")
await pw_context.add_init_script(path=js_override)
def start_task(self, task: str):
if self.task and self.task.is_alive():
self.kill_task()
if not self.task:
self.task = defer.DeferredTask(
thread_name="BrowserAgent" + self.agent.context.id
)
if self.agent.context.task:
self.agent.context.task.add_child_task(self.task, terminate_thread=True)
self.task.start_task(self._run_task, task)
return self.task
def kill_task(self):
if self.task:
self.task.kill(terminate_thread=True)
self.task = None
self.context = None
self.use_agent = None
self.browser = None
self.iter_no = 0
async def _run_task(self, task_str: str): # Renamed task to task_str to avoid conflict
agent = self.agent
await self._initialize()
class CustomSystemPrompt(browser_use.SystemPrompt):
def get_system_message(self) -> SystemMessage:
existing_rules = super().get_system_message().content # Use .content for SystemMessage
new_rules = agent.read_prompt("prompts/browser_agent.system.md")
return SystemMessage(content=f"{existing_rules}\n{new_rules}".strip())
class DoneResult(BaseModel):
title: str
response: str
page_summary: str
controller = browser_use.Controller()
@controller.registry.action("Done with task", param_model=DoneResult)
async def done(params: DoneResult):
result = browser_use.ActionResult(
is_done=True, extracted_content=params.model_dump_json()
)
return result
model = models.get_model(
type=models.ModelType.CHAT,
provider=self.agent.config.browser_model.provider,
name=self.agent.config.browser_model.name,
**self.agent.config.browser_model.kwargs,
)
self.use_agent = browser_use.Agent(
task=task_str, # Use renamed task_str
browser_context=self.context,
llm=model,
use_vision=self.agent.config.browser_model.vision,
system_prompt_class=CustomSystemPrompt,
controller=controller,
)
self.iter_no = get_iter_no(self.agent)
# The error "FW:Action error: Error code: 400 - {'error': "Invalid tool_choice type: 'object'..."
# suggests an issue within the browser_use.Agent's LLM interaction.
# We'll rely on its result to determine success/failure.
run_result = await self.use_agent.run()
return run_result # This is the browser_use.Agent's result object
def override_hooks(self):
def override_hook(func):
async def wrapper(*args, **kwargs):
await self.agent.wait_if_paused()
if self.iter_no != get_iter_no(self.agent):
raise InterventionException("Task cancelled")
return await func(*args, **kwargs)
return wrapper
if self.context:
self.context.get_state = override_hook(self.context.get_state)
self.context.get_session = override_hook(self.context.get_session)
self.context.remove_highlights = override_hook(self.context.remove_highlights)
async def get_page(self):
if self.use_agent:
return await self.use_agent.browser_context.get_current_page()
class BrowserAgent(Tool):
async def execute(self, message="", reset="", **kwargs): # 'message' is the task for the browser agent
self.guid = str(uuid.uuid4())
reset_flag = str(reset).lower().strip() == "true"
await self.prepare_state(reset=reset_flag)
# 'message' here is the specific instruction for the browser agent, e.g., "Open URL X and find Y"
current_browser_task_description = message
task = self.state.start_task(current_browser_task_description)
while not task.is_ready():
await self.agent.handle_intervention()
await asyncio.sleep(1)
try:
update = await self.get_update()
log_entries = update.get("log")
if log_entries: # Check if log_entries is not None
self.update_progress("\n".join(log_entries))
screenshot = update.get("screenshot", None)
if screenshot:
self.log.update(screenshot=screenshot)
except Exception as e:
# self.agent.context.log.log(type="warning", heading=f"{self.name}: Error during get_update", content=str(e))
pass # Silently pass for now, or log minimally
task_execution_result = await task.result() # This is the result from self.use_agent.run()
raw_final_content = None
# Placeholder for more specific error details from browser_use.Agent if available
# Example: if task_execution_result has an 'error_info' attribute or similar
internal_error_details = "The underlying browser automation engine did not complete successfully or returned no content."
if task_execution_result and hasattr(task_execution_result, 'final_result'):
raw_final_content = task_execution_result.final_result()
if hasattr(task_execution_result, 'error_message') and task_execution_result.error_message: # Hypothetical
internal_error_details = task_execution_result.error_message
elif raw_final_content is None and hasattr(task_execution_result, 'status_message') and task_execution_result.status_message: # Hypothetical
internal_error_details = task_execution_result.status_message
message_for_response: str
answer_text_for_log: str
if raw_final_content is None:
# CRITICAL FAILURE CASE
error_payload = {
"critical_error": "BrowserAgent_Execution_Failed",
"tool_name": self.name,
"details": f"The browser automation tool ('{self.name}') failed to produce a usable result. This often indicates an internal issue, such as a misconfiguration or an unrecoverable error during web interaction (e.g., LLM tool_choice problem, page load timeouts, or complex JavaScript interactions). Details: {internal_error_details}",
"original_task_for_tool": current_browser_task_description # Use the task description passed to this tool
}
message_for_response = json.dumps(error_payload)
answer_text_for_log = f"null (Critical Error reported by {self.name}: {error_payload['details']})"
elif isinstance(raw_final_content, str):
# SUCCESS CASE (or at least, content was returned)
message_for_response = raw_final_content
try:
answer_data = DirtyJson.parse_string(raw_final_content)
# Ensure 'strings.dict_to_text' or an equivalent is available and imported if used
# For now, using json.dumps for reliable string representation of dict/list for logging
if isinstance(answer_data, (dict, list)):
answer_text_for_log = json.dumps(answer_data, indent=2)
# elif 'strings' in globals() and hasattr(strings, 'dict_to_text'):
# answer_text_for_log = strings.dict_to_text(answer_data)
else:
answer_text_for_log = raw_final_content
except Exception:
answer_text_for_log = raw_final_content # Log raw content if parsing fails
else:
# UNEXPECTED RETURN TYPE
warning_message = f"Browser agent ('{self.name}') returned an unexpected data type: {type(raw_final_content).__name__}. Content: {str(raw_final_content)[:200]}..."
# Treat as a critical failure as it's not a usable string output
error_payload = {
"critical_error": "BrowserAgent_Invalid_Output",
"tool_name": self.name,
"details": warning_message,
"original_task_for_tool": current_browser_task_description
}
message_for_response = json.dumps(error_payload)
answer_text_for_log = f"null (Invalid Output Type from {self.name}: {warning_message})"
self.log.update(answer=answer_text_for_log)
return Response(message=message_for_response, break_loop=False)
def get_log_object(self):
return self.agent.context.log.log(
type="browser",
heading=f"{self.agent.agent_name}: Using tool '{self.name}'",
content="", # Initial content, will be updated by after_execution
kvps=self.args,
)
async def get_update(self):
await self.prepare_state() # Ensure state is prepared
result = {}
agent = self.agent
ua = self.state.use_agent
page = await self.state.get_page()
# ctx = self.state.context # Not used directly here
if ua and page:
try:
async def _get_update_internal(): # Renamed to avoid conflict
await agent.wait_if_paused()
log = []
for message_item in ua.message_manager.get_messages(): # Renamed message to message_item
if message_item.type == "system":
continue
if message_item.type == "ai":
try:
if isinstance(message_item.content, str): # Ensure content is a string
data = json.loads(message_item.content)
cs = data.get("current_state")
if cs:
if isinstance(cs.get("memory"), str): log.append("AI:" + cs["memory"])
if isinstance(cs.get("next_goal"), str): log.append("AI:" + cs["next_goal"])
except Exception:
pass # Could log this parsing error if needed
if message_item.type == "human":
content = str(message_item.content).strip()
part = content.split("\n", 1)[0].split(",", 1)[0]
if part:
if len(part) > 150:
part = part[:150] + "..."
log.append("FW:" + part)
result["log"] = log
# Ensure persist_chat and chat folder path logic is correct
screenshots_folder = files.get_abs_path(
persist_chat.get_chat_folder_path(agent.context.id), # Ensure agent.context.id is valid
"browser",
"screenshots"
)
path = files.get_abs_path(screenshots_folder, f"{self.guid}.png")
files.make_dirs(path) # make_dirs should handle existing dirs
await page.screenshot(path=path, full_page=False, timeout=3000)
result["screenshot"] = f"img://{path}&t={str(time.time())}"
if self.state.task and self.state.task.is_alive(): # Check if task is alive
await self.state.task.execute_inside(_get_update_internal)
elif not self.state.task or not self.state.task.is_alive(): # If task is not running, run directly
# This case might be tricky if _get_update_internal relies on task context
# For simplicity, assume it can run if task is not active or if it's called before/after task.
# However, get_update is usually called WHILE task is running.
# If task is dead, page might be None or closed.
# This part might need careful review based on when get_update is called.
# A simple approach: if task isn't running, this update might not be meaningful.
# self.agent.context.log.log(type="debug", heading=f"{self.name}: get_update called but task not active.")
pass
except Exception as e:
# self.agent.context.log.log(type="warning", heading=f"{self.name}: Exception in get_update", content=str(e))
pass # Silently pass for now
return result
async def prepare_state(self, reset=False):
self.state = self.agent.get_data("_browser_agent_state")
if not self.state or reset:
if self.state and hasattr(self.state, 'kill_task'): # Ensure old task is killed if resetting
self.state.kill_task()
self.state = await State.create(self.agent)
self.agent.set_data("_browser_agent_state", self.state)
def update_progress(self, text):
short = text.split("\n")[-1]
if len(short) > 50:
short = short[:50] + "..."
progress = f"Browser: {short}"
self.log.update(progress=text) # self.log is from Tool.before_execution
self.agent.context.log.set_progress(progress)`
memory.py: `from datetime import datetime from typing import Any, List, Sequence from langchain.storage import InMemoryByteStore, LocalFileStore from langchain.embeddings import CacheBackedEmbeddings
from langchain_chroma import Chroma
from langchain_community.vectorstores import FAISS import faiss from langchain_community.docstore.in_memory import InMemoryDocstore from langchain_community.vectorstores.utils import ( DistanceStrategy, ) from langchain_core.embeddings import Embeddings
import os, json
import numpy as np
from python.helpers.print_style import PrintStyle from . import files from langchain_core.documents import Document import uuid from python.helpers import knowledge_import from python.helpers.log import Log, LogItem from enum import Enum from agent import Agent, ModelConfig import models
class MyFaiss(FAISS): # override aget_by_ids def get_by_ids(self, ids: Sequence[str], /) -> List[Document]: # return all self.docstore._dict[id] in ids return [self.docstore._dict[id] for id in (ids if isinstance(ids, list) else [ids]) if id in self.docstore._dict] # type: ignore
async def aget_by_ids(self, ids: Sequence[str], /) -> List[Document]:
return self.get_by_ids(ids)
def get_all_docs(self):
return self.docstore._dict # type: ignore
class Memory:
class Area(Enum):
MAIN = "main"
FRAGMENTS = "fragments"
SOLUTIONS = "solutions"
INSTRUMENTS = "instruments"
index: dict[str, "MyFaiss"] = {}
@staticmethod
async def get(agent: Agent):
memory_subdir = agent.config.memory_subdir or "default"
if Memory.index.get(memory_subdir) is None:
log_item = agent.context.log.log(
type="util",
heading=f"Initializing VectorDB in '/{memory_subdir}'",
)
db, created = Memory.initialize(
log_item,
agent.config.embeddings_model,
memory_subdir,
False,
)
Memory.index[memory_subdir] = db
wrap = Memory(agent, db, memory_subdir=memory_subdir)
if agent.config.knowledge_subdirs:
await wrap.preload_knowledge(
log_item, agent.config.knowledge_subdirs, memory_subdir
)
return wrap
else:
return Memory(
agent=agent,
db=Memory.index[memory_subdir],
memory_subdir=memory_subdir,
)
@staticmethod
async def reload(agent: Agent):
memory_subdir = agent.config.memory_subdir or "default"
if Memory.index.get(memory_subdir):
del Memory.index[memory_subdir]
return await Memory.get(agent)
@staticmethod
def initialize(
log_item: LogItem | None,
model_config: ModelConfig,
memory_subdir: str,
in_memory=False,
) -> tuple[MyFaiss, bool]:
PrintStyle.standard("Initializing VectorDB...")
if log_item:
log_item.stream(progress="\nInitializing VectorDB")
em_dir = files.get_abs_path(
"memory/embeddings"
) # just caching, no need to parameterize
db_dir = Memory._abs_db_dir(memory_subdir)
# make sure embeddings and database directories exist
os.makedirs(db_dir, exist_ok=True)
if in_memory:
store = InMemoryByteStore()
else:
os.makedirs(em_dir, exist_ok=True)
store = LocalFileStore(em_dir)
embeddings_model = models.get_model(
models.ModelType.EMBEDDING,
model_config.provider,
model_config.name,
**model_config.kwargs,
)
embeddings_model_id = files.safe_file_name(
model_config.provider.name + "_" + model_config.name
)
# here we setup the embeddings model with the chosen cache storage
embedder = CacheBackedEmbeddings.from_bytes_store(
embeddings_model, store, namespace=embeddings_model_id
)
# initial DB and docs variables
db: MyFaiss | None = None
docs: dict[str, Document] | None = None
created = False
# if db folder exists and is not empty:
if os.path.exists(db_dir) and files.exists(db_dir, "index.faiss"):
db = MyFaiss.load_local(
folder_path=db_dir,
embeddings=embedder,
allow_dangerous_deserialization=True,
distance_strategy=DistanceStrategy.COSINE,
# normalize_L2=True,
relevance_score_fn=Memory._cosine_normalizer,
) # type: ignore
# if there is a mismatch in embeddings used, re-index the whole DB
emb_ok = False
emb_set_file = files.get_abs_path(db_dir, "embedding.json")
if files.exists(emb_set_file):
embedding_set = json.loads(files.read_file(emb_set_file))
if (
embedding_set["model_provider"] == model_config.provider.name
and embedding_set["model_name"] == model_config.name
):
# model matches
emb_ok = True
# re-index - create new DB and insert existing docs
if db and not emb_ok:
docs = db.get_all_docs()
db = None
# DB not loaded, create one
if not db:
index = faiss.IndexFlatIP(len(embedder.embed_query("example")))
db = MyFaiss(
embedding_function=embedder,
index=index,
docstore=InMemoryDocstore(),
index_to_docstore_id={},
distance_strategy=DistanceStrategy.COSINE,
# normalize_L2=True,
relevance_score_fn=Memory._cosine_normalizer,
)
# insert docs if reindexing
if docs:
PrintStyle.standard("Indexing memories...")
if log_item:
log_item.stream(progress="\nIndexing memories")
db.add_documents(documents=list(docs.values()), ids=list(docs.keys()))
# save DB
Memory._save_db_file(db, memory_subdir)
# save meta file
meta_file_path = files.get_abs_path(db_dir, "embedding.json")
files.write_file(
meta_file_path,
json.dumps(
{
"model_provider": model_config.provider.name,
"model_name": model_config.name,
}
),
)
created = True
return db, created
def __init__(
self,
agent: Agent,
db: MyFaiss,
memory_subdir: str,
):
self.agent = agent
self.db = db
self.memory_subdir = memory_subdir
async def preload_knowledge(
self, log_item: LogItem | None, kn_dirs: list[str], memory_subdir: str
):
if log_item:
log_item.update(heading="Preloading knowledge...")
# db abs path
db_dir = Memory._abs_db_dir(memory_subdir)
# Load the index file if it exists
index_path = files.get_abs_path(db_dir, "knowledge_import.json")
# make sure directory exists
if not os.path.exists(db_dir):
os.makedirs(db_dir)
index: dict[str, knowledge_import.KnowledgeImport] = {}
if os.path.exists(index_path):
with open(index_path, "r") as f:
index = json.load(f)
# preload knowledge folders
index = self._preload_knowledge_folders(log_item, kn_dirs, index)
for file in index:
if index[file]["state"] in ["changed", "removed"] and index[file].get(
"ids", []
): # for knowledge files that have been changed or removed and have IDs
await self.delete_documents_by_ids(
index[file]["ids"]
) # remove original version
if index[file]["state"] == "changed":
index[file]["ids"] = await self.insert_documents(
index[file]["documents"]
) # insert new version
# remove index where state="removed"
index = {k: v for k, v in index.items() if v["state"] != "removed"}
# strip state and documents from index and save it
for file in index:
if "documents" in index[file]:
del index[file]["documents"] # type: ignore
if "state" in index[file]:
del index[file]["state"] # type: ignore
with open(index_path, "w") as f:
json.dump(index, f)
def _preload_knowledge_folders(
self,
log_item: LogItem | None,
kn_dirs: list[str],
index: dict[str, knowledge_import.KnowledgeImport],
):
# load knowledge folders, subfolders by area
for kn_dir in kn_dirs:
for area in Memory.Area:
index = knowledge_import.load_knowledge(
log_item,
files.get_abs_path("knowledge", kn_dir, area.value),
index,
{"area": area.value},
)
# load instruments descriptions
index = knowledge_import.load_knowledge(
log_item,
files.get_abs_path("instruments"),
index,
{"area": Memory.Area.INSTRUMENTS.value},
filename_pattern="**/*.md",
)
return index
async def search_similarity_threshold(
self, query: str, limit: int, threshold: float, filter: str = ""
):
comparator = Memory._get_comparator(filter) if filter else None
# rate limiter
await self.agent.rate_limiter(
model_config=self.agent.config.embeddings_model, input=query
)
return await self.db.asearch(
query,
search_type="similarity_score_threshold",
k=limit,
score_threshold=threshold,
filter=comparator,
)
async def delete_documents_by_query(
self, query: str, threshold: float, filter: str = ""
):
k = 100
tot = 0
removed = []
while True:
# Perform similarity search with score
docs = await self.search_similarity_threshold(
query, limit=k, threshold=threshold, filter=filter
)
removed += docs
# Extract document IDs and filter based on score
# document_ids = [result[0].metadata["id"] for result in docs if result[1] < score_limit]
document_ids = [result.metadata["id"] for result in docs]
# Delete documents with IDs over the threshold score
if document_ids:
# fnd = self.db.get(where={"id": {"$in": document_ids}})
# if fnd["ids"]: self.db.delete(ids=fnd["ids"])
# tot += len(fnd["ids"])
self.db.delete(ids=document_ids)
tot += len(document_ids)
# If fewer than K document IDs, break the loop
if len(document_ids) < k:
break
if tot:
self._save_db() # persist
return removed
async def delete_documents_by_ids(self, ids: list[str]):
# aget_by_ids is not yet implemented in faiss, need to do a workaround
rem_docs = self.db.get_by_ids(ids) # existing docs to remove (prevents error)
if rem_docs:
rem_ids = [doc.metadata["id"] for doc in rem_docs] # ids to remove
await self.db.adelete(ids=rem_ids)
if rem_docs:
self._save_db() # persist
return rem_docs
async def insert_text(self, text, metadata: dict = {}):
doc = Document(text, metadata=metadata)
ids = await self.insert_documents([doc])
return ids[0]
async def insert_documents(self, docs: list[Document]):
ids = [str(uuid.uuid4()) for _ in range(len(docs))]
timestamp = self.get_timestamp()
if ids:
for doc, id in zip(docs, ids):
doc.metadata["id"] = id # add ids to documents metadata
doc.metadata["timestamp"] = timestamp # add timestamp
if not doc.metadata.get("area", ""):
doc.metadata["area"] = Memory.Area.MAIN.value
# rate limiter
docs_txt = "".join(self.format_docs_plain(docs))
await self.agent.rate_limiter(
model_config=self.agent.config.embeddings_model, input=docs_txt
)
self.db.add_documents(documents=docs, ids=ids)
self._save_db() # persist
return ids
def _save_db(self):
Memory._save_db_file(self.db, self.memory_subdir)
@staticmethod
def _save_db_file(db: MyFaiss, memory_subdir: str):
abs_dir = Memory._abs_db_dir(memory_subdir)
db.save_local(folder_path=abs_dir)
@staticmethod
def _get_comparator(condition: str):
def comparator(data: dict[str, Any]):
try:
return eval(condition, {}, data)
except Exception as e:
# PrintStyle.error(f"Error evaluating condition: {e}")
return False
return comparator
@staticmethod
def _score_normalizer(val: float) -> float:
res = 1 - 1 / (1 + np.exp(val))
return res
@staticmethod
def _cosine_normalizer(val: float) -> float:
res = (1 + val) / 2
res = max(
0, min(1, res)
) # float precision can cause values like 1.0000000596046448
return res
@staticmethod
def _abs_db_dir(memory_subdir: str) -> str:
return files.get_abs_path("memory", memory_subdir)
@staticmethod
def format_docs_plain(docs: list[Document]) -> list[str]:
result = []
for doc in docs:
text = ""
for k, v in doc.metadata.items():
text += f"{k}: {v}\n"
text += f"Content: {doc.page_content}"
result.append(text)
return result
@staticmethod
def get_timestamp():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def get_memory_subdir_abs(agent: Agent) -> str: return files.get_abs_path("memory", agent.config.memory_subdir or "default")
def get_custom_knowledge_subdir_abs(agent: Agent) -> str: for dir in agent.config.knowledge_subdirs: if dir != "default": return files.get_abs_path("knowledge", dir) raise Exception("No custom knowledge subdir set")
def reload(): # clear the memory index, this will force all DBs to reload Memory.index = {} `
Seems like a major issue, breaks all browser automation, yet nothing from the dev on this one? Seems it should be high priority, guess I'll wait and see.
it seems browser tool just worked for me. i asked the agent zero to install playwright, and chromium. it just worked then. it all started with the playwright mcp, which did not work by the way due to no support for kali linux, but agent-zero installed dependencies related to it, which has seemed to fix the native browser_tool issue for me.
So, it seems that it is related to kali linux environment dependencies.
New version of browser agent is already in development branch. Will be released any day now.