[Bug]: Playwright Async Python: "target page context or browser has been closed
crawl4ai version
Crawl4AI 0.5.0.post8
Expected Behavior
i was using your pip package perfectly fine but after the Crawl4AI 0.5.0.post8 update after the every 5 ferquest its giving a error "Playwright Async Python: "target page context or browser has been closed"
Current Behavior
"import asyncio import json import logging import aiohttp import requests import os import warnings import urllib3 from bs4 import BeautifulSoup import openai import random import time from typing import List, Dict, Any, Optional, Union from crawl4ai import AsyncWebCrawler from app.utils.proxy_manager import proxy_manager from app.schemas.product_crawl import ProductCrawlItem, ProductInfo
SSL uyarılarını devre dışı bırak
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
User Agent listesi
USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/117.0", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Safari/605.1.15", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 OPR/107.0.0.0" ]
Kabul Edilebilir Dil Başlıkları
ACCEPT_LANGUAGE_HEADERS = [ "tr-TR,tr;q=0.9,en-US;q=0.8,en;q=0.7", "tr;q=0.9,en-US;q=0.8,en;q=0.7", "en-US,en;q=0.9,tr;q=0.8", "tr,en-US;q=0.9,en;q=0.8" ]
async def crawl_website(url: str, browser_path: str, headless: bool = True, user_agent: str = None, viewport: Dict = None, timeout: int = 30, proxy: Dict = None, session = None) -> Optional[str]: """ URL'yi crawl eder ve HTML içeriğini döndürür
Args:
url: Crawl edilecek URL
browser_path: Tarayıcı yolu
headless: Pencersiz mod aktif/pasif
user_agent: Kullanılacak User-Agent
viewport: Tarayıcı görünüm boyutları {'width': 1920, 'height': 1080}
timeout: Zaman aşımı süresi (saniye)
proxy: Proxy ayarları {'http': 'http://user:pass@host:port', 'https': 'http://user:pass@host:port'}
session: aiohttp ClientSession nesnesi
Returns:
HTML içeriği veya None
"""
logger = logging.getLogger(__name__)
# JavaScript snippets - sayfa yüklendikten sonra çalıştırılacak kodlar
js_snippets = [
# Sayfa title'ını al ve konsola yazdır (debugging için)
"console.log('Sayfa başlığı:', document.title);",
# Rastgele scroll davranışı
"window.scrollTo(0, Math.floor(Math.random() * 100));",
"await new Promise(r => setTimeout(r, Math.floor(Math.random() * 500) + 800));",
# Sayfayı insan gibi yavaşça kaydır
"for (let i = 0; i < document.body.scrollHeight; i += Math.floor(Math.random() * 100) + 50) {",
" window.scrollTo(0, i);",
" await new Promise(r => setTimeout(r, Math.floor(Math.random() * 100) + 20));",
"}",
# Sayfanın alt kısmına kaydır
"window.scrollTo(0, document.body.scrollHeight);",
"await new Promise(r => setTimeout(r, Math.floor(Math.random() * 500) + 500));",
# Sayfanın ortasına kaydır
"window.scrollTo(0, document.body.scrollHeight / 2);",
"await new Promise(r => setTimeout(r, Math.floor(Math.random() * 500) + 500));",
# Ürün detayı için tipik tıklama simülasyonu (varsa)
"const detailsButtons = Array.from(document.querySelectorAll('button, a')).filter(el => el.textContent.includes('Detay') || el.textContent.includes('Göster'));",
"if (detailsButtons.length > 0) { detailsButtons[0].click(); }",
"await new Promise(r => setTimeout(r, Math.floor(Math.random() * 1000) + 1000));"
]
# HTTP başlıkları
extra_headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Accept-Language": random.choice(ACCEPT_LANGUAGE_HEADERS),
"Accept-Encoding": "gzip, deflate, br",
"Referer": "https://www.google.com/",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Pragma": "no-cache",
"Cache-Control": "no-cache"
}
# AsyncWebCrawler için parametreler
crawler_params = {
"browser_path": browser_path,
"headless": headless,
"user_agent": user_agent,
"viewport": viewport,
"timeout": timeout,
"stealth": True,
"javascript": True
}
# Session ekle (eğer verilmişse)
if session:
crawler_params["client"] = session
try:
# AsyncWebCrawler ile URL'yi crawl et - retry mekanizması ile
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
# AsyncWebCrawler ile crawl işlemi
async with AsyncWebCrawler(**crawler_params) as crawler:
logger.info(f"Crawling başladı: {url}")
# Sayfayı ziyaret et
result = await crawler.arun(
url=url,
javascript_snippets=js_snippets,
extra_headers=extra_headers,
wait_until="networkidle",
proxy=proxy
)
# Başarılı sonuç alındıysa döngüden çık
if result is not None and hasattr(result, 'html') and result.html:
return result.html
# Başarısız sonuç veya boş içerik - yeniden dene
retry_count += 1
logger.warning(f"İçerik alınamadı, yeniden deneniyor ({retry_count}/{max_retries})")
await asyncio.sleep(random.uniform(2.0, 5.0)) # Artan bekleme süreleri
except Exception as retry_error:
retry_count += 1
logger.warning(f"Crawl hatası, yeniden deneniyor ({retry_count}/{max_retries}): {str(retry_error)}")
if retry_count < max_retries:
await asyncio.sleep(random.uniform(2.0, 5.0)) # Artan bekleme süreleri
else:
raise # Son denemede de hata alınırsa, hatayı yeniden yükselt
# Tüm denemeler başarısız oldu
logger.error(f"Crawl işlemi {max_retries} deneme sonucunda başarısız oldu: {url}")
return None
except Exception as e:
logger.error(f"Crawl işlemi sırasında hata: {str(e)}")
return None
class ProductCrawlService: """ Ürün crawl ve AI analiz servisi """
def __init__(self, custom_proxy_manager=None, api_key=None):
self.logger = logging.getLogger(__name__)
# Eğer özel bir proxy manager verilmişse onu kullan
self.proxy_manager = custom_proxy_manager or proxy_manager
# SambaNova API için anahtarlar
self.api_keys = self._load_api_keys()
self.current_api_index = 0
# Eğer özel API key verilmişse onu kullan
if api_key:
self.specific_api_key = api_key
self.logger.info(f"Özel API key kullanılıyor")
else:
self.specific_api_key = None
# OpenAI/SambaNova client kurulumu
self.setup_ai()
# Aktif API gösterge değişkeni
self.active_api_var = None
# Browser yolu
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(os.path.dirname(current_dir))
self.browser_path = os.path.join(project_root, "browsers")
self.logger.info(f"Browser yolu: {self.browser_path}")
# Playwright ortam değişkenini ayarla
os.environ["PLAYWRIGHT_BROWSERS_PATH"] = self.browser_path
def _load_api_keys(self) -> List[str]:
"""API anahtarlarını yükle"""
try:
# Dosya yolunu belirle
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(os.path.dirname(current_dir))
keys_path = os.path.join(project_root, "api_keys.json")
# Dosya var mı kontrol et
if not os.path.exists(keys_path):
self.logger.warning(f"API anahtarı dosyası bulunamadı: {keys_path}")
return []
# Dosyadan API anahtarlarını yükle
with open(keys_path, "r") as f:
api_keys_data = json.load(f)
# Doğrudan string listesi yerine, yapılandırmayı kontrol et
if isinstance(api_keys_data, dict) and "sambanova" in api_keys_data:
# Yeni API yapılandırma formatı
sambanova_keys = []
for key_data in api_keys_data["sambanova"]:
if key_data.get("enabled", True):
sambanova_keys.append(key_data["api_key"])
# Anahtarları maskele ve logla (güvenlik için)
self.logger.info(f"{len(sambanova_keys)} Sambanova API anahtarı yüklendi")
for i, key in enumerate(sambanova_keys):
if len(key) > 16: # Anahtarın yeterince uzun olduğundan emin ol
masked_key = f"{key[:8]}...{key[-8:]}"
self.logger.debug(f"API #{i+1}: {masked_key}")
return sambanova_keys
else:
# Eski API yapılandırma formatı (doğrudan liste)
self.logger.info(f"{len(api_keys_data)} API anahtarı yüklendi")
return api_keys_data
except Exception as e:
self.logger.error(f"API anahtarları yüklenemedi: {str(e)}")
return []
@classmethod
def _get_all_api_keys(cls) -> List[str]:
"""Tüm SambaNova API anahtarlarını döndürür. Paralel işlemler için kullanılır."""
api_keys = []
try:
# Dosya yolunu belirle
current_file = os.path.abspath(__file__)
current_dir = os.path.dirname(current_file)
project_root = os.path.dirname(os.path.dirname(current_dir))
keys_path = os.path.join(project_root, "api_keys.json")
# Dosya var mı kontrol et
if not os.path.exists(keys_path):
return []
# Dosyadan API anahtarlarını yükle
with open(keys_path, "r") as f:
api_keys_data = json.load(f)
# Format kontrolü yap
if isinstance(api_keys_data, dict) and "sambanova" in api_keys_data:
# Yeni API yapılandırma formatı
for key_data in api_keys_data["sambanova"]:
if key_data.get("enabled", True):
api_keys.append(key_data["api_key"])
else:
# Eski API yapılandırma formatı (doğrudan liste)
api_keys = api_keys_data
except Exception as e:
# Loglamadan sadece boş liste döndür
pass
return api_keys
def setup_ai(self):
"""AI API istemcisini yapılandır"""
try:
# Yeni OpenAI versiyonu için
self.client = openai.OpenAI(
api_key=self.api_keys[self.current_api_index],
base_url="https://api.sambanova.ai/v1"
)
except AttributeError:
# Eski OpenAI versiyonu için
openai.api_key = self.api_keys[self.current_api_index]
openai.api_base = "https://api.sambanova.ai/v1"
self.client = openai
def switch_api(self):
"""API anahtarları arasında geçiş yap"""
try:
if not self.api_keys:
self.logger.warning("API anahtarı bulunamadı, geçiş yapılamıyor")
return
# Sonraki API anahtarına geç
self.current_api_index = (self.current_api_index + 1) % len(self.api_keys)
# OpenAI/SambaNova client'ı yeni anahtar ile oluştur
try:
# Yeni OpenAI versiyonu için
self.client = openai.OpenAI(
api_key=self.api_keys[self.current_api_index],
base_url="https://api.sambanova.ai/v1"
)
except AttributeError:
# Eski OpenAI versiyonu için
openai.api_key = self.api_keys[self.current_api_index]
openai.api_base = "https://api.sambanova.ai/v1"
self.client = openai
# Anahtarı maskele ve logla
current_key = self.api_keys[self.current_api_index]
if len(current_key) > 16:
masked_key = f"{current_key[:8]}...{current_key[-8:]}"
self.logger.info(f"API #{self.current_api_index + 1}'e geçiş yapıldı: {masked_key}")
else:
self.logger.info(f"API #{self.current_api_index + 1}'e geçiş yapıldı")
except Exception as e:
self.logger.error(f"API anahtarı geçişi sırasında hata: {str(e)}")
# Yedek plan: client'ı mümkün olduğunca korumaya çalış
if not hasattr(self, 'client') or self.client is None:
self.setup_ai()
async def initialize_session(self) -> bool:
"""Aiohttp session'ı başlat"""
try:
# Eğer mevcut bir session varsa ve kapalı değilse, önce kapat
if hasattr(self, 'session') and self.session is not None and not self.session.closed:
await self.session.close()
self.logger.info("Mevcut session kapatıldı.")
# Cookie jar oluştur
cookie_jar = aiohttp.CookieJar(unsafe=True)
# Aiohttp için gelişmiş TCP bağlantı limitleri
connector = aiohttp.TCPConnector(
ssl=False, # SSL doğrulamasını devre dışı bırak (self-signed sertifikalar için)
limit=10, # Maksimum eşzamanlı bağlantı sayısı
ttl_dns_cache=300, # DNS önbellek süresi (saniye)
force_close=False, # Bağlantıları tekrar kullan
enable_cleanup_closed=True # Kapatılan soketleri temizle
)
# Daha gelişmiş aiohttp oturumu başlat
self.session = aiohttp.ClientSession(
connector=connector,
cookie_jar=cookie_jar,
timeout=aiohttp.ClientTimeout(
total=60, # Toplam zaman aşımı
connect=20, # Bağlantı kurma zaman aşımı
sock_connect=20, # Soket bağlantı zaman aşımı
sock_read=40 # Soket okuma zaman aşımı
),
headers={
"User-Agent": random.choice(USER_AGENTS),
"Accept-Language": random.choice(ACCEPT_LANGUAGE_HEADERS),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8"
}
)
self.logger.info("Yeni aiohttp session başarıyla başlatıldı.")
return True
except Exception as e:
self.logger.error(f"Session başlatma hatası: {str(e)}")
# Hata olsa bile bir session oluşturmayı dene (basit yapılandırma ile)
try:
self.session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30))
self.logger.info("Basit yapılandırma ile session başlatıldı.")
return True
except Exception as backup_error:
self.logger.error(f"Yedek session başlatma hatası: {str(backup_error)}")
return False
def check_proxy_connection(self, silent=False) -> bool:
"""Proxy bağlantısını kontrol et"""
try:
# Rastgele user agent seç
headers = {
"User-Agent": random.choice(USER_AGENTS),
"Accept-Language": random.choice(ACCEPT_LANGUAGE_HEADERS),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Connection": "keep-alive"
}
# Proxy bağlantısını test et
test_url = "https://api.ipify.org?format=json"
with requests.Session() as session:
response = session.get(
test_url,
proxies=self.proxy_manager.get_requests_proxies(),
timeout=10,
verify=False,
headers=headers
)
ip_data = json.loads(response.text)
if not silent:
self.logger.info(f"Proxy bağlantısı başarılı. IP: {ip_data['ip']}")
return True
except Exception as e:
if not silent:
self.logger.error(f"Proxy bağlantı hatası: {str(e)}")
return False
def get_random_browser_settings(self):
"""Rastgele tarayıcı ayarları oluştur"""
# Rastgele user agent
user_agent = random.choice(USER_AGENTS)
# Rastgele viewport boyutu
viewports = [
{'width': 1366, 'height': 768},
{'width': 1920, 'height': 1080},
{'width': 1536, 'height': 864},
{'width': 1440, 'height': 900},
{'width': 1280, 'height': 720}
]
viewport = random.choice(viewports)
# Rastgele zaman aşımı süresi
timeout = random.randint(30, 50)
return {
'user_agent': user_agent,
'viewport': viewport,
'timeout': timeout,
'locale': 'tr-TR'
}
async def crawl_single_url(self, url_data: Union[ProductCrawlItem, Dict, str], use_proxy: bool = True) -> Optional[ProductCrawlItem]:
"""
Tek bir URL'yi crawl et
Args:
url_data: URL verisi (ProductCrawlItem, Dict veya str olabilir)
use_proxy: Proxy kullanılsın mı?
Returns:
ProductCrawlItem sonucu veya None
"""
try:
# url_data'nın tipini kontrol et ve uygun şekilde URL'yi çıkar
if isinstance(url_data, str):
# Doğrudan string URL verilmiş
url = url_data
item = ProductCrawlItem(url=url)
elif hasattr(url_data, 'url'):
# ProductCrawlItem objesi
url = url_data.url
item = url_data
elif isinstance(url_data, dict) and "url" in url_data:
# Dict formatında veri
url = url_data["url"]
item = ProductCrawlItem(**url_data)
else:
raise ValueError(f"Geçersiz URL verisi formatı: {type(url_data)}")
# Session'ı kontrol et
if not hasattr(self, 'session') or self.session is None or self.session.closed:
await self.initialize_session()
# Proxy kullanımı ve rotasyonu
if use_proxy:
# Her istek için yeni bir proxy seç ve doğrula
proxy_success = await self.switch_and_verify_proxy(max_retries=3, retry_delay=2)
self.logger.info(f"Proxy rotasyonu: {self.proxy_manager.proxy_username} - Başarılı: {proxy_success}")
if proxy_success:
self.session._default_proxy = self.proxy_manager.get_httpx_proxy_config()
self.logger.info(f"URL {url} için aktif proxy: {self.proxy_manager.proxy_username}")
else:
self.logger.warning("Proxy doğrulanamadı, mevcut proxy ile devam ediliyor")
else:
# Proxy kullanımını devre dışı bırak
self.session._default_proxy = None
# Rastgele bekleme ekle - antibot tespitini engellemek için (5-15 saniye arası)
wait_time = random.uniform(5.0, 15.0)
self.logger.info(f"URL {url} için {wait_time:.2f} saniye bekleniyor...")
await asyncio.sleep(wait_time)
# Tarayıcı ayarlarını oluştur
browser_settings = self.get_random_browser_settings()
# Tarayıcı başlatma ve crawling işlemi için maksimum deneme sayısı
max_retries = 3
for retry in range(max_retries):
try:
# Tarayıcı yolunu al
browser_path = os.path.join(os.getcwd(), 'browsers')
self.logger.info(f"Browser yolu: {browser_path}")
# Crawl4AI ile crawling yap
html_content = await crawl_website(
url=url,
browser_path=browser_path,
headless=True,
user_agent=browser_settings['user_agent'],
viewport=browser_settings['viewport'],
timeout=browser_settings['timeout'],
proxy=self.proxy_manager.get_httpx_proxy_config() if use_proxy else None,
session=self.session
)
if html_content:
self.logger.info(f"İçerik başarıyla alındı: {url} ({len(html_content)} karakter)")
return html_content
# İçerik alınamadı, bekle ve tekrar dene
self.logger.warning(f"HTML içeriği alınamadı ({retry+1}/{max_retries}), yeniden deneniyor: {url}")
# Proxy değiştir ve tekrar dene
if use_proxy and retry < max_retries - 1:
await self.switch_and_verify_proxy()
self.logger.info(f"Proxy değiştirildi: {self.proxy_manager.proxy_username}")
# Artan bekleme süresi ekle
await asyncio.sleep(5 * (retry + 1))
except Exception as e:
self.logger.error(f"Tarayıcı hatası ({retry+1}/{max_retries}): {str(e)}")
# Son deneme değilse, tekrar dene
if retry < max_retries - 1:
# Proxy değiştir ve tekrar dene
if use_proxy:
await self.switch_and_verify_proxy()
self.logger.info(f"Proxy değiştirildi: {self.proxy_manager.proxy_username}")
# Artan bekleme süresi ekle
await asyncio.sleep(5 * (retry + 1))
else:
# Basit HTTP isteği ile deneme yap
self.logger.warning(f"Tarayıcı tüm denemelerde başarısız oldu, basit HTTP ile deneniyor: {url}")
try:
simple_html = await self.fetch_url_with_aiohttp(url, use_proxy=use_proxy)
if simple_html:
self.logger.info(f"İçerik basit HTTP ile alındı: {url} ({len(simple_html)} karakter)")
return simple_html
except Exception as http_error:
self.logger.error(f"Basit HTTP isteği hatası: {str(http_error)}")
# Tüm yöntemler başarısız oldu
raise Exception(f"URL crawl edilemedi (tüm yöntemler başarısız): {url}")
# Maksimum deneme sayısına ulaşıldı ve başarısız oldu
raise Exception(f"URL crawl edilemedi (maximum retry sayısına ulaşıldı): {url}")
except Exception as e:
self.logger.error(f"Crawl single URL hatası: {str(e)}")
return None
async def _simple_http_scrape(self, url: str) -> Optional[str]:
"""Basit HTTP isteği ile bir URL'yi scrape et"""
self.logger.info(f"Basit HTTP scraping: {url}")
# Bot algılama sistemlerini atlatmak için rastgele bekleme ekle
await asyncio.sleep(random.uniform(2.0, 5.0))
try:
# Önce senkron requests ile dene (Brotli desteği genellikle varsayılan olarak var)
try:
self.logger.info(f"Requests ile deneniyor: {url}")
# Rastgele user agent ve dil başlıkları
user_agent = random.choice(USER_AGENTS)
accept_language = random.choice(ACCEPT_LANGUAGE_HEADERS)
# Daha gerçekçi HTTP başlıkları
headers = {
"User-Agent": user_agent,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Accept-Language": accept_language,
"Accept-Encoding": "gzip, deflate, br",
"Referer": "https://www.google.com/",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Pragma": "no-cache",
"Cache-Control": "no-cache",
# Modern tarayıcı başlıkları
"Sec-CH-UA": "\"Chromium\";v=\"122\", \"Google Chrome\";v=\"122\", \"Not:A-Brand\";v=\"99\"",
"Sec-CH-UA-Mobile": "?0",
"Sec-CH-UA-Platform": "\"Windows\"",
"Sec-CH-UA-Arch": "\"x86\"",
"Sec-CH-UA-Full-Version-List": "\"Chromium\";v=\"122.0.6261.112\", \"Google Chrome\";v=\"122.0.6261.112\", \"Not:A-Brand\";v=\"99.0.0.0\""
}
# Oturum oluştur (çerezleri saklamak için)
session = requests.Session()
# Proxy kullanarak isteği yap
proxies = self.proxy_manager.get_requests_proxies()
# Önce ana sayfayı ziyaret et - çerezleri almak için
main_domain = url.split('/')[2] # örn. www.akakce.com
main_url = f"https://{main_domain}/"
# Ana sayfaya git
session.get(
main_url,
headers=headers,
proxies=proxies if proxies else None,
timeout=20,
verify=False
)
# Kısa bir bekleme süresi - insan davranışı simülasyonu
time.sleep(random.uniform(1.0, 3.0))
# Asıl ürün sayfasına git
response = session.get(
url,
headers=headers,
proxies=proxies if proxies else None,
timeout=30,
verify=False
)
if response.status_code == 200:
return response.text
elif response.status_code == 403:
self.logger.error(f"Requests HTTP isteği engellendi (403 Forbidden). URL: {url}")
self.logger.info("aiohttp ile alternatif yöntem denenecek...")
else:
self.logger.error(f"Requests HTTP isteği başarısız: {response.status_code}")
except Exception as req_e:
self.logger.error(f"Requests HTTP isteği hatası: {str(req_e)}")
# CloudScraper ile dene
try:
import cloudscraper
self.logger.info(f"CloudScraper ile deneniyor: {url}")
scraper = cloudscraper.create_scraper()
response = scraper.get(
url,
proxies=proxies if proxies else None,
timeout=30,
headers=headers
)
if response.status_code == 200:
return response.text
else:
self.logger.error(f"CloudScraper isteği başarısız: {response.status_code}")
except ImportError:
self.logger.warning("CloudScraper kütüphanesi bulunamadı. 'pip install cloudscraper' ile yükleyebilirsiniz.")
except Exception as cs_e:
self.logger.error(f"CloudScraper hatası: {str(cs_e)}")
# Requests başarısız olursa aiohttp ile dene
# Brotli desteği kontrol et
try:
import brotli
has_brotli = True
except ImportError:
has_brotli = False
self.logger.warning("Brotli kütüphanesi eksik. pip install brotli ile yükleyebilirsiniz.")
# Rastgele user agent ve dil başlıkları
user_agent = random.choice(USER_AGENTS)
accept_language = random.choice(ACCEPT_LANGUAGE_HEADERS)
# Varsayılan başlıklar
headers = {
"User-Agent": user_agent,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Accept-Language": accept_language,
"Referer": "https://www.google.com/",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Pragma": "no-cache",
"Cache-Control": "no-cache",
"DNT": "1"
}
# Brotli desteği varsa ekle, yoksa kaldır
if has_brotli:
headers["Accept-Encoding"] = "gzip, deflate, br"
else:
headers["Accept-Encoding"] = "gzip, deflate"
# Proxy ayarlarını string formatında al
proxies = self.proxy_manager.get_httpx_proxy_config()
proxy_str = None
if proxies:
# İlk proxy'yi al ve string formatında kullan
proxy_values = list(proxies.values())
if proxy_values:
proxy_str = proxy_values[0]
# URL'yi temizle (% işaretlerini düzgün işlemek için)
from urllib.parse import unquote, quote
clean_url = quote(unquote(url), safe=':/?&=')
# Session'ı kontrol et ve gerekirse oluştur
if not hasattr(self, 'session') or self.session is None or self.session.closed:
await self.initialize_session()
try:
# Ana sayfaya git
async with self.session.get(main_url, proxy=proxy_str, ssl=False, headers=headers) as main_response:
await main_response.text()
# Kısa bir bekleme süresi
await asyncio.sleep(random.uniform(1.0, 3.0))
# Asıl ürün sayfasına git
async with self.session.get(clean_url, proxy=proxy_str, ssl=False, headers=headers) as response:
if response.status == 200:
return await response.text()
else:
self.logger.error(f"Aiohttp HTTP isteği başarısız: {response.status}")
return None
except Exception as aio_e:
self.logger.error(f"Aiohttp HTTP isteği hatası: {str(aio_e)}")
return None
except Exception as e:
self.logger.error(f"Simple HTTP scraping hatası: {str(e)}")
return None
def normalize_url(self, url: str) -> str:
"""URL'yi normalize et"""
# URL normalizasyon işlemleri (gerekirse burada yapılabilir)
return url
def extract_product_div(self, html_content: str) -> Optional[str]:
"""HTML içeriğinden ürün div'ini ayıkla"""
try:
soup = BeautifulSoup(html_content, 'html.parser')
# Ana ürün div'ini ara
product_div = soup.find('div', id='pd_v8')
if product_div:
return str(product_div)
# Alternatif div'ler var mı kontrol et
self.logger.warning("Ana ürün div'i bulunamadı, alternatifler aranıyor...")
# Alternatif 1: Data-category içeren div
alt_div = soup.find('div', attrs={'data-category': True})
if alt_div:
return str(alt_div)
# Alternatif 2: Product class'ına sahip div
alt_div = soup.find('div', class_=lambda c: c and 'product' in c.lower())
if alt_div:
return str(alt_div)
# Alternatif 3: Ürün bilgilerini içerebilecek içerik div'i
alt_div = soup.find('div', id=lambda x: x and ('content' in x.lower() or 'main' in x.lower()))
if alt_div:
return str(alt_div)
# Alternatif 4: Fiyat içeren div
price_container = soup.find('div', class_=lambda c: c and ('price' in c.lower() or 'fiyat' in c.lower()))
if price_container:
# Fiyat container'ın en yakın parent elementini bul
parent_div = price_container.find_parent('div')
if parent_div:
return str(parent_div)
# Alternatif 5: Ana içerik container'ı
main_container = soup.find('main') or soup.find('div', id='main') or soup.find('div', id='content')
if main_container:
return str(main_container)
# Alternatif 6: Body etiketinin içeriğini kullan
body = soup.find('body')
if body:
return str(body)
# Hiçbir div bulunamadı
self.logger.error("Hiçbir ürün div'i bulunamadı!")
return None
except Exception as e:
self.logger.error(f"HTML ayıklama hatası: {str(e)}")
return None
async def analyze_with_ai(self, content: str, max_retries=10) -> Dict[str, Any]:
"""SambaNova API ile HTML içeriğini analiz et"""
prompt = f"""
Aşağıdaki HTML içeriğinden ürün bilgilerini çıkar ve JSON formatında döndür.
HTML içeriği ürün detay sayfasının ana div'inden alınmıştır.
Tüm HTML etiketlerini ve yapısını dikkate alarak aşağıdaki bilgileri bul:
ÖNEMLİ: Eğer HTML içeriğinde "Fiyat bulunamadı" veya "Bu ürün satışta değil" gibi ifadeler varsa,
tüm alanları "PROBLEMLİ LİNK" olarak doldur ve hata_aciklamasi'na durumu yaz.
Diğer durumlarda aşağıdaki bilgileri bul:
1. Ürünün tam adı (h1 veya başlık etiketlerinde olabilir)
2. Fiyat bilgisi (sadece rakam, TL ve diğer karakterler olmadan)
3. Kargo durumu (ücretsiz/ücretli - kargo ile ilgili metinleri ara)
4. Satıcı bilgisi (aşağıdaki durumları kontrol et):
a) Eğer şu formatta bir yapı varsa: <span class="v_v8">Satıcı: <img ... alt="X">/Y</span>
- X: img etiketinin alt özelliğindeki değer
- Y: img etiketinden sonra gelen metin (/ işaretinden sonraki kısım)
- Bu durumda "X/Y" formatında birleştir (örn: "Hepsiburada/expinda")
b) Eğer sadece <img ... alt="X"> şeklinde ise:
- Sadece alt özelliğindeki değeri al (örn: "Amazon Türkiye")
c) Eğer sadece düz metin olarak satıcı adı varsa:
- Metni olduğu gibi al
5. Son güncelleme tarihi (güncelleme ile ilgili metinleri ara)
6. Takipçi sayısı:
- Eğer "X+ takip" formatında bir metin varsa, onu al (takip, takipçi gibi metinleri ara, örn: "100+ takip")
- Eğer "Takip et, tüm satıcılardaki indirimlerden haberdar ol." gibi bir metin varsa veya takipçi bilgisi yoksa, "Takipçi yok" yaz
HTML İçeriği:
{content}
SADECE aşağıdaki formatta JSON yanıt ver, başka hiçbir şey yazma:
{{
"urun_adi": "Ürün adı buraya veya PROBLEMLİ LİNK",
"fiyat": "1234.56 veya PROBLEMLİ LİNK",
"kargo": "ücretsiz veya PROBLEMLİ LİNK",
"satici": "Satıcı adı buraya veya PROBLEMLİ LİNK",
"son_guncelleme": "güncelleme tarihi buraya veya PROBLEMLİ LİNK",
"takipci": "X+ takip veya PROBLEMLİ LİNK",
"hata_aciklamasi": "Eğer fiyat bulunamadı veya ürün satışta değil ise bunu belirt"
}}
"""
for attempt in range(max_retries):
try:
response = await asyncio.to_thread(
lambda: self.client.chat.completions.create(
model='Meta-Llama-3.1-8B-Instruct',
messages=[
{"role": "system", "content": "Sen bir e-ticaret ürün analiz asistanısın. SADECE JSON formatında yanıt ver, başka hiçbir şey yazma."},
{"role": "user", "content": prompt}
],
temperature=0.1,
top_p=0.1
)
)
# AI'dan gelen yanıtı al
ai_response = response.choices[0].message.content.strip()
# Yanıtın başında ve sonunda fazladan metin varsa temizle
ai_response = ai_response.strip('`') # Markdown backtick'leri temizle
# JSON başlangıcını ve sonunu bul
start_idx = ai_response.find('{')
end_idx = ai_response.rfind('}') + 1
if start_idx != -1 and end_idx != 0:
json_str = ai_response[start_idx:end_idx]
try:
return json.loads(json_str)
except json.JSONDecodeError:
# JSON format hatasını düzeltmeye çalış
try:
# Gereksiz virgülleri temizle
json_str = json_str.replace('",\n "', '",\n ')
# Boş değerleri düzelt
json_str = json_str.replace('""', '"-"')
# Tekrar dene
return json.loads(json_str)
except json.JSONDecodeError:
# Regex ile değerleri çıkarmayı dene
import re
try:
urun_adi = re.search(r'"urun_adi":\s*"([^"]+)"', json_str)
fiyat = re.search(r'"fiyat":\s*"([^"]+)"', json_str)
kargo = re.search(r'"kargo":\s*"([^"]+)"', json_str)
satici = re.search(r'"satici":\s*"([^"]+)"', json_str)
son_guncelleme = re.search(r'"son_guncelleme":\s*"([^"]+)"', json_str)
takipci = re.search(r'"takipci":\s*"([^"]+)"', json_str)
takipci_value = takipci.group(1) if takipci else "Takipçi yok"
# Eğer "Takip et" içeriyorsa "Takipçi yok" olarak değiştir
if "Takip et" in takipci_value:
takipci_value = "Takipçi yok"
return {
"urun_adi": urun_adi.group(1) if urun_adi else "HATA",
"fiyat": fiyat.group(1) if fiyat else "HATA",
"kargo": kargo.group(1) if kargo else "HATA",
"satici": satici.group(1) if satici else "HATA",
"son_guncelleme": son_guncelleme.group(1) if son_guncelleme else "HATA",
"takipci": takipci_value,
"hata_aciklamasi": "JSON düzeltildi"
}
except Exception as e:
self.logger.warning(f"JSON düzeltme başarısız: {str(e)}")
self.logger.warning(f"Ham veri: {json_str}")
# Diğer API'ye geç ve tekrar dene
self.switch_api()
continue
self.logger.warning(f"JSON formatı bulunamadı: {ai_response}")
# Diğer API'ye geç ve tekrar dene
self.switch_api()
continue
except Exception as e:
error_str = str(e)
if "rate_limit_exceeded" in error_str or "error" in error_str.lower():
# Herhangi bir hata durumunda diğer API'ye geç
self.switch_api()
continue
self.logger.error(f"SambaNova API hatası: {error_str}")
# Diğer API'ye geç ve tekrar dene
self.switch_api()
continue
# Tüm denemeler başarısız oldu
return {
"urun_adi": "HATA",
"fiyat": "HATA",
"kargo": "HATA",
"satici": "HATA",
"son_guncelleme": "HATA",
"takipci": "HATA",
"hata_aciklamasi": "Tüm API'ler denendi fakat sonuç alınamadı"
}
async def crawl_urls(self, urls: List[ProductCrawlItem], use_proxy: bool = True, batch_size: int = 5) -> List[ProductInfo]:
"""URL'leri crawl et ve AI ile analiz et"""
# Batch boyutunu sınırla (çok fazla paralel istek yapmamak için)
batch_size = min(batch_size, 3) # En fazla 3 URL'yi paralel işle
# Session'ı başlat
session_ok = await self.initialize_session()
if not session_ok:
self.logger.error("Session başlatılamadı, işlem durduruluyor!")
return []
# İşlem başlamadan önce proxy kontrolü yap - yeniden deneme mekanizması ile
if use_proxy:
proxy_ready = False
for attempt in range(3): # 3 deneme yap
if await self.switch_and_verify_proxy(): # Her zaman rotasyon için
proxy_ready = True
break
self.logger.warning(f"Proxy bağlantısı kurulamadı. {attempt+1}/3 deneme...")
await asyncio.sleep(5) # 5 saniye bekle
if not proxy_ready:
self.logger.warning("Proxy bağlantısı kurulamadı, proxy olmadan devam ediliyor!")
results = []
total_urls = len(urls)
processed = 0
failed_urls = []
consecutive_errors = 0
max_consecutive_errors = 5 # Maksimum ardışık hata sayısı
# URL'leri normalize et
normalized_urls = []
for url_data in urls:
normalized_url = self.normalize_url(url_data.url)
# Eğer URL değiştiyse, log kaydet
if normalized_url != url_data.url:
self.logger.info(f"URL normalize edildi: {url_data.url} -> {normalized_url}")
new_url_data = url_data.copy()
new_url_data.url = normalized_url
normalized_urls.append(new_url_data)
self.logger.info(f"Toplam {total_urls} URL işlenecek. Batch boyutu: {batch_size}")
try:
# URL'leri batch'lere böl
batches = [normalized_urls[i:i + batch_size] for i in range(0, len(normalized_urls), batch_size)]
# Her batch için crawl işlemi yap
for batch_index, batch in enumerate(batches):
self.logger.info(f"Batch {batch_index+1}/{len(batches)} işleniyor...")
# Batch başında uzun bir bekleme süresi ekle (bu süreci daha "insansı" yapar)
batch_wait = random.uniform(20.0, 40.0)
self.logger.info(f"Yeni batch başlatılıyor, {batch_wait:.2f} saniye bekleniyor...")
await asyncio.sleep(batch_wait)
# Batch içindeki URL'leri işle
for url_data in batch:
# İlerleme bilgisi
processed += 1
progress_percent = (processed / total_urls) * 100
self.logger.info(f"İşlenen URL: {processed}/{total_urls} (%{progress_percent:.1f})")
# URL işlem başlangıcı
start_time = time.time()
try:
# Her URL için yeni bir proxy rotasyonu
if use_proxy:
proxy_success = await self.switch_and_verify_proxy(max_retries=3, retry_delay=2)
if not proxy_success:
consecutive_errors += 1
self.logger.warning(f"Proxy doğrulanamadı ({consecutive_errors}/{max_consecutive_errors}), mevcut proxy ile devam ediliyor: {url_data.url}")
if consecutive_errors >= max_consecutive_errors:
self.logger.error("Maksimum ardışık hata sayısına ulaşıldı!")
# Bir süre bekle ve tekrar dene
await asyncio.sleep(60) # 60 saniye bekle
consecutive_errors = 0 # Hata sayacını sıfırla
else:
consecutive_errors = 0 # Proxy doğrulama başarılı, sayacı sıfırla
# URL'yi crawl et
html_content = await self.crawl_single_url(url_data, use_proxy=use_proxy)
if not html_content:
self.logger.error(f"HTML içeriği alınamadı: {url_data.url}")
failed_urls.append(url_data)
results.append(ProductInfo(
arama_terimi=url_data.arama_terimi,
url=url_data.url,
urun_adi="HTML YOK",
fiyat="HTML YOK",
kargo="HTML YOK",
satici="HTML YOK",
son_guncelleme="HTML YOK",
takipci="HTML YOK",
hata_aciklamasi="HTML içeriği alınamadı"
))
continue
# HTML içeriğinden ürün div'ini ayıkla
product_html = self.extract_product_div(html_content)
if not product_html:
# Ürün div'i bulunamadı
results.append(ProductInfo(
arama_terimi=url_data.arama_terimi,
url=url_data.url,
urun_adi="DIV YOK",
fiyat="DIV YOK",
kargo="DIV YOK",
satici="DIV YOK",
son_guncelleme="DIV YOK",
takipci="DIV YOK",
hata_aciklamasi="Ürün div'i bulunamadı"
))
continue
# HTML içeriğini AI ile analiz et
ai_analysis = await self.analyze_with_ai(product_html)
# Problemli link kontrolü
if ai_analysis.get("urun_adi") == "PROBLEMLİ LİNK" or ai_analysis.get("fiyat") == "PROBLEMLİ LİNK":
self.logger.warning(f"Problemli link tespit edildi: {url_data.url}")
results.append(ProductInfo(
arama_terimi=url_data.arama_terimi,
url=url_data.url,
urun_adi="PROBLEMLİ LİNK",
fiyat="PROBLEMLİ LİNK",
kargo="PROBLEMLİ LİNK",
satici="PROBLEMLİ LİNK",
son_guncelleme="PROBLEMLİ LİNK",
takipci="PROBLEMLİ LİNK",
hata_aciklamasi=ai_analysis.get("hata_aciklamasi", "Problemli ürün sayfası")
))
continue
# Analiz sonuçlarını sonuç listesine ekle
results.append(ProductInfo(
arama_terimi=url_data.arama_terimi,
url=url_data.url,
urun_adi=ai_analysis.get("urun_adi", ""),
fiyat=ai_analysis.get("fiyat", ""),
kargo=ai_analysis.get("kargo", ""),
satici=ai_analysis.get("satici", ""),
son_guncelleme=ai_analysis.get("son_guncelleme", ""),
takipci=ai_analysis.get("takipci", ""),
hata_aciklamasi=ai_analysis.get("hata_aciklamasi", "")
))
# İşlem süresini hesapla
end_time = time.time()
processing_time = end_time - start_time
self.logger.info(f"Başarıyla tamamlandı: {url_data.url} ({processing_time:.2f} saniye)")
# URL'ler arasında rastgele bekleme süresi (8-15 saniye)
if url_data != batch[-1]: # Son URL değilse
url_wait = random.uniform(8.0, 15.0)
self.logger.info(f"Sonraki URL için {url_wait:.2f} saniye bekleniyor...")
await asyncio.sleep(url_wait)
except Exception as e:
if 'Executable doesn\'t exist' in str(e):
self.logger.error("Tarayıcı bulunamadı hatası! Program 'browsers' klasöründe Chrome bulamıyor.")
return []
else:
self.logger.error(f"Crawl sırasında hata: {str(e)}")
# Hata bilgisini sonuçlara ekle
results.append(ProductInfo(
arama_terimi=url_data.arama_terimi,
url=url_data.url,
urun_adi="HATA",
fiyat="HATA",
kargo="HATA",
satici="HATA",
son_guncelleme="HATA",
takipci="HATA",
hata_aciklamasi=f"Crawl hatası: {str(e)}"
))
# Batch sonunda uzun bir bekleme ekle (antibot koruma için)
if batch_index < len(batches) - 1: # Son batch değilse
between_batch_wait = random.uniform(30.0, 60.0)
self.logger.info(f"Batch {batch_index+1} tamamlandı. Sonraki batch için {between_batch_wait:.2f} saniye bekleniyor...")
await asyncio.sleep(between_batch_wait)
return results
except Exception as e:
self.logger.error(f"Crawl URLs genel hatası: {str(e)}")
return results
async def switch_and_verify_proxy(self, max_retries=3, retry_delay=2) -> bool:
"""Proxy bağlantısını kontrol et ve gerekirse değiştir"""
# Her zaman proxy değişimini zorla
if len(self.proxy_manager.active_proxies) > 1:
self.proxy_manager.rotate_proxy()
self.logger.info(f"Proxy değiştirildi: {self.proxy_manager.proxy_username}")
else:
self.logger.warning("Sadece 1 aktif proxy bulunduğu için rotasyon yapılamadı")
# Geçerli proxy'yi test et
for attempt in range(1, max_retries + 1):
try:
# Test URL'leri - farklı alternatifler kullan
test_urls = [
"https://api.ipify.org?format=json",
"https://ifconfig.me/ip",
"https://checkip.amazonaws.com"
]
# Rastgele bir test URL'si seç
test_url = random.choice(test_urls)
# Rastgele user agent seç
headers = {
"User-Agent": random.choice(USER_AGENTS),
"Accept-Language": random.choice(ACCEPT_LANGUAGE_HEADERS),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Connection": "keep-alive"
}
with requests.Session() as session:
proxies = self.proxy_manager.get_requests_proxies()
self.logger.info(f"Proxy kontrol ediliyor: {proxies['http']}")
response = session.get(
test_url,
proxies=proxies,
timeout=10,
verify=False,
headers=headers
)
if response.status_code == 200:
# API yanıtı servisine göre işle
if "api.ipify.org" in test_url:
ip_data = json.loads(response.text)
self.logger.info(f"Aktif Proxy IP: {ip_data.get('ip', 'Bilinmiyor')}")
else:
# Diğer hizmetler sadece IP döndürür
self.logger.info(f"Aktif Proxy IP: {response.text.strip()}")
return True
else:
self.logger.warning(f"Proxy testi başarısız: HTTP {response.status_code}")
# Yeni bir proxy dene
if len(self.proxy_manager.active_proxies) > 1 and attempt < max_retries:
self.proxy_manager.rotate_proxy()
self.logger.info(f"Yeni proxy deneniyor: {self.proxy_manager.proxy_username}")
except Exception as e:
self.logger.error(f"Proxy kontrolü hatası (Deneme {attempt}/{max_retries}): {str(e)}")
# Yeni bir proxy dene
if len(self.proxy_manager.active_proxies) > 1 and attempt < max_retries:
self.proxy_manager.rotate_proxy()
self.logger.info(f"Hata sonrası yeni proxy: {self.proxy_manager.proxy_username}")
# Son deneme değilse bekle ve tekrar dene
if attempt < max_retries:
self.logger.info(f"{retry_delay} saniye beklenip yeniden denenecek...")
await asyncio.sleep(retry_delay)
retry_delay *= 1.5 # Her denemede bekleme süresini artır
self.logger.error("Tüm proxy denemeleri başarısız oldu!")
return False
def get_api_key(self):
"""Sambanova API anahtarını döndür (sıradaki key'e geç)"""
# Eğer belirli bir API key verilmişse daima onu kullan
if hasattr(self, 'specific_api_key') and self.specific_api_key:
return self.specific_api_key
# API anahtarları yüklenmemişse veya boşsa None döndür
if not self.api_keys or len(self.api_keys) == 0:
return None
# Sıradaki API anahtarını al
api_key = self.api_keys[self.current_api_index]
# Sonraki anahtar için indeksi güncelle (döngüye girsin)
self.current_api_index = (self.current_api_index + 1) % len(self.api_keys)
return api_key
@classmethod
async def parallel_crawl_urls(cls, urls: List[ProductCrawlItem], worker_count: int = 3) -> List[ProductInfo]:
"""
URL listesini paralel olarak crawl eder. Her bir worker ayrı bir proxy kullanır.
Args:
urls: Crawl edilecek URL'ler
worker_count: Paralel çalışacak worker sayısı (varsayılan: 3)
Returns:
Crawl edilen ürün bilgileri listesi
"""
logger = logging.getLogger(__name__)
if not urls:
logger.warning("Crawl edilecek URL bulunamadı")
return []
logger.info(f"Paralel crawling başlatılıyor. {len(urls)} URL, {worker_count} worker ile işlenecek.")
# API anahtarlarını al
api_keys = cls._get_all_api_keys()
if not api_keys:
logger.error("API anahtarları yüklenemedi. Crawling yapılamayacak.")
return []
logger.info(f"{len(api_keys)} API anahtarı yüklendi")
# Worker proxy'leri oluştur
from app.utils.proxy_manager import ProxyManager
worker_proxies = ProxyManager.create_worker_proxies(worker_count=worker_count, logger=logger)
if not worker_proxies:
logger.error("Worker proxy'leri oluşturulamadı. Crawling yapılamayacak.")
return []
# URL'leri worker'lara dağıt
worker_count = len(worker_proxies)
urls_per_worker = len(urls) // worker_count + (1 if len(urls) % worker_count > 0 else 0)
# Her worker için bir task oluştur
tasks = []
for i in range(worker_count):
# Bu worker'a atanacak URL'leri belirle
start_idx = i * urls_per_worker
end_idx = min(start_idx + urls_per_worker, len(urls))
worker_urls = urls[start_idx:end_idx]
if not worker_urls:
continue
# Bu worker için API key seç
api_key = api_keys[i % len(api_keys)]
# Worker için crawler servisi oluştur
worker_service = cls(
custom_proxy_manager=worker_proxies[i],
api_key=api_key
)
# Worker'ın açıklayıcı ismi
worker_name = f"Worker-{i+1}/{worker_count}"
logger.info(f"{worker_name}: {len(worker_urls)} URL için crawl başlatılıyor (Proxy: {worker_proxies[i].proxy_username}, API: {api_key[:8]}...)")
# Task oluştur
task = asyncio.create_task(worker_service.crawl_urls(
urls=worker_urls,
use_proxy=True,
batch_size=5 # Her worker için batch boyutu
))
tasks.append(task)
# Tüm worker'ları bekle
results = []
try:
# Tüm worker'ları paralel çalıştır
worker_results = await asyncio.gather(*tasks, return_exceptions=True)
# Worker sonuçlarını işle
for i, result in enumerate(worker_results):
if isinstance(result, Exception):
logger.error(f"Worker-{i+1} hata ile sonlandı: {str(result)}")
else:
logger.info(f"Worker-{i+1}: {len(result)} sonuç döndürdü")
results.extend(result)
except Exception as e:
logger.error(f"Paralel crawling işlemi sırasında hata: {str(e)}")
logger.info(f"Paralel crawling tamamlandı. Toplam {len(results)} sonuç.")
return results
async def fetch_url_with_aiohttp(self, url: str, use_proxy: bool = True) -> Optional[str]:
"""Aiohttp kullanarak URL'nin HTML içeriğini alır"""
self.logger.info(f"Aiohttp ile isteniyor: {url}")
try:
# Session'ı kontrol et ve gerekirse oluştur
if not hasattr(self, 'session') or self.session is None or self.session.closed:
await self.initialize_session()
# Proxy ayarını belirle
proxy = None
if use_proxy:
proxy_config = self.proxy_manager.get_httpx_proxy_config()
if proxy_config and "http" in proxy_config:
proxy = proxy_config["http"]
# Rastgele user agent ve başlıklar
headers = {
"User-Agent": random.choice(USER_AGENTS),
"Accept-Language": random.choice(ACCEPT_LANGUAGE_HEADERS),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Referer": "https://www.google.com/",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"DNT": "1"
}
# URL'yi temizle
from urllib.parse import unquote, quote
clean_url = quote(unquote(url), safe=':/?&=')
# İlk olarak ana domain sayfasını ziyaret et (çerezleri almak için)
try:
main_domain = url.split('/')[2] # örn. www.akakce.com
main_url = f"https://{main_domain}/"
async with self.session.get(
main_url,
proxy=proxy,
ssl=False,
headers=headers,
timeout=20
) as main_response:
await main_response.text()
# Kısa bir bekleme süresi
await asyncio.sleep(random.uniform(1.0, 3.0))
except Exception as main_error:
self.logger.warning(f"Ana domain ziyareti sırasında hata: {str(main_error)}")
# Asıl URL'yi ziyaret et
async with self.session.get(
clean_url,
proxy=proxy,
ssl=False,
headers=headers,
timeout=30
) as response:
if response.status == 200:
html_content = await response.text()
self.logger.info(f"Aiohttp ile içerik alındı: {url} ({len(html_content)} karakter)")
return html_content
else:
self.logger.error(f"Aiohttp isteği başarısız: HTTP {response.status}")
return None
except asyncio.TimeoutError:
self.logger.error(f"Aiohttp zaman aşımı: {url}")
return None
except Exception as e:
self.logger.error(f"Aiohttp isteği sırasında hata: {str(e)}")
return None
Singleton ürün crawl servisi
product_crawl_service = ProductCrawlService()
Is this reproducible?
Yes
Inputs Causing the Bug
Steps to Reproduce
Code snippets
OS
windows
Python version
3.12.2
Browser
chrome
Browser version
No response
Error logs & Screenshots (if applicable)
No response