rethinkdb-python icon indicating copy to clipboard operation
rethinkdb-python copied to clipboard

On python 3.8 the query for 'between' in a table CANCEL TASK....

Open dpineiden opened this issue 5 years ago • 6 comments

Describe the bug

In a table with extra INDEXES, I have a DT_GEN and COUNT (date and int), tested.

The between query as shows on documentation:

On (https://gitlab.com/pineiden/datadbs-rethinkdb/-/blob/master/data_rdb/rdb.py) line 316 and so on

                    table_manager = self.r.db(dbname).table(table_name)
                    task_coro = await table_manager.between(
                        lower, upper, **filter_opt).order_by(order_by).run(
                            self.session, **options)
                    result = await task_coro
Task was destroyed but it is pending!
task: <Task pending name='Task-4' coro=<ConnectionInstance._reader() running at /home/david/.virtualenvs/datawork/lib/python3.8/site-packages/rethinkdb-2.4.7-py3.8.egg/rethinkdb/asyncio_net/net_asyncio.py:313> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f7d5a713df0>()]>>

To Reproduce

Mount Rethinkdb 2.4.1 Install Python 3.8 Create a virtualenv

To install dependences and then install from repo:

pip install data-rdb
pip uninstall data-rdb

Now clone from: https://gitlab.com/pineiden/datadbs-rethinkdb

and

python setup.py develop

On folder tests run

python load_data.py

And

python read_data.py

Those scripts load with dummy data on a table and read that on real time.

Expected behavior

Get the data from the table

Screenshots https://gitlab.com/-/snippets/2005380

Additional context Add any other context about the problem here.

Well, the core of the query (only with the Rethinkdb module) and Python 3.6.9:

from pytz import timezone
from datetime import datetime, timedelta
import asyncio
from rethinkdb import r as rdb

host = 'localhost'
port = 28015
loop = asyncio.get_event_loop()
rdb.set_loop_type('asyncio')
conn = loop.run_until_complete(rdb.connect(db='test', host=host, port=port))
first = datetime.now() - timedelta(seconds=200)
tz = timezone("America/Santiago")
firsttz = tz.localize(first)
firstrdb = rdb.iso8601(firsttz.isoformat())
post = datetime.now()
posttz = tz.localize(post)
postrdb = rdb.iso8601(posttz.isoformat())
table = "STATION"
filter_opt = {'left_bound': 'open', 'index': "DT_GEN"}
print(f"Desde {firstrdb}, hasta {postrdb}")
query = rdb.db("test").table(table).between(firstrdb, postrdb,
                                            **filter_opt).run(conn)
result = loop.run_until_complete(query)
print(result)

dpineiden avatar Aug 18 '20 00:08 dpineiden

Now, working for debugging.

This works (using secondary index COUNTER), with py3.6 and py3.8:


from pytz import timezone
from datetime import datetime, timedelta
import asyncio
from rethinkdb import r as rdb
from tasktools.taskloop import coromask, renew, simple_fargs_out
from functools import partial

host = 'localhost'
port = 28015
loop = asyncio.get_event_loop()
rdb.set_loop_type('asyncio')
filter_opt = {'left_bound': 'open', 'index': "COUNTER"}
table = "STATION"


def date_rdb(td=0):
    first = datetime.utcnow() + timedelta(seconds=td)
    tz = timezone("UTC")
    firsttz = tz.localize(first)
    firstrdb = rdb.iso8601(firsttz.isoformat())
    return firstrdb


async def run(*args, **kwargs):
    connected = args[0]
    firstrdb = args[1]
    val = args[1]
    conn = args[2]
    if not connected:
        conn = await rdb.connect(db='test', host=host, port=port)
        connected = True
    if connected:
        await asyncio.sleep(5)
        val2 = val + 20
        postrdb = date_rdb()
        print(f"Desde {firstrdb}, hasta {postrdb}")
        query = rdb.db("test").table(table).between(val, val2,
                                                    **filter_opt).run(conn)
        result = await query
        print(result)
    return [connected, val2, conn], kwargs


connected = False
now = date_rdb()
conn = None
val = 0
#args = [connected, now, conn]
args = [connected, val, conn]
kwargs: dict = {}

task = loop.create_task(coromask(run, args, kwargs, simple_fargs_out))
task.add_done_callback(partial(renew, task, run, simple_fargs_out))

loop.run_forever()

dpineiden avatar Aug 18 '20 01:08 dpineiden

Not using 'between', using 'filter'. Works fine on Python 3.8, the query for datetime

"""
Index: COUNTER
Python 3.6
Python 3.8
"""

from pytz import timezone
from datetime import datetime, timedelta
import asyncio
from rethinkdb import r as rdb
from tasktools.taskloop import coromask, renew, simple_fargs_out
from functools import partial

host = 'localhost'
port = 28015
loop = asyncio.get_event_loop()
rdb.set_loop_type('asyncio')
filter_opt = {'left_bound': 'open', 'index': "COUNTER"}
table = "STATION"


def date_rdb(td=0):
    first = datetime.utcnow() + timedelta(seconds=td)
    tz = timezone("UTC")
    firsttz = tz.localize(first)
    firstrdb = rdb.iso8601(firsttz.isoformat())
    return firstrdb


async def run(*args, **kwargs):
    connected = args[0]
    firstrdb = args[1]
    postrdb = firstrdb
    conn = args[2]
    if not connected:
        conn = await rdb.connect(db='test', host=host, port=port)
        connected = True
    if connected:
        await asyncio.sleep(5)
        val2 = val + 20
        print(f"Desde {firstrdb}, hasta {postrdb}")
        query = rdb.db("test").table(table).filter(
            lambda data: data["DT_GEN"] >= firstrdb).coerce_to(
                "array").order_by("DT_GEN").run(conn)
        result = await query
        if result:
            lista = []
            for q in result:
                print(q)
                lista.append(q)
            postrdb = lista[-1]["DT_GEN"]
            print("Las data", postrdb)
    return [connected, postrdb, conn], kwargs


connected = False
now = date_rdb()
conn = None
val = 0
args = [connected, now, conn]
kwargs: dict = {}

task = loop.create_task(coromask(run, args, kwargs, simple_fargs_out))
task.add_done_callback(partial(renew, task, run, simple_fargs_out))

loop.run_forever()

dpineiden avatar Aug 18 '20 02:08 dpineiden

Now, I think, comparing. Between is better than filter for real time. Filter read all the table every time.

I opened the diccionary of options, and it's working ok.

"""
Index: COUNTER
Python 3.6
Python 3.8
"""

from pytz import timezone
from datetime import datetime, timedelta
import asyncio
from rethinkdb import r as rdb
from tasktools.taskloop import TaskLoop

host = 'localhost'
port = 28015
loop = asyncio.get_event_loop()
rdb.set_loop_type('asyncio')
filter_opt = {'left_bound': 'open', 'index': "COUNTER"}
table = "STATION"


def date_rdb(td=0):
    first = datetime.utcnow() + timedelta(seconds=td)
    tz = timezone("UTC")
    firsttz = tz.localize(first)
    firstrdb = rdb.iso8601(firsttz.isoformat())
    return firstrdb


async def run(*args, **kwargs):
    connected = args[0]
    firstrdb = args[1]
    conn = args[2]
    if not connected:
        conn = await rdb.connect(db='test', host=host, port=port)
        connected = True
    if connected:
        await asyncio.sleep(5)
        postrdb = date_rdb()
        print(f"Desde {firstrdb}, hasta {postrdb}")
        query = rdb.db("test").table(table).between(
            firstrdb,
            postrdb,
            index="DT_GEN",
            left_bound="closed",
            right_bound="open").coerce_to("array").order_by("DT_GEN").run(conn)
        result = await query
        if result:
            lista = []
            for q in result:
                print(q)
                lista.append(q)
            postrdb = lista[-1]["DT_GEN"]
            print("Las data", postrdb)
    return [connected, postrdb, conn], kwargs


connected = False
now = date_rdb()
conn = None
args = [connected, now, conn]
kwargs: dict = {}

task = TaskLoop(run, args, kwargs, **{"name": "test_query_dt_gen"})
task.create()
loop.run_forever()

dpineiden avatar Aug 18 '20 04:08 dpineiden

And then. Beyond using only 1 connection. When I active more, in somewhat one or other is broken. The 'Task cancelled' error. So, I'm using now the wait_for and await sleep from asyncio to control that. But I think I can't get the full knowledge from this bug-error or whathever

dpineiden avatar Aug 19 '20 14:08 dpineiden

Ok. Now i'm reading the code. On the file 'net_asyncio.py' needs some fixes. Reading the documentaion for the streamreader, streamwriter. Need some extra step when write, close the streams.

https://docs.python.org/3/library/asyncio-stream.html#asyncio.StreamWriter

For example, to write (do a query)

stream.write(data)
await stream.drain()

Or to close:

stream.close()
await stream.wait_closed()

Working with sockets is very very tricky :)

dpineiden avatar Aug 19 '20 21:08 dpineiden

I am experiencing the same bug, it's pretty annoying. Hope it gets resolved soon!

bashalex avatar Nov 06 '20 15:11 bashalex