hail icon indicating copy to clipboard operation
hail copied to clipboard

[batch] file existence checks in batch test setup fails in a bizarre manner, apparently involving the event loop

Open danking opened this issue 7 months ago • 0 comments

What happened?

=================================== FAILURES ===================================
_______________ ServiceTests.test_python_job_incorrect_signature _______________

self = <test.hailtop.batch.test_batch.ServiceTests testMethod=test_python_job_incorrect_signature>

    def setUp(self):
        # https://stackoverflow.com/questions/42332030/pytest-monkeypatch-setattr-inside-of-test-class-method
        self.monkeypatch = MonkeyPatch()
    
        self.backend = ServiceBackend()
    
        remote_tmpdir = get_remote_tmpdir('hailtop_test_batch_service_tests')
        if not remote_tmpdir.endswith('/'):
            remote_tmpdir += '/'
        self.remote_tmpdir = remote_tmpdir + str(uuid.uuid4()) + '/'
    
        if remote_tmpdir.startswith('gs://'):
            match = re.fullmatch('gs://(?P<bucket_name>[^/]+).*', remote_tmpdir)
            assert match
            self.bucket = match.groupdict()['bucket_name']
        else:
            assert remote_tmpdir.startswith('hail-az://')
            if remote_tmpdir.startswith('hail-az://'):
                match = re.fullmatch('hail-az://(?P<storage_account>[^/]+)/(?P<container_name>[^/]+).*', remote_tmpdir)
                assert match
                storage_account, container_name = match.groups()
            else:
                assert remote_tmpdir.startswith('https://')
                match = re.fullmatch('https://(?P<storage_account>[^/]+).blob.core.windows.net/(?P<container_name>[^/]+).*', remote_tmpdir)
                assert match
                storage_account, container_name = match.groups()
            self.bucket = f'{storage_account}/{container_name}'
    
        self.cloud_input_dir = f'{self.remote_tmpdir}batch-tests/resources'
    
        token = uuid.uuid4()
        self.cloud_output_path = f'/batch-tests/{token}'
        self.cloud_output_dir = f'{self.remote_tmpdir}{self.cloud_output_path}'
    
        self.router_fs = RouterAsyncFS()
    
>       if not self.sync_exists(f'{self.remote_tmpdir}batch-tests/resources/hello.txt'):

../test/hailtop/batch/test_batch.py:533: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../test/hailtop/batch/test_batch.py:544: in sync_exists
    return async_to_blocking(self.router_fs.exists(url))
utils/utils.py:160: in async_to_blocking
    return loop.run_until_complete(task)
/usr/lib/python3.9/asyncio/base_events.py:634: in run_until_complete
    self.run_forever()
/usr/lib/python3.9/asyncio/base_events.py:601: in run_forever
    self._run_once()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_UnixSelectorEventLoop running=False closed=False debug=False>

    def _run_once(self):
        """Run one full iteration of the event loop.
    
        This calls all currently ready callbacks, polls for I/O,
        schedules the resulting callbacks, and finally schedules
        'call_later' callbacks.
        """
    
        sched_count = len(self._scheduled)
        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
            self._timer_cancelled_count / sched_count >
                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
            # Remove delayed calls that were cancelled if their number
            # is too high
            new_scheduled = []
            for handle in self._scheduled:
                if handle._cancelled:
                    handle._scheduled = False
                else:
                    new_scheduled.append(handle)
    
            heapq.heapify(new_scheduled)
            self._scheduled = new_scheduled
            self._timer_cancelled_count = 0
        else:
            # Remove delayed calls that were cancelled from head of queue.
            while self._scheduled and self._scheduled[0]._cancelled:
                self._timer_cancelled_count -= 1
                handle = heapq.heappop(self._scheduled)
                handle._scheduled = False
    
        timeout = None
        if self._ready or self._stopping:
            timeout = 0
        elif self._scheduled:
            # Compute the desired timeout.
            when = self._scheduled[0]._when
            timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
    
        event_list = self._selector.select(timeout)
        self._process_events(event_list)
    
        # Handle 'later' callbacks that are ready.
        end_time = self.time() + self._clock_resolution
        while self._scheduled:
            handle = self._scheduled[0]
            if handle._when >= end_time:
                break
            handle = heapq.heappop(self._scheduled)
            handle._scheduled = False
            self._ready.append(handle)
    
        # This is the only place where callbacks are actually *called*.
        # All other places just add them to ready.
        # Note: We run all currently scheduled callbacks, but not any
        # callbacks scheduled by callbacks run this time around --
        # they will be run the next time (after another I/O poll).
        # Use an idiom that is thread-safe without using locks.
        ntodo = len(self._ready)
        for i in range(ntodo):
>           handle = self._ready.popleft()
E           IndexError: pop from an empty deque

/usr/lib/python3.9/asyncio/base_events.py:1890: IndexError

Version

0.2.126

Relevant log output

No response

danking avatar Nov 09 '23 17:11 danking