micropython-modbus
micropython-modbus copied to clipboard
TypeError: extra keyword arguments given
Description
Thank you shared this impressive project, I used the version:2.3.7 umodbus code, when run this code, the error shows: TypeError:extra keyword arguments given. Follow as code: `# modbus master driver for lidar
6 pieces lidar for system, address from 1 to 6
from right to left side, count closewize
system package
from machine import Pin import logging import gc from InitConfig import config import uasyncio as asyncio
log = logging.MiniLog("RtuMaster", level = logging.DEBUG) DEBUG = config.get('DEBUG', False)
MasterUartId = config.get("UART_LIDAR")
self defined package
from umodbus.serial import Serial as RTUMaster from umodbus import version
gc.collect()
version = '1.0' author = 'skylin'
register_definitions = { "HREGS":{ # read hold registers "slave_address":{ "1": 1, "2": 2, "3": 3, "4": 4, "5": 5, "6": 6 }, "register_address": 0x94, "length": 0x02, }, }
class Frame(object):
def init(self,data = None):
""" Data struct: """
if data is None:
self.data = bytearray()
else:
self.data = bytearray(data)
def __len__(self):
return len(self.data)
class FrameBuffer(object): def init(self, size): """ A FrameBuffer is used to buffer frames received from the Network over the active Bus.The FrameBuffer is designed as a FIFO and is useful to overcome the limited storing capability """ self._size = size self._data = [[memoryview(bytearray(6))] for _ in range(size)] self._index_write = 0 self._index_read = 0 self._null_sink = [0, 0, 0, 0] self._count = 0
@micropython.native
def put(self, v):
self._data[self._index_write] = v
next_index = (self._index_write + 1) % self._size
if next_index != self._index_read:
self._index_write = self.next_index
self._count += 1
else:
self.clear()
if DEBUG:
raise Exception("frame buffer over flow")
@micropython.native
def get(self):
if self._index_read == self._index_write:
return None
lin_id, _, data = self._data[self._index_read]
self._index_read = (self._index_read +1) % self._size
self._count -= 1
return Frame(data) # Omit the data length
def any(self):
if self._index_read == self._index_write:
return False
return True
def clear(self):
self._index_write = 0
self._index_read = 0
class Master(object): def init(self, uart_id, baudrate = 115200, ctrlPin = None, fbSize = 128): self.rtu_pins = (Pin('A2'), Pin('A3')) self.uart_id = uart_id self.ctrlPin = ctrlPin self.baudrate = baudrate
self.host = RTUMaster(
pins = self.rtu_pins,
uart_id = self.uart_id,
baudrate = self.baudrate,
ctrl_pin = self.ctrlPin
)
self.framebuf = FrameBuffer(fbSize)
def write(self, registerdefinitions, idx):
slave_addr = registerdefinitions['HREGS']['slave_address']['idx']
register_addr = registerdefinitions['HREGS']['register_address']
qty = registerdefinitions['HREGS']['length']
register_value = self.host.read_holding_registers(
slave_addr = slave_addr,
starting_addr = register_addr,
register_qty = qty,
signed = False
)
if not register_value:
self.framebuf.put(register_value)
def read(self):
if self.framebuf.any():
return self.framebuf.get()
if name == 'main':
async def test():
ctrl_Pin = '485_TR1'
m = Master(uart_id = 2, ctrlPin = '485_TR1')
while True:
for idx in range(7):
w = m.write(register_definitions, idx)
if w is not None:
log.info(f"Lidar{idx} return data is:{w}")
await asyncio.sleep_ms(30)
async def start():
asyncio.create_task(test())
while True:
await asyncio.sleep_ms(10)
app = None
async def main():
import time
t = time.localtime()
log.info(t)
gc.collect()
global app
await start()
try:
gc.collect()
asyncio.run(main())
except KeyboardInterrupt:
log.info('Interrupt')
finally:
asyncio.new_event_loop()
log.info("run again")
`
Reproduction steps
...
MicroPython version
V1.22
MicroPython board
pyboard
MicroPython Modbus version
# e.g. v2.3.3
# use the following command to get the used version
import os
from umodbus import version
print('MicroPython infos:', os.uname())
print('Used micropthon-modbus version:', version.__version__))
Relevant log output
>>> %Run -c $EDITOR_CONTENT
MPY: sync filesystems
MPY: soft reboot
Task exception wasn't retrieved
future: <Task> coro= <generator object 'test' at 2000c850>
Traceback (most recent call last):
File "asyncio/core.py", line 1, in run_until_complete
File "<stdin>", line 147, in test
File "<stdin>", line 117, in __init__
File "umodbus/serial.py", line 1, in __init__
TypeError: extra keyword arguments given
User code
# modbus master driver for lidar
# 6 pieces lidar for system, address from 1 to 6
# from right to left side, count closewize
# system package
from machine import Pin
import logging
import gc
from InitConfig import config
import uasyncio as asyncio
log = logging.MiniLog("RtuMaster", level = logging.DEBUG)
DEBUG = config.get('DEBUG', False)
MasterUartId = config.get("UART_LIDAR")
# self defined package
from umodbus.serial import Serial as RTUMaster
from umodbus import version
gc.collect()
__version__ = '1.0'
__author__ = 'skylin'
register_definitions = {
"HREGS":{ # read hold registers
"slave_address":{
"1": 1,
"2": 2,
"3": 3,
"4": 4,
"5": 5,
"6": 6
},
"register_address": 0x94,
"length": 0x02,
},
}
class Frame(object):
def __init__(self,data = None):
""" Data struct: """
if data is None:
self.data = bytearray()
else:
self.data = bytearray(data)
def __len__(self):
return len(self.data)
class FrameBuffer(object):
def __init__(self, size):
"""
A FrameBuffer is used to buffer frames received from the Network
over the active Bus.The FrameBuffer is designed as a FIFO and is
useful to overcome the limited storing capability
"""
self._size = size
self._data = [[memoryview(bytearray(6))] for _ in range(size)]
self._index_write = 0
self._index_read = 0
self._null_sink = [0, 0, 0, 0]
self._count = 0
@micropython.native
def put(self, v):
self._data[self._index_write] = v
next_index = (self._index_write + 1) % self._size
if next_index != self._index_read:
self._index_write = self.next_index
self._count += 1
else:
self.clear()
if DEBUG:
raise Exception("frame buffer over flow")
@micropython.native
def get(self):
if self._index_read == self._index_write:
return None
lin_id, _, data = self._data[self._index_read]
self._index_read = (self._index_read +1) % self._size
self._count -= 1
return Frame(data) # Omit the data length
def any(self):
if self._index_read == self._index_write:
return False
return True
def clear(self):
self._index_write = 0
self._index_read = 0
class Master(object):
def __init__(self, uart_id, baudrate = 115200, ctrlPin = None, fbSize = 128):
self.rtu_pins = (Pin('A2'), Pin('A3'))
self.uart_id = uart_id
self.ctrlPin = ctrlPin
self.baudrate = baudrate
self.host = RTUMaster(
pins = self.rtu_pins,
uart_id = self.uart_id,
baudrate = self.baudrate,
ctrl_pin = self.ctrlPin
)
self.framebuf = FrameBuffer(fbSize)
def write(self, registerdefinitions, idx):
slave_addr = registerdefinitions['HREGS']['slave_address']['idx']
register_addr = registerdefinitions['HREGS']['register_address']
qty = registerdefinitions['HREGS']['length']
register_value = self.host.read_holding_registers(
slave_addr = slave_addr,
starting_addr = register_addr,
register_qty = qty,
signed = False
)
if not register_value:
self.framebuf.put(register_value)
def read(self):
if self.framebuf.any():
return self.framebuf.get()
if __name__ == '__main__':
async def test():
ctrl_Pin = '485_TR1'
m = Master(uart_id = 2, ctrlPin = '485_TR1')
while True:
for idx in range(7):
w = m.write(register_definitions, idx)
if w is not None:
log.info(f"Lidar{idx} return data is:{w}")
await asyncio.sleep_ms(30)
async def start():
asyncio.create_task(test())
while True:
await asyncio.sleep_ms(10)
app = None
async def main():
import time
t = time.localtime()
log.info(t)
gc.collect()
global app
await start()
try:
gc.collect()
asyncio.run(main())
except KeyboardInterrupt:
log.info('Interrupt')
finally:
asyncio.new_event_loop()
log.info("run again")
Additional informations
No response
Same thing, can't find what cause the problem nor fix it so far.
My code for Pyboard v1.0:
from pyb import Pin
Y1 = pyb.Pin(pyb.Pin.board.Y1, pyb.Pin.OUT)
Y2 = pyb.Pin(pyb.Pin.board.Y2, pyb.Pin.IN)
# self defined package
from umodbus.serial import Serial as RTUMaster
rtu_pins = (Pin('Y1'), Pin('Y2'))
host = RTUMaster(
pins = rtu_pins,
uart_id = 6,
baudrate = 9600,
)
Error message:
MPY: sync filesystems
MPY: soft reboot
Traceback (most recent call last):
File "<stdin>", line 13, in <module>
File "umodbus/serial.py", line 108, in __init__
TypeError: extra keyword arguments given