commit 59adecf02485abe319f4b18c2ec2b613e6a420e3 Author: Georgi Gardev Date: Fri Nov 17 11:19:22 2023 +0200 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..112f331 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__ +.env diff --git a/mqtt.py b/mqtt.py new file mode 100644 index 0000000..0efd753 --- /dev/null +++ b/mqtt.py @@ -0,0 +1,82 @@ + +# Based on +# https://github.com/martijnversluis/switchbot2mqtt/ + +import json +import logging +import random +import time +import json +from paho.mqtt import client as mqtt_client +from switchbot import trigger_device + +FIRST_RECONNECT_DELAY = 1 +RECONNECT_RATE = 2 +MAX_RECONNECT_COUNT = 12 +MAX_RECONNECT_DELAY = 60 + +class Config: + def __init__(self, **config): + self.broker = config.get("broker") + self.port = config.get("port", None) + self.topic_prefix = config.get("topic_prefix") + self.client_id = config.get("client_id", f'switchbot2mqtt-{random.randint(0, 1000)}') + self.username = config.get("username", None) + self.password = config.get("password", None) + +class MqttListener: + def __init__(self, **config): + self.running = True + self.config = Config(**config) + + def run(self): + logging.info("Connecting...") + self.connect() + logging.info("Connected! Starting...") + self.client.loop_forever() + + def connect(self): + self.client = mqtt_client.Client(self.config.client_id) + self.client.username_pw_set(self.config.username, self.config.password) + self.client.on_connect = self.on_connect + self.client.on_disconnect = self.on_disconnect + self.client.connect(self.config.broker, self.config.port) + + def on_connect(self, client, userdata, flags, rc): + if rc == 0 and client.is_connected(): + logging.info("Connected to MQTT Broker!") + client.subscribe(f'{self.config.topic_prefix}/#') + client.on_message = self.on_message + else: + logging.error(f'Failed to connect, return code {rc}') + + def on_disconnect(self, client, userdata, rc): + logging.info("Disconnected with result code: %s", rc) + reconnect_count = 0 + reconnect_delay = FIRST_RECONNECT_DELAY + + while reconnect_count < MAX_RECONNECT_COUNT: + logging.info("Reconnecting in %d seconds...", reconnect_delay) + time.sleep(reconnect_delay) + + try: + client.reconnect() + logging.info("Reconnected successfully!") + return + except Exception as err: + logging.error("%s. Reconnect failed. Retrying...", err) + + reconnect_delay *= RECONNECT_RATE + reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY) + reconnect_count += 1 + + logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count) + self.running = False + + def on_message(self, client, userdata, msg): + logging.debug(f"Received from `{msg.topic}` topic") + + mac_address = msg.topic[(len(self.config.topic_prefix)+1):] + + logging.info(f"Execute for {mac_address}") + trigger_device(mac_address, "Bot", "Press") diff --git a/start.py b/start.py new file mode 100644 index 0000000..cb789db --- /dev/null +++ b/start.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 + +import logging +import os +from os.path import join, dirname +from dotenv import load_dotenv +from mqtt import MqttListener + +dotenv_path = join(dirname(__file__), '.env') +load_dotenv(dotenv_path) + +logging.basicConfig(level=logging.DEBUG) +logging.info("Starting listener") + +listener = MqttListener( + broker=os.environ.get('BROKER'), + port=1883, + username=os.environ.get('USERNAME'), + password=os.environ.get('PASSWORD'), + topic_prefix="switchbot2mqtt" +) +listener.run() diff --git a/switchbot.py b/switchbot.py new file mode 100644 index 0000000..c4f1857 --- /dev/null +++ b/switchbot.py @@ -0,0 +1,303 @@ +#!/usr/bin/env python3 + +# Taken from +# https://raw.githubusercontent.com/OpenWonderLabs/python-host/master/switchbot_py2topy3.py + + +# Copyright 2017-present WonderLabs, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pexpect +import sys +from bluepy.btle import Scanner, DefaultDelegate +import binascii +import copy +import datetime + + +class ScanDelegate(DefaultDelegate): + def __init__(self): + DefaultDelegate.__init__(self) + + +class DevScanner(DefaultDelegate): + def __init__(self): + DefaultDelegate.__init__(self) + # print('Scanner inited') + + def dongle_start(self): + self.con = pexpect.spawn('hciconfig hci0 up') + time.sleep(1) + + def dongle_restart(self): + print('restart bluetooth dongle') + self.con = pexpect.spawn('hciconfig hci0 down') + time.sleep(3) + self.con = pexpect.spawn('hciconfig hci0 up') + time.sleep(3) + + def scan_loop(self): + service_uuid = 'cba20d00-224d-11e6-9fb8-0002a5d5c51b' + company_id = '6909' # actually 0x0969 + dev_list = [] + bot_list = [] + meter_list = [] + curtain_list = [] + contact_list = [] + motion_list = [] + param_list = [] + + pir_tip = ['No movement detected', 'Movement detected'] + hall_tip = ['Door closed', 'Door opened', 'Timeout no closed'] + light_tip = ['Dark', 'Bright'] + + self.con = pexpect.spawn('hciconfig') + pnum = self.con.expect(['hci0', pexpect.EOF, pexpect.TIMEOUT]) + if pnum == 0: + self.con = pexpect.spawn('hcitool lescan') + # self.con.expect('LE Scan ...', timeout=5) + scanner = Scanner().withDelegate(DevScanner()) + devices = scanner.scan(10.0) + print('Scanning...') + else: + raise Error('no bluetooth error') + + for dev in devices: + mac = 0 + param_list[:] = [] + for (adtype, desc, value) in dev.getScanData(): + # print(adtype, desc, value) + if desc == '16b Service Data': + dev_type = binascii.a2b_hex(value[4:6]) + if dev_type == b'H': + param_list.append(binascii.a2b_hex(value[6:8])) + elif dev_type == b'T': + # celsius + tempFra = int(value[11:12].encode('utf-8'), 16) / 10.0 + tempInt = int(value[12:14].encode('utf-8'), 16) + if tempInt < 128: + tempInt *= -1 + tempFra *= -1 + else: + tempInt -= 128 + param_list.append(tempInt + tempFra) + param_list.append( + int(value[14:16].encode('utf-8'), 16) % 128) + # print('meter:', param1, param2) + elif dev_type == b'd': + # print(adtype, desc, value) + pirSta = ( + int(value[6:7].encode('utf-8'), 16) >> 2) & 0x01 + # TODO: + # diffSec = ( + # int(value[10:11].encode('utf-8'), 16) >> 2) & 0x02 + diffSec = 0 + hallSta = ( + int(value[11:12].encode('utf-8'), 16) >> 1) & 0x03 + lightSta = int(value[11:12].encode('utf-8'), 16) & 0x01 + param_list.extend([hallSta, pirSta, lightSta, diffSec]) + # print(pirSta, diffSec, hallSta, lightSta) + elif dev_type == b's': + # print(adtype, desc, value) + pirSta = ( + int(value[6:7].encode('utf-8'), 16) >> 2) & 0x01 + lightSta = (int(value[15:16].encode('utf-8'), 16) & 0x03) - 1 + # TODO: + diffSec = 0 + param_list.extend([pirSta, lightSta, diffSec]) + else: + param_list[:] = [] + elif desc == 'Local name': + if value == 'WoHand': + mac = dev.addr + dev_type = b'H' + elif value == 'WoMeter': + mac = dev.addr + dev_type = b'T' + elif value == 'WoCurtain': + mac = dev.addr + dev_type = b'c' + elif value == 'WoContact': + mac = dev.addr + dev_type = b'd' + elif value == 'WoMotion': + mac = dev.addr + dev_type = b's' + elif desc == 'Complete 128b Services' and value == service_uuid: + mac = dev.addr + elif desc == 'Manufacturer' and value[0:4] == company_id: + mac = dev.addr + + if mac != 0: + dev_list.append([mac, dev_type, copy.deepcopy(param_list)]) + + # print(dev_list) + for (mac, dev_type, params) in dev_list: + if dev_type == b'H': + if int(binascii.b2a_hex(params[0]), 16) > 127: + bot_list.append([mac, 'Bot', 'Turn On']) + bot_list.append([mac, 'Bot', 'Turn Off']) + bot_list.append([mac, 'Bot', 'Up']) + bot_list.append([mac, 'Bot', 'Down']) + else: + bot_list.append([mac, 'Bot', 'Press']) + elif dev_type == b'T': + meter_list.append([mac, 'Meter', "%.1f'C %d%%" % + (params[0], params[1])]) + elif dev_type == b'c': + curtain_list.append([mac, 'Curtain', 'Open']) + curtain_list.append([mac, 'Curtain', 'Close']) + curtain_list.append([mac, 'Curtain', 'Pause']) + elif dev_type == b'd': + # TODO: + # timeTirgger = datetime.datetime.now() + datetime.timedelta(0, params[3]) + # contact_list.append([mac, 'Contact', "%s, %s, %s, Last trigger: %s" % + # (hall_tip[params[0]], pir_tip[params[1]], light_tip[params[2]], timeTirgger.strftime("%Y-%m-%d %H:%M"))]) + contact_list.append([mac, 'Contact', "%s, %s, %s" % + (hall_tip[params[0]], pir_tip[params[1]], light_tip[params[2]])]) + elif dev_type == b's': + motion_list.append([mac, 'Motion', "%s, %s" % + (pir_tip[params[0]], light_tip[params[1]])]) + print('Scan timeout.') + return bot_list + meter_list + curtain_list + contact_list + motion_list + pass + + def register_cb(self, fn): + self.cb = fn + return + + def close(self): + # self.con.sendcontrol('c') + self.con.close(force=True) + + +def trigger_device(device): + [mac, dev_type, act] = device + # print 'Start to control' + con = pexpect.spawn('gatttool -b ' + mac + ' -t random -I') + con.expect('\[LE\]>') + print('Preparing to connect.') + retry = 3 + index = 0 + while retry > 0 and 0 == index: + con.sendline('connect') + # To compatible with different Bluez versions + index = con.expect( + ['Error', '\[CON\]', 'Connection successful.*\[LE\]>']) + retry -= 1 + if 0 == index: + print('Connection error.') + return + print('Connection successful.') + con.sendline('char-desc') + con.expect(['\[CON\]', 'cba20002-224d-11e6-9fb8-0002a5d5c51b']) + cmd_handle = con.before.decode('utf-8').split('\n')[-1].split()[2].strip(',') + if dev_type == 'Bot': + if act == 'Turn On': + con.sendline('char-write-cmd ' + cmd_handle + ' 570101') + elif act == 'Turn Off': + con.sendline('char-write-cmd ' + cmd_handle + ' 570102') + elif act == 'Press': + con.sendline('char-write-cmd ' + cmd_handle + ' 570100') + elif act == 'Down': + con.sendline('char-write-cmd ' + cmd_handle + ' 570103') + elif act == 'Up': + con.sendline('char-write-cmd ' + cmd_handle + ' 570104') + elif dev_type == 'Meter': + con.sendline('char-write-cmd ' + cmd_handle + ' 570F31') + con.expect('\[LE\]>') + con.sendline('char-read-uuid cba20003-224d-11e6-9fb8-0002a5d5c51b') + index = con.expect(['value:[0-9a-fA-F ]+', 'Error']) + if index == 0: + data = con.after.decode('utf-8').split(':')[1].replace(' ', '') + tempFra = int(data[3], 16) / 10.0 + tempInt = int(data[4:6], 16) + if tempInt < 128: + tempInt *= -1 + tempFra *= -1 + else: + tempInt -= 128 + meterTemp = tempInt + tempFra + meterHumi = int(data[6:8], 16) % 128 + print("Meter[%s] %.1f'C %d%%" % (mac, meterTemp, meterHumi)) + else: + print('Error!') + elif dev_type == 'Curtain': + if act == 'Open': + con.sendline('char-write-cmd ' + cmd_handle + ' 570F450105FF00') + elif act == 'Close': + con.sendline('char-write-cmd ' + cmd_handle + ' 570F450105FF64') + elif act == 'Pause': + con.sendline('char-write-cmd ' + cmd_handle + ' 570F450100FF') + else: + print('Unsupported operations') + con.expect('\[LE\]>') + con.sendline('quit') + print('Complete') + + +def main(): + # Check bluetooth dongle + print( + 'Usage: "sudo python3 switchbot_py2topy3.py [mac dev_type cmd]" or "sudo python3 switchbot_py2topy3.py"') + connect = pexpect.spawn('hciconfig') + pnum = connect.expect(["hci0", pexpect.EOF, pexpect.TIMEOUT]) + if pnum != 0: + print('No bluetooth hardware, exit now') + sys.exit() + connect = pexpect.spawn('hciconfig hci0 up') + + # print(sys.argv, len(sys.argv)) + + if len(sys.argv) == 4 or len(sys.argv) == 5: + dev = sys.argv[1] + dev_type = sys.argv[2] + act = sys.argv[3] if len(sys.argv) < 5 else ('Turn ' + sys.argv[4]) + trigger_device([dev, dev_type, act]) + + elif len(sys.argv) == 1: + # Start scanning... + scan = DevScanner() + dev_list = scan.scan_loop() + # dev_number = None + + if not dev_list: + print("No SwitchBot nearby, exit") + sys.exit() + for idx, val in enumerate(dev_list): + print('%2d' % idx, val) + + dev_number = int(input("Input the device number to control:")) + if dev_number >= len(dev_list): + print("Input error, exit") + else: + ble_dev = dev_list[dev_number] + print(ble_dev) + + # Trigger the device to work + # If the SwitchBot address is known you can run this command directly without scanning + + trigger_device(ble_dev) + else: + print('Wrong cmd!') + print( + 'Usage: "sudo python3 switchbot_py2topy3.py [mac dev_type cmd]" or "sudo python3 switchbot_py2topy3.py"') + + connect = pexpect.spawn('hciconfig') + + sys.exit() + + +if __name__ == "__main__": + main()