R2R
R2R copied to clipboard
Add ScraperPipeline
trafficstars
The task for this issue is to add a pipeline which facilitates scraping of provided URLs.
The class should confirm to typical pipeline format seen elsewhere in the codebase and should provide basic support for scraping target URLs.
Example script to start from:
import json
import logging
import os
import fire
from scrapy.crawler import CrawlerProcess
from sciphi.database.content import (
ShardContentWriter,
ShardedContentReader,
ShardedContentWriter,
)
from sciphi.database.search import ShardedSearchReader
from sciphi.database.utils import hash_to_shard
from sciphi.search.search_parser import (
extract_answer_box,
extract_knowledge_graph,
extract_organic_results,
extract_questions_and_answers,
extract_related_questions,
)
from sciphi.search.splash_spider import SplashSpider
logger = logging.getLogger(__name__)
pdfminer_logger = logging.getLogger("pdfminer")
pdfminer_logger.setLevel(logging.WARNING)
def extract_all_urls_from_search_data(search_data):
urls = []
urls.extend(extract_related_questions(search_data))
urls.extend(extract_answer_box(search_data))
urls.extend(extract_knowledge_graph(search_data))
urls.extend(extract_organic_results(search_data))
urls.extend(extract_questions_and_answers(search_data))
return urls
class Scraper:
def __init__(
self,
root_path=None,
scraped_db_rel_path="data/scraped_v2/",
search_db_rel_path="data/search/",
debug=False,
):
if root_path is None:
root_path = os.environ["HOME"] # On Unix and Linux systems
self.scraped_db_path = os.path.join(root_path, scraped_db_rel_path)
self.search_db_path = os.path.join(root_path, search_db_rel_path)
logging.basicConfig(level=logging.DEBUG if debug else logging.INFO)
def scrape(
self,
pipeline="sciphi.search.splash_spider.CollectAllPipeline",
splash_url="http://localhost:8050",
tag="v0",
shard_index=0,
limit=1_000_000,
skip_proc_queries=True,
):
logging.getLogger("scrapy").setLevel(logging.WARNING)
with ShardedSearchReader(self.search_db_path) as search_db:
all_queries = search_db.fetch_search_queries()
logger.info(f"Loaded {len(all_queries)} queries to process.")
# Create the scraped database
with ShardedContentWriter(self.scraped_db_path) as scraped_db:
scraped_db.create_table()
with ShardedContentReader(self.scraped_db_path) as scraped_db:
processed_queries = scraped_db.fetch_unique_queries()
processed_urls = scraped_db.fetch_unique_urls()
logger.info(f"Already processed {len(processed_queries)} queries.")
with ShardContentWriter(
self.scraped_db_path, shard_index
) as shard_db:
queries_to_process = []
for query in all_queries:
if query in processed_queries and skip_proc_queries:
continue
if (
hash_to_shard(query, shard_db.num_shards)
!= shard_index
):
continue
queries_to_process.append(query)
if len(queries_to_process) >= limit:
break
logger.info(
f"Loaded {len(queries_to_process)} queries to process."
)
queries_to_results = search_db.fetch_searches_by_queries(
queries_to_process
)
if len(queries_to_results) != len(queries_to_process):
raise Exception(
"Mismatch in number of queries and searches"
)
logger.info(
f"Loaded {len(queries_to_results)} searches to process."
)
urls_to_queries = {}
for query, result in queries_to_results.items():
search_data = json.loads(result)
query_extraction = extract_all_urls_from_search_data(
search_data
)
for extraction in query_extraction:
link = extraction["link"]
if (
link != ""
and link not in urls_to_queries
# TODO - Implement redundancy check later
# and link not in scraped_urls
):
if link in urls_to_queries:
logger.warning(
f"Found duplicate link {link} for query {query}."
)
continue
if link in processed_urls:
logger.warning(
f"Found processed link {link} for query {query}."
)
continue
urls_to_queries[link] = query
logger.info(f"Found {len(urls_to_queries)} URLs to process.")
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36"
settings = {
"SPLASH_URL": splash_url,
"ITEM_PIPELINES": {
pipeline: 1,
},
"SCRAPED_DB_PATH": self.scraped_db_path,
"SHARD_INDEX": shard_index,
"AUTOTHROTTLE_ENABLED": True,
"DOWNLOAD_DELAY": 2,
"RETRY_ENABLED": True,
"RETRY_TIMES": 10,
"RETRY_HTTP_CODES": [
500,
502,
503,
504,
522,
524,
429,
403,
],
"DOWNLOADER_MIDDLEWARES": {
"scrapy.downloadermiddlewares.retry.RetryMiddleware": None,
"sciphi.search.splash_spider.CustomRetryMiddleware": 550,
},
"COOKIES_ENABLED": True,
"CONCURRENT_REQUESTS": 4_096,
"LOG_LEVEL": "WARNING",
"AJAXCRAWL_ENABLED": True,
"USER_AGENT": USER_AGENT,
}
process = CrawlerProcess(settings)
process.crawl(
SplashSpider,
urls=list(urls_to_queries.keys()),
tag=tag,
url_to_query_map=urls_to_queries,
)
process.start()
def report(
self,
):
with ShardedContentReader(self.scraped_db_path) as db:
unique_queries = db.fetch_unique_queries()
logger.info(f"Found {len(unique_queries)} unique queries.")
unique_urls = db.fetch_unique_urls()
logger.info(f"Found {len(unique_urls)} unique urls.")
if __name__ == "__main__":
fire.Fire(Scraper)