crawl4ai
crawl4ai copied to clipboard
[Bug]: Batch Processing with MemoryAdaptiveDispatcher Missing Crawl Outputs
crawl4ai version
0.5.0
Expected Behavior
All 680 product URLs passed to crawler.arun_many() should produce a corresponding result.extracted_content if the crawling and extraction process succeeds. Each result should be saved to MongoDB as a new document. I have 680 unique URL's that should output 680 unique MongoDB documents.
Current Behavior
I have 680 unique URL's that output 540 MongoDB documents. The MemoryAdaptiveDispatcher is not indicating any errors processing the URL's and all URL's were displayed to be processed. Is this a concurrent processing issue or a race condition issue?
Is this reproducible?
Yes
Inputs Causing the Bug
- A list of 680 product URLs loaded from product_links.json
- This bug happens with any large URL set that I run concurrent processing on.
Steps to Reproduce
Code snippets
import os
import asyncio
import json
from pydantic import BaseModel
from datetime import datetime, UTC
from crawl4ai import (
AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode,
LLMConfig, CrawlerMonitor, DisplayMode
)
from crawl4ai.async_dispatcher import MemoryAdaptiveDispatcher
from crawl4ai.extraction_strategy import LLMExtractionStrategy
from pymongo import MongoClient
# MongoDB connection
client = MongoClient("YOUR_MONGO_URI")
collection = client["your_db"]["your_collection"]
class ProductMetadata(BaseModel):
# Sample Schema
async def main():
# Load URLs
with open('product_links.json', 'r') as f:
product_urls = json.load(f).get('validated_links', [])
browser_cfg = BrowserConfig(
browser_type="chromium",
headless=True,
java_script_enabled=True
)
llm_strategy = LLMExtractionStrategy(
llm_config=LLMConfig(
provider="openai/gpt-4o",
api_token="YOUR_API_KEY",
base_url="https://your-llm-api-url.com"
),
schema=ProductMetadata.model_json_schema(),
extraction_type="schema",
instruction="(Sanitized prompt omitted for brevity)",
chunk_token_threshold=1000,
apply_chunking=False,
input_format="markdown",
extra_args={"temperature": 0.1, "max_tokens": 1000}
)
crawl_config = CrawlerRunConfig(
extraction_strategy=llm_strategy,
cache_mode=CacheMode.BYPASS,
stream=True
)
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=80.0,
check_interval=1.0,
max_session_permit=10,
monitor=CrawlerMonitor(enable_ui=True)
)
async with AsyncWebCrawler(config=browser_cfg) as crawler:
async for result in await crawler.arun_many(
urls=product_urls,
config=crawl_config,
dispatcher=dispatcher
):
if result.success:
try:
data = json.loads(result.extracted_content)
if isinstance(data, list):
data = max(data, key=lambda x: sum(1 for v in x.values() if v and v != "N/A"))
data['extraction_date'] = datetime.now(UTC).isoformat()
data['source_url'] = result.url
existing = collection.find_one({"product_link": data.get("product_link")})
if existing:
collection.update_one({"_id": existing["_id"]}, {"$set": data})
print(f"Updated: {data.get('product_name')}")
else:
collection.insert_one(data)
print(f"Inserted: {data.get('product_name')}")
print(f"Memory Usage: {result.dispatch_result.memory_usage:.1f}MB")
except Exception as e:
print(f"Processing error: {e}")
else:
print(f"Failed to crawl: {result.error_message}")
if __name__ == "__main__":
asyncio.run(main())
OS
macOS
Python version
3.13.2
Browser
Chrome
Browser version
No response
Error logs & Screenshots (if applicable)
No Error Logs.