radonwave
radonwave copied to clipboard
modfied code for FTLAB RadonEye RD200
Maybe you can use it
#!/usr/bin/env python3
import sys
import time
from struct import unpack
from bluepy import btle
import time
from argparse import ArgumentParser
import json
import struct
def a_to_int(bytearray):
return struct.unpack("l", bytearray)[0]
def a_to_float(bytearray):
return struct.unpack("f", bytearray)[0]
class CouldNotConnectError(Exception):
pass
class Measurement:
def __init__(self, humidity, temperature, radon_avg, radon_1day, accel, brightness, humidity2):
self.humidity = humidity
self.temperature = temperature
self.radon_avg = radon_avg
self.radon_1day = radon_1day
self.accel = accel
self.brightness = brightness
self.humidity2 = humidity2
def connect_and_read(device_address):
try:
dev = btle.Peripheral(device_address,"random")
except btle.BTLEException as e:
raise CouldNotConnectError()
if False:
print('Services')
for service in dev.services:
print(service)
print('Characteristics')
for ch in dev.getCharacteristics():
print(ch.getHandle(), ch.uuid, ch.propertiesToString())
service = dev.getServiceByUUID(btle.UUID('00001523-1212-efde-1523-785feabcd123'))
character = dev.getCharacteristics(startHnd=1, endHnd=0xFFFF, uuid=btle.UUID('00001525-1212-efde-1523-785feabcd123'))
dev.writeCharacteristic(11,b'\x50')
value = dev.readCharacteristic(13)
value2 = struct.pack('BBBB',value[2], value[3], value[4], value[5])
value2 = int(a_to_float(value2)*37)
dev.disconnect()
return value2
def main():
parser = ArgumentParser()
parser.add_argument('--wait', default=1200, type=int,
help='Seconds to wait between queries. Do not choose this too low as the '
'radon levels are only updated once every 60 minutes. Set to 0 to query '
'only once. Default: 1200 '
'(20 minutes)')
parser.add_argument('--mqtt', help='MQTT server')
parser.add_argument('--topic', help='MQTT topic')
parser.add_argument('device_address', metavar='BLUETOOTH-DEVICE-ADDRESS')
args = parser.parse_args()
device_address = args.device_address
if args.mqtt and not args.topic:
parser.error('Provide also a --topic when you use --mqtt')
if args.mqtt:
try:
import paho.mqtt.client as mqtt
client = mqtt.Client()
client.connect(args.mqtt)
assert client
except Exception as e: # unsure which exceptions connect can cause, so need to catch everything
print('Could not connect to MQTT broker:', e, file=sys.stderr)
client = None
else:
client = None
while True:
try:
measurement = connect_and_read(device_address)
except CouldNotConnectError:
print('Could not connect', file=sys.stderr)
print(device_address, file=sys.stderr)
except btle.BTLEException as e:
print('Bluetooth error:', e, file=sys.stderr)
else:
#print('{time}\t{radon:.2f}'.format(time=time.strftime('%Y-%m-%d %H:%M:%S'),measurement), sep='\t')
print(time.strftime('%Y-%m-%d %H:%M:%S'), measurement, sep='\t')
sys.stdout.flush()
if client:
data = {
'radon': measurement
}
client.publish(args.topic, json.dumps(data))
if args.wait == 0:
break
time.sleep(args.wait)
if client:
client.disconnect()
if __name__ == '__main__':
main()
Thanks a lot. I don’t know whether I can actually integrate it (because I cannot test it). But if nothing else, other people will now be able to find the code when they search.
I can test it if you want to.
Best Michael
Am 19.07.2019 um 18:48 schrieb Marcel Martin:
Thanks a lot. I don’t know whether I can actually integrate it (because I cannot test it). But if nothing else, other people will now be able to find the code when they search.
— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/marcelm/radonwave/issues/5?email_source=notifications&email_token=AF34UHZ45PVVUZ6XHKOEGJ3QAHV4FA5CNFSM4IBTBAD2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD2MFBBA#issuecomment-513298564, or mute the thread https://github.com/notifications/unsubscribe-auth/AF34UHZXRH7UR67SCRLJATLQAHV4FANCNFSM4IBTBADQ.