modin icon indicating copy to clipboard operation
modin copied to clipboard

Read compressed data from s3 in parallel

Open alejandro-ponder opened this issue 3 years ago • 7 comments

Is your feature request related to a problem? Please describe. A clear and concise description of what the problem is. What kind of performance improvements would you like to see with this new API?

If I try to read compressed data (in my case gzip) from s3, modin doesn't read in parallel.

Can reproduce with the following:

pd.read_csv("https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Wireless_v1_00.tsv.gz",compression='gzip',header=0,sep="\t")

alejandro-ponder avatar Jun 10 '22 02:06 alejandro-ponder

@alejandro-ponder thank you for reporting this issue! Modin is actually defaulting to pandas for a different reason: it thinks that the file doesn't exist. pandas can (start to-- I haven't been able to finish after several minutes) read the file with the HTTPS URL, but Modin doesn't allow that.

We are tracking support for reading data from an HTTPS URL in #3170.

But even when I change your path to the s3:// format, I get a different error ending in ConnectTimeoutError: Connect timeout on endpoint URL: "http://169.254.169.254/latest/api/token" (shown below). The error originates here in FileDispatcher.file_exists. We need to investigate that. When I catch the ConnectTimeoutError there, I get another error when we actually open the file (second sack trace below) also ending in ConnectTimeoutError: Connect timeout on endpoint URL: "http://169.254.169.254/latest/api/token".

`ConnectTimeoutError` stack trace for reading from `s3://amazon-reviews-pds/tsv/amazon_reviews_us_Wireless_v1_00.tsv.gz`
---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:986, in TCPConnector._wrap_create_connection(self, req, timeout, client_error, *args, **kwargs)
    985     async with ceil_timeout(timeout.sock_connect):
--> 986         return await self._loop.create_connection(*args, **kwargs)  # type: ignore[return-value]  # noqa
    987 except cert_errors as exc:

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/asyncio/base_events.py:1050, in BaseEventLoop.create_connection(self, protocol_factory, host, port, ssl, family, proto, flags, sock, local_addr, server_hostname, ssl_handshake_timeout, happy_eyeballs_delay, interleave)
   1049 try:
-> 1050     sock = await self._connect_sock(
   1051         exceptions, addrinfo, laddr_infos)
   1052     break

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/asyncio/base_events.py:961, in BaseEventLoop._connect_sock(self, exceptions, addr_info, local_addr_infos)
    960         raise my_exceptions.pop()
--> 961 await self.sock_connect(sock, address)
    962 return sock

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/asyncio/selector_events.py:499, in BaseSelectorEventLoop.sock_connect(self, sock, address)
    498 self._sock_connect(fut, sock, address)
--> 499 return await fut

CancelledError:

During handling of the above exception, another exception occurred:

TimeoutError                              Traceback (most recent call last)
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/client.py:535, in ClientSession._request(self, method, str_or_url, params, data, json, cookies, headers, skip_auto_headers, auth, allow_redirects, max_redirects, compress, chunked, expect100, raise_for_status, read_until_eof, proxy, proxy_auth, timeout, verify_ssl, fingerprint, ssl_context, ssl, proxy_headers, trace_request_ctx, read_bufsize)
    534         assert self._connector is not None
--> 535         conn = await self._connector.connect(
    536             req, traces=traces, timeout=real_timeout
    537         )
    538 except asyncio.TimeoutError as exc:

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:542, in BaseConnector.connect(self, req, traces, timeout)
    541 try:
--> 542     proto = await self._create_connection(req, traces, timeout)
    543     if self._closed:

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:907, in TCPConnector._create_connection(self, req, traces, timeout)
    906 else:
--> 907     _, proto = await self._create_direct_connection(req, traces, timeout)
    909 return proto

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:1175, in TCPConnector._create_direct_connection(self, req, traces, timeout, client_error)
   1174 try:
-> 1175     transp, proto = await self._wrap_create_connection(
   1176         self._factory,
   1177         host,
   1178         port,
   1179         timeout=timeout,
   1180         ssl=sslcontext,
   1181         family=hinfo["family"],
   1182         proto=hinfo["proto"],
   1183         flags=hinfo["flags"],
   1184         server_hostname=hinfo["hostname"] if sslcontext else None,
   1185         local_addr=self._local_addr,
   1186         req=req,
   1187         client_error=client_error,
   1188     )
   1189 except ClientConnectorError as exc:

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:986, in TCPConnector._wrap_create_connection(self, req, timeout, client_error, *args, **kwargs)
    985     async with ceil_timeout(timeout.sock_connect):
--> 986         return await self._loop.create_connection(*args, **kwargs)  # type: ignore[return-value]  # noqa
    987 except cert_errors as exc:

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/async_timeout/__init__.py:129, in Timeout.__aexit__(self, exc_type, exc_val, exc_tb)
    123 async def __aexit__(
    124     self,
    125     exc_type: Optional[Type[BaseException]],
    126     exc_val: Optional[BaseException],
    127     exc_tb: Optional[TracebackType],
    128 ) -> Optional[bool]:
--> 129     self._do_exit(exc_type)
    130     return None

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/async_timeout/__init__.py:212, in Timeout._do_exit(self, exc_type)
    211     self._timeout_handler = None
--> 212     raise asyncio.TimeoutError
    213 # timeout has not expired

TimeoutError:

The above exception was the direct cause of the following exception:

ServerTimeoutError                        Traceback (most recent call last)
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/httpsession.py:178, in AIOHTTPSession.send(self, request)
    177 url = URL(url, encoded=True)
--> 178 response = await self._session.request(
    179     request.method, url=url, headers=headers_, data=data, proxy=proxy_url,
    180     proxy_headers=proxy_headers
    181 )
    183 http_response = aiobotocore.awsrequest.AioAWSResponse(
    184     str(response.url),
    185     response.status,
    186     response.headers,
    187     response
    188 )

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/client.py:539, in ClientSession._request(self, method, str_or_url, params, data, json, cookies, headers, skip_auto_headers, auth, allow_redirects, max_redirects, compress, chunked, expect100, raise_for_status, read_until_eof, proxy, proxy_auth, timeout, verify_ssl, fingerprint, ssl_context, ssl, proxy_headers, trace_request_ctx, read_bufsize)
    538 except asyncio.TimeoutError as exc:
--> 539     raise ServerTimeoutError(
    540         "Connection timeout " "to host {}".format(url)
    541     ) from exc
    543 assert conn.transport is not None

ServerTimeoutError: Connection timeout to host http://169.254.169.254/latest/api/token

During handling of the above exception, another exception occurred:

ConnectTimeoutError                       Traceback (most recent call last)
Input In [1], in <cell line: 1>()
----> 1 import modin.pandas as pd; pd.read_csv("s3://amazon-reviews-pds/tsv/amazon_reviews_us_Wireless_v1_00.tsv.gz",compression='gzip',header=0,sep="\t")

File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
     50 """
     51 Compute function with logging if Modin logging is enabled.
     52
   (...)
     62 Any
     63 """
     64 if LogMode.get() == "disable":
---> 65     return f(*args, **kwargs)
     67 logger = get_logger()
     68 try:

File ~/software_sources/modin/modin/pandas/io.py:140, in read_csv(filepath_or_buffer, sep, delimiter, header, names, index_col, usecols, squeeze, prefix, mangle_dupe_cols, dtype, engine, converters, true_values, false_values, skipinitialspace, skiprows, nrows, na_values, keep_default_na, na_filter, verbose, skip_blank_lines, parse_dates, infer_datetime_format, keep_date_col, date_parser, dayfirst, cache_dates, iterator, chunksize, compression, thousands, decimal, lineterminator, quotechar, quoting, escapechar, comment, encoding, encoding_errors, dialect, error_bad_lines, warn_bad_lines, on_bad_lines, skipfooter, doublequote, delim_whitespace, low_memory, memory_map, float_precision, storage_options)
    138 _, _, _, f_locals = inspect.getargvalues(inspect.currentframe())
    139 kwargs = {k: v for k, v in f_locals.items() if k in _pd_read_csv_signature}
--> 140 return _read(**kwargs)

File ~/software_sources/modin/modin/pandas/io.py:61, in _read(**kwargs)
     58 from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher
     60 squeeze = kwargs.pop("squeeze", False)
---> 61 pd_obj = FactoryDispatcher.read_csv(**kwargs)
     62 # This happens when `read_csv` returns a TextFileReader object for iterating through
     63 if isinstance(pd_obj, pandas.io.parsers.TextFileReader):

File ~/software_sources/modin/modin/core/execution/dispatching/factories/dispatcher.py:185, in FactoryDispatcher.read_csv(cls, **kwargs)
    182 @classmethod
    183 @_inherit_docstrings(factories.BaseFactory._read_csv)
    184 def read_csv(cls, **kwargs):
--> 185     return cls.__factory._read_csv(**kwargs)

File ~/software_sources/modin/modin/core/execution/dispatching/factories/factories.py:217, in BaseFactory._read_csv(cls, **kwargs)
    209 @classmethod
    210 @doc(
    211     _doc_io_method_template,
   (...)
    215 )
    216 def _read_csv(cls, **kwargs):
--> 217     return cls.io_cls.read_csv(**kwargs)

File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
     50 """
     51 Compute function with logging if Modin logging is enabled.
     52
   (...)
     62 Any
     63 """
     64 if LogMode.get() == "disable":
---> 65     return f(*args, **kwargs)
     67 logger = get_logger()
     68 try:

File ~/software_sources/modin/modin/core/io/file_dispatcher.py:153, in FileDispatcher.read(cls, *args, **kwargs)
    129 @classmethod
    130 @logger_decorator("PANDAS-API", "FileDispatcher.read", "INFO")
    131 def read(cls, *args, **kwargs):
    132     """
    133     Read data according passed `args` and `kwargs`.
    134
   (...)
    151     postprocessing work on the resulting query_compiler object.
    152     """
--> 153     query_compiler = cls._read(*args, **kwargs)
    154     # TODO (devin-petersohn): Make this section more general for non-pandas kernel
    155     # implementations.
    156     if StorageFormat.get() == "Pandas":

File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
     50 """
     51 Compute function with logging if Modin logging is enabled.
     52
   (...)
     62 Any
     63 """
     64 if LogMode.get() == "disable":
---> 65     return f(*args, **kwargs)
     67 logger = get_logger()
     68 try:

File ~/software_sources/modin/modin/core/io/text/text_file_dispatcher.py:1009, in TextFileDispatcher._read(cls, filepath_or_buffer, **kwargs)
   1000 (
   1001     skiprows_md,
   1002     pre_reading,
   1003     skiprows_partitioning,
   1004 ) = cls._manage_skiprows_parameter(skiprows, header_size)
   1005 should_handle_skiprows = skiprows_md is not None and not isinstance(
   1006     skiprows_md, int
   1007 )
-> 1009 use_modin_impl = cls.check_parameters_support(
   1010     filepath_or_buffer,
   1011     kwargs,
   1012     skiprows_md,
   1013     header_size,
   1014 )
   1015 if not use_modin_impl:
   1016     return cls.single_worker_read(
   1017         filepath_or_buffer, callback=cls.read_callback, **kwargs
   1018     )

File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
     50 """
     51 Compute function with logging if Modin logging is enabled.
     52
   (...)
     62 Any
     63 """
     64 if LogMode.get() == "disable":
---> 65     return f(*args, **kwargs)
     67 logger = get_logger()
     68 try:

File ~/software_sources/modin/modin/core/io/text/text_file_dispatcher.py:656, in TextFileDispatcher.check_parameters_support(cls, filepath_or_buffer, read_kwargs, skiprows_md, header_size)
    654 skiprows = read_kwargs.get("skiprows")
    655 if isinstance(filepath_or_buffer, str):
--> 656     if not cls.file_exists(filepath_or_buffer):
    657         return False
    658 elif not cls.pathlib_or_pypath(filepath_or_buffer):

File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
     50 """
     51 Compute function with logging if Modin logging is enabled.
     52
   (...)
     62 Any
     63 """
     64 if LogMode.get() == "disable":
---> 65     return f(*args, **kwargs)
     67 logger = get_logger()
     68 try:

File ~/software_sources/modin/modin/core/io/file_dispatcher.py:271, in FileDispatcher.file_exists(cls, file_path)
    269 exists = False
    270 try:
--> 271     exists = s3fs.exists(file_path) or exists
    272 except NoCredentialsError:
    273     pass

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/asyn.py:86, in sync_wrapper.<locals>.wrapper(*args, **kwargs)
     83 @functools.wraps(func)
     84 def wrapper(*args, **kwargs):
     85     self = obj or args[0]
---> 86     return sync(self.loop, func, *args, **kwargs)

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/asyn.py:66, in sync(loop, func, timeout, *args, **kwargs)
     64     raise FSTimeoutError from return_result
     65 elif isinstance(return_result, BaseException):
---> 66     raise return_result
     67 else:
     68     return return_result

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/asyn.py:26, in _runner(event, coro, result, timeout)
     24     coro = asyncio.wait_for(coro, timeout=timeout)
     25 try:
---> 26     result[0] = await coro
     27 except Exception as ex:
     28     result[0] = ex

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:888, in S3FileSystem._exists(self, path)
    886     return False
    887 try:
--> 888     await self._info(path, bucket, key, version_id=version_id)
    889     return True
    890 except FileNotFoundError:

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:1140, in S3FileSystem._info(self, path, bucket, key, refresh, version_id)
   1138 if key:
   1139     try:
-> 1140         out = await self._call_s3(
   1141             "head_object",
   1142             self.kwargs,
   1143             Bucket=bucket,
   1144             Key=key,
   1145             **version_id_kw(version_id),
   1146             **self.req_kw,
   1147         )
   1148         return {
   1149             "ETag": out.get("ETag", ""),
   1150             "LastModified": out["LastModified"],
   (...)
   1156             "ContentType": out.get("ContentType"),
   1157         }
   1158     except FileNotFoundError:

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:325, in S3FileSystem._call_s3(self, method, *akwarglist, **kwargs)
    324 async def _call_s3(self, method, *akwarglist, **kwargs):
--> 325     await self.set_session()
    326     s3 = await self.get_s3(kwargs.get("Bucket"))
    327     method = getattr(s3, method)

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:473, in S3FileSystem.set_session(self, refresh, kwargs)
    469 else:
    470     s3creator = self.session.create_client(
    471         "s3", config=conf, **init_kwargs, **client_kwargs
    472     )
--> 473     self._s3 = await s3creator.__aenter__()
    475 self._s3creator = s3creator
    476 # the following actually closes the aiohttp connection; use of privates
    477 # might break in the future, would cause exception at gc time

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/session.py:22, in ClientCreatorContext.__aenter__(self)
     21 async def __aenter__(self) -> AioBaseClient:
---> 22     self._client = await self._coro
     23     return await self._client.__aenter__()

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/session.py:102, in AioSession._create_client(self, service_name, region_name, api_version, use_ssl, verify, endpoint_url, aws_access_key_id, aws_secret_access_key, aws_session_token, config)
     97     raise PartialCredentialsError(
     98         provider='explicit',
     99         cred_var=self._missing_cred_vars(aws_access_key_id,
    100                                          aws_secret_access_key))
    101 else:
--> 102     credentials = await self.get_credentials()
    103 endpoint_resolver = self._get_internal_component('endpoint_resolver')
    104 exceptions_factory = self._get_internal_component('exceptions_factory')

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/session.py:133, in AioSession.get_credentials(self)
    131 async def get_credentials(self):
    132     if self._credentials is None:
--> 133         self._credentials = await (self._components.get_component(
    134             'credential_provider').load_credentials())
    135     return self._credentials

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/credentials.py:814, in AioCredentialResolver.load_credentials(self)
    812 for provider in self.providers:
    813     logger.debug("Looking for credentials via: %s", provider.METHOD)
--> 814     creds = await provider.load()
    815     if creds is not None:
    816         return creds

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/credentials.py:486, in AioInstanceMetadataProvider.load(self)
    484 async def load(self):
    485     fetcher = self._role_fetcher
--> 486     metadata = await fetcher.retrieve_iam_role_credentials()
    487     if not metadata:
    488         return None

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/utils.py:175, in AioInstanceMetadataFetcher.retrieve_iam_role_credentials(self)
    173 async def retrieve_iam_role_credentials(self):
    174     try:
--> 175         token = await self._fetch_metadata_token()
    176         role_name = await self._get_iam_role(token)
    177         credentials = await self._get_credentials(role_name, token)

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/utils.py:88, in AioIMDSFetcher._fetch_metadata_token(self)
     86 for i in range(self._num_attempts):
     87     try:
---> 88         response = await session.send(request.prepare())
     89         if response.status_code == 200:
     90             return await response.text

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/httpsession.py:210, in AIOHTTPSession.send(self, request)
    208 except ServerTimeoutError as e:
    209     if str(e).lower().startswith('connect'):
--> 210         raise ConnectTimeoutError(endpoint_url=request.url, error=e)
    211     else:
    212         raise ReadTimeoutError(endpoint_url=request.url, error=e)

ConnectTimeoutError: Connect timeout on endpoint URL: "http://169.254.169.254/latest/api/token"

Stack trace after catching ConnectionTimeoutError
---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:986, in TCPConnector._wrap_create_connection(self, req, timeout, client_error, *args, **kwargs)
    985     async with ceil_timeout(timeout.sock_connect):
--> 986         return await self._loop.create_connection(*args, **kwargs)  # type: ignore[return-value]  # noqa
    987 except cert_errors as exc:

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/asyncio/base_events.py:1050, in BaseEventLoop.create_connection(self, protocol_factory, host, port, ssl, family, proto, flags, sock, local_addr, server_hostname, ssl_handshake_timeout, happy_eyeballs_delay, interleave)
   1049 try:
-> 1050     sock = await self._connect_sock(
   1051         exceptions, addrinfo, laddr_infos)
   1052     break

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/asyncio/base_events.py:961, in BaseEventLoop._connect_sock(self, exceptions, addr_info, local_addr_infos)
    960         raise my_exceptions.pop()
--> 961 await self.sock_connect(sock, address)
    962 return sock

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/asyncio/selector_events.py:499, in BaseSelectorEventLoop.sock_connect(self, sock, address)
    498 self._sock_connect(fut, sock, address)
--> 499 return await fut

CancelledError:

During handling of the above exception, another exception occurred:

TimeoutError                              Traceback (most recent call last)
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/client.py:535, in ClientSession._request(self, method, str_or_url, params, data, json, cookies, headers, skip_auto_headers, auth, allow_redirects, max_redirects, compress, chunked, expect100, raise_for_status, read_until_eof, proxy, proxy_auth, timeout, verify_ssl, fingerprint, ssl_context, ssl, proxy_headers, trace_request_ctx, read_bufsize)
    534         assert self._connector is not None
--> 535         conn = await self._connector.connect(
    536             req, traces=traces, timeout=real_timeout
    537         )
    538 except asyncio.TimeoutError as exc:

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:542, in BaseConnector.connect(self, req, traces, timeout)
    541 try:
--> 542     proto = await self._create_connection(req, traces, timeout)
    543     if self._closed:

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:907, in TCPConnector._create_connection(self, req, traces, timeout)
    906 else:
--> 907     _, proto = await self._create_direct_connection(req, traces, timeout)
    909 return proto

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:1175, in TCPConnector._create_direct_connection(self, req, traces, timeout, client_error)
   1174 try:
-> 1175     transp, proto = await self._wrap_create_connection(
   1176         self._factory,
   1177         host,
   1178         port,
   1179         timeout=timeout,
   1180         ssl=sslcontext,
   1181         family=hinfo["family"],
   1182         proto=hinfo["proto"],
   1183         flags=hinfo["flags"],
   1184         server_hostname=hinfo["hostname"] if sslcontext else None,
   1185         local_addr=self._local_addr,
   1186         req=req,
   1187         client_error=client_error,
   1188     )
   1189 except ClientConnectorError as exc:

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:986, in TCPConnector._wrap_create_connection(self, req, timeout, client_error, *args, **kwargs)
    985     async with ceil_timeout(timeout.sock_connect):
--> 986         return await self._loop.create_connection(*args, **kwargs)  # type: ignore[return-value]  # noqa
    987 except cert_errors as exc:

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/async_timeout/__init__.py:129, in Timeout.__aexit__(self, exc_type, exc_val, exc_tb)
    123 async def __aexit__(
    124     self,
    125     exc_type: Optional[Type[BaseException]],
    126     exc_val: Optional[BaseException],
    127     exc_tb: Optional[TracebackType],
    128 ) -> Optional[bool]:
--> 129     self._do_exit(exc_type)
    130     return None

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/async_timeout/__init__.py:212, in Timeout._do_exit(self, exc_type)
    211     self._timeout_handler = None
--> 212     raise asyncio.TimeoutError
    213 # timeout has not expired

TimeoutError:

The above exception was the direct cause of the following exception:

ServerTimeoutError                        Traceback (most recent call last)
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/httpsession.py:178, in AIOHTTPSession.send(self, request)
    177 url = URL(url, encoded=True)
--> 178 response = await self._session.request(
    179     request.method, url=url, headers=headers_, data=data, proxy=proxy_url,
    180     proxy_headers=proxy_headers
    181 )
    183 http_response = aiobotocore.awsrequest.AioAWSResponse(
    184     str(response.url),
    185     response.status,
    186     response.headers,
    187     response
    188 )

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/client.py:539, in ClientSession._request(self, method, str_or_url, params, data, json, cookies, headers, skip_auto_headers, auth, allow_redirects, max_redirects, compress, chunked, expect100, raise_for_status, read_until_eof, proxy, proxy_auth, timeout, verify_ssl, fingerprint, ssl_context, ssl, proxy_headers, trace_request_ctx, read_bufsize)
    538 except asyncio.TimeoutError as exc:
--> 539     raise ServerTimeoutError(
    540         "Connection timeout " "to host {}".format(url)
    541     ) from exc
    543 assert conn.transport is not None

ServerTimeoutError: Connection timeout to host http://169.254.169.254/latest/api/token

During handling of the above exception, another exception occurred:

ConnectTimeoutError                       Traceback (most recent call last)
Input In [1], in <cell line: 1>()
----> 1 import modin.pandas as pd; pd.read_csv("s3://amazon-reviews-pds/tsv/amazon_reviews_us_Wireless_v1_00.tsv.gz",compression='gzip',header=0,sep="\t")

File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
     50 """
     51 Compute function with logging if Modin logging is enabled.
     52
   (...)
     62 Any
     63 """
     64 if LogMode.get() == "disable":
---> 65     return f(*args, **kwargs)
     67 logger = get_logger()
     68 try:

File ~/software_sources/modin/modin/pandas/io.py:140, in read_csv(filepath_or_buffer, sep, delimiter, header, names, index_col, usecols, squeeze, prefix, mangle_dupe_cols, dtype, engine, converters, true_values, false_values, skipinitialspace, skiprows, nrows, na_values, keep_default_na, na_filter, verbose, skip_blank_lines, parse_dates, infer_datetime_format, keep_date_col, date_parser, dayfirst, cache_dates, iterator, chunksize, compression, thousands, decimal, lineterminator, quotechar, quoting, escapechar, comment, encoding, encoding_errors, dialect, error_bad_lines, warn_bad_lines, on_bad_lines, skipfooter, doublequote, delim_whitespace, low_memory, memory_map, float_precision, storage_options)
    138 _, _, _, f_locals = inspect.getargvalues(inspect.currentframe())
    139 kwargs = {k: v for k, v in f_locals.items() if k in _pd_read_csv_signature}
--> 140 return _read(**kwargs)

File ~/software_sources/modin/modin/pandas/io.py:61, in _read(**kwargs)
     58 from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher
     60 squeeze = kwargs.pop("squeeze", False)
---> 61 pd_obj = FactoryDispatcher.read_csv(**kwargs)
     62 # This happens when `read_csv` returns a TextFileReader object for iterating through
     63 if isinstance(pd_obj, pandas.io.parsers.TextFileReader):

File ~/software_sources/modin/modin/core/execution/dispatching/factories/dispatcher.py:185, in FactoryDispatcher.read_csv(cls, **kwargs)
    182 @classmethod
    183 @_inherit_docstrings(factories.BaseFactory._read_csv)
    184 def read_csv(cls, **kwargs):
--> 185     return cls.__factory._read_csv(**kwargs)

File ~/software_sources/modin/modin/core/execution/dispatching/factories/factories.py:217, in BaseFactory._read_csv(cls, **kwargs)
    209 @classmethod
    210 @doc(
    211     _doc_io_method_template,
   (...)
    215 )
    216 def _read_csv(cls, **kwargs):
--> 217     return cls.io_cls.read_csv(**kwargs)

File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
     50 """
     51 Compute function with logging if Modin logging is enabled.
     52
   (...)
     62 Any
     63 """
     64 if LogMode.get() == "disable":
---> 65     return f(*args, **kwargs)
     67 logger = get_logger()
     68 try:

File ~/software_sources/modin/modin/core/io/file_dispatcher.py:153, in FileDispatcher.read(cls, *args, **kwargs)
    129 @classmethod
    130 @logger_decorator("PANDAS-API", "FileDispatcher.read", "INFO")
    131 def read(cls, *args, **kwargs):
    132     """
    133     Read data according passed `args` and `kwargs`.
    134
   (...)
    151     postprocessing work on the resulting query_compiler object.
    152     """
--> 153     query_compiler = cls._read(*args, **kwargs)
    154     # TODO (devin-petersohn): Make this section more general for non-pandas kernel
    155     # implementations.
    156     if StorageFormat.get() == "Pandas":

File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
     50 """
     51 Compute function with logging if Modin logging is enabled.
     52
   (...)
     62 Any
     63 """
     64 if LogMode.get() == "disable":
---> 65     return f(*args, **kwargs)
     67 logger = get_logger()
     68 try:

File ~/software_sources/modin/modin/core/io/text/text_file_dispatcher.py:1027, in TextFileDispatcher._read(cls, filepath_or_buffer, **kwargs)
   1021 # In these cases we should pass additional metadata
   1022 # to the workers to match pandas output
   1023 pass_names = names in [None, lib.no_default] and (
   1024     skiprows is not None or kwargs["skipfooter"] != 0
   1025 )
-> 1027 pd_df_metadata = cls.read_callback(
   1028     filepath_or_buffer,
   1029     **dict(kwargs, nrows=1, skipfooter=0, index_col=index_col),
   1030 )
   1031 column_names = pd_df_metadata.columns
   1032 column_widths, num_splits = cls._define_metadata(pd_df_metadata, column_names)

File ~/software_sources/modin/modin/core/io/text/csv_dispatcher.py:40, in CSVDispatcher.read_callback(*args, **kwargs)
     24 def read_callback(*args, **kwargs):
     25     """
     26     Parse data on each partition.
     27
   (...)
     38         Function call result.
     39     """
---> 40     return pandas.read_csv(*args, **kwargs)

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/pandas/util/_decorators.py:311, in deprecate_nonkeyword_arguments.<locals>.decorate.<locals>.wrapper(*args, **kwargs)
    305 if len(args) > num_allow_args:
    306     warnings.warn(
    307         msg.format(arguments=arguments),
    308         FutureWarning,
    309         stacklevel=stacklevel,
    310     )
--> 311 return func(*args, **kwargs)

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/pandas/io/parsers/readers.py:680, in read_csv(filepath_or_buffer, sep, delimiter, header, names, index_col, usecols, squeeze, prefix, mangle_dupe_cols, dtype, engine, converters, true_values, false_values, skipinitialspace, skiprows, skipfooter, nrows, na_values, keep_default_na, na_filter, verbose, skip_blank_lines, parse_dates, infer_datetime_format, keep_date_col, date_parser, dayfirst, cache_dates, iterator, chunksize, compression, thousands, decimal, lineterminator, quotechar, quoting, doublequote, escapechar, comment, encoding, encoding_errors, dialect, error_bad_lines, warn_bad_lines, on_bad_lines, delim_whitespace, low_memory, memory_map, float_precision, storage_options)
    665 kwds_defaults = _refine_defaults_read(
    666     dialect,
    667     delimiter,
   (...)
    676     defaults={"delimiter": ","},
    677 )
    678 kwds.update(kwds_defaults)
--> 680 return _read(filepath_or_buffer, kwds)

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/pandas/io/parsers/readers.py:575, in _read(filepath_or_buffer, kwds)
    572 _validate_names(kwds.get("names", None))
    574 # Create the parser.
--> 575 parser = TextFileReader(filepath_or_buffer, **kwds)
    577 if chunksize or iterator:
    578     return parser

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/pandas/io/parsers/readers.py:933, in TextFileReader.__init__(self, f, engine, **kwds)
    930     self.options["has_index_names"] = kwds["has_index_names"]
    932 self.handles: IOHandles | None = None
--> 933 self._engine = self._make_engine(f, self.engine)

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/pandas/io/parsers/readers.py:1217, in TextFileReader._make_engine(self, f, engine)
   1213     mode = "rb"
   1214 # error: No overload variant of "get_handle" matches argument types
   1215 # "Union[str, PathLike[str], ReadCsvBuffer[bytes], ReadCsvBuffer[str]]"
   1216 # , "str", "bool", "Any", "Any", "Any", "Any", "Any"
-> 1217 self.handles = get_handle(  # type: ignore[call-overload]
   1218     f,
   1219     mode,
   1220     encoding=self.options.get("encoding", None),
   1221     compression=self.options.get("compression", None),
   1222     memory_map=self.options.get("memory_map", False),
   1223     is_text=is_text,
   1224     errors=self.options.get("encoding_errors", "strict"),
   1225     storage_options=self.options.get("storage_options", None),
   1226 )
   1227 assert self.handles is not None
   1228 f = self.handles.handle

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/pandas/io/common.py:670, in get_handle(path_or_buf, mode, encoding, compression, memory_map, is_text, errors, storage_options)
    667     codecs.lookup_error(errors)
    669 # open URLs
--> 670 ioargs = _get_filepath_or_buffer(
    671     path_or_buf,
    672     encoding=encoding,
    673     compression=compression,
    674     mode=mode,
    675     storage_options=storage_options,
    676 )
    678 handle = ioargs.filepath_or_buffer
    679 handles: list[BaseBuffer]

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/pandas/io/common.py:385, in _get_filepath_or_buffer(filepath_or_buffer, encoding, compression, mode, storage_options)
    382     pass
    384 try:
--> 385     file_obj = fsspec.open(
    386         filepath_or_buffer, mode=fsspec_mode, **(storage_options or {})
    387     ).open()
    388 # GH 34626 Reads from Public Buckets without Credentials needs anon=True
    389 except tuple(err_types_to_retry_with_anon):

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/core.py:141, in OpenFile.open(self)
    133 def open(self):
    134     """Materialise this as a real open file without context
    135
    136     The file should be explicitly closed to avoid enclosed file
   (...)
    139     been deleted; but a with-context is better style.
    140     """
--> 141     out = self.__enter__()
    142     closer = out.close
    143     fobjects = self.fobjects.copy()[:-1]

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/core.py:104, in OpenFile.__enter__(self)
    101 def __enter__(self):
    102     mode = self.mode.replace("t", "").replace("b", "") + "b"
--> 104     f = self.fs.open(self.path, mode=mode)
    106     self.fobjects = [f]
    108     if self.compression is not None:

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/spec.py:1037, in AbstractFileSystem.open(self, path, mode, block_size, cache_options, compression, **kwargs)
   1035 else:
   1036     ac = kwargs.pop("autocommit", not self._intrans)
-> 1037     f = self._open(
   1038         path,
   1039         mode=mode,
   1040         block_size=block_size,
   1041         autocommit=ac,
   1042         cache_options=cache_options,
   1043         **kwargs,
   1044     )
   1045     if compression is not None:
   1046         from fsspec.compression import compr

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:605, in S3FileSystem._open(self, path, mode, block_size, acl, version_id, fill_cache, cache_type, autocommit, requester_pays, cache_options, **kwargs)
    602 if cache_type is None:
    603     cache_type = self.default_cache_type
--> 605 return S3File(
    606     self,
    607     path,
    608     mode,
    609     block_size=block_size,
    610     acl=acl,
    611     version_id=version_id,
    612     fill_cache=fill_cache,
    613     s3_additional_kwargs=kw,
    614     cache_type=cache_type,
    615     autocommit=autocommit,
    616     requester_pays=requester_pays,
    617     cache_options=cache_options,
    618 )

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:1911, in S3File.__init__(self, s3, path, mode, block_size, acl, version_id, fill_cache, s3_additional_kwargs, autocommit, cache_type, requester_pays, cache_options)
   1909         self.details = s3.info(path)
   1910         self.version_id = self.details.get("VersionId")
-> 1911 super().__init__(
   1912     s3,
   1913     path,
   1914     mode,
   1915     block_size,
   1916     autocommit=autocommit,
   1917     cache_type=cache_type,
   1918     cache_options=cache_options,
   1919 )
   1920 self.s3 = self.fs  # compatibility
   1922 # when not using autocommit we want to have transactional state to manage

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/spec.py:1385, in AbstractBufferedFile.__init__(self, fs, path, mode, block_size, autocommit, cache_type, cache_options, size, **kwargs)
   1383         self.size = size
   1384     else:
-> 1385         self.size = self.details["size"]
   1386     self.cache = caches[cache_type](
   1387         self.blocksize, self._fetch_range, self.size, **cache_options
   1388     )
   1389 else:

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/spec.py:1398, in AbstractBufferedFile.details(self)
   1395 @property
   1396 def details(self):
   1397     if self._details is None:
-> 1398         self._details = self.fs.info(self.path)
   1399     return self._details

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/asyn.py:86, in sync_wrapper.<locals>.wrapper(*args, **kwargs)
     83 @functools.wraps(func)
     84 def wrapper(*args, **kwargs):
     85     self = obj or args[0]
---> 86     return sync(self.loop, func, *args, **kwargs)

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/asyn.py:66, in sync(loop, func, timeout, *args, **kwargs)
     64     raise FSTimeoutError from return_result
     65 elif isinstance(return_result, BaseException):
---> 66     raise return_result
     67 else:
     68     return return_result

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/asyn.py:26, in _runner(event, coro, result, timeout)
     24     coro = asyncio.wait_for(coro, timeout=timeout)
     25 try:
---> 26     result[0] = await coro
     27 except Exception as ex:
     28     result[0] = ex

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:1140, in S3FileSystem._info(self, path, bucket, key, refresh, version_id)
   1138 if key:
   1139     try:
-> 1140         out = await self._call_s3(
   1141             "head_object",
   1142             self.kwargs,
   1143             Bucket=bucket,
   1144             Key=key,
   1145             **version_id_kw(version_id),
   1146             **self.req_kw,
   1147         )
   1148         return {
   1149             "ETag": out.get("ETag", ""),
   1150             "LastModified": out["LastModified"],
   (...)
   1156             "ContentType": out.get("ContentType"),
   1157         }
   1158     except FileNotFoundError:

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:325, in S3FileSystem._call_s3(self, method, *akwarglist, **kwargs)
    324 async def _call_s3(self, method, *akwarglist, **kwargs):
--> 325     await self.set_session()
    326     s3 = await self.get_s3(kwargs.get("Bucket"))
    327     method = getattr(s3, method)

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:473, in S3FileSystem.set_session(self, refresh, kwargs)
    469 else:
    470     s3creator = self.session.create_client(
    471         "s3", config=conf, **init_kwargs, **client_kwargs
    472     )
--> 473     self._s3 = await s3creator.__aenter__()
    475 self._s3creator = s3creator
    476 # the following actually closes the aiohttp connection; use of privates
    477 # might break in the future, would cause exception at gc time

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/session.py:22, in ClientCreatorContext.__aenter__(self)
     21 async def __aenter__(self) -> AioBaseClient:
---> 22     self._client = await self._coro
     23     return await self._client.__aenter__()

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/session.py:102, in AioSession._create_client(self, service_name, region_name, api_version, use_ssl, verify, endpoint_url, aws_access_key_id, aws_secret_access_key, aws_session_token, config)
     97     raise PartialCredentialsError(
     98         provider='explicit',
     99         cred_var=self._missing_cred_vars(aws_access_key_id,
    100                                          aws_secret_access_key))
    101 else:
--> 102     credentials = await self.get_credentials()
    103 endpoint_resolver = self._get_internal_component('endpoint_resolver')
    104 exceptions_factory = self._get_internal_component('exceptions_factory')

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/session.py:133, in AioSession.get_credentials(self)
    131 async def get_credentials(self):
    132     if self._credentials is None:
--> 133         self._credentials = await (self._components.get_component(
    134             'credential_provider').load_credentials())
    135     return self._credentials

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/credentials.py:814, in AioCredentialResolver.load_credentials(self)
    812 for provider in self.providers:
    813     logger.debug("Looking for credentials via: %s", provider.METHOD)
--> 814     creds = await provider.load()
    815     if creds is not None:
    816         return creds

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/credentials.py:486, in AioInstanceMetadataProvider.load(self)
    484 async def load(self):
    485     fetcher = self._role_fetcher
--> 486     metadata = await fetcher.retrieve_iam_role_credentials()
    487     if not metadata:
    488         return None

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/utils.py:175, in AioInstanceMetadataFetcher.retrieve_iam_role_credentials(self)
    173 async def retrieve_iam_role_credentials(self):
    174     try:
--> 175         token = await self._fetch_metadata_token()
    176         role_name = await self._get_iam_role(token)
    177         credentials = await self._get_credentials(role_name, token)

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/utils.py:88, in AioIMDSFetcher._fetch_metadata_token(self)
     86 for i in range(self._num_attempts):
     87     try:
---> 88         response = await session.send(request.prepare())
     89         if response.status_code == 200:
     90             return await response.text

File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/httpsession.py:210, in AIOHTTPSession.send(self, request)
    208 except ServerTimeoutError as e:
    209     if str(e).lower().startswith('connect'):
--> 210         raise ConnectTimeoutError(endpoint_url=request.url, error=e)
    211     else:
    212         raise ReadTimeoutError(endpoint_url=request.url, error=e)

ConnectTimeoutError: Connect timeout on endpoint URL: "http://169.254.169.254/latest/api/token"

mvashishtha avatar Jun 10 '22 12:06 mvashishtha

@alejandro-ponder never mind, that seems to be some kind of network error on my machine. Even pandas read import pandas as pd; pd.read_csv("s3://nyc-tlc/trip data/yellow_tripdata_2009-01.csv", nrows=10) works on another machine but not on mine. Could you please try the s3:// path instead of http?

mvashishtha avatar Jun 10 '22 13:06 mvashishtha

Tried changing path to s3:// and the read takes about 7min 13 seconds, while it takes pandas about 3min 35 seconds for the same workload.

Note, i'm also using error_bad_lines=False argument. not sure if that could be affecting anything?

alejandro-ponder avatar Jun 10 '22 14:06 alejandro-ponder

@alejandro-ponder is this in a fresh environment? Your machine may not have enough memory to do both Modin and pandas in the same notebook/interpreter environment.

devin-petersohn avatar Jun 10 '22 14:06 devin-petersohn

it should be. I restarted the kernel between the two runs

alejandro-ponder avatar Jun 10 '22 14:06 alejandro-ponder

@prutskov this issue should be handled now, right?

pyrito avatar Aug 04 '22 16:08 pyrito

Yes, now Modin handles https-like addresses. But I would prefer to run the reproducer before closing the issue.

prutskov avatar Aug 04 '22 16:08 prutskov

@alejandro-ponder could you please re-check if this is still happening?

vnlitvinov avatar Sep 09 '22 15:09 vnlitvinov