Crawl4AI in Streamlit
I'm trying to create a markdown of an URL provided via an input in Streamlit app, and expose said markdown in an expanded box afterwards.
Executing crawl4AI inside Streamlit produces an error, I can't create a dropdown box in streamlit using the content. How can I use the output makrdown in streamlit?
I am unable to under the exact issue. Please share the error that you re getting with using Crawl4AI. Thanks for trying crawl4ai.
@kloskas To help you, please create a simple code snippet using Streamlit and Crawl4ai that gets the URL from a text input and returns the markdown. Use the same code you have used before. This way, it will be easy to identify what went wrong.
What is the expected outcome of this task? @aravindkarnam
I am unable to understand the requirement of create a dropdown box in streamlit using the content. If it is to create a demo streamlit to show markdown of scraped webpage, this sounds easy and I can take this over the weekend
Here's a code snippet shared by one of our community members on discord. I could verify that it works, so you could try this out. Closing this issue.
import streamlit as st
import asyncio
import nest_asyncio
import os
from pathlib import Path
import json
import xml.etree.ElementTree as ET
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from crawl4ai import AsyncWebCrawler, CacheMode
from crawl4ai import CrawlerMonitor, DisplayMode
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
from crawl4ai.content_filter_strategy import PruningContentFilter
from crawl4ai.async_dispatcher import MemoryAdaptiveDispatcher
nest_asyncio.apply()
def create_pruning_filter():
"""Create and return a PruningContentFilter instance."""
return PruningContentFilter(
threshold=0.1,
threshold_type="dynamic",
min_word_threshold=5
)
def create_markdown_generator(prune_filter):
"""Create and return a DefaultMarkdownGenerator instance."""
return DefaultMarkdownGenerator(
content_filter=prune_filter,
options={
"ignore_links": True,
"escape_html": False,
"body_width": 50,
"ignore_images": True,
"ignore_tables": True
}
)
def create_dispatcher():
"""Create and return a MemoryAdaptiveDispatcher instance."""
return MemoryAdaptiveDispatcher(
memory_threshold_percent=70.0,
check_interval=1.0,
max_session_permit=10
#monitor=CrawlerMonitor(
#display_mode=DisplayMode.DETAILED
)
def create_crawler_config(md_generator):
"""Create and return a CrawlerRunConfig instance."""
return CrawlerRunConfig(
markdown_generator=md_generator,
word_count_threshold=10,
exclude_external_links=True,
process_iframes=True,
remove_overlay_elements=True,
exclude_social_media_links=True,
check_robots_txt=True,
semaphore_count=3,
#cache_mode=CacheMode.ENABLED
)
def create_output_directory():
"""Create the output directory if it doesn't exist."""
output_dir = "markdown_outputs"
os.makedirs(output_dir, exist_ok=True)
return output_dir
async def fetch_sitemap(url):
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(url)
return result.html if result.success else None
async def extract_urls_from_sitemap(sitemap_url):
xml_content = await fetch_sitemap(sitemap_url)
if not xml_content:
return []
try:
root = ET.fromstring(xml_content)
namespaces = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'}
return [url.text for url in root.findall(".//ns:loc", namespaces)]
except ET.ParseError:
print(f"⚠️ Sitemap at {sitemap_url} is invalid. Skipping...")
return []
def extract_internal_links(base_url):
"""Extract all relevant internal links in the order they appear."""
try:
response = requests.get(base_url)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
base_domain = urlparse(base_url).netloc
filtered_links = []
seen_urls = set()
exclusion_keywords = ['#', 'signup', 'login', 'contact', 'help', 'terms', 'privacy', 'copyright', 'contrib', 'index']
links = soup.find_all('a')
for link in links:
relative_url = link.get('href')
if relative_url:
absolute_url = urljoin(base_url, relative_url)
parsed_url = urlparse(absolute_url)
# Ensure it's an internal link, doesn't match exclusion patterns, and is unique
if (parsed_url.netloc == base_domain and
absolute_url not in seen_urls and
not any(keyword in absolute_url for keyword in exclusion_keywords)):
filtered_links.append(absolute_url)
seen_urls.add(absolute_url)
return filtered_links
except requests.exceptions.RequestException as e:
print(f"⚠️ Error fetching {base_url}: {e}")
return []
async def discover_urls(base_url):
"""Discover URLs using both sitemap.xml and ordered internal link crawling."""
sitemap_url = f"{base_url}/sitemap.xml"
sitemap_links = await extract_urls_from_sitemap(sitemap_url)
if not sitemap_links:
print("⚠️ No sitemap found! Extracting internal links instead (ordered)...")
sitemap_links = extract_internal_links(base_url) # Now preserves order
print(f"🔍 Total Pages Found: {len(sitemap_links)}")
return sitemap_links
async def process_url(crawler, url, config, dispatcher, output_dir):
result = await crawler.arun(url=url, config=config, dispatcher=dispatcher)
if result.success:
file_safe_url = url.split('/')[-1] or "homepage"
raw_md_file = os.path.join(output_dir, f"result_raw_{file_safe_url}.md")
filtered_md_file = os.path.join(output_dir, f"result_filtered_{file_safe_url}.md")
st.write(f"🔍 Processing {url}")
st.write(f"📄 Raw Markdown Length: {len(result.markdown)}")
st.write(f"⚠️ Filtered Markdown Length: {len(result.markdown.fit_markdown)}")
with open(raw_md_file, "w", encoding="utf-8") as f:
f.write(result.markdown)
with open(filtered_md_file, "w", encoding="utf-8") as f:
f.write(result.markdown.fit_markdown)
return raw_md_file, filtered_md_file
return None, None
# Streamlit App
def main():
st.title("🌐 URL to Markdown Converter")
# User input for base URL
base_url = st.text_input("Enter the base URL to fetch content from", "")
# Button to fetch URLs
if st.button("Fetch URLs"):
if not base_url:
st.error("Please enter a valid URL.")
return
st.info("Fetching URLs... This may take a few seconds.")
async def fetch_links():
urls = await discover_urls(base_url)
return urls
urls = asyncio.run(fetch_links())
if urls:
st.session_state["urls"] = urls
st.session_state["url_selection_made"] = False # Reset selection
st.success(f"✅ Found {len(urls)} URLs!")
else:
st.error("❌ No URLs found. Please check the website's structure.")
# Show discovered URLs and allow selection
if "urls" in st.session_state:
urls = st.session_state["urls"]
selected_range = st.text_input("Enter URL range to process (e.g., 1-3, 5, 7-9):", "")
# Process range selection
if st.button("Select URLs"):
try:
selected_indices = []
for part in selected_range.split(","):
part = part.strip()
if "-" in part:
start, end = map(int, part.split("-"))
selected_indices.extend(range(start, end + 1))
else:
selected_indices.append(int(part))
selected_indices = [i - 1 for i in selected_indices if 1 <= i <= len(urls)]
st.session_state["selected_urls"] = [urls[i] for i in selected_indices]
st.session_state["url_selection_made"] = True
st.success(f"Selected {len(selected_indices)} URLs for processing.")
except ValueError:
st.error("Invalid input. Please enter numbers in the correct format.")
# Process selected URLs
if "url_selection_made" in st.session_state and st.session_state["url_selection_made"]:
browser_config = BrowserConfig(verbose=True)
output_dir = create_output_directory()
prune_filter = create_pruning_filter()
md_generator = create_markdown_generator(prune_filter)
dispatcher = create_dispatcher()
config = create_crawler_config(md_generator)
async def run_crawler():
async with AsyncWebCrawler() as crawler:
tasks = [process_url(crawler, url, config, dispatcher, output_dir) for url in st.session_state["selected_urls"]]
results = await asyncio.gather(*tasks)
for raw_md_path, filtered_md_path in results:
if raw_md_path and filtered_md_path:
with open(raw_md_path, "r", encoding="utf-8") as f:
raw_content = f.read()
st.markdown("### 🔹 Raw Markdown")
st.code(raw_content, language="markdown")
st.download_button("Download Raw Markdown", raw_content, file_name=Path(raw_md_path).name, mime="text/markdown")
with open(filtered_md_path, "r", encoding="utf-8") as f:
filtered_content = f.read()
st.markdown("### 🔹 Filtered Markdown")
st.code(filtered_content, language="markdown")
st.download_button("Download Filtered Markdown", filtered_content, file_name=Path(filtered_md_path).name, mime="text/markdown")
asyncio.run(run_crawler())
if __name__ == "__main__":
nest_asyncio.apply()
main()