modin
modin copied to clipboard
Read compressed data from s3 in parallel
Is your feature request related to a problem? Please describe. A clear and concise description of what the problem is. What kind of performance improvements would you like to see with this new API?
If I try to read compressed data (in my case gzip) from s3, modin doesn't read in parallel.
Can reproduce with the following:
pd.read_csv("https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Wireless_v1_00.tsv.gz",compression='gzip',header=0,sep="\t")
@alejandro-ponder thank you for reporting this issue! Modin is actually defaulting to pandas for a different reason: it thinks that the file doesn't exist. pandas can (start to-- I haven't been able to finish after several minutes) read the file with the HTTPS URL, but Modin doesn't allow that.
We are tracking support for reading data from an HTTPS URL in #3170.
But even when I change your path to the s3:// format, I get a different error ending in ConnectTimeoutError: Connect timeout on endpoint URL: "http://169.254.169.254/latest/api/token" (shown below). The error originates here in FileDispatcher.file_exists. We need to investigate that. When I catch the ConnectTimeoutError there, I get another error when we actually open the file (second sack trace below) also ending in ConnectTimeoutError: Connect timeout on endpoint URL: "http://169.254.169.254/latest/api/token".
`ConnectTimeoutError` stack trace for reading from `s3://amazon-reviews-pds/tsv/amazon_reviews_us_Wireless_v1_00.tsv.gz`
---------------------------------------------------------------------------
CancelledError Traceback (most recent call last)
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:986, in TCPConnector._wrap_create_connection(self, req, timeout, client_error, *args, **kwargs)
985 async with ceil_timeout(timeout.sock_connect):
--> 986 return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa
987 except cert_errors as exc:
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/asyncio/base_events.py:1050, in BaseEventLoop.create_connection(self, protocol_factory, host, port, ssl, family, proto, flags, sock, local_addr, server_hostname, ssl_handshake_timeout, happy_eyeballs_delay, interleave)
1049 try:
-> 1050 sock = await self._connect_sock(
1051 exceptions, addrinfo, laddr_infos)
1052 break
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/asyncio/base_events.py:961, in BaseEventLoop._connect_sock(self, exceptions, addr_info, local_addr_infos)
960 raise my_exceptions.pop()
--> 961 await self.sock_connect(sock, address)
962 return sock
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/asyncio/selector_events.py:499, in BaseSelectorEventLoop.sock_connect(self, sock, address)
498 self._sock_connect(fut, sock, address)
--> 499 return await fut
CancelledError:
During handling of the above exception, another exception occurred:
TimeoutError Traceback (most recent call last)
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/client.py:535, in ClientSession._request(self, method, str_or_url, params, data, json, cookies, headers, skip_auto_headers, auth, allow_redirects, max_redirects, compress, chunked, expect100, raise_for_status, read_until_eof, proxy, proxy_auth, timeout, verify_ssl, fingerprint, ssl_context, ssl, proxy_headers, trace_request_ctx, read_bufsize)
534 assert self._connector is not None
--> 535 conn = await self._connector.connect(
536 req, traces=traces, timeout=real_timeout
537 )
538 except asyncio.TimeoutError as exc:
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:542, in BaseConnector.connect(self, req, traces, timeout)
541 try:
--> 542 proto = await self._create_connection(req, traces, timeout)
543 if self._closed:
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:907, in TCPConnector._create_connection(self, req, traces, timeout)
906 else:
--> 907 _, proto = await self._create_direct_connection(req, traces, timeout)
909 return proto
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:1175, in TCPConnector._create_direct_connection(self, req, traces, timeout, client_error)
1174 try:
-> 1175 transp, proto = await self._wrap_create_connection(
1176 self._factory,
1177 host,
1178 port,
1179 timeout=timeout,
1180 ssl=sslcontext,
1181 family=hinfo["family"],
1182 proto=hinfo["proto"],
1183 flags=hinfo["flags"],
1184 server_hostname=hinfo["hostname"] if sslcontext else None,
1185 local_addr=self._local_addr,
1186 req=req,
1187 client_error=client_error,
1188 )
1189 except ClientConnectorError as exc:
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:986, in TCPConnector._wrap_create_connection(self, req, timeout, client_error, *args, **kwargs)
985 async with ceil_timeout(timeout.sock_connect):
--> 986 return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa
987 except cert_errors as exc:
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/async_timeout/__init__.py:129, in Timeout.__aexit__(self, exc_type, exc_val, exc_tb)
123 async def __aexit__(
124 self,
125 exc_type: Optional[Type[BaseException]],
126 exc_val: Optional[BaseException],
127 exc_tb: Optional[TracebackType],
128 ) -> Optional[bool]:
--> 129 self._do_exit(exc_type)
130 return None
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/async_timeout/__init__.py:212, in Timeout._do_exit(self, exc_type)
211 self._timeout_handler = None
--> 212 raise asyncio.TimeoutError
213 # timeout has not expired
TimeoutError:
The above exception was the direct cause of the following exception:
ServerTimeoutError Traceback (most recent call last)
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/httpsession.py:178, in AIOHTTPSession.send(self, request)
177 url = URL(url, encoded=True)
--> 178 response = await self._session.request(
179 request.method, url=url, headers=headers_, data=data, proxy=proxy_url,
180 proxy_headers=proxy_headers
181 )
183 http_response = aiobotocore.awsrequest.AioAWSResponse(
184 str(response.url),
185 response.status,
186 response.headers,
187 response
188 )
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/client.py:539, in ClientSession._request(self, method, str_or_url, params, data, json, cookies, headers, skip_auto_headers, auth, allow_redirects, max_redirects, compress, chunked, expect100, raise_for_status, read_until_eof, proxy, proxy_auth, timeout, verify_ssl, fingerprint, ssl_context, ssl, proxy_headers, trace_request_ctx, read_bufsize)
538 except asyncio.TimeoutError as exc:
--> 539 raise ServerTimeoutError(
540 "Connection timeout " "to host {}".format(url)
541 ) from exc
543 assert conn.transport is not None
ServerTimeoutError: Connection timeout to host http://169.254.169.254/latest/api/token
During handling of the above exception, another exception occurred:
ConnectTimeoutError Traceback (most recent call last)
Input In [1], in <cell line: 1>()
----> 1 import modin.pandas as pd; pd.read_csv("s3://amazon-reviews-pds/tsv/amazon_reviews_us_Wireless_v1_00.tsv.gz",compression='gzip',header=0,sep="\t")
File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
50 """
51 Compute function with logging if Modin logging is enabled.
52
(...)
62 Any
63 """
64 if LogMode.get() == "disable":
---> 65 return f(*args, **kwargs)
67 logger = get_logger()
68 try:
File ~/software_sources/modin/modin/pandas/io.py:140, in read_csv(filepath_or_buffer, sep, delimiter, header, names, index_col, usecols, squeeze, prefix, mangle_dupe_cols, dtype, engine, converters, true_values, false_values, skipinitialspace, skiprows, nrows, na_values, keep_default_na, na_filter, verbose, skip_blank_lines, parse_dates, infer_datetime_format, keep_date_col, date_parser, dayfirst, cache_dates, iterator, chunksize, compression, thousands, decimal, lineterminator, quotechar, quoting, escapechar, comment, encoding, encoding_errors, dialect, error_bad_lines, warn_bad_lines, on_bad_lines, skipfooter, doublequote, delim_whitespace, low_memory, memory_map, float_precision, storage_options)
138 _, _, _, f_locals = inspect.getargvalues(inspect.currentframe())
139 kwargs = {k: v for k, v in f_locals.items() if k in _pd_read_csv_signature}
--> 140 return _read(**kwargs)
File ~/software_sources/modin/modin/pandas/io.py:61, in _read(**kwargs)
58 from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher
60 squeeze = kwargs.pop("squeeze", False)
---> 61 pd_obj = FactoryDispatcher.read_csv(**kwargs)
62 # This happens when `read_csv` returns a TextFileReader object for iterating through
63 if isinstance(pd_obj, pandas.io.parsers.TextFileReader):
File ~/software_sources/modin/modin/core/execution/dispatching/factories/dispatcher.py:185, in FactoryDispatcher.read_csv(cls, **kwargs)
182 @classmethod
183 @_inherit_docstrings(factories.BaseFactory._read_csv)
184 def read_csv(cls, **kwargs):
--> 185 return cls.__factory._read_csv(**kwargs)
File ~/software_sources/modin/modin/core/execution/dispatching/factories/factories.py:217, in BaseFactory._read_csv(cls, **kwargs)
209 @classmethod
210 @doc(
211 _doc_io_method_template,
(...)
215 )
216 def _read_csv(cls, **kwargs):
--> 217 return cls.io_cls.read_csv(**kwargs)
File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
50 """
51 Compute function with logging if Modin logging is enabled.
52
(...)
62 Any
63 """
64 if LogMode.get() == "disable":
---> 65 return f(*args, **kwargs)
67 logger = get_logger()
68 try:
File ~/software_sources/modin/modin/core/io/file_dispatcher.py:153, in FileDispatcher.read(cls, *args, **kwargs)
129 @classmethod
130 @logger_decorator("PANDAS-API", "FileDispatcher.read", "INFO")
131 def read(cls, *args, **kwargs):
132 """
133 Read data according passed `args` and `kwargs`.
134
(...)
151 postprocessing work on the resulting query_compiler object.
152 """
--> 153 query_compiler = cls._read(*args, **kwargs)
154 # TODO (devin-petersohn): Make this section more general for non-pandas kernel
155 # implementations.
156 if StorageFormat.get() == "Pandas":
File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
50 """
51 Compute function with logging if Modin logging is enabled.
52
(...)
62 Any
63 """
64 if LogMode.get() == "disable":
---> 65 return f(*args, **kwargs)
67 logger = get_logger()
68 try:
File ~/software_sources/modin/modin/core/io/text/text_file_dispatcher.py:1009, in TextFileDispatcher._read(cls, filepath_or_buffer, **kwargs)
1000 (
1001 skiprows_md,
1002 pre_reading,
1003 skiprows_partitioning,
1004 ) = cls._manage_skiprows_parameter(skiprows, header_size)
1005 should_handle_skiprows = skiprows_md is not None and not isinstance(
1006 skiprows_md, int
1007 )
-> 1009 use_modin_impl = cls.check_parameters_support(
1010 filepath_or_buffer,
1011 kwargs,
1012 skiprows_md,
1013 header_size,
1014 )
1015 if not use_modin_impl:
1016 return cls.single_worker_read(
1017 filepath_or_buffer, callback=cls.read_callback, **kwargs
1018 )
File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
50 """
51 Compute function with logging if Modin logging is enabled.
52
(...)
62 Any
63 """
64 if LogMode.get() == "disable":
---> 65 return f(*args, **kwargs)
67 logger = get_logger()
68 try:
File ~/software_sources/modin/modin/core/io/text/text_file_dispatcher.py:656, in TextFileDispatcher.check_parameters_support(cls, filepath_or_buffer, read_kwargs, skiprows_md, header_size)
654 skiprows = read_kwargs.get("skiprows")
655 if isinstance(filepath_or_buffer, str):
--> 656 if not cls.file_exists(filepath_or_buffer):
657 return False
658 elif not cls.pathlib_or_pypath(filepath_or_buffer):
File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
50 """
51 Compute function with logging if Modin logging is enabled.
52
(...)
62 Any
63 """
64 if LogMode.get() == "disable":
---> 65 return f(*args, **kwargs)
67 logger = get_logger()
68 try:
File ~/software_sources/modin/modin/core/io/file_dispatcher.py:271, in FileDispatcher.file_exists(cls, file_path)
269 exists = False
270 try:
--> 271 exists = s3fs.exists(file_path) or exists
272 except NoCredentialsError:
273 pass
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/asyn.py:86, in sync_wrapper.<locals>.wrapper(*args, **kwargs)
83 @functools.wraps(func)
84 def wrapper(*args, **kwargs):
85 self = obj or args[0]
---> 86 return sync(self.loop, func, *args, **kwargs)
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/asyn.py:66, in sync(loop, func, timeout, *args, **kwargs)
64 raise FSTimeoutError from return_result
65 elif isinstance(return_result, BaseException):
---> 66 raise return_result
67 else:
68 return return_result
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/asyn.py:26, in _runner(event, coro, result, timeout)
24 coro = asyncio.wait_for(coro, timeout=timeout)
25 try:
---> 26 result[0] = await coro
27 except Exception as ex:
28 result[0] = ex
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:888, in S3FileSystem._exists(self, path)
886 return False
887 try:
--> 888 await self._info(path, bucket, key, version_id=version_id)
889 return True
890 except FileNotFoundError:
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:1140, in S3FileSystem._info(self, path, bucket, key, refresh, version_id)
1138 if key:
1139 try:
-> 1140 out = await self._call_s3(
1141 "head_object",
1142 self.kwargs,
1143 Bucket=bucket,
1144 Key=key,
1145 **version_id_kw(version_id),
1146 **self.req_kw,
1147 )
1148 return {
1149 "ETag": out.get("ETag", ""),
1150 "LastModified": out["LastModified"],
(...)
1156 "ContentType": out.get("ContentType"),
1157 }
1158 except FileNotFoundError:
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:325, in S3FileSystem._call_s3(self, method, *akwarglist, **kwargs)
324 async def _call_s3(self, method, *akwarglist, **kwargs):
--> 325 await self.set_session()
326 s3 = await self.get_s3(kwargs.get("Bucket"))
327 method = getattr(s3, method)
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:473, in S3FileSystem.set_session(self, refresh, kwargs)
469 else:
470 s3creator = self.session.create_client(
471 "s3", config=conf, **init_kwargs, **client_kwargs
472 )
--> 473 self._s3 = await s3creator.__aenter__()
475 self._s3creator = s3creator
476 # the following actually closes the aiohttp connection; use of privates
477 # might break in the future, would cause exception at gc time
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/session.py:22, in ClientCreatorContext.__aenter__(self)
21 async def __aenter__(self) -> AioBaseClient:
---> 22 self._client = await self._coro
23 return await self._client.__aenter__()
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/session.py:102, in AioSession._create_client(self, service_name, region_name, api_version, use_ssl, verify, endpoint_url, aws_access_key_id, aws_secret_access_key, aws_session_token, config)
97 raise PartialCredentialsError(
98 provider='explicit',
99 cred_var=self._missing_cred_vars(aws_access_key_id,
100 aws_secret_access_key))
101 else:
--> 102 credentials = await self.get_credentials()
103 endpoint_resolver = self._get_internal_component('endpoint_resolver')
104 exceptions_factory = self._get_internal_component('exceptions_factory')
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/session.py:133, in AioSession.get_credentials(self)
131 async def get_credentials(self):
132 if self._credentials is None:
--> 133 self._credentials = await (self._components.get_component(
134 'credential_provider').load_credentials())
135 return self._credentials
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/credentials.py:814, in AioCredentialResolver.load_credentials(self)
812 for provider in self.providers:
813 logger.debug("Looking for credentials via: %s", provider.METHOD)
--> 814 creds = await provider.load()
815 if creds is not None:
816 return creds
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/credentials.py:486, in AioInstanceMetadataProvider.load(self)
484 async def load(self):
485 fetcher = self._role_fetcher
--> 486 metadata = await fetcher.retrieve_iam_role_credentials()
487 if not metadata:
488 return None
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/utils.py:175, in AioInstanceMetadataFetcher.retrieve_iam_role_credentials(self)
173 async def retrieve_iam_role_credentials(self):
174 try:
--> 175 token = await self._fetch_metadata_token()
176 role_name = await self._get_iam_role(token)
177 credentials = await self._get_credentials(role_name, token)
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/utils.py:88, in AioIMDSFetcher._fetch_metadata_token(self)
86 for i in range(self._num_attempts):
87 try:
---> 88 response = await session.send(request.prepare())
89 if response.status_code == 200:
90 return await response.text
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/httpsession.py:210, in AIOHTTPSession.send(self, request)
208 except ServerTimeoutError as e:
209 if str(e).lower().startswith('connect'):
--> 210 raise ConnectTimeoutError(endpoint_url=request.url, error=e)
211 else:
212 raise ReadTimeoutError(endpoint_url=request.url, error=e)
ConnectTimeoutError: Connect timeout on endpoint URL: "http://169.254.169.254/latest/api/token"
Stack trace after catching ConnectionTimeoutError
---------------------------------------------------------------------------
CancelledError Traceback (most recent call last)
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:986, in TCPConnector._wrap_create_connection(self, req, timeout, client_error, *args, **kwargs)
985 async with ceil_timeout(timeout.sock_connect):
--> 986 return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa
987 except cert_errors as exc:
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/asyncio/base_events.py:1050, in BaseEventLoop.create_connection(self, protocol_factory, host, port, ssl, family, proto, flags, sock, local_addr, server_hostname, ssl_handshake_timeout, happy_eyeballs_delay, interleave)
1049 try:
-> 1050 sock = await self._connect_sock(
1051 exceptions, addrinfo, laddr_infos)
1052 break
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/asyncio/base_events.py:961, in BaseEventLoop._connect_sock(self, exceptions, addr_info, local_addr_infos)
960 raise my_exceptions.pop()
--> 961 await self.sock_connect(sock, address)
962 return sock
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/asyncio/selector_events.py:499, in BaseSelectorEventLoop.sock_connect(self, sock, address)
498 self._sock_connect(fut, sock, address)
--> 499 return await fut
CancelledError:
During handling of the above exception, another exception occurred:
TimeoutError Traceback (most recent call last)
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/client.py:535, in ClientSession._request(self, method, str_or_url, params, data, json, cookies, headers, skip_auto_headers, auth, allow_redirects, max_redirects, compress, chunked, expect100, raise_for_status, read_until_eof, proxy, proxy_auth, timeout, verify_ssl, fingerprint, ssl_context, ssl, proxy_headers, trace_request_ctx, read_bufsize)
534 assert self._connector is not None
--> 535 conn = await self._connector.connect(
536 req, traces=traces, timeout=real_timeout
537 )
538 except asyncio.TimeoutError as exc:
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:542, in BaseConnector.connect(self, req, traces, timeout)
541 try:
--> 542 proto = await self._create_connection(req, traces, timeout)
543 if self._closed:
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:907, in TCPConnector._create_connection(self, req, traces, timeout)
906 else:
--> 907 _, proto = await self._create_direct_connection(req, traces, timeout)
909 return proto
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:1175, in TCPConnector._create_direct_connection(self, req, traces, timeout, client_error)
1174 try:
-> 1175 transp, proto = await self._wrap_create_connection(
1176 self._factory,
1177 host,
1178 port,
1179 timeout=timeout,
1180 ssl=sslcontext,
1181 family=hinfo["family"],
1182 proto=hinfo["proto"],
1183 flags=hinfo["flags"],
1184 server_hostname=hinfo["hostname"] if sslcontext else None,
1185 local_addr=self._local_addr,
1186 req=req,
1187 client_error=client_error,
1188 )
1189 except ClientConnectorError as exc:
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/connector.py:986, in TCPConnector._wrap_create_connection(self, req, timeout, client_error, *args, **kwargs)
985 async with ceil_timeout(timeout.sock_connect):
--> 986 return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa
987 except cert_errors as exc:
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/async_timeout/__init__.py:129, in Timeout.__aexit__(self, exc_type, exc_val, exc_tb)
123 async def __aexit__(
124 self,
125 exc_type: Optional[Type[BaseException]],
126 exc_val: Optional[BaseException],
127 exc_tb: Optional[TracebackType],
128 ) -> Optional[bool]:
--> 129 self._do_exit(exc_type)
130 return None
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/async_timeout/__init__.py:212, in Timeout._do_exit(self, exc_type)
211 self._timeout_handler = None
--> 212 raise asyncio.TimeoutError
213 # timeout has not expired
TimeoutError:
The above exception was the direct cause of the following exception:
ServerTimeoutError Traceback (most recent call last)
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/httpsession.py:178, in AIOHTTPSession.send(self, request)
177 url = URL(url, encoded=True)
--> 178 response = await self._session.request(
179 request.method, url=url, headers=headers_, data=data, proxy=proxy_url,
180 proxy_headers=proxy_headers
181 )
183 http_response = aiobotocore.awsrequest.AioAWSResponse(
184 str(response.url),
185 response.status,
186 response.headers,
187 response
188 )
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiohttp/client.py:539, in ClientSession._request(self, method, str_or_url, params, data, json, cookies, headers, skip_auto_headers, auth, allow_redirects, max_redirects, compress, chunked, expect100, raise_for_status, read_until_eof, proxy, proxy_auth, timeout, verify_ssl, fingerprint, ssl_context, ssl, proxy_headers, trace_request_ctx, read_bufsize)
538 except asyncio.TimeoutError as exc:
--> 539 raise ServerTimeoutError(
540 "Connection timeout " "to host {}".format(url)
541 ) from exc
543 assert conn.transport is not None
ServerTimeoutError: Connection timeout to host http://169.254.169.254/latest/api/token
During handling of the above exception, another exception occurred:
ConnectTimeoutError Traceback (most recent call last)
Input In [1], in <cell line: 1>()
----> 1 import modin.pandas as pd; pd.read_csv("s3://amazon-reviews-pds/tsv/amazon_reviews_us_Wireless_v1_00.tsv.gz",compression='gzip',header=0,sep="\t")
File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
50 """
51 Compute function with logging if Modin logging is enabled.
52
(...)
62 Any
63 """
64 if LogMode.get() == "disable":
---> 65 return f(*args, **kwargs)
67 logger = get_logger()
68 try:
File ~/software_sources/modin/modin/pandas/io.py:140, in read_csv(filepath_or_buffer, sep, delimiter, header, names, index_col, usecols, squeeze, prefix, mangle_dupe_cols, dtype, engine, converters, true_values, false_values, skipinitialspace, skiprows, nrows, na_values, keep_default_na, na_filter, verbose, skip_blank_lines, parse_dates, infer_datetime_format, keep_date_col, date_parser, dayfirst, cache_dates, iterator, chunksize, compression, thousands, decimal, lineterminator, quotechar, quoting, escapechar, comment, encoding, encoding_errors, dialect, error_bad_lines, warn_bad_lines, on_bad_lines, skipfooter, doublequote, delim_whitespace, low_memory, memory_map, float_precision, storage_options)
138 _, _, _, f_locals = inspect.getargvalues(inspect.currentframe())
139 kwargs = {k: v for k, v in f_locals.items() if k in _pd_read_csv_signature}
--> 140 return _read(**kwargs)
File ~/software_sources/modin/modin/pandas/io.py:61, in _read(**kwargs)
58 from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher
60 squeeze = kwargs.pop("squeeze", False)
---> 61 pd_obj = FactoryDispatcher.read_csv(**kwargs)
62 # This happens when `read_csv` returns a TextFileReader object for iterating through
63 if isinstance(pd_obj, pandas.io.parsers.TextFileReader):
File ~/software_sources/modin/modin/core/execution/dispatching/factories/dispatcher.py:185, in FactoryDispatcher.read_csv(cls, **kwargs)
182 @classmethod
183 @_inherit_docstrings(factories.BaseFactory._read_csv)
184 def read_csv(cls, **kwargs):
--> 185 return cls.__factory._read_csv(**kwargs)
File ~/software_sources/modin/modin/core/execution/dispatching/factories/factories.py:217, in BaseFactory._read_csv(cls, **kwargs)
209 @classmethod
210 @doc(
211 _doc_io_method_template,
(...)
215 )
216 def _read_csv(cls, **kwargs):
--> 217 return cls.io_cls.read_csv(**kwargs)
File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
50 """
51 Compute function with logging if Modin logging is enabled.
52
(...)
62 Any
63 """
64 if LogMode.get() == "disable":
---> 65 return f(*args, **kwargs)
67 logger = get_logger()
68 try:
File ~/software_sources/modin/modin/core/io/file_dispatcher.py:153, in FileDispatcher.read(cls, *args, **kwargs)
129 @classmethod
130 @logger_decorator("PANDAS-API", "FileDispatcher.read", "INFO")
131 def read(cls, *args, **kwargs):
132 """
133 Read data according passed `args` and `kwargs`.
134
(...)
151 postprocessing work on the resulting query_compiler object.
152 """
--> 153 query_compiler = cls._read(*args, **kwargs)
154 # TODO (devin-petersohn): Make this section more general for non-pandas kernel
155 # implementations.
156 if StorageFormat.get() == "Pandas":
File ~/software_sources/modin/modin/logging/logger_function.py:65, in logger_decorator.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
50 """
51 Compute function with logging if Modin logging is enabled.
52
(...)
62 Any
63 """
64 if LogMode.get() == "disable":
---> 65 return f(*args, **kwargs)
67 logger = get_logger()
68 try:
File ~/software_sources/modin/modin/core/io/text/text_file_dispatcher.py:1027, in TextFileDispatcher._read(cls, filepath_or_buffer, **kwargs)
1021 # In these cases we should pass additional metadata
1022 # to the workers to match pandas output
1023 pass_names = names in [None, lib.no_default] and (
1024 skiprows is not None or kwargs["skipfooter"] != 0
1025 )
-> 1027 pd_df_metadata = cls.read_callback(
1028 filepath_or_buffer,
1029 **dict(kwargs, nrows=1, skipfooter=0, index_col=index_col),
1030 )
1031 column_names = pd_df_metadata.columns
1032 column_widths, num_splits = cls._define_metadata(pd_df_metadata, column_names)
File ~/software_sources/modin/modin/core/io/text/csv_dispatcher.py:40, in CSVDispatcher.read_callback(*args, **kwargs)
24 def read_callback(*args, **kwargs):
25 """
26 Parse data on each partition.
27
(...)
38 Function call result.
39 """
---> 40 return pandas.read_csv(*args, **kwargs)
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/pandas/util/_decorators.py:311, in deprecate_nonkeyword_arguments.<locals>.decorate.<locals>.wrapper(*args, **kwargs)
305 if len(args) > num_allow_args:
306 warnings.warn(
307 msg.format(arguments=arguments),
308 FutureWarning,
309 stacklevel=stacklevel,
310 )
--> 311 return func(*args, **kwargs)
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/pandas/io/parsers/readers.py:680, in read_csv(filepath_or_buffer, sep, delimiter, header, names, index_col, usecols, squeeze, prefix, mangle_dupe_cols, dtype, engine, converters, true_values, false_values, skipinitialspace, skiprows, skipfooter, nrows, na_values, keep_default_na, na_filter, verbose, skip_blank_lines, parse_dates, infer_datetime_format, keep_date_col, date_parser, dayfirst, cache_dates, iterator, chunksize, compression, thousands, decimal, lineterminator, quotechar, quoting, doublequote, escapechar, comment, encoding, encoding_errors, dialect, error_bad_lines, warn_bad_lines, on_bad_lines, delim_whitespace, low_memory, memory_map, float_precision, storage_options)
665 kwds_defaults = _refine_defaults_read(
666 dialect,
667 delimiter,
(...)
676 defaults={"delimiter": ","},
677 )
678 kwds.update(kwds_defaults)
--> 680 return _read(filepath_or_buffer, kwds)
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/pandas/io/parsers/readers.py:575, in _read(filepath_or_buffer, kwds)
572 _validate_names(kwds.get("names", None))
574 # Create the parser.
--> 575 parser = TextFileReader(filepath_or_buffer, **kwds)
577 if chunksize or iterator:
578 return parser
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/pandas/io/parsers/readers.py:933, in TextFileReader.__init__(self, f, engine, **kwds)
930 self.options["has_index_names"] = kwds["has_index_names"]
932 self.handles: IOHandles | None = None
--> 933 self._engine = self._make_engine(f, self.engine)
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/pandas/io/parsers/readers.py:1217, in TextFileReader._make_engine(self, f, engine)
1213 mode = "rb"
1214 # error: No overload variant of "get_handle" matches argument types
1215 # "Union[str, PathLike[str], ReadCsvBuffer[bytes], ReadCsvBuffer[str]]"
1216 # , "str", "bool", "Any", "Any", "Any", "Any", "Any"
-> 1217 self.handles = get_handle( # type: ignore[call-overload]
1218 f,
1219 mode,
1220 encoding=self.options.get("encoding", None),
1221 compression=self.options.get("compression", None),
1222 memory_map=self.options.get("memory_map", False),
1223 is_text=is_text,
1224 errors=self.options.get("encoding_errors", "strict"),
1225 storage_options=self.options.get("storage_options", None),
1226 )
1227 assert self.handles is not None
1228 f = self.handles.handle
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/pandas/io/common.py:670, in get_handle(path_or_buf, mode, encoding, compression, memory_map, is_text, errors, storage_options)
667 codecs.lookup_error(errors)
669 # open URLs
--> 670 ioargs = _get_filepath_or_buffer(
671 path_or_buf,
672 encoding=encoding,
673 compression=compression,
674 mode=mode,
675 storage_options=storage_options,
676 )
678 handle = ioargs.filepath_or_buffer
679 handles: list[BaseBuffer]
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/pandas/io/common.py:385, in _get_filepath_or_buffer(filepath_or_buffer, encoding, compression, mode, storage_options)
382 pass
384 try:
--> 385 file_obj = fsspec.open(
386 filepath_or_buffer, mode=fsspec_mode, **(storage_options or {})
387 ).open()
388 # GH 34626 Reads from Public Buckets without Credentials needs anon=True
389 except tuple(err_types_to_retry_with_anon):
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/core.py:141, in OpenFile.open(self)
133 def open(self):
134 """Materialise this as a real open file without context
135
136 The file should be explicitly closed to avoid enclosed file
(...)
139 been deleted; but a with-context is better style.
140 """
--> 141 out = self.__enter__()
142 closer = out.close
143 fobjects = self.fobjects.copy()[:-1]
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/core.py:104, in OpenFile.__enter__(self)
101 def __enter__(self):
102 mode = self.mode.replace("t", "").replace("b", "") + "b"
--> 104 f = self.fs.open(self.path, mode=mode)
106 self.fobjects = [f]
108 if self.compression is not None:
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/spec.py:1037, in AbstractFileSystem.open(self, path, mode, block_size, cache_options, compression, **kwargs)
1035 else:
1036 ac = kwargs.pop("autocommit", not self._intrans)
-> 1037 f = self._open(
1038 path,
1039 mode=mode,
1040 block_size=block_size,
1041 autocommit=ac,
1042 cache_options=cache_options,
1043 **kwargs,
1044 )
1045 if compression is not None:
1046 from fsspec.compression import compr
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:605, in S3FileSystem._open(self, path, mode, block_size, acl, version_id, fill_cache, cache_type, autocommit, requester_pays, cache_options, **kwargs)
602 if cache_type is None:
603 cache_type = self.default_cache_type
--> 605 return S3File(
606 self,
607 path,
608 mode,
609 block_size=block_size,
610 acl=acl,
611 version_id=version_id,
612 fill_cache=fill_cache,
613 s3_additional_kwargs=kw,
614 cache_type=cache_type,
615 autocommit=autocommit,
616 requester_pays=requester_pays,
617 cache_options=cache_options,
618 )
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:1911, in S3File.__init__(self, s3, path, mode, block_size, acl, version_id, fill_cache, s3_additional_kwargs, autocommit, cache_type, requester_pays, cache_options)
1909 self.details = s3.info(path)
1910 self.version_id = self.details.get("VersionId")
-> 1911 super().__init__(
1912 s3,
1913 path,
1914 mode,
1915 block_size,
1916 autocommit=autocommit,
1917 cache_type=cache_type,
1918 cache_options=cache_options,
1919 )
1920 self.s3 = self.fs # compatibility
1922 # when not using autocommit we want to have transactional state to manage
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/spec.py:1385, in AbstractBufferedFile.__init__(self, fs, path, mode, block_size, autocommit, cache_type, cache_options, size, **kwargs)
1383 self.size = size
1384 else:
-> 1385 self.size = self.details["size"]
1386 self.cache = caches[cache_type](
1387 self.blocksize, self._fetch_range, self.size, **cache_options
1388 )
1389 else:
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/spec.py:1398, in AbstractBufferedFile.details(self)
1395 @property
1396 def details(self):
1397 if self._details is None:
-> 1398 self._details = self.fs.info(self.path)
1399 return self._details
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/asyn.py:86, in sync_wrapper.<locals>.wrapper(*args, **kwargs)
83 @functools.wraps(func)
84 def wrapper(*args, **kwargs):
85 self = obj or args[0]
---> 86 return sync(self.loop, func, *args, **kwargs)
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/asyn.py:66, in sync(loop, func, timeout, *args, **kwargs)
64 raise FSTimeoutError from return_result
65 elif isinstance(return_result, BaseException):
---> 66 raise return_result
67 else:
68 return return_result
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/fsspec/asyn.py:26, in _runner(event, coro, result, timeout)
24 coro = asyncio.wait_for(coro, timeout=timeout)
25 try:
---> 26 result[0] = await coro
27 except Exception as ex:
28 result[0] = ex
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:1140, in S3FileSystem._info(self, path, bucket, key, refresh, version_id)
1138 if key:
1139 try:
-> 1140 out = await self._call_s3(
1141 "head_object",
1142 self.kwargs,
1143 Bucket=bucket,
1144 Key=key,
1145 **version_id_kw(version_id),
1146 **self.req_kw,
1147 )
1148 return {
1149 "ETag": out.get("ETag", ""),
1150 "LastModified": out["LastModified"],
(...)
1156 "ContentType": out.get("ContentType"),
1157 }
1158 except FileNotFoundError:
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:325, in S3FileSystem._call_s3(self, method, *akwarglist, **kwargs)
324 async def _call_s3(self, method, *akwarglist, **kwargs):
--> 325 await self.set_session()
326 s3 = await self.get_s3(kwargs.get("Bucket"))
327 method = getattr(s3, method)
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/s3fs/core.py:473, in S3FileSystem.set_session(self, refresh, kwargs)
469 else:
470 s3creator = self.session.create_client(
471 "s3", config=conf, **init_kwargs, **client_kwargs
472 )
--> 473 self._s3 = await s3creator.__aenter__()
475 self._s3creator = s3creator
476 # the following actually closes the aiohttp connection; use of privates
477 # might break in the future, would cause exception at gc time
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/session.py:22, in ClientCreatorContext.__aenter__(self)
21 async def __aenter__(self) -> AioBaseClient:
---> 22 self._client = await self._coro
23 return await self._client.__aenter__()
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/session.py:102, in AioSession._create_client(self, service_name, region_name, api_version, use_ssl, verify, endpoint_url, aws_access_key_id, aws_secret_access_key, aws_session_token, config)
97 raise PartialCredentialsError(
98 provider='explicit',
99 cred_var=self._missing_cred_vars(aws_access_key_id,
100 aws_secret_access_key))
101 else:
--> 102 credentials = await self.get_credentials()
103 endpoint_resolver = self._get_internal_component('endpoint_resolver')
104 exceptions_factory = self._get_internal_component('exceptions_factory')
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/session.py:133, in AioSession.get_credentials(self)
131 async def get_credentials(self):
132 if self._credentials is None:
--> 133 self._credentials = await (self._components.get_component(
134 'credential_provider').load_credentials())
135 return self._credentials
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/credentials.py:814, in AioCredentialResolver.load_credentials(self)
812 for provider in self.providers:
813 logger.debug("Looking for credentials via: %s", provider.METHOD)
--> 814 creds = await provider.load()
815 if creds is not None:
816 return creds
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/credentials.py:486, in AioInstanceMetadataProvider.load(self)
484 async def load(self):
485 fetcher = self._role_fetcher
--> 486 metadata = await fetcher.retrieve_iam_role_credentials()
487 if not metadata:
488 return None
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/utils.py:175, in AioInstanceMetadataFetcher.retrieve_iam_role_credentials(self)
173 async def retrieve_iam_role_credentials(self):
174 try:
--> 175 token = await self._fetch_metadata_token()
176 role_name = await self._get_iam_role(token)
177 credentials = await self._get_credentials(role_name, token)
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/utils.py:88, in AioIMDSFetcher._fetch_metadata_token(self)
86 for i in range(self._num_attempts):
87 try:
---> 88 response = await session.send(request.prepare())
89 if response.status_code == 200:
90 return await response.text
File ~/opt/anaconda3/envs/modin-latest/lib/python3.9/site-packages/aiobotocore/httpsession.py:210, in AIOHTTPSession.send(self, request)
208 except ServerTimeoutError as e:
209 if str(e).lower().startswith('connect'):
--> 210 raise ConnectTimeoutError(endpoint_url=request.url, error=e)
211 else:
212 raise ReadTimeoutError(endpoint_url=request.url, error=e)
ConnectTimeoutError: Connect timeout on endpoint URL: "http://169.254.169.254/latest/api/token"
@alejandro-ponder never mind, that seems to be some kind of network error on my machine. Even pandas read import pandas as pd; pd.read_csv("s3://nyc-tlc/trip data/yellow_tripdata_2009-01.csv", nrows=10) works on another machine but not on mine. Could you please try the s3:// path instead of http?
Tried changing path to s3:// and the read takes about 7min 13 seconds, while it takes pandas about 3min 35 seconds for the same workload.
Note, i'm also using error_bad_lines=False argument. not sure if that could be affecting anything?
@alejandro-ponder is this in a fresh environment? Your machine may not have enough memory to do both Modin and pandas in the same notebook/interpreter environment.
it should be. I restarted the kernel between the two runs
@prutskov this issue should be handled now, right?
Yes, now Modin handles https-like addresses. But I would prefer to run the reproducer before closing the issue.
@alejandro-ponder could you please re-check if this is still happening?