grobid_client_python icon indicating copy to clipboard operation
grobid_client_python copied to clipboard

UnicodeEncodeError ... surrogates not allowed

Open WolfgangFahl opened this issue 3 years ago • 5 comments

see also http://ceur-ws.bitplan.com/index.php/Grobid#Test

The test input are the PDF files of the http://ceur-ws.org/ Workhop Proceeding Publishing site. The site had 53133 PDF files to be processed at the time of the issue:

find . -name "*.pdf" | grep Vol| wc -l
53133

The service was installed using docker see documentation at http://ceur-ws.bitplan.com/index.php/Grobid The environment is Ubuntu 20 and holds a complete copy of the original CEUR-WS files which are hosted at RWTH Aachen University.

python --version
Python 3.7.5

What I tried

date;python grobid_client.py --input /hd/luxio/CEUR-WS/www processFulltextDocument;date
Thu 07 Jan 2021 08:22:16 PM CET
GROBID server is up and running
...
www2004-weesa.pdf
Traceback (most recent call last):
  File "grobid_client.py", line 208, in <module>
    force=force)
  File "grobid_client.py", line 66, in process
    print(filename)
UnicodeEncodeError: 'utf-8' codec can't encode character '\udc96' in position 21: surrogates not allowed
Thu 07 Jan 2021 11:40:18 PM CET

At this point some 1/3 of all files had been processed

find . -name "*.tei.xml" | grep Vol| wc -l
17714

What I had expected That the processing would continue and at the end there would be a list of errors that happened during processing.

WolfgangFahl avatar Jan 08 '21 06:01 WolfgangFahl

see also https://stackoverflow.com/questions/27366479/python-3-os-walk-file-paths-unicodeencodeerror-utf-8-codec-cant-encode-s

WolfgangFahl avatar Jan 08 '21 06:01 WolfgangFahl

Here is a proposal for an improvement

import sys
import os
import io
import json
import argparse
import time
import concurrent.futures
from client import ApiClient
import ntpath
import requests

'''
This version uses the standard ProcessPoolExecutor for parallelizing the concurrent calls to the GROBID services. 
Given the limits of ThreadPoolExecutor (input stored in memory, blocking Executor.map until the whole input
is acquired), it works with batches of PDF of a size indicated in the config.json file (default is 1000 entries). 
We are moving from first batch to the second one only when the first is entirely processed - which means it is
slightly sub-optimal, but should scale better. Working without batch would mean acquiring a list of million of 
files in directories would require something scalable too (e.g. done in a separate thread), which is not 
implemented for the moment.
'''
class grobid_client(ApiClient):
    """
    Command line cline for the RESTFul API of the GROBID service

    see config.json for the configuraton of the service
    """

    def __init__(self, config_path='./config.json'):
        """ 
        construct me from the given config.json file

        Args:
						config_path(str): the file to use for configuration
        """
        self.config = None
        self._load_config(config_path)

    def _load_config(self, path='./config.json'):
        """
        Load the json configuration 

        Args:
            path(str): the 
        """
        config_json = open(path).read()
        self.config = json.loads(config_json)

        # test if the server is up and running...
        the_url = 'http://'+self.config['grobid_server']
        if len(self.config['grobid_port'])>0:
            the_url += ":"+self.config['grobid_port']
        the_url += "/api/isalive"
        r = requests.get(the_url)
        status = r.status_code

        if status != 200:
            print('GROBID server does not appear up and running ' + str(status))
        else:
            print("GROBID server is up and running")
        self.debug=self.config['debug']

    def getPDFs(self,input_path):
        '''
				get PDFs for the given input_path
			
				Args:
					input_path(str): the path to the input PDF files

				Returns:
					list: a list of paths to the PDF files found having a pdf extension in the director tree specified by the input_path
        '''
        pdf_files = []
        for (dirpath, dirnames, filenames) in os.walk(input_path):
            if self.debug:
                print(dirpath, dirnames, filenames)
            for filename in filenames:
                if filename.endswith('.pdf') or filename.endswith('.PDF'): 
                    if self.debug:
                       try:
                          print(filename)
                       except Exception:
                          # may happen on linux see https://stackoverflow.com/questions/27366479/python-3-os-walk-file-paths-unicodeencodeerror-utf-8-codec-cant-encode-s
                          pass
                    pdf_files.append(os.sep.join([dirpath, filename]))
        return pdf_files

    def process(self, service, input_path, 
            output=None, 
            n=10, 
            generateIDs=False, 
            consolidate_header=True, 
            consolidate_citations=False, 
            include_raw_citations=False, 
            include_raw_affiliations=False, 
            teiCoordinates=False,
            force=True):
        batch_size_pdf = self.config['batch_size']

        print("Processing files from %s" % input_path)
        pdf_files = self.getPDFs(input_path)
        total=len(pdf_files)
        print("Processing %d PDF files" % total)
        for fromIndex in range(0,total,batch_size_pdf):
            toIndex=fromIndex+batch_size_pdf
            if toIndex>total: toIndex=total
            pdf_files_batch=pdf_files[fromIndex:toIndex]
            print("Processing %5d - %5d " % (fromIndex+1,toIndex))
            self.process_batch(service, pdf_files_batch, output, n, generateIDs, consolidate_header, consolidate_citations, include_raw_citations, include_raw_affiliations, teiCoordinates, force)

    def process_batch(self, service, pdf_files, output, n, generateIDs, consolidate_header, consolidate_citations, include_raw_citations, include_raw_affiliations, teiCoordinates, force):
        print(len(pdf_files), "PDF files to process")
        #with concurrent.futures.ThreadPoolExecutor(max_workers=n) as executor:
        with concurrent.futures.ProcessPoolExecutor(max_workers=n) as executor:
            for pdf_file in pdf_files:
                executor.submit(self.process_pdf, service, pdf_file, output, generateIDs, consolidate_header, consolidate_citations, include_raw_citations, include_raw_affiliations, teiCoordinates, force)

    def process_pdf(self, service, pdf_file, output, generateIDs, consolidate_header, consolidate_citations, include_raw_citations, include_raw_affiliations, teiCoordinates, force):
        # check if TEI file is already produced 
        # we use ntpath here to be sure it will work on Windows too
        pdf_file_name = ntpath.basename(pdf_file)
        if output is not None:
            filename = os.path.join(output, os.path.splitext(pdf_file_name)[0] + '.tei.xml')
        else:
            filename = os.path.join(ntpath.dirname(pdf_file), os.path.splitext(pdf_file_name)[0] + '.tei.xml')

        if not force and os.path.isfile(filename):
            print(filename, "already exist, skipping... (use --force to reprocess pdf input files)")
            return

        print(pdf_file)
        files = {
            'input': (
                pdf_file,
                open(pdf_file, 'rb'),
                'application/pdf',
                {'Expires': '0'}
            )
        }
        
        the_url = 'http://'+self.config['grobid_server']
        if len(self.config['grobid_port'])>0:
            the_url += ":"+self.config['grobid_port']
        the_url += "/api/"+service

        # set the GROBID parameters
        the_data = {}
        if generateIDs:
            the_data['generateIDs'] = '1'
        if consolidate_header:
            the_data['consolidateHeader'] = '1'
        if consolidate_citations:
            the_data['consolidateCitations'] = '1'   
        if include_raw_citations:
            the_data['includeRawCitations'] = '1'
        if include_raw_affiliations:
            the_data['includeRawAffiliations'] = '1'
        if teiCoordinates:
            the_data['teiCoordinates'] = self.config['coordinates'] 

        res, status = self.post(
            url=the_url,
            files=files,
            data=the_data,
            headers={'Accept': 'text/plain'}
        )

        if status == 503:
            time.sleep(self.config['sleep_time'])
            return self.process_pdf(pdf_file, output, service, generateIDs, consolidate_header, consolidate_citations, include_raw_citations, include_raw_affiliations, force, teiCoordinates)
        elif status != 200:
            print('Processing failed with error ' + str(status))
        else:
            # writing TEI file
            try:
                with io.open(filename,'w',encoding='utf8') as tei_file:
                    tei_file.write(res.text)
            except OSError:  
               print ("Writing resulting TEI XML file %s failed" % filename)
               pass
 
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description = "Client for GROBID services")
    parser.add_argument("service", help="one of [processFulltextDocument, processHeaderDocument, processReferences]")
    parser.add_argument("--input", default=None, help="path to the directory containing PDF to process") 
    parser.add_argument("--output", default=None, help="path to the directory where to put the results (optional)") 
    parser.add_argument("--config", default="./config.json", help="path to the config file, default is ./config.json") 
    parser.add_argument("--n", default=10, help="concurrency for service usage") 
    parser.add_argument("--generateIDs", action='store_true', help="generate random xml:id to textual XML elements of the result files") 
    parser.add_argument("--consolidate_header", action='store_true', help="call GROBID with consolidation of the metadata extracted from the header") 
    parser.add_argument("--consolidate_citations", action='store_true', help="call GROBID with consolidation of the extracted bibliographical references") 
    parser.add_argument("--include_raw_citations", action='store_true', help="call GROBID requesting the extraction of raw citations") 
    parser.add_argument("--include_raw_affiliations", action='store_true', help="call GROBID requestiong the extraciton of raw affiliations") 
    parser.add_argument("--force", action='store_true', help="force re-processing pdf input files when tei output files already exist")
    parser.add_argument("--teiCoordinates", action='store_true', help="add the original PDF coordinates (bounding boxes) to the extracted elements")

    args = parser.parse_args()

    input_path = args.input
    config_path = args.config
    output_path = args.output
    
    if args.n is not None:
        try:
            n = int(args.n)
        except ValueError:
            print("Invalid concurrency parameter n:", n, "n = 10 will be used by default")
            pass

    # if output path does not exist, we create it
    if output_path is not None and not os.path.isdir(output_path):
        try:  
            print("output directory does not exist but will be created:", output_path)
            os.makedirs(output_path)
        except OSError:  
            print ("Creation of the directory %s failed" % output_path)
        else:  
            print ("Successfully created the directory %s" % output_path)

    service = args.service
    generateIDs = args.generateIDs
    consolidate_header = args.consolidate_header
    consolidate_citations = args.consolidate_citations
    include_raw_citations = args.include_raw_citations
    include_raw_affiliations = args.include_raw_affiliations
    force = args.force
    teiCoordinates = args.teiCoordinates

    client = grobid_client(config_path=config_path)

    start_time = time.time()

    client.process(service, input_path, 
            output=output_path, 
            n=n, 
            generateIDs=generateIDs, 
            consolidate_header=consolidate_header, 
            consolidate_citations=consolidate_citations, 
            include_raw_citations=include_raw_citations, 
            include_raw_affiliations=include_raw_affiliations, 
            teiCoordinates=teiCoordinates,
            force=force)

    runtime = round(time.time() - start_time, 3)
    print("runtime: %s seconds " % (runtime))

WolfgangFahl avatar Jan 08 '21 06:01 WolfgangFahl

Hi @WolfgangFahl

Thanks a lot for the issue! I have opened PR #19 based on your proposal to catch the error when printing a file name with invalid unicode bytes. I've added a verbose option rather than "debug", which is more clear I think with what is done.

However, I think we don't want to build a list of files before processing, because if we have millions of files (which is actually my use cases!), we might blow out the memory - this is why I introduced the processing by batch while walking in the subdirectories of PDF files to be processed.

kermitt2 avatar Jan 10 '21 17:01 kermitt2

Hi @WolfgangFahl - thanks for looking into this. There are more advantages to changing the logic than just processing the files in advance. A proper progress bar will be possible and errors like the utf-8 file name problem can be checked in advance and files could be excluded from processing e.g. if they are inaccessible, to big or don't fit certain other criteria. The memory problem can be avoided by using a generator but i also think that even with millions of files the memory footprint will not be to bad 1 million files with a length of 100 bytes on average will just need 100 MB of main memory which is not a lot for computers these days. Given that e.g. in my case processing 50.000 files takes some 10 hours its probably a good idea to talk about memory per process and there the limitation is IMHO more in the time realm than in the memory realm. Tracking the process and being able to select files is IMHO crucial again in this context. Even creating a small SqLite table that tracks things might then be better than worrying about memory. The limitation would then be disk space and having a GB of disk space available is much easier than a GB of RAM. Whether this is all worthwhile and relevant IMHO also depends on how often the tools needs to be used. In my case it was just this on error and the second run was swift.

WolfgangFahl avatar Jan 11 '21 05:01 WolfgangFahl

Thanks for the feedback @WolfgangFahl ! The idea with this client was to provide an example of usage of the GROBID REST API for parallel processing, easy to adapt to the need of different users (like the client in Java or Javascript), so I tried to keep it simple. The exact workflow certainly differs from users and for instance I have other projects where I am using LMDB to manage at scale files and tasks, combining PDF harvesting/metadata enrichment (https://github.com/kermitt2/article-dataset-builder) - so it's like the SQLite you mention, and I simply reused and adapted this client. But I agree that for a small set of files like 50K PDF, having a progress bar for instance is more relevant than saving a bit of memory. We could enrich this client for this scenario maybe.

kermitt2 avatar Jan 14 '21 05:01 kermitt2