Source code for greyupnp.ssdp

from __future__ import unicode_literals

import contextlib
from requests.structures import CaseInsensitiveDict
import crookbook
import six
import socket
import struct
import time

#: The IP used to broadcast multicast packets to.
MCAST_IP = "239.255.255.250"

#: The port used to broadcast multicast packets to.
MCAST_PORT = 1900

#: The IP+port used to broadcast multicast packets to.
MCAST_IP_PORT = MCAST_IP + ':' + str(MCAST_PORT)


[docs]def make_socket(): '''Creates a socket suitable for SSDP searches. The socket will have a default timeout of 0.2 seconds (this works well for the :py:func:search function which interleaves sending requests and reading responses. ''' mreq = struct.pack("4sl", socket.inet_aton(MCAST_IP), socket.INADDR_ANY) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(('', MCAST_PORT)) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) sock.settimeout(0.2) return sock
[docs]def encode_request(request_line, **headers): '''Creates the data for a SSDP request. Args: request_line (string): The request line for the request (e.g. ``"M-SEARCH * HTTP/1.1"``). headers (dict of string -> string): Dictionary of header name - header value pairs to present in the request. Returns: bytes: The encoded request. ''' lines = [request_line] lines.extend(['%s: %s' % kv for kv in headers.items()]) return ('\r\n'.join(lines) + '\r\n\r\n').encode('utf-8')
[docs]def decode_response(data): '''Decodes the data from a SSDP response. Args: data (bytes): The encoded response. Returns: dict of string -> string: Case-insensitive dictionary of header name to header value pairs extracted from the response. ''' res = CaseInsensitiveDict() for dataline in data.decode('utf-8').splitlines()[1:]: dataline = dataline.strip() if not dataline: continue line_parts = dataline.split(':', 1) # This is to deal with headers with no value. if len(line_parts) < 2: line_parts = (line_parts[0], '') res[line_parts[0].strip()] = line_parts[1].strip() return res
[docs]def request_via_socket(sock, search_target): '''Send an SSDP search request via the provided socket. Args: sock: A socket suitable for use to send a broadcast message - preferably one created by :py:func:`make_socket`. search_target (string): A :term:`resource type` target to search for. ''' msgparts = dict(HOST=MCAST_IP_PORT, MAN='"ssdp:discover"', MX='3', ST=search_target) msg = encode_request('M-SEARCH * HTTP/1.1', **msgparts) sock.sendto(msg, (MCAST_IP, MCAST_PORT))
[docs]def responses_from_socket(sock, timeout=10): '''Yield SSDP search responses and advertisements from the provided socket. Args: sock: A socket suitable for use to send a broadcast message - preferably one created by :py:func:`make_socket`. timeout (int / float): Overall time in seconds for how long to wait for before no longer listening for responses. Yields: dict of string -> string: Case-insensitive dictionary of header name to header value pairs extracted from the response. ''' now = time.time() give_up_by = now + timeout while now < give_up_by: try: data = sock.recv(1024) except socket.timeout: now = time.time() continue # We handle either search responses or announcements. for data_prefix in [ b'HTTP/1.1 200 OK', b'NOTIFY * HTTP/1.1', ]: if data[:len(data_prefix)] == data_prefix: break else: now = time.time() continue yield decode_response(data) now = time.time() continue
[docs]@crookbook.essence('location type', mutable=False) @crookbook.described(inner="{0.type!r} at {0.location}") class Discovery(object): '''This class describes a discovered resource, from either a SSDP search response or SSDP advertisement. The headers from the response which describes the discovered resource are available as the 'headers' attribute. You can also access any header (in a case insensitive manner) as an attribute on the object. You can access the location of the resource via the "location" attribute. Attributes: headers: Case-insensitive dictionary of header name to header value pairs extracted from the response. ''' #: Case-insensitive dictionary of header name to header value pairs #: extracted from the response. #headers = None
[docs] def __init__(self, headers): ''' Create instance from SSDP response headers. Args: headers: Dictionary containing header name to header value mappings. ''' self.headers = CaseInsensitiveDict(headers) for attr in ['location', 'type']: try: getattr(self, attr) except AttributeError: msg = 'no header suitable for "{}" in {}' raise ValueError(msg.format(attr, self.headers))
@property def type(self): '''The :term:`resource type`, describing either a service or device.''' return self.headers.get('ST') or self.NT def __getattr__(self, name): try: return self.headers[name] except KeyError: raise AttributeError(name) @property def parsed(self): '''An urlparsed object (returned by :py:func:`~urllib.parse.urlparse`) of the location of the discovery.''' return six.moves.urllib.parse.urlparse(self.location)
[docs] def has_host(self, host): ''' Indicates if the discovered resource is on the described host - this is determined by looking at the location field. Args: host: Either a hostname (e.g. `"localhost"`) or a host-port pair (e.g. `"localhost:80"`). Returns: `true` if this resource is on the provided host. ''' parsed = self.parsed return host in (parsed.hostname, parsed.netloc)