On python 3.8 the query for 'between' in a table CANCEL TASK....
Describe the bug
In a table with extra INDEXES, I have a DT_GEN and COUNT (date and int), tested.
The between query as shows on documentation:
On (https://gitlab.com/pineiden/datadbs-rethinkdb/-/blob/master/data_rdb/rdb.py) line 316 and so on
table_manager = self.r.db(dbname).table(table_name)
task_coro = await table_manager.between(
lower, upper, **filter_opt).order_by(order_by).run(
self.session, **options)
result = await task_coro
Task was destroyed but it is pending!
task: <Task pending name='Task-4' coro=<ConnectionInstance._reader() running at /home/david/.virtualenvs/datawork/lib/python3.8/site-packages/rethinkdb-2.4.7-py3.8.egg/rethinkdb/asyncio_net/net_asyncio.py:313> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f7d5a713df0>()]>>
To Reproduce
Mount Rethinkdb 2.4.1 Install Python 3.8 Create a virtualenv
To install dependences and then install from repo:
pip install data-rdb
pip uninstall data-rdb
Now clone from: https://gitlab.com/pineiden/datadbs-rethinkdb
and
python setup.py develop
On folder tests run
python load_data.py
And
python read_data.py
Those scripts load with dummy data on a table and read that on real time.
Expected behavior
Get the data from the table
Screenshots https://gitlab.com/-/snippets/2005380
Additional context Add any other context about the problem here.
Well, the core of the query (only with the Rethinkdb module) and Python 3.6.9:
from pytz import timezone
from datetime import datetime, timedelta
import asyncio
from rethinkdb import r as rdb
host = 'localhost'
port = 28015
loop = asyncio.get_event_loop()
rdb.set_loop_type('asyncio')
conn = loop.run_until_complete(rdb.connect(db='test', host=host, port=port))
first = datetime.now() - timedelta(seconds=200)
tz = timezone("America/Santiago")
firsttz = tz.localize(first)
firstrdb = rdb.iso8601(firsttz.isoformat())
post = datetime.now()
posttz = tz.localize(post)
postrdb = rdb.iso8601(posttz.isoformat())
table = "STATION"
filter_opt = {'left_bound': 'open', 'index': "DT_GEN"}
print(f"Desde {firstrdb}, hasta {postrdb}")
query = rdb.db("test").table(table).between(firstrdb, postrdb,
**filter_opt).run(conn)
result = loop.run_until_complete(query)
print(result)
Now, working for debugging.
This works (using secondary index COUNTER), with py3.6 and py3.8:
from pytz import timezone
from datetime import datetime, timedelta
import asyncio
from rethinkdb import r as rdb
from tasktools.taskloop import coromask, renew, simple_fargs_out
from functools import partial
host = 'localhost'
port = 28015
loop = asyncio.get_event_loop()
rdb.set_loop_type('asyncio')
filter_opt = {'left_bound': 'open', 'index': "COUNTER"}
table = "STATION"
def date_rdb(td=0):
first = datetime.utcnow() + timedelta(seconds=td)
tz = timezone("UTC")
firsttz = tz.localize(first)
firstrdb = rdb.iso8601(firsttz.isoformat())
return firstrdb
async def run(*args, **kwargs):
connected = args[0]
firstrdb = args[1]
val = args[1]
conn = args[2]
if not connected:
conn = await rdb.connect(db='test', host=host, port=port)
connected = True
if connected:
await asyncio.sleep(5)
val2 = val + 20
postrdb = date_rdb()
print(f"Desde {firstrdb}, hasta {postrdb}")
query = rdb.db("test").table(table).between(val, val2,
**filter_opt).run(conn)
result = await query
print(result)
return [connected, val2, conn], kwargs
connected = False
now = date_rdb()
conn = None
val = 0
#args = [connected, now, conn]
args = [connected, val, conn]
kwargs: dict = {}
task = loop.create_task(coromask(run, args, kwargs, simple_fargs_out))
task.add_done_callback(partial(renew, task, run, simple_fargs_out))
loop.run_forever()
Not using 'between', using 'filter'. Works fine on Python 3.8, the query for datetime
"""
Index: COUNTER
Python 3.6
Python 3.8
"""
from pytz import timezone
from datetime import datetime, timedelta
import asyncio
from rethinkdb import r as rdb
from tasktools.taskloop import coromask, renew, simple_fargs_out
from functools import partial
host = 'localhost'
port = 28015
loop = asyncio.get_event_loop()
rdb.set_loop_type('asyncio')
filter_opt = {'left_bound': 'open', 'index': "COUNTER"}
table = "STATION"
def date_rdb(td=0):
first = datetime.utcnow() + timedelta(seconds=td)
tz = timezone("UTC")
firsttz = tz.localize(first)
firstrdb = rdb.iso8601(firsttz.isoformat())
return firstrdb
async def run(*args, **kwargs):
connected = args[0]
firstrdb = args[1]
postrdb = firstrdb
conn = args[2]
if not connected:
conn = await rdb.connect(db='test', host=host, port=port)
connected = True
if connected:
await asyncio.sleep(5)
val2 = val + 20
print(f"Desde {firstrdb}, hasta {postrdb}")
query = rdb.db("test").table(table).filter(
lambda data: data["DT_GEN"] >= firstrdb).coerce_to(
"array").order_by("DT_GEN").run(conn)
result = await query
if result:
lista = []
for q in result:
print(q)
lista.append(q)
postrdb = lista[-1]["DT_GEN"]
print("Las data", postrdb)
return [connected, postrdb, conn], kwargs
connected = False
now = date_rdb()
conn = None
val = 0
args = [connected, now, conn]
kwargs: dict = {}
task = loop.create_task(coromask(run, args, kwargs, simple_fargs_out))
task.add_done_callback(partial(renew, task, run, simple_fargs_out))
loop.run_forever()
Now, I think, comparing. Between is better than filter for real time. Filter read all the table every time.
I opened the diccionary of options, and it's working ok.
"""
Index: COUNTER
Python 3.6
Python 3.8
"""
from pytz import timezone
from datetime import datetime, timedelta
import asyncio
from rethinkdb import r as rdb
from tasktools.taskloop import TaskLoop
host = 'localhost'
port = 28015
loop = asyncio.get_event_loop()
rdb.set_loop_type('asyncio')
filter_opt = {'left_bound': 'open', 'index': "COUNTER"}
table = "STATION"
def date_rdb(td=0):
first = datetime.utcnow() + timedelta(seconds=td)
tz = timezone("UTC")
firsttz = tz.localize(first)
firstrdb = rdb.iso8601(firsttz.isoformat())
return firstrdb
async def run(*args, **kwargs):
connected = args[0]
firstrdb = args[1]
conn = args[2]
if not connected:
conn = await rdb.connect(db='test', host=host, port=port)
connected = True
if connected:
await asyncio.sleep(5)
postrdb = date_rdb()
print(f"Desde {firstrdb}, hasta {postrdb}")
query = rdb.db("test").table(table).between(
firstrdb,
postrdb,
index="DT_GEN",
left_bound="closed",
right_bound="open").coerce_to("array").order_by("DT_GEN").run(conn)
result = await query
if result:
lista = []
for q in result:
print(q)
lista.append(q)
postrdb = lista[-1]["DT_GEN"]
print("Las data", postrdb)
return [connected, postrdb, conn], kwargs
connected = False
now = date_rdb()
conn = None
args = [connected, now, conn]
kwargs: dict = {}
task = TaskLoop(run, args, kwargs, **{"name": "test_query_dt_gen"})
task.create()
loop.run_forever()
And then. Beyond using only 1 connection. When I active more, in somewhat one or other is broken. The 'Task cancelled' error. So, I'm using now the wait_for and await sleep from asyncio to control that. But I think I can't get the full knowledge from this bug-error or whathever
Ok. Now i'm reading the code. On the file 'net_asyncio.py' needs some fixes. Reading the documentaion for the streamreader, streamwriter. Need some extra step when write, close the streams.
https://docs.python.org/3/library/asyncio-stream.html#asyncio.StreamWriter
For example, to write (do a query)
stream.write(data)
await stream.drain()
Or to close:
stream.close()
await stream.wait_closed()
Working with sockets is very very tricky :)
I am experiencing the same bug, it's pretty annoying. Hope it gets resolved soon!