hail
hail copied to clipboard
[batch] file existence checks in batch test setup fails in a bizarre manner, apparently involving the event loop
What happened?
=================================== FAILURES ===================================
_______________ ServiceTests.test_python_job_incorrect_signature _______________
self = <test.hailtop.batch.test_batch.ServiceTests testMethod=test_python_job_incorrect_signature>
def setUp(self):
# https://stackoverflow.com/questions/42332030/pytest-monkeypatch-setattr-inside-of-test-class-method
self.monkeypatch = MonkeyPatch()
self.backend = ServiceBackend()
remote_tmpdir = get_remote_tmpdir('hailtop_test_batch_service_tests')
if not remote_tmpdir.endswith('/'):
remote_tmpdir += '/'
self.remote_tmpdir = remote_tmpdir + str(uuid.uuid4()) + '/'
if remote_tmpdir.startswith('gs://'):
match = re.fullmatch('gs://(?P<bucket_name>[^/]+).*', remote_tmpdir)
assert match
self.bucket = match.groupdict()['bucket_name']
else:
assert remote_tmpdir.startswith('hail-az://')
if remote_tmpdir.startswith('hail-az://'):
match = re.fullmatch('hail-az://(?P<storage_account>[^/]+)/(?P<container_name>[^/]+).*', remote_tmpdir)
assert match
storage_account, container_name = match.groups()
else:
assert remote_tmpdir.startswith('https://')
match = re.fullmatch('https://(?P<storage_account>[^/]+).blob.core.windows.net/(?P<container_name>[^/]+).*', remote_tmpdir)
assert match
storage_account, container_name = match.groups()
self.bucket = f'{storage_account}/{container_name}'
self.cloud_input_dir = f'{self.remote_tmpdir}batch-tests/resources'
token = uuid.uuid4()
self.cloud_output_path = f'/batch-tests/{token}'
self.cloud_output_dir = f'{self.remote_tmpdir}{self.cloud_output_path}'
self.router_fs = RouterAsyncFS()
> if not self.sync_exists(f'{self.remote_tmpdir}batch-tests/resources/hello.txt'):
../test/hailtop/batch/test_batch.py:533:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../test/hailtop/batch/test_batch.py:544: in sync_exists
return async_to_blocking(self.router_fs.exists(url))
utils/utils.py:160: in async_to_blocking
return loop.run_until_complete(task)
/usr/lib/python3.9/asyncio/base_events.py:634: in run_until_complete
self.run_forever()
/usr/lib/python3.9/asyncio/base_events.py:601: in run_forever
self._run_once()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=False debug=False>
def _run_once(self):
"""Run one full iteration of the event loop.
This calls all currently ready callbacks, polls for I/O,
schedules the resulting callbacks, and finally schedules
'call_later' callbacks.
"""
sched_count = len(self._scheduled)
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
self._timer_cancelled_count / sched_count >
_MIN_CANCELLED_TIMER_HANDLES_FRACTION):
# Remove delayed calls that were cancelled if their number
# is too high
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
# Remove delayed calls that were cancelled from head of queue.
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
timeout = None
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
# Compute the desired timeout.
when = self._scheduled[0]._when
timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
event_list = self._selector.select(timeout)
self._process_events(event_list)
# Handle 'later' callbacks that are ready.
end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
# This is the only place where callbacks are actually *called*.
# All other places just add them to ready.
# Note: We run all currently scheduled callbacks, but not any
# callbacks scheduled by callbacks run this time around --
# they will be run the next time (after another I/O poll).
# Use an idiom that is thread-safe without using locks.
ntodo = len(self._ready)
for i in range(ntodo):
> handle = self._ready.popleft()
E IndexError: pop from an empty deque
/usr/lib/python3.9/asyncio/base_events.py:1890: IndexError
Version
0.2.126
Relevant log output
No response