home-assistant-components
home-assistant-components copied to clipboard
Remote IR Vaccum
Here is IR Remote Vacuum Component. It has power_template docking mode and timeout in minutes cleaning/docking mode. Power_template mode is not fully tested. It also has special brand mode, irobot is implemented.
"""
Support for the Xiaomi vacuum cleaner robot.
For more details about this platform, please refer to the documentation
https://home-assistant.io/components/vacuum.xiaomi_miio/
"""
import asyncio
from functools import partial
import logging
import time
import datetime
import voluptuous as vol
from homeassistant.components.vacuum import (
ATTR_CLEANED_AREA, DOMAIN, PLATFORM_SCHEMA, SUPPORT_BATTERY,
SUPPORT_CLEAN_SPOT, SUPPORT_FAN_SPEED, SUPPORT_LOCATE, SUPPORT_PAUSE,
SUPPORT_RETURN_HOME, SUPPORT_SEND_COMMAND, SUPPORT_STOP,
SUPPORT_STATE, SUPPORT_START, VACUUM_SERVICE_SCHEMA, StateVacuumDevice,
STATE_CLEANING, STATE_DOCKED, STATE_PAUSED, STATE_IDLE, STATE_RETURNING,
STATE_ERROR)
from homeassistant.components.remote import (
ATTR_COMMAND, DOMAIN, SERVICE_SEND_COMMAND)
from homeassistant.const import (
ATTR_ENTITY_ID, CONF_HOST, CONF_NAME, CONF_TOKEN, STATE_OFF, STATE_ON)
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.restore_state import RestoreEntity
from homeassistant.util import Throttle
from homeassistant.helpers.event import async_track_state_change
from homeassistant.core import callback
from homeassistant.exceptions import TemplateError
DEPENDENCIES = ['remote']
_LOGGER = logging.getLogger(__name__)
DEFAULT_NAME = 'IR Vacuum cleaner'
DEFAULT_BRAND = None
DEFAULT_CLEANING_TIME = 60
DEFAULT_TIME_DOCKING = 1
CONF_REMOTE = 'remote'
CONF_BRAND = 'brand'
CONF_TIME_CLEANING = 'time_cleaning'
CONF_TIME_DOCKING = 'time_docking'
CONF_POWER_TEMPLATE = 'power_template'
CONF_COMMANDS = 'commands'
COMMAND_START = 'start'
COMMANDS_SCHEMA = vol.Schema({
vol.Required(COMMAND_START): cv.string
}, extra=vol.ALLOW_EXTRA)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_BRAND, default=DEFAULT_BRAND): cv.string,
vol.Optional(CONF_TIME_CLEANING, default=DEFAULT_CLEANING_TIME): cv.positive_int,
vol.Optional(CONF_TIME_DOCKING, default=DEFAULT_TIME_DOCKING): cv.positive_int,
vol.Required(CONF_REMOTE): cv.entity_id,
vol.Optional(CONF_POWER_TEMPLATE): cv.template,
vol.Required(CONF_COMMANDS): COMMANDS_SCHEMA
})
ATTR_CLEAN_START = 'clean_start'
ATTR_CLEAN_STOP = 'clean_stop'
ATTR_CLEANING_TIME = 'time_cleaning'
ATTR_DO_NOT_DISTURB = 'do_not_disturb'
ATTR_DO_NOT_DISTURB_START = 'do_not_disturb_start'
ATTR_DO_NOT_DISTURB_END = 'do_not_disturb_end'
ATTR_MAIN_BRUSH_LEFT = 'main_brush_left'
ATTR_SIDE_BRUSH_LEFT = 'side_brush_left'
ATTR_FILTER_LEFT = 'filter_left'
ATTR_SENSOR_DIRTY_LEFT = 'sensor_dirty_left'
ATTR_CLEANING_COUNT = 'cleaning_count'
ATTR_CLEANED_TOTAL_AREA = 'total_cleaned_area'
ATTR_CLEANING_TOTAL_TIME = 'total_time_cleaning'
ATTR_ERROR = 'error'
ATTR_RC_DURATION = 'duration'
ATTR_RC_ROTATION = 'rotation'
ATTR_RC_VELOCITY = 'velocity'
ATTR_STATUS = 'status'
MIN_TIME_BETWEEN_UPDATES = datetime.timedelta(seconds=10)
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
"""Set up the xiaomi remote climate platform."""
name = config.get(CONF_NAME)
brand = config.get(CONF_BRAND)
time_cleaning = config.get(CONF_TIME_CLEANING)
time_docking = config.get(CONF_TIME_DOCKING)
remote_entity_id = config.get(CONF_REMOTE)
commands = config.get(CONF_COMMANDS)
power_template = config.get(CONF_POWER_TEMPLATE)
async_add_entities([
RemoteVacuum(hass, name, brand, time_cleaning, time_docking, remote_entity_id, commands, power_template)
])
class RemoteVacuum(StateVacuumDevice, RestoreEntity):
def __init__(self, hass, name, brand, time_cleaning, time_docking, remote_entity_id, commands, power_template):
self.hass = hass
self._name = name
self._brand = brand.lower()
self._time_cleaning = time_cleaning * 60
self._time_docking = time_docking * 60
self._remote_entity_id = remote_entity_id
self._commands = commands
s = set()
s.add(SUPPORT_STATE)
if 'start' in commands:
s.add(SUPPORT_START)
if 'stop' in commands:
s.add(SUPPORT_STOP)
if 'pause' in commands:
s.add(SUPPORT_PAUSE)
if 'return_to_base' in commands:
s.add(SUPPORT_RETURN_HOME)
self._support = s
self._state = STATE_DOCKED
self.last_clean = {}
self._skip_update = False
self._power_template = power_template
self.clean_start = time.time()
self.dock_start = time.time()
@property
def name(self):
"""Return the name of the climate device."""
return self._name
@property
def supported_features(self):
"""Flag vacuum cleaner robot features that are supported."""
return self._support
@property
def state(self):
return self._state
@property
def available(self) -> bool:
"""Return True if entity is available."""
return True
@property
def device_state_attributes(self):
"""Return the specific state attributes of this vacuum cleaner."""
attrs = {}
if self._state is not None:
if self.last_clean:
if 'start' in self.last_clean:
attrs[ATTR_CLEAN_START] = self.last_clean['start']
if 'end' in self.last_clean:
attrs[ATTR_CLEAN_STOP] = self.last_clean['end']
return attrs
def _send_command(self, command_name):
if command_name in self._commands:
command = self._commands[command_name]
if command is not None:
self.hass.services.call(DOMAIN, SERVICE_SEND_COMMAND, {
ATTR_COMMAND: 'raw:' + command,
ATTR_ENTITY_ID: self._remote_entity_id
})
if self._brand == 'irobot':
self.hass.services.call(DOMAIN, SERVICE_SEND_COMMAND, {
ATTR_COMMAND: 'raw:' + command,
ATTR_ENTITY_ID: self._remote_entity_id
})
def start(self):
self._send_command('start')
self._state = STATE_CLEANING
if not self._power_template:
self.clean_start = time.time()
else:
self._skip_update = True
self.last_clean['start'] = self.dt()
self.schedule_update_ha_state()
def stop(self):
self._send_command('stop')
self._state = STATE_IDLE
self.last_clean['end'] = self.dt()
self.schedule_update_ha_state()
def pause(self):
self._send_command('pause')
self._state = STATE_PAUSED
self.last_clean['end'] = self.dt()
self.schedule_update_ha_state()
def return_to_base(self):
if self._state == STATE_CLEANING:
self.last_clean['end'] = self.dt()
if self._brand == 'irobot':
self._send_command('start')
time.sleep(2)
self._send_command('return_to_base')
self._state = STATE_RETURNING
if not self._power_template:
self.dock_start = time.time()
self.schedule_update_ha_state()
@callback
def _async_update_power(self):
try:
if self._power_template.async_render().lower() in ('true', 'on', '1'):
self._state = STATE_DOCKED
except TemplateError as ex:
_LOGGER.warning('Unable to update power from template: %s', ex)
def dt(self):
return datetime.datetime.now().strftime("%H:%M:%S %d.%m.%Y")
@Throttle(MIN_TIME_BETWEEN_UPDATES)
async def async_update(self):
if self._skip_update:
self._skip_update = False
return
if not self._power_template:
if self._state == STATE_CLEANING:
time_diff = time.time() - self.clean_start
# _LOGGER.error('STATE_CLEANING %s' % time_diff)
if time_diff > self._time_cleaning:
self.last_clean['end'] = self.dt()
self._state = STATE_DOCKED
self.schedule_update_ha_state()
elif self._state == STATE_RETURNING:
time_diff = time.time() - self.dock_start
# _LOGGER.error('STATE_RETURNING %s' % time_diff)
if time_diff > self._time_docking:
self._state = STATE_DOCKED
self.schedule_update_ha_state()
else:
self.clean_start = None
self.dock_start = None
elif self._power_template:
self._async_update_power()
IR codes example: https://www.dropbox.com/s/texllwcvx3sxnqe/ir_remote_codes.rar?dl=0&file_subpath=%2Fir_remote_codes%2Fvacuum