s3fs icon indicating copy to clipboard operation
s3fs copied to clipboard

Reading errors after upgrade from 0.2.0 to 0.4.0

Open ohadmata opened this issue 5 years ago • 20 comments

Hi We are running Flask service on a docker image. one of our main routes reads file from S3 and returns it's content. recently we upgraded the package from version 0.2.0 to 0.4.0.

After the upgrade, the route is working fine, but after several hours it raises strange exceptions contains only the file path (without any other information)

Our temporary solution for that is to reset the container, after that it's working fine, but fails again after several hours.

The most strange thing is that when I am connecting to the same container (with the connection issues) and I am opening python shell and execute the code - It's working as expected, even when the flask service on the same container raise exceptions on the same file.

any ideas? thanks

my code:

fs = s3fs.S3FileSystem()
with fs.open('PATH_TO_FILE.json', 'rb') as f:
    data = f.read()
    f.close()
    json_content =  json.loads(data.decode('utf-8'))

print(json_content)

ohadmata avatar Dec 02 '19 08:12 ohadmata

Thanks for raising this.

It would be helpful if you could also share the errors you are seeing, especially as this takes several hours to reproduce.

jacobtomlinson avatar Dec 02 '19 09:12 jacobtomlinson

Hi. As I said, the error is just the path to the file in the S3 bucket. nothing else :-(

  • I am try new solution for this use case - I am downloading the file into /tmp directory using fs.get (instead of fs.open) and read the file from the local filesystem. I am hoping this will solve this issue for now.

ohadmata avatar Dec 02 '19 09:12 ohadmata

Ah sorry I misunderstood. Strange that you are seeing only the path with no Python traceback.

You could try wrapping your whole with block in a very broad try-except statement and then print out any information you are able to gather in there. It may be that your exception is being supressed somehow.

jacobtomlinson avatar Dec 02 '19 09:12 jacobtomlinson

That exactly what I did. There is a try..catch block but the exception itself is just the s3 path :-( I am attaching the whole code, hoping it will be relevant, it's include storage interface

@bp.route('/get_data', methods=('GET', 'POST'))
def get_data():
    storage = Storage(StorageTypes.S3, config['s3_bucket'])
    try:
        json_content = storage.read(f'PATH_TO_FILE.json', DataFormats.DICTIONARY)
        result = json_content
    except Exception as e:
        print(e)
        result['exception'] = str(e)

        # print boto3 debug data
        print('boto3 debug data:')
        import boto3
        client = boto3.client('sts')
        print(client.get_caller_identity()['Arn'])
    finally:
        return jsonify(result)


class StorageTypes(Enum):
    S3 = 1
    NFS = 2


class Storage:
    def __init__(self, storage_type, root_path, params=None):

        self.root_path = root_path

        # set the storage type (Default to S3)
        if storage_type is None:
            self.storage_type = StorageTypes.S3
        else:
            self.storage_type = storage_type

        # execute the constructors
        if self.storage_type == StorageTypes.S3:
            self.storage_object = S3(self.root_path, params)

    def read(self, path, data_format):
        return self.storage_object.read(path, data_format)


class S3:
    def __init__(self, root_path, params=None):
        key = None
        secret = None

        # The root_path is the bucket name
        self.bucket_name = root_path

        # check for given params
        if params is not None:
            if 'key' in params and 'secret' in params:
                key = params['key']
                secret = params['secret']

        # create the s3fs object
        if key is not None and secret is not None:

            self.key = key
            self.secret = secret

            fs = s3fs.S3FileSystem(
                key=key,
                secret=secret
            )
        else:
            fs = s3fs.S3FileSystem()

        self.fs = fs

    def extract_path(self, path):
        if path.lower().startswith('s3://'):
            return path
        else:
            return f's3://{self.bucket_name}/{path}'

    def read(self, path, data_format=None):
        path = self.extract_path(path)

        if data_format == DataFormats.DICTIONARY:
            with self.fs.open(path, 'rb') as f:
                data = f.read()
                f.close()
            return json.loads(data.decode('utf-8'))

        if data_format == DataFormats.TEXT:
            with self.fs.open(path, 'rb') as f:
                data = f.read()
                f.close()
            return data.decode('utf-8')

        if data_format == DataFormats.PANDAS_TABLE:
            import pyarrow.parquet as pq
            return pq.ParquetDataset(path, filesystem=self.fs).read_pandas()

        # default - read binary file
        with self.fs.open(path, 'rb') as f:
            data = f.read()
            f.close()
        return data

ohadmata avatar Dec 02 '19 11:12 ohadmata

The exception message may just be that path but does the traceback show where it is being raised?

import traceback

...

except Exception as e:
    traceback.print_tb(e.__traceback__)

jacobtomlinson avatar Dec 02 '19 11:12 jacobtomlinson

Hi I added the traceback and I am getting FileNotFound Exception. As I said, the file exists :-( I'm not totally sure that it has do with the file, since we have different files that really do not exist in S3.

  File "/usr/local/lib/python3.7/site-packages/noos_utils/storage/__init__.py", line 32, in read
    return self.storage_object.read(path, data_format)
  File "/usr/local/lib/python3.7/site-packages/noos_utils/storage/S3.py", line 62, in read
    self.fs.get(path, tmp)
  File "/usr/local/lib/python3.7/site-packages/fsspec/spec.py", line 568, in get
    with self.open(rpath, "rb", **kwargs) as f1:
  File "/usr/local/lib/python3.7/site-packages/fsspec/spec.py", line 724, in open
    **kwargs
  File "/usr/local/lib/python3.7/site-packages/s3fs/core.py", line 315, in _open
    autocommit=autocommit, requester_pays=requester_pays)
  File "/usr/local/lib/python3.7/site-packages/s3fs/core.py", line 957, in __init__
    cache_type=cache_type)
  File "/usr/local/lib/python3.7/site-packages/fsspec/spec.py", line 956, in __init__
    self.details = fs.info(path)
  File "/usr/local/lib/python3.7/site-packages/s3fs/core.py", line 486, in info
    return super().info(path)
  File "/usr/local/lib/python3.7/site-packages/fsspec/spec.py", line 511, in info
    raise FileNotFoundError(path)


ohadmata avatar Dec 03 '19 13:12 ohadmata

Are you modifying the file on S3? Or is it just a static file?

Could there be a chance that the file potentially doesn't exist for a brief amount of time?

jacobtomlinson avatar Dec 03 '19 13:12 jacobtomlinson

Nope, I am creating this file and read it after few minutes. After the creation, the file wont change

ohadmata avatar Dec 03 '19 13:12 ohadmata

Hmm ok. It appears that S3 is reporting the file doesn't exist which is raising the error.

It's not exactly a solution but you could catch that error and retry?

jacobtomlinson avatar Dec 03 '19 13:12 jacobtomlinson

Duplicate of #253 ? @TomAugspurger , time to make dircache optional and off by default; or to introduce timed expiry?

martindurant avatar Dec 03 '19 15:12 martindurant

I am trying to execute this line of code before any S3 read, Hopefully it will help. My next step will to go back to boto3 commands ... :-(

self.fs.invalidate_cache()

ohadmata avatar Dec 03 '19 15:12 ohadmata

time to make dircache optional and off by default

Yes to making it optional. I'm not sure about changing the default. It's hard to say what the right one is since it's workload dependent. dircache=False would be the safer / more correct option though.

TomAugspurger avatar Dec 03 '19 20:12 TomAugspurger

Hi everyone. After two days It looks like this change [invalidate_cache] fixed the problem. thanks!

ohadmata avatar Dec 05 '19 11:12 ohadmata

Thanks for coming back to let us know!

jacobtomlinson avatar Dec 05 '19 11:12 jacobtomlinson

I have a similar problem. In my case I'm doing a query to Athena. With s3fs = 0.2.0 I can do as many query as I want but with newer version I'm only getting a result for the first query and then I got FileNotFoundError for subsequent ones.. It's not quite clear to me how should I use s3fs.S3FileSystem.invalidate_cache().

def wait_for_query(athena_client, execution_id):
    # TODO: add timeout
    while True:
        response = athena_client.get_query_execution(
            QueryExecutionId=execution_id)
        status = response['QueryExecution']['Status']['State']
        if status in ['FAILED', 'CANCELLED']:
            error = \
                response['QueryExecution']['Status']['StateChangeReason']
            raise Exception('Error doing query\n{}'.format(error))
        elif status == 'SUCCEEDED':
            break
        sleep(2)
    return response

def execute_athena_query(query, wait=True):
    athena_client = boto3.client('athena', 'us-east-1')
    athena_config = {
        'OutputLocation': 's3://my_bucket/athena_results',
        'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
    }
    athena_context = {'Database': 'my_database'}
    response = athena_client.start_query_execution(
        QueryString=query,
        QueryExecutionContext=athena_context,
        ResultConfiguration=athena_config)
    execution_id = response['QueryExecutionId']
    if wait:
        response = wait_for_query(athena_client, execution_id)
    return response

def get_df_athena(query, dtypes):
    response = execute_athena_query(query)
    location = response['QueryExecution']['ResultConfiguration']\
        ['OutputLocation']
    df = pandas.read_csv(location, dtype=dtypes)
    return df

rpanai avatar Feb 04 '20 21:02 rpanai

@TomAugspurger , revive the no-caching option? Or to try a direct HEAD in the case that a file appears unfound?

martindurant avatar Feb 04 '20 21:02 martindurant

I'm not sure, though it seems to be causing plenty of issues.

To start with, I'd like to see a keyword cache_instances added to S3FileSystem.__init__ to control this.

TomAugspurger avatar Feb 04 '20 23:02 TomAugspurger

I'm not sure, though it seems to be causing plenty of issues.

To start with, I'd like to see a keyword cache_instances added to S3FileSystem.__init__ to control this.

Do you want me to help with this?

rpanai avatar Feb 05 '20 17:02 rpanai

If you're able to, that'd be great!

TomAugspurger avatar Feb 05 '20 17:02 TomAugspurger

Note that you can already set S3FileSystem.cachable = False as a workaround. @TomAugspurger , I actually meant the file cache, not the instance cache - you had a PR for this in fsspec?

martindurant avatar Feb 06 '20 14:02 martindurant