data-prep-kit icon indicating copy to clipboard operation
data-prep-kit copied to clipboard

web2parquet hangs

Open rajeshsirsikar-bq opened this issue 5 months ago • 13 comments

I am using DPK's web2parquet to download html file and convert to parquet of around 12000 urls. I am running a batch code...which picks 5 urls as batch and runs it on web2parquet....but each time I run....it hangs at 6th iteration without any error message or log.

Can someone help me in debugging this?

rajeshsirsikar-bq avatar Jun 30 '25 15:06 rajeshsirsikar-bq

We will look into this, @rajeshsirsikar-bq, but let me see if I understand the problem: You use Web2Parquet(urls= [url1, url2, url3, url4, url5], other parameters).transform() in each iteration of the batch and then on the 6th iteration of the batch, it hangs? If not, please give more details about what your script does.

cc: @touma-I @swith005 @hmtbr

shahrokhDaijavad avatar Jul 01 '25 15:07 shahrokhDaijavad

@shahrokhDaijavad you are right...the pipeline is batch_json_files---> web2parquet---->html2parquet--->vectorize

each json has 5 urls...I intend to increase it....but as of now first 4 to 5 iteration works fine...6th iteration (Sometimes 7 th 9th or 11th) halts at web2parquet stage...the files don't get downloaded...If I run those urls as indivisual URLs they download...So I am assuming something is wrong with crawler code...

Its not a system issue as I am running this on a VM which has 22 core cpu and 64gb RAM...

rajeshsirsikar-bq avatar Jul 01 '25 17:07 rajeshsirsikar-bq

Thanks for the clarification, @rajeshsirsikar-bq. So, if you use a single URL each iteration, it doesn't hang? Can you please share the snippet of code that uses Web2parquet? It may help if I set up a call with you to see exactly how you do this and what happens.

shahrokhDaijavad avatar Jul 01 '25 17:07 shahrokhDaijavad

import os, shutil, json, logging
from dpk_web2parquet.transform import Web2Parquet
from utils.config import CONFIG
from functions.url_mapping_utils import create_url_filename_mapping

def run_content_crawler(urls_file, input_dir, max_downloads, mime_type, logger=logging.getLogger(__name__)):

    try:

        shutil.rmtree(input_dir, ignore_errors=True)
        os.makedirs(input_dir, exist_ok=True)
        logger.info("Cleared and created download directory: %s", input_dir)

        with open(urls_file, 'r') as f:
            urls_data = json.load(f)
            urls = [item['url'] for item in urls_data]

            Web2Parquet(
                urls=urls,
                folder=input_dir,
                depth=0,
                downloads=max_downloads,
                mime_type=mime_type
            ).transform()

        logger.info("Web crawl completed. Downloaded %d files into '%s'", len(os.listdir(input_dir)), input_dir)

        downloaded_files = os.listdir(input_dir)
        filename_mapping = create_url_filename_mapping(urls, downloaded_files)
        

        mapping_path = os.path.join(CONFIG.JSON_DIR, "url_to_file_mapping.json")
        with open(mapping_path, "w") as f:
            json.dump(filename_mapping, f, indent=2)

        logger.info(f"Saved URL-to-filename mapping to {mapping_path}")

    except Exception as e:
        logger.exception("Error occurred during web crawling: %s", str(e))
import time
from s03_crawl_site import run_content_crawler
from utils.config import CONFIG
from functions.logging import LoggingConfig
import glob
import json
import os

logger = LoggingConfig.setup_logging()

def split_into_batches(input_file, output_dir, batch_size=5, prefix="urls_irs_batch"):
    os.makedirs(output_dir, exist_ok=True)

    with open(input_file, 'r') as f:
        urls_data = json.load(f)  # Assume format: [{"url": ...}, {"url": ...}, ...]

    total_batches = (len(urls_data) + batch_size - 1) // batch_size

    batch_files = []

    for i in range(total_batches):
        batch = urls_data[i * batch_size : (i + 1) * batch_size]
        batch_file = os.path.join(output_dir, f"{prefix}{i+1}.json")
        with open(batch_file, 'w') as bf:
            json.dump(batch, bf, indent=2)
        batch_files.append(batch_file)

    return batch_files

def test_crawler_batches(json_batch_files, delay_between_batches=5):
    for i, batch_file in enumerate(json_batch_files, start=1):
        logger.info(f"\n=== Test Batch {i} ===")
        try:
            run_content_crawler(
                urls_file=batch_file,
                input_dir=CONFIG.INPUT_DIR,
                max_downloads=1,
                mime_type='text/html',
                logger=logger
            )
            logger.info(f"Batch {i} completed successfully.")
        except Exception as e:
            logger.error(f"Batch {i} failed: {e}")
            break

        logger.info(f"Sleeping {delay_between_batches} seconds before next batch...")
        time.sleep(delay_between_batches)

if __name__ == "__main__":
    input_file = "json_output/test_data.json"  # Your original file
    output_dir = "json_output"
    
    batch_files = split_into_batches(input_file, output_dir, batch_size=5)
    
    test_crawler_batches(batch_files, delay_between_batches=2)

test_data.json

I have provided web2parquet code.....test.py which calls that function after creating batch of 5 urls each...I have provided test.json which u can use to repliate teh issue..

rajeshsirsikar-bq avatar Jul 01 '25 18:07 rajeshsirsikar-bq

Thanks, @rajeshsirsikar-bq.

@touma-I Do we have enough information, or do we still need to have a call with @rajeshsirsikar-bq ?

shahrokhDaijavad avatar Jul 01 '25 19:07 shahrokhDaijavad

Hi! I was just going through the issue and thought I’d share my 2 cents — hope it helps. From what I understand, this might be a reactor-related issue. Once the reactor is shut down, it won’t restart within the same process — a new process is usually required to initialize it again.If reactor does not work crawler should not work I think.

I did have one question: what exactly does “iteration” refer to here? Is it one batch of 5 URLs (i.e., one JSON with 5 items), or is each individual URL considered a separate iteration?

ShiroYasha18 avatar Jul 01 '25 19:07 ShiroYasha18

one batch of 5 URLs (i.e., one JSON with 5 items) is correct...thats one iteration....I have options to increase or decrease this number as well...but any number I put...lets say I say 10 Urls of each json...still it hangs after 2 to 3 iterations

rajeshsirsikar-bq avatar Jul 01 '25 19:07 rajeshsirsikar-bq

@shahrokhDaijavad Any update? Do you need a call?

rajeshsirsikar-bq avatar Jul 02 '25 16:07 rajeshsirsikar-bq

@rajeshsirsikar-bq This week, our team is busy with a new release of DPK on PyPi and Friday is a US holiday too. I will let you know if we still need a call early next week. For now, are you ok with doing this one URL at a time, until we look into it?

shahrokhDaijavad avatar Jul 02 '25 17:07 shahrokhDaijavad

@rajeshsirsikar-bq hello I was working on this issue and as I predicted , it was a reactor issue. so basically when u send 12k urls in batches after few batches there is kind of an internal bad stage and timeout which causes reactor or the event loop to be paused and when it gets paused no crawling happens and it exits silently. Unfortunately, we cannot start the reactor once off in the Twisted framework. This has nothing to do really with the VM configurations , we have more power over there than required.

We have seen these issues in Twisted framework earlier before and the team will require some more time to patch it . However, I recommend a temporary fix in which instead of sending all the batches under one process . for each batch we call a subprocess which crawls the urls in that particular batch with the same desired parameters. The overall functionality remains the same just a few tweaks in how the functions are called mostly. This will handles the hanging during the iteration as for each iteration we have a new subprocess , so even if after a batch things get shutdown the next iteration wont fail as for next batch/iteration we have a new process spinning up . I have tested this already at my end with your test data and it works fine for me . As the method of calling a subprocess needed some tweaks I am putting up both the scripts over here too :

s03_crawl_site.py

import argparse
import shutil
import os, shutil, json, logging
from dpk_web2parquet.transform import Web2Parquet
from utils.config import CONFIG
from functions.url_mapping_utils import create_url_filename_mapping


def run_content_crawler(urls_file, input_dir, max_downloads, mime_type, logger=logging.getLogger(__name__)):
    try:
        shutil.rmtree(input_dir, ignore_errors=True)
        os.makedirs(input_dir, exist_ok=True)
        logger.info("Cleared and created download directory: %s", input_dir)

        with open(urls_file, 'r') as f:
            urls_data = json.load(f)
            urls = [item['url'] for item in urls_data]

            Web2Parquet(
                urls=urls,
                folder=input_dir,
                depth=0,
                downloads=max_downloads,
                mime_types=[mime_type]
            ).transform()

        logger.info("Web crawl completed. Downloaded %d files into '%s'", len(os.listdir(input_dir)), input_dir)

        downloaded_files = os.listdir(input_dir)
        filename_mapping = create_url_filename_mapping(urls, downloaded_files)
        mapping_path = os.path.join(CONFIG.JSON_DIR, "url_to_file_mapping.json")
        os.makedirs(CONFIG.JSON_DIR, exist_ok=True) 
        with open(mapping_path, "w") as f:
            json.dump(filename_mapping, f, indent=2)
        logger.info(f"Saved URL-to-filename mapping to {mapping_path}")

    except Exception as e:
        logger.exception("Error occurred during web crawling: %s", str(e))

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--urls_file", required=True)
    parser.add_argument("--input_dir", required=True)
    parser.add_argument("--max_downloads", type=int, default=1)
    parser.add_argument("--mime_type", default="text/html")
    args = parser.parse_args()

    logger = logging.getLogger("crawler")
    logging.basicConfig(level=logging.INFO)
    run_content_crawler(
        urls_file=args.urls_file,
        input_dir=args.input_dir,
        max_downloads=args.max_downloads,
        mime_type=args.mime_type,
        logger=logger
    )

if __name__ == "__main__":
    main() 


test.py:

import json
import os
import subprocess
import sys
import time

BATCH_SIZE = 5
INPUT_FILE = "test_data.json"
BATCH_DIR = "batches"
CRAWL_SCRIPT = "s03_crawl_site.py"

os.makedirs(BATCH_DIR, exist_ok=True)

with open(INPUT_FILE, "r") as f:
    urls_data = json.load(f)

total_batches = (len(urls_data) + BATCH_SIZE - 1) // BATCH_SIZE
batch_files = []

for i in range(total_batches):
    batch = urls_data[i * BATCH_SIZE : (i + 1) * BATCH_SIZE]
    batch_file = os.path.join(BATCH_DIR, f"urls_batch{i+1}.json")
    with open(batch_file, "w") as bf:
        json.dump(batch, bf, indent=2)
    batch_files.append(batch_file)
for i, batch_file in enumerate(batch_files, 1):
    print(f"Processing batch {i}/{len(batch_files)}: {batch_file}")
    result = subprocess.run([
        sys.executable, CRAWL_SCRIPT,
        "--urls_file", batch_file,
        "--input_dir", "input",
        "--max_downloads", "1",
        "--mime_type", "text/html"
    ], capture_output=True, text=True)
    print("STDOUT:", result.stdout)
    print("STDERR:", result.stderr)
    if result.returncode != 0:
        print(f"Batch {i} failed!")
        break
    print(f"Batch {i} completed.\n")
    time.sleep(2) 

Kindly try to run these scripts and let me know if there still some issues existing :)

ShiroYasha18 avatar Jul 03 '25 18:07 ShiroYasha18

The issue should be due to conflict of the event loop as mentioned above. A workaround is to use the nest_asyncio library to allow running nested event loop. Example: https://github.com/data-prep-kit/data-prep-kit/blob/dev/examples/rag-html-1/1_crawl_site.ipynb

hmtbr avatar Jul 04 '25 01:07 hmtbr

@hmtbr I tried using nest_asyncio as well...but same issue....I will try @ShiroYasha18 solution...

rajeshsirsikar-bq avatar Jul 08 '25 19:07 rajeshsirsikar-bq

Thanks, @rajeshsirsikar-bq. While the stop-gap solution by @ShiroYasha18 may work, let me set your expectations that the web2parquet transform was not designed as a web-crawler with a very large, scalable number of URLs. IBM has non-open-source, scalable solutions that are not in DPK.

shahrokhDaijavad avatar Jul 09 '25 22:07 shahrokhDaijavad