"""Classes and methods for working with the physical Mebo robot"""
import logging
import socket
import sys
import time
from abc import ABC
from collections import namedtuple
from functools import partial
from ipaddress import (
AddressValueError,
IPv4Network,
IPv4Address
)
from xml.etree.ElementTree import fromstring as xmlfromstring
from requests import Session
from requests.exceptions import (
ConnectionError,
HTTPError
)
from zeroconf import ServiceBrowser, Zeroconf
from .exceptions import (
MeboCommandError,
MeboDiscoveryError,
MeboRequestError,
MeboConnectionError,
MeboConfigurationError
)
from mebo.stream.session import (
RTSPSession,
)
logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
Broadcast = namedtuple('Broadcast', ['ip', 'port', 'data'])
WirelessNetwork = namedtuple('WirelessNetwork', ['ssid', 'mac', 'a', 'q', 'si', 'nl', 'ch'])
NORTH = 'n'
NORTH_EAST = 'ne'
EAST = 'e'
SOUTH_EAST = 'se'
SOUTH = 's'
SOUTH_WEST = 'sw'
WEST = 'w'
NORTH_WEST = 'nw'
DIRECTIONS = {NORTH, NORTH_EAST, EAST, SOUTH_EAST, SOUTH, SOUTH_WEST, WEST, NORTH_WEST}
class _MeboMDNSListener:
def remove_service(self, zeroconf, type, name):
logging.debug("Service %s removed", name)
def add_service(self, zeroconf, type, name):
info = zeroconf.get_service_info(type, name)
logging.debug("Service %s added, service info: %s", name, info)
[docs]class ComponentFactory:
"""Factory class for generating classes of components"""
@classmethod
def _from_parent(cls, name, **actions):
"""Generates a class of the given type as a subclass of component
:param name: Name of the generated class
:type name: str
:param actions: action names-> `callables`, which are closures using the parents shared request infrastructure
:type actions: dict
"""
cls = type(name, (Component,), actions)
cls.__doc__ = f"""{name.upper()} Component"""
return cls(actions=actions.keys())
[docs]class Component(ABC):
"""Abstract base class for all robot components"""
def __init__(self, actions):
self.actions = actions
def __repr__(self):
return '<{} actions={}>'.format(self.__class__, self.actions)
[docs]class Mebo:
"""Mebo represents a single physical robot"""
# port used by mebo to broadcast its presence
BROADCAST_PORT = 51110
# port used to establish media (RTSP) sessions
RTSP_PORT = 6667
def __init__(self, ip=None, auto_connect=False):
"""Initializes a Mebo robot object and establishes an http connection
If no ip or network is supplied, then we will autodiscover the mebo using mDNS.
:param ip: IPv4 address of the robot as a string.
:type ip: str
:param auto_connect: if True, will autodiscover and connect at object creation time
:type auto_connect: bool
>>> m = Mebo()
>>> m.connect()
>>> m2 = Mebo(auto_connect=True)
"""
self._session = Session()
self._ip = None
if ip:
self.ip = ip
elif auto_connect:
self.connect()
self._arm = None
self._wrist = None
self._claw = None
self._speaker = None
self._rtsp_session = None
@property
def endpoint(self):
return 'http://{}'.format(self.ip)
@property
def ip(self):
"""The IP of the robot on the LAN
This value is either provided explicitly at creation time or autodiscovered via mDNS
"""
if self._ip is None:
raise MeboConfigurationError('No configured or discovered value for ip address')
return self._ip
@ip.setter
def ip(self, value):
try:
addr = IPv4Address(value)
self._ip = addr
except AddressValueError:
raise MeboConfigurationError(f'Value {addr} set for IP is invalid IPv4 Address')
@property
def media(self):
"""an rtsp session representing the media streams (audio and video) for the robot"""
if self._rtsp_session is None:
url = f'rtsp://{self.ip}/streamhd/'
self._rtsp_session = RTSPSession(
url,
port=self.RTSP_PORT,
username='stream',
realm='realm',
user_agent='python-mebo'
)
return self._rtsp_session
# TODO: rip this out, or change it to a hearbeat
# listener which gets established, this has nothing to do with discovery
# instead, use mDNS
def _get_broadcast(self, address, timeout=10):
""" Attempts to receive the UDP broadcast signal from Mebo
on the supplied address. Raises an exception if no data is received
before 'timeout' seconds.
:param address: the broadcast address to bind
:param timeout: how long the socket should wait without receiving data before raising socket.timeout
:returns: A Broadcast object that containing the source IP, port, and data received.
:raises: `socket.timeout` if no data is received before `timeout` seconds
"""
logging.debug(f"reading from: {address}:{Mebo.BROADCAST_PORT}")
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.bind((address, Mebo.BROADCAST_PORT))
s.settimeout(timeout)
data, source = s.recvfrom(4096)
logging.debug(f"Data received: {data}:{source}")
return Broadcast(source[0], source[1], data)
def _setup_video_stream(self):
self._request(req='feedback_channel_init')
self._request(req='set_video_gop', value=40)
self._request(req='set_date', value=time.time())
self._request(req='set_video_gop', value=40, speed=1)
self._request(req='set_video_gop', value=40, speed=2)
self._request(req='set_video_bitrate', value=600)
self._request(req='set_video_bitrate', value=600, speed=1)
self._request(req='set_video_bitrate', value=600, speed=2)
self._request(req='set_resolution', value='720p')
self._request(req='set_resolution', value='720p', speed=1)
self._request(req='set_resolution', value='720p', speed=2)
self._request(req='set_video_qp', value=42)
self._request(req='set_video_qp', value=42, speed=1)
self._request(req='set_video_qp', value=42, speed=2)
self._request(req='set_video_framerate', value=20)
self._request(req='set_video_framerate', value=20, speed=1)
self._request(req='set_video_framerate', value=20, speed=2)
def _get_stream(self, address, timeout=10):
pass
def _get_mdns(self, key="_camera._tcp.local."):
try:
zeroconf = Zeroconf()
listener = _MeboMDNSListener()
browser = ServiceBrowser(zeroconf, key, listener)
time.sleep(1)
for name, record in browser.services.items():
info = zeroconf.get_service_info(record.key, record.alias)
# note: the library we're using keeps these keys and values as bytes
return info.properties[b'ip'].decode('ascii')
finally:
zeroconf.close()
def _discover(self):
"""
Runs the discovery scan to find Mebo on your LAN
:returns: The IP address of the Mebo found on the LAN
:raises: `MeboDiscoveryError` if discovery times out or the API probe produces a 40X or 50x status code
"""
try:
logging.debug('Looking for Mebo...')
ip = self._get_mdns()
return ip
except socket.timeout:
raise MeboDiscoveryError(('Unable to locate Mebo on the network.\n'
'\tMake sure it is powered on and connected to LAN.\n'
'\tIt may be necessary to power cycle the Mebo.'))
[docs] def connect(self):
"""Connect to the mebo control server over HTTP
If no IP exists for the robot already, the IP will be autodiscovered via mDNS. When there is already an IP, that will be used to make a canary request to get the command server software version. If the robot has been previously connected, no request is made at all.
:raises: :class:`mebo.exceptions.MeboDiscoveryError` when a mDNS discovery fails
:raises: :class:`mebo.exceptions.MeboConnectionError` when a TCP ConnectionError or HTTPError occurs
"""
if self._ip is None:
self.ip = self._discover()
logging.debug(f'Mebo found at {self.ip}')
try:
version = self.version
logging.debug(f'Mebo {version} connected')
except (ConnectionError, HTTPError) as e:
raise MeboConnectionError(f'Unable to connect to mebo: {e}')
def _request(self, **params):
""" private function to submit HTTP requests to Mebo's API
:param params: arguments to pass as query params to the Mebo API. Might
also include `need_response`, a kwarg dictating whether the caller
requires the response object from the API.
:returns: The `requests.HTTPResponse` object if `need_response` is True, `None` otherwise.
"""
try:
need_response = params.pop('need_response')
except KeyError:
# by default, don't return a response
need_response = False
try:
response = self._session.get(self.endpoint, params=params)
response.raise_for_status()
if need_response:
return response
except (ConnectionError, HTTPError) as e:
raise MeboRequestError(f'Request to Mebo failed: {e}')
[docs] def visible_networks(self):
"""
Retrieves list of wireless networks visible to Mebo.
:returns: A dictionary of name to `WirelessNetwork`
>>> m = Mebo(auto_connect=True)
>>> print(m.visible_networks())
"""
resp = self._request(req='get_rt_list', need_response=True)
et = xmlfromstring(f'{resp.text}')
visible = {}
for nw in et.findall('w'):
visible[nw.find('s').text.strip('"')] = WirelessNetwork(*(i.text.strip('"') for i in nw.getchildren()))
return visible
[docs] def add_router(self, auth_type, ssid, password, index=1):
"""
Save a wireless network to the Mebo's list of routers
"""
self._request(req='setup_wireless_save', auth=auth_type, ssid=ssid, key=password, index=index)
def set_scan_timer(self, value=30):
self._request(req='set_scan_timer', value=value)
def restart(self):
self._request(req='restart_system')
def set_timer_state(self, value=0):
self._request(req='set_timer_state', value=value)
def get_wifi_cert(self):
resp = self._request(req='get_wifi_cert', need_response=True)
_, cert_type = resp.text.split(':')
return cert_type.strip()
[docs] def get_boundary_position(self):
""" Gets boundary positions for 4 axes:
Arm: s_up, s_down
Claw: c_open, c_close
Wrist Rotation: w_left & w_right
Wrist Elevation: h_up, h_down
:returns: dictionary of functions to boundary positions
"""
resp = self._request(req='get_boundary_position', need_response=True)
_, key_value_string = [s.strip() for s in resp.text.split(':')]
return dict((k, int(v)) for k, v in [ks.strip().split('=') for ks in key_value_string.split('&')])
@property
def version(self):
"""returns the software version of the robot
>>> m = Mebo(auto_connect=True)
>>> m.version == '03.02.37'
"""
if not hasattr(self, '_version') or self._version is None:
resp = self._request(req='get_version', need_response=True)
_, version = resp.text.split(':')
self._version = version.strip()
return self._version
[docs] def move(self, direction, speed=255, dur=1000):
"""Move the robot in a given direction at a speed for a given duration
:param direction: map direction to move. 'n', 'ne', 'nw', etc.
:param speed: a value in the range [0, 255]. default: 255
:param dur: number of milliseconds the wheels should spin. default: 1000
"""
directions = {
NORTH: 'move_forward',
NORTH_EAST: 'move_forward_right',
EAST: 'move_right',
SOUTH_EAST: 'move_backward_right',
SOUTH: 'move_backward',
SOUTH_WEST: 'move_backward_left',
WEST: 'move_left',
NORTH_WEST: 'move_forward_left'
}
direction = directions.get(direction.lower())
if direction is None:
raise MeboCommandError(
'Direction must be one of the map directions: {}'.format(directions.keys()
))
speed = min(speed, 255)
# there is also a ts keyword that could be passed here.
self._request(req=direction, dur=dur, value=speed)
[docs] def turn(self, direction):
"""Turns a very small amount in the given direction
:param direction: one of R or L
"""
direction = direction.lower()[0]
if direction not in {"r", "l"}:
raise MeboCommandError('Direction for turn must be either "right", "left", "l", or "r"')
call = 'inch_right' if direction == 'r' else 'inch_left'
self._request(req=call)
def stop(self):
self._request(req='fb_stop')
@property
def claw(self):
""" The claw component at the end of Mebo's arm
>>> m = Mebo(auto_connect=True)
>>> m.claw.open(dur=1000, **params)
>>> m.claw.close(dur=400, **params)
>>> m.claw.stop(**params)
"""
if self._claw is None:
claw = ComponentFactory._from_parent(
'Claw',
open=partial(self._request, req='c_open'),
close=partial(self._request, req='c_close'),
stop=partial(self._request, req='c_stop')
)
self._claw = claw
return self._claw
@property
def wrist(self):
"""The wrist component of the robot
The wrist component has the following actions:
* rotate clockwise (to the right from the robot's perspective) OR counter-clockwise (to the left from the robot's perspective)
* raise or lower
>>> m = Mebo()
>>> m.wrist.rotate_right(**params)
>>> m.wrist.rotate_left(**params)
>>> m.wrist.inch_right(**params)
>>> m.wrist.inch_left(**params)
>>> m.wrist.rotate_stop()
>>> m.wrist.up()
>>> m.wrist.down()
>>> m.wrist.lift_stop()
"""
if self._wrist is None:
wrist = ComponentFactory._from_parent(
'Wrist',
rotate_right=partial(self._request, req='w_right'),
inch_right=partial(self._request, req='inch_w_right'),
rotate_left=partial(self._request, req='w_left'),
inch_left=partial(self._request, req='inch_w_left'),
rotate_stop=partial(self._request, req='w_stop'),
up=partial(self._request, req='h_up'),
down=partial(self._request, req='h_down'),
lift_stop=partial(self._request, req='h_stop'),
)
self._wrist = wrist
return self._wrist
@property
def arm(self):
"""The arm component of mebo
>>> m = Mebo(auto_connect=True)
>>> m.arm.up(dur=1000, **params)
>>> m.arm.down(dur=1000, **params)
>>> m.arm.stop(**params)
"""
up = partial(self._request, req='s_up')
up.__doc__ = """Move the arm up
:param dur: The duration of the arm movement
:type dur: int
"""
down = partial(self._request, req='s_down')
down.__doc__ = """Move the arm down
:param dur: The duration of the arm movement
:type dur: int
"""
stop = partial(self._request, req='s_stop')
stop.__doc__ = """Stop the arm"""
if self._arm is None:
arm = ComponentFactory._from_parent(
'Arm',
up=up,
down=down,
stop=stop
)
self._arm = arm
return self._arm
@property
def speaker(self):
"""
>>> m = Mebo(auto_connect=True)
>>> m.speaker.set_volume(value=6)
>>> m.speaker.get_volume()
>>> m.speaker.play_sound(**params)
"""
if self._speaker is None:
speaker = ComponentFactory._from_parent(
'Speaker',
set_volume=partial(self._request, req='set_spk_volume'),
play_sound=partial(self._request, req='audio_out0'))
self._speaker = speaker
return self._speaker