s3fs
s3fs copied to clipboard
Reading errors after upgrade from 0.2.0 to 0.4.0
Hi We are running Flask service on a docker image. one of our main routes reads file from S3 and returns it's content. recently we upgraded the package from version 0.2.0 to 0.4.0.
After the upgrade, the route is working fine, but after several hours it raises strange exceptions contains only the file path (without any other information)
Our temporary solution for that is to reset the container, after that it's working fine, but fails again after several hours.
The most strange thing is that when I am connecting to the same container (with the connection issues) and I am opening python shell and execute the code - It's working as expected, even when the flask service on the same container raise exceptions on the same file.
any ideas? thanks
my code:
fs = s3fs.S3FileSystem()
with fs.open('PATH_TO_FILE.json', 'rb') as f:
data = f.read()
f.close()
json_content = json.loads(data.decode('utf-8'))
print(json_content)
Thanks for raising this.
It would be helpful if you could also share the errors you are seeing, especially as this takes several hours to reproduce.
Hi. As I said, the error is just the path to the file in the S3 bucket. nothing else :-(
- I am try new solution for this use case - I am downloading the file into /tmp directory using fs.get (instead of fs.open) and read the file from the local filesystem. I am hoping this will solve this issue for now.
Ah sorry I misunderstood. Strange that you are seeing only the path with no Python traceback.
You could try wrapping your whole with
block in a very broad try-except statement and then print out any information you are able to gather in there. It may be that your exception is being supressed somehow.
That exactly what I did. There is a try..catch block but the exception itself is just the s3 path :-( I am attaching the whole code, hoping it will be relevant, it's include storage interface
@bp.route('/get_data', methods=('GET', 'POST'))
def get_data():
storage = Storage(StorageTypes.S3, config['s3_bucket'])
try:
json_content = storage.read(f'PATH_TO_FILE.json', DataFormats.DICTIONARY)
result = json_content
except Exception as e:
print(e)
result['exception'] = str(e)
# print boto3 debug data
print('boto3 debug data:')
import boto3
client = boto3.client('sts')
print(client.get_caller_identity()['Arn'])
finally:
return jsonify(result)
class StorageTypes(Enum):
S3 = 1
NFS = 2
class Storage:
def __init__(self, storage_type, root_path, params=None):
self.root_path = root_path
# set the storage type (Default to S3)
if storage_type is None:
self.storage_type = StorageTypes.S3
else:
self.storage_type = storage_type
# execute the constructors
if self.storage_type == StorageTypes.S3:
self.storage_object = S3(self.root_path, params)
def read(self, path, data_format):
return self.storage_object.read(path, data_format)
class S3:
def __init__(self, root_path, params=None):
key = None
secret = None
# The root_path is the bucket name
self.bucket_name = root_path
# check for given params
if params is not None:
if 'key' in params and 'secret' in params:
key = params['key']
secret = params['secret']
# create the s3fs object
if key is not None and secret is not None:
self.key = key
self.secret = secret
fs = s3fs.S3FileSystem(
key=key,
secret=secret
)
else:
fs = s3fs.S3FileSystem()
self.fs = fs
def extract_path(self, path):
if path.lower().startswith('s3://'):
return path
else:
return f's3://{self.bucket_name}/{path}'
def read(self, path, data_format=None):
path = self.extract_path(path)
if data_format == DataFormats.DICTIONARY:
with self.fs.open(path, 'rb') as f:
data = f.read()
f.close()
return json.loads(data.decode('utf-8'))
if data_format == DataFormats.TEXT:
with self.fs.open(path, 'rb') as f:
data = f.read()
f.close()
return data.decode('utf-8')
if data_format == DataFormats.PANDAS_TABLE:
import pyarrow.parquet as pq
return pq.ParquetDataset(path, filesystem=self.fs).read_pandas()
# default - read binary file
with self.fs.open(path, 'rb') as f:
data = f.read()
f.close()
return data
The exception message may just be that path but does the traceback show where it is being raised?
import traceback
...
except Exception as e:
traceback.print_tb(e.__traceback__)
Hi I added the traceback and I am getting FileNotFound Exception. As I said, the file exists :-( I'm not totally sure that it has do with the file, since we have different files that really do not exist in S3.
File "/usr/local/lib/python3.7/site-packages/noos_utils/storage/__init__.py", line 32, in read
return self.storage_object.read(path, data_format)
File "/usr/local/lib/python3.7/site-packages/noos_utils/storage/S3.py", line 62, in read
self.fs.get(path, tmp)
File "/usr/local/lib/python3.7/site-packages/fsspec/spec.py", line 568, in get
with self.open(rpath, "rb", **kwargs) as f1:
File "/usr/local/lib/python3.7/site-packages/fsspec/spec.py", line 724, in open
**kwargs
File "/usr/local/lib/python3.7/site-packages/s3fs/core.py", line 315, in _open
autocommit=autocommit, requester_pays=requester_pays)
File "/usr/local/lib/python3.7/site-packages/s3fs/core.py", line 957, in __init__
cache_type=cache_type)
File "/usr/local/lib/python3.7/site-packages/fsspec/spec.py", line 956, in __init__
self.details = fs.info(path)
File "/usr/local/lib/python3.7/site-packages/s3fs/core.py", line 486, in info
return super().info(path)
File "/usr/local/lib/python3.7/site-packages/fsspec/spec.py", line 511, in info
raise FileNotFoundError(path)
Are you modifying the file on S3? Or is it just a static file?
Could there be a chance that the file potentially doesn't exist for a brief amount of time?
Nope, I am creating this file and read it after few minutes. After the creation, the file wont change
Hmm ok. It appears that S3 is reporting the file doesn't exist which is raising the error.
It's not exactly a solution but you could catch that error and retry?
Duplicate of #253 ? @TomAugspurger , time to make dircache optional and off by default; or to introduce timed expiry?
I am trying to execute this line of code before any S3 read, Hopefully it will help. My next step will to go back to boto3 commands ... :-(
self.fs.invalidate_cache()
time to make dircache optional and off by default
Yes to making it optional. I'm not sure about changing the default. It's hard to say what the right one is since it's workload dependent. dircache=False
would be the safer / more correct option though.
Hi everyone. After two days It looks like this change [invalidate_cache] fixed the problem. thanks!
Thanks for coming back to let us know!
I have a similar problem. In my case I'm doing a query to Athena. With s3fs = 0.2.0 I can do as many query as I want but with newer version I'm only getting a result for the first query and then I got FileNotFoundError
for subsequent ones..
It's not quite clear to me how should I use s3fs.S3FileSystem.invalidate_cache()
.
def wait_for_query(athena_client, execution_id):
# TODO: add timeout
while True:
response = athena_client.get_query_execution(
QueryExecutionId=execution_id)
status = response['QueryExecution']['Status']['State']
if status in ['FAILED', 'CANCELLED']:
error = \
response['QueryExecution']['Status']['StateChangeReason']
raise Exception('Error doing query\n{}'.format(error))
elif status == 'SUCCEEDED':
break
sleep(2)
return response
def execute_athena_query(query, wait=True):
athena_client = boto3.client('athena', 'us-east-1')
athena_config = {
'OutputLocation': 's3://my_bucket/athena_results',
'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
}
athena_context = {'Database': 'my_database'}
response = athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext=athena_context,
ResultConfiguration=athena_config)
execution_id = response['QueryExecutionId']
if wait:
response = wait_for_query(athena_client, execution_id)
return response
def get_df_athena(query, dtypes):
response = execute_athena_query(query)
location = response['QueryExecution']['ResultConfiguration']\
['OutputLocation']
df = pandas.read_csv(location, dtype=dtypes)
return df
@TomAugspurger , revive the no-caching option? Or to try a direct HEAD in the case that a file appears unfound?
I'm not sure, though it seems to be causing plenty of issues.
To start with, I'd like to see a keyword cache_instances
added to S3FileSystem.__init__
to control this.
I'm not sure, though it seems to be causing plenty of issues.
To start with, I'd like to see a keyword
cache_instances
added toS3FileSystem.__init__
to control this.
Do you want me to help with this?
If you're able to, that'd be great!
Note that you can already set S3FileSystem.cachable = False
as a workaround.
@TomAugspurger , I actually meant the file cache, not the instance cache - you had a PR for this in fsspec
?