distributed icon indicating copy to clipboard operation
distributed copied to clipboard

async task classes are not fully implemented for Python 3.11

Open QuLogic opened this issue 1 year ago • 3 comments

What happened:

It appears that in Python 3.11, more parts of async task-like classes must be implemented, but they are not in distributed. This seems to affect at least Nanny, Worker, ProcessInterface, MultiWorker, and Future.

Nanny failure
_______________ test_client_constructor_with_temporary_security ________________

    @gen_test()
    async def test_client_constructor_with_temporary_security():
        xfail_ssl_issue5601()
        pytest.importorskip("cryptography")
>       async with Client(
            security=True, silence_logs=False, dashboard_address=":0", asynchronous=True
        ) as c:

../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/deploy/tests/test_local.py:318: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/client.py:1347: in __aenter__
    await self
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/client.py:1164: in _start
    self.cluster = await LocalCluster(
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/deploy/spec.py:389: in _
    await self._correct_state()
../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/deploy/spec.py:355: in _correct_state_internal
    await asyncio.wait(workers)
/usr/lib64/python3.11/asyncio/tasks.py:427: in wait
    return await _wait(fs, timeout, return_when, loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

fs = {<Nanny: None, threads: 1>, <Nanny: None, threads: 1>, <Nanny: None, threads: 1>, <Nanny: None, threads: 1>}
timeout = None, return_when = 'ALL_COMPLETED'
loop = <_UnixSelectorEventLoop running=False closed=True debug=False>

    async def _wait(fs, timeout, return_when, loop):
        """Internal helper for wait().
    
        The fs argument must be a collection of Futures.
        """
        assert fs, 'Set of Futures is empty.'
        waiter = loop.create_future()
        timeout_handle = None
        if timeout is not None:
            timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        counter = len(fs)
    
        def _on_completion(f):
            nonlocal counter
            counter -= 1
            if (counter <= 0 or
                return_when == FIRST_COMPLETED or
                return_when == FIRST_EXCEPTION and (not f.cancelled() and
                                                    f.exception() is not None)):
                if timeout_handle is not None:
                    timeout_handle.cancel()
                if not waiter.done():
                    waiter.set_result(None)
    
        for f in fs:
>           f.add_done_callback(_on_completion)
E           AttributeError: 'Nanny' object has no attribute 'add_done_callback'

/usr/lib64/python3.11/asyncio/tasks.py:531: AttributeError
Worker failure
__________________________________ test_procs __________________________________

self = LocalCluster(a65e5443, 'inproc://192.168.2.10/3934787/288', workers=0, threads=0, memory=0 B)
workers = {0: {'cls': <class 'distributed.worker.Worker'>, 'options': {'dashboard': False, 'dashboard_address': None, 'host': No...ted.worker.Worker'>, 'options': {'dashboard': False, 'dashboard_address': None, 'host': None, 'interface': None, ...}}}
scheduler = {'cls': <class 'distributed.scheduler.Scheduler'>, 'options': {'blocked_handlers': None, 'dashboard': True, 'dashboard_address': ':0', 'host': None, ...}}
worker = {'cls': <class 'distributed.worker.Worker'>, 'options': {'dashboard': False, 'dashboard_address': None, 'host': None, 'interface': None, ...}}
asynchronous = False
loop = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8ef5d2350>
security = Security(require_encryption=False, tls_min_version=771)
silence_logs = False, name = None, shutdown_on_close = True
scheduler_sync_interval = 1

    def __init__(
        self,
        workers=None,
        scheduler=None,
        worker=None,
        asynchronous=False,
        loop=None,
        security=None,
        silence_logs=False,
        name=None,
        shutdown_on_close=True,
        scheduler_sync_interval=1,
    ):
        self._created = weakref.WeakSet()
    
        self.scheduler_spec = copy.copy(scheduler)
        self.worker_spec = copy.copy(workers) or {}
        self.new_spec = copy.copy(worker)
        self.scheduler = None
        self.workers = {}
        self._i = 0
        self.security = security or Security()
        self._futures = set()
    
        if silence_logs:
            self._old_logging_level = silence_logging(level=silence_logs)
            self._old_bokeh_logging_level = silence_logging(
                level=silence_logs, root="bokeh"
            )
    
        self._instances.add(self)
        self._correct_state_waiting = None
        self._name = name or type(self).__name__
        self.shutdown_on_close = shutdown_on_close
    
        super().__init__(
            asynchronous=asynchronous,
            loop=loop,
            name=name,
            scheduler_sync_interval=scheduler_sync_interval,
        )
    
        if not self.asynchronous:
            self._loop_runner.start()
            self.sync(self._start)
            try:
>               self.sync(self._correct_state)

../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/deploy/spec.py:266: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = LocalCluster(a65e5443, 'inproc://192.168.2.10/3934787/288', workers=0, threads=0, memory=0 B)
func = <bound method SpecCluster._correct_state of LocalCluster(a65e5443, 'inproc://192.168.2.10/3934787/288', workers=0, threads=0, memory=0 B)>
asynchronous = False, callback_timeout = None, args = (), kwargs = {}

    def sync(self, func, *args, asynchronous=None, callback_timeout=None, **kwargs):
        """Call `func` with `args` synchronously or asynchronously depending on
        the calling context"""
        callback_timeout = _parse_timedelta(callback_timeout)
        if asynchronous is None:
            asynchronous = self.asynchronous
        if asynchronous:
            future = func(*args, **kwargs)
            if callback_timeout is not None:
                future = asyncio.wait_for(future, callback_timeout)
            return future
        else:
>           return sync(
                self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
            )

../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/utils.py:338: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

loop = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8ef5d2350>
func = <bound method SpecCluster._correct_state of LocalCluster(a65e5443, 'inproc://192.168.2.10/3934787/288', workers=0, threads=0, memory=0 B)>
callback_timeout = None, args = (), kwargs = {}
f = <function sync.<locals>.f at 0x7fb8f0953ec0>
wait = <function sync.<locals>.wait at 0x7fb8f09520c0>
typ = <class 'AttributeError'>
exc = AttributeError("'Worker' object has no attribute 'add_done_callback'")
tb = <traceback object at 0x7fb8fa548700>

    def sync(loop, func, *args, callback_timeout=None, **kwargs):
        """
        Run coroutine in loop running in separate thread.
        """
        callback_timeout = _parse_timedelta(callback_timeout, "s")
        if loop.asyncio_loop.is_closed():
            raise RuntimeError("IOLoop is closed")
    
        e = threading.Event()
        main_tid = threading.get_ident()
        result = error = future = None  # set up non-locals
    
        @gen.coroutine
        def f():
            nonlocal result, error, future
            try:
                if main_tid == threading.get_ident():
                    raise RuntimeError("sync() called from thread of running loop")
                yield gen.moment
                future = func(*args, **kwargs)
                if callback_timeout is not None:
                    future = asyncio.wait_for(future, callback_timeout)
                future = asyncio.ensure_future(future)
                result = yield future
            except Exception:
                error = sys.exc_info()
            finally:
                e.set()
    
        def cancel():
            if future is not None:
                future.cancel()
    
        def wait(timeout):
            try:
                return e.wait(timeout)
            except KeyboardInterrupt:
                loop.add_callback(cancel)
                raise
    
        loop.add_callback(f)
        if callback_timeout is not None:
            if not wait(callback_timeout):
                raise TimeoutError(f"timed out after {callback_timeout} s.")
        else:
            while not e.is_set():
                wait(10)
    
        if error:
            typ, exc, tb = error
>           raise exc.with_traceback(tb)

../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/utils.py:405: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    @gen.coroutine
    def f():
        nonlocal result, error, future
        try:
            if main_tid == threading.get_ident():
                raise RuntimeError("sync() called from thread of running loop")
            yield gen.moment
            future = func(*args, **kwargs)
            if callback_timeout is not None:
                future = asyncio.wait_for(future, callback_timeout)
            future = asyncio.ensure_future(future)
>           result = yield future

../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/utils.py:378: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tornado.gen.Runner object at 0x7fb8fa549b10>

    def run(self) -> None:
        """Starts or resumes the generator, running until it reaches a
        yield point that is not ready.
        """
        if self.running or self.finished:
            return
        try:
            self.running = True
            while True:
                future = self.future
                if future is None:
                    raise Exception("No pending future")
                if not future.done():
                    return
                self.future = None
                try:
                    exc_info = None
    
                    try:
>                       value = future.result()

/usr/lib64/python3.11/site-packages/tornado/gen.py:769: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = LocalCluster(a65e5443, 'inproc://192.168.2.10/3934787/288', workers=0, threads=0, memory=0 B)

    async def _correct_state_internal(self):
        async with self._lock:
            self._correct_state_waiting = None
    
            to_close = set(self.workers) - set(self.worker_spec)
            if to_close:
                if self.scheduler.status == Status.running:
                    await self.scheduler_comm.retire_workers(workers=list(to_close))
                tasks = [
                    asyncio.create_task(self.workers[w].close())
                    for w in to_close
                    if w in self.workers
                ]
                await asyncio.gather(*tasks)
            for name in to_close:
                if name in self.workers:
                    del self.workers[name]
    
            to_open = set(self.worker_spec) - set(self.workers)
            workers = []
            for name in to_open:
                d = self.worker_spec[name]
                cls, opts = d["cls"], d.get("options", {})
                if "name" not in opts:
                    opts = opts.copy()
                    opts["name"] = name
                if isinstance(cls, str):
                    cls = import_term(cls)
                worker = cls(
                    getattr(self.scheduler, "contact_address", None)
                    or self.scheduler.address,
                    **opts,
                )
                self._created.add(worker)
                workers.append(worker)
            if workers:
>               await asyncio.wait(workers)

../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/deploy/spec.py:355: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

fs = {<Worker 'not-running', name: 1, status: init, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>, <Worker 'not-running', name: 0, status: init, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>}

    async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
        """Wait for the Futures or Tasks given by fs to complete.
    
        The fs iterable must not be empty.
    
        Coroutines will be wrapped in Tasks.
    
        Returns two sets of Future: (done, pending).
    
        Usage:
    
            done, pending = await asyncio.wait(fs)
    
        Note: This does not raise TimeoutError! Futures that aren't done
        when the timeout occurs are returned in the second set.
        """
        if futures.isfuture(fs) or coroutines.iscoroutine(fs):
            raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
        if not fs:
            raise ValueError('Set of Tasks/Futures is empty.')
        if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
            raise ValueError(f'Invalid return_when value: {return_when}')
    
        fs = set(fs)
    
        if any(coroutines.iscoroutine(f) for f in fs):
            raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
    
        loop = events.get_running_loop()
>       return await _wait(fs, timeout, return_when, loop)

/usr/lib64/python3.11/asyncio/tasks.py:427: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

fs = {<Worker 'not-running', name: 1, status: init, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>, <Worker 'not-running', name: 0, status: init, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>}
timeout = None, return_when = 'ALL_COMPLETED'
loop = <_UnixSelectorEventLoop running=True closed=False debug=False>

    async def _wait(fs, timeout, return_when, loop):
        """Internal helper for wait().
    
        The fs argument must be a collection of Futures.
        """
        assert fs, 'Set of Futures is empty.'
        waiter = loop.create_future()
        timeout_handle = None
        if timeout is not None:
            timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        counter = len(fs)
    
        def _on_completion(f):
            nonlocal counter
            counter -= 1
            if (counter <= 0 or
                return_when == FIRST_COMPLETED or
                return_when == FIRST_EXCEPTION and (not f.cancelled() and
                                                    f.exception() is not None)):
                if timeout_handle is not None:
                    timeout_handle.cancel()
                if not waiter.done():
                    waiter.set_result(None)
    
        for f in fs:
>           f.add_done_callback(_on_completion)
E           AttributeError: 'Worker' object has no attribute 'add_done_callback'

/usr/lib64/python3.11/asyncio/tasks.py:531: AttributeError
ProcessInterface failure
__________________________ test_ProcessInterfaceValid __________________________

self = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8f036b650>
callback = functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0...ibuted/deploy/spec.py:319> exception=AttributeError("'ProcessInterface' object has no attribute 'add_done_callback'")>)

    def _run_callback(self, callback: Callable[[], Any]) -> None:
        """Runs a callback with error handling.
    
        .. versionchanged:: 6.0
    
           CancelledErrors are no longer logged.
        """
        try:
>           ret = callback()

/usr/lib64/python3.11/site-packages/tornado/ioloop.py:740: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8f036b650>
future = <Task finished name='Task-143678' coro=<SpecCluster._correct_state_internal() done, defined at /builddir/build/BUILDRO...ributed/deploy/spec.py:319> exception=AttributeError("'ProcessInterface' object has no attribute 'add_done_callback'")>

    def _discard_future_result(self, future: Future) -> None:
        """Avoid unhandled-exception warnings from spawned coroutines."""
>       future.result()
E       AttributeError: 'ProcessInterface' object has no attribute 'add_done_callback'

/usr/lib64/python3.11/site-packages/tornado/ioloop.py:764: AttributeError
MultiWorker failure
_______________________________ test_MultiWorker _______________________________

self = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8ef7ead50>
callback = functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0...distributed/deploy/spec.py:319> exception=AttributeError("'MultiWorker' object has no attribute 'add_done_callback'")>)

    def _run_callback(self, callback: Callable[[], Any]) -> None:
        """Runs a callback with error handling.
    
        .. versionchanged:: 6.0
    
           CancelledErrors are no longer logged.
        """
        try:
>           ret = callback()

/usr/lib64/python3.11/site-packages/tornado/ioloop.py:740: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fb8ef7ead50>
future = <Task finished name='Task-143730' coro=<SpecCluster._correct_state_internal() done, defined at /builddir/build/BUILDRO.../distributed/deploy/spec.py:319> exception=AttributeError("'MultiWorker' object has no attribute 'add_done_callback'")>

    def _discard_future_result(self, future: Future) -> None:
        """Avoid unhandled-exception warnings from spawned coroutines."""
>       future.result()
E       AttributeError: 'MultiWorker' object has no attribute 'add_done_callback'
Future failure
___________________________ test_task_unique_groups ____________________________

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:45313', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:34269', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:37035', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_task_unique_groups(c, s, a, b):
        """This test ensure that task groups remain unique when using submit"""
        x = c.submit(sum, [1, 2])
        y = c.submit(len, [1, 2])
        z = c.submit(sum, [3, 4])
>       await asyncio.wait([x, y, z])

../../BUILDROOT/python-distributed-2022.7.1-1.fc37.x86_64/usr/lib/python3.11/site-packages/distributed/tests/test_scheduler.py:2206: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/lib64/python3.11/asyncio/tasks.py:427: in wait
    return await _wait(fs, timeout, return_when, loop)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

fs = {<Future: cancelled, type: int, key: len-a6d0feba889a132085fa70f1b616feeb>, <Future: cancelled, type: int, key: sum-1d10c8fc1de9b59dc44708aaf25351e0>, <Future: cancelled, type: int, key: sum-fcf9017adfd73674bb128a7ddc1ad246>}
timeout = None, return_when = 'ALL_COMPLETED'
loop = <_UnixSelectorEventLoop running=False closed=True debug=False>

    async def _wait(fs, timeout, return_when, loop):
        """Internal helper for wait().
    
        The fs argument must be a collection of Futures.
        """
        assert fs, 'Set of Futures is empty.'
        waiter = loop.create_future()
        timeout_handle = None
        if timeout is not None:
            timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        counter = len(fs)
    
        def _on_completion(f):
            nonlocal counter
            counter -= 1
            if (counter <= 0 or
                return_when == FIRST_COMPLETED or
                return_when == FIRST_EXCEPTION and (not f.cancelled() and
                                                    f.exception() is not None)):
                if timeout_handle is not None:
                    timeout_handle.cancel()
                if not waiter.done():
                    waiter.set_result(None)
    
        for f in fs:
            f.add_done_callback(_on_completion)
    
        try:
            await waiter
        finally:
            if timeout_handle is not None:
                timeout_handle.cancel()
            for f in fs:
>               f.remove_done_callback(_on_completion)
E               AttributeError: 'Future' object has no attribute 'remove_done_callback'

/usr/lib64/python3.11/asyncio/tasks.py:539: AttributeError

What you expected to happen: Tests all pass.

Environment:

  • Dask version: 2022.7.1
  • Python version: 3.11.0.b4
  • Operating System: Fedora Rawhide
  • Install method (conda, pip, source): source

QuLogic avatar Jul 26 '22 05:07 QuLogic

https://github.com/python/cpython/commit/903f0a02c16240dc769a08c30e8d072a4fb09154

Looks like Coroutines were deprecated but not Awaitables

graingert avatar Aug 03 '22 16:08 graingert

on python3.10 -W error this passes:

class ExampleAwaitable:
    def __await__(self):
        async def _():
            return await asyncio.sleep(0)

        return _().__await__()


import asyncio

print(f"{asyncio.iscoroutine(ExampleAwaitable())=}")

async def amain():
    await asyncio.wait([ExampleAwaitable()])
    print("waited!")

asyncio.run(amain())

and in 3.11 this fails:

asyncio.iscoroutine(ExampleAwaitable())=False
Traceback (most recent call last):
  File "/home/graingert/projects/foo.py", line 17, in <module>
    asyncio.run(amain())
  File "/usr/lib/python3.11/asyncio/runners.py", line 187, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/asyncio/runners.py", line 120, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/asyncio/base_events.py", line 650, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/home/graingert/projects/foo.py", line 14, in amain
    await asyncio.wait([ExampleAwaitable()])
  File "/usr/lib/python3.11/asyncio/tasks.py", line 427, in wait
    return await _wait(fs, timeout, return_when, loop)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/asyncio/tasks.py", line 531, in _wait
    f.add_done_callback(_on_completion)
    ^^^^^^^^^^^^^^^^^^^
AttributeError: 'ExampleAwaitable' object has no attribute 'add_done_callback'

graingert avatar Aug 03 '22 16:08 graingert

I think we want to make most of these objects not Awaitables anyway eg. Nanny, Worker, ProcessInterface, MultiWorker where they can't really work correctly without async with

Future is tricky as we kind of want to support asyncio.wait([f: Future]) but we really don't want it to end up in an asyncio task._fut_waiter without being wrapped in an asyncio.Task

Maybe we could support await f.wait_for(delay) and await distrubuted.wait([f]) only instead?

graingert avatar Aug 04 '22 13:08 graingert

Closing as we've added Python 3.11 support in https://github.com/dask/distributed/pull/7249. @graingert @QuLogic let me know if this is still an issue and we can re-open as needed

jrbourbeau avatar Dec 21 '22 16:12 jrbourbeau