R2R icon indicating copy to clipboard operation
R2R copied to clipboard

Add ScraperPipeline

Open emrgnt-cmplxty opened this issue 1 year ago • 1 comments
trafficstars

The task for this issue is to add a pipeline which facilitates scraping of provided URLs.

The class should confirm to typical pipeline format seen elsewhere in the codebase and should provide basic support for scraping target URLs.

emrgnt-cmplxty avatar Mar 26 '24 20:03 emrgnt-cmplxty

Example script to start from:

import json
import logging
import os

import fire
from scrapy.crawler import CrawlerProcess

from sciphi.database.content import (
    ShardContentWriter,
    ShardedContentReader,
    ShardedContentWriter,
)
from sciphi.database.search import ShardedSearchReader
from sciphi.database.utils import hash_to_shard
from sciphi.search.search_parser import (
    extract_answer_box,
    extract_knowledge_graph,
    extract_organic_results,
    extract_questions_and_answers,
    extract_related_questions,
)
from sciphi.search.splash_spider import SplashSpider

logger = logging.getLogger(__name__)

pdfminer_logger = logging.getLogger("pdfminer")
pdfminer_logger.setLevel(logging.WARNING)


def extract_all_urls_from_search_data(search_data):
    urls = []
    urls.extend(extract_related_questions(search_data))
    urls.extend(extract_answer_box(search_data))
    urls.extend(extract_knowledge_graph(search_data))
    urls.extend(extract_organic_results(search_data))
    urls.extend(extract_questions_and_answers(search_data))
    return urls


class Scraper:
    def __init__(
        self,
        root_path=None,
        scraped_db_rel_path="data/scraped_v2/",
        search_db_rel_path="data/search/",
        debug=False,
    ):
        if root_path is None:
            root_path = os.environ["HOME"]  # On Unix and Linux systems

        self.scraped_db_path = os.path.join(root_path, scraped_db_rel_path)
        self.search_db_path = os.path.join(root_path, search_db_rel_path)
        logging.basicConfig(level=logging.DEBUG if debug else logging.INFO)

    def scrape(
        self,
        pipeline="sciphi.search.splash_spider.CollectAllPipeline",
        splash_url="http://localhost:8050",
        tag="v0",
        shard_index=0,
        limit=1_000_000,
        skip_proc_queries=True,
    ):
        logging.getLogger("scrapy").setLevel(logging.WARNING)

        with ShardedSearchReader(self.search_db_path) as search_db:
            all_queries = search_db.fetch_search_queries()
            logger.info(f"Loaded {len(all_queries)} queries to process.")

            # Create the scraped database
            with ShardedContentWriter(self.scraped_db_path) as scraped_db:
                scraped_db.create_table()

            with ShardedContentReader(self.scraped_db_path) as scraped_db:
                processed_queries = scraped_db.fetch_unique_queries()
                processed_urls = scraped_db.fetch_unique_urls()
            logger.info(f"Already processed {len(processed_queries)} queries.")

            with ShardContentWriter(
                self.scraped_db_path, shard_index
            ) as shard_db:
                queries_to_process = []
                for query in all_queries:
                    if query in processed_queries and skip_proc_queries:
                        continue
                    if (
                        hash_to_shard(query, shard_db.num_shards)
                        != shard_index
                    ):
                        continue
                    queries_to_process.append(query)
                    if len(queries_to_process) >= limit:
                        break

                logger.info(
                    f"Loaded {len(queries_to_process)} queries to process."
                )
                queries_to_results = search_db.fetch_searches_by_queries(
                    queries_to_process
                )
                if len(queries_to_results) != len(queries_to_process):
                    raise Exception(
                        "Mismatch in number of queries and searches"
                    )
                logger.info(
                    f"Loaded {len(queries_to_results)} searches to process."
                )

                urls_to_queries = {}
                for query, result in queries_to_results.items():
                    search_data = json.loads(result)
                    query_extraction = extract_all_urls_from_search_data(
                        search_data
                    )
                    for extraction in query_extraction:
                        link = extraction["link"]
                        if (
                            link != ""
                            and link not in urls_to_queries
                            # TODO - Implement redundancy check later
                            # and link not in scraped_urls
                        ):
                            if link in urls_to_queries:
                                logger.warning(
                                    f"Found duplicate link {link} for query {query}."
                                )
                                continue
                            if link in processed_urls:
                                logger.warning(
                                    f"Found processed link {link} for query {query}."
                                )
                                continue
                            urls_to_queries[link] = query

                logger.info(f"Found {len(urls_to_queries)} URLs to process.")

                USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36"

                settings = {
                    "SPLASH_URL": splash_url,
                    "ITEM_PIPELINES": {
                        pipeline: 1,
                    },
                    "SCRAPED_DB_PATH": self.scraped_db_path,
                    "SHARD_INDEX": shard_index,
                    "AUTOTHROTTLE_ENABLED": True,
                    "DOWNLOAD_DELAY": 2,
                    "RETRY_ENABLED": True,
                    "RETRY_TIMES": 10,
                    "RETRY_HTTP_CODES": [
                        500,
                        502,
                        503,
                        504,
                        522,
                        524,
                        429,
                        403,
                    ],
                    "DOWNLOADER_MIDDLEWARES": {
                        "scrapy.downloadermiddlewares.retry.RetryMiddleware": None,
                        "sciphi.search.splash_spider.CustomRetryMiddleware": 550,
                    },
                    "COOKIES_ENABLED": True,
                    "CONCURRENT_REQUESTS": 4_096,
                    "LOG_LEVEL": "WARNING",
                    "AJAXCRAWL_ENABLED": True,
                    "USER_AGENT": USER_AGENT,
                }
                process = CrawlerProcess(settings)
                process.crawl(
                    SplashSpider,
                    urls=list(urls_to_queries.keys()),
                    tag=tag,
                    url_to_query_map=urls_to_queries,
                )
                process.start()

    def report(
        self,
    ):
        with ShardedContentReader(self.scraped_db_path) as db:
            unique_queries = db.fetch_unique_queries()
            logger.info(f"Found {len(unique_queries)} unique queries.")
            unique_urls = db.fetch_unique_urls()
            logger.info(f"Found {len(unique_urls)} unique urls.")


if __name__ == "__main__":
    fire.Fire(Scraper)

emrgnt-cmplxty avatar Mar 31 '24 19:03 emrgnt-cmplxty