diff --git a/config.toml b/config.toml deleted file mode 100644 index 3ab5fc5..0000000 --- a/config.toml +++ /dev/null @@ -1,47 +0,0 @@ -[[relays]] -name = "Unknown 1" - -[relays.chip] -name = "gpiochip0" -number = 0 - -[relays.pin] -name = "CON2-P22" -line_number = 2 - -[[relays]] -name = "Waterfall" - -[relays.chip] -name = "gpiochip0" -number = 0 - -[relays.pin] -name = "CON2-P18" -line_number = 68 - -[[relays]] -name = "Filter" - -[relays.chip] -name = "gpiochip0" -number = 0 - -[relays.pin] -name = "CON2-P16" -line_number = 15 - -[[relays]] -name = "Unknown 2" - -[relays.chip] -name = "gpiochip0" -number = 0 - -[relays.pin] -name = "CON2-P12" -line_number = 16 - -[rgpio] -host = "pool.ribble.net" -port = 8889 diff --git a/daemon-test.py b/daemon-test.py deleted file mode 100755 index 44505a8..0000000 --- a/daemon-test.py +++ /dev/null @@ -1,161 +0,0 @@ -#!/usr/bin/env python3 -import argparse -import dataclasses -import sys -from typing import Dict -import time - -import rgpio -# From test C program using wiringPi -# RELAY_PINS = [4, 17, 27, 18, 22, 23, 24, 25] -# Physical pins -# PINS = [7, 11, 13, 15, 12, 16, 18, 22] -# From Banana Pi Zero M2 pinout -RELAY_PINS = ["CON2-P07", "CON2-P11", "CON2-P13", "CON2-P15", "CON2-P12", "CON2-P16", "CON2-P18", "CON2-P22"] -# con2-p22 ? -# p18: waterfall -# p16: filter pump, salt cell - -#lgpio.h:#define LG_GPIO_IS_KERNEL 1 -#lgpio.h:#define LG_GPIO_IS_OUTPUT 2 -#lgpio.h:#define LG_GPIO_IS_ACTIVE_LOW 4 -#lgpio.h:#define LG_GPIO_IS_OPEN_DRAIN 8 -#lgpio.h:#define LG_GPIO_IS_OPEN_SOURCE 16 -#lgpio.h:#define LG_GPIO_IS_PULL_UP 32 -#lgpio.h:#define LG_GPIO_IS_PULL_DOWN 64 -#lgpio.h:#define LG_GPIO_IS_PULL_NONE 128 -#lgpio.h:#define LG_GPIO_IS_INPUT 65536 -#lgpio.h:#define LG_GPIO_IS_RISING_EDGE 131072 -#lgpio.h:#define LG_GPIO_IS_FALLING_EDGE 262144 -#lgpio.h:#define LG_GPIO_IS_REALTIME_CLOCK 524288 - -FLAG_TO_NAME = { - 1: "kernel", - 2: "output", - 4: "active_low", - 8: "open_drain", - 16: "open_source", - 32: "pull_up", - 64: "pull_down", - 128: "pull_none", - 65536: "input", - 131072: "rising_edge", - 262144: "falling_edge", - 524288: "realtime_clock", -} - - -@dataclasses.dataclass -class Relay: - name: str - pin: str - -RELAY_ON = 0 -RELAY_OFF = 1 - -RELAYS = [ - Relay("Light", "CON2-P22"), - Relay("Waterfall", "CON2-P18"), - Relay("Filter", "CON2-P16"), - Relay("Unknown 2", "CON2-P12"), -] -DEVICE = "/dev/ttyUSB0" - -def alloff(args, sbc, handle, pin_to_line): - "Turn off all relays" - for relay in RELAYS: - line = pin_to_line[relay.pin] - sbc.gpio_claim_output(handle, line, level=RELAY_OFF) - print_status(sbc, handle, line) - -def flags_to_name(flags: int): - "Get the set of flag names based on the provided flag value" - result = set() - for value, name in FLAG_TO_NAME.items(): - if flags & value: - result.add(name) - return result - -def on(args, sbc, handle, pin_to_line): - "Turn on a single relay" - name_to_pin = {relay.name.lower(): relay.pin for relay in RELAYS} - pin = name_to_pin[args.name.lower()] - line = pin_to_line[pin] - sbc.gpio_claim_output(handle, line, level=RELAY_ON) - sbc.gpio_write(handle, line, RELAY_ON) - print_status(sbc, handle, line) - print(f"{args.name} now on") - if args.duration: - print(f"Waiting {args.duration} minutes ({args.duration / 60.0} hours), then turning off") - time.sleep(args.duration * 60) - sbc.gpio_write(handle, line, RELAY_OFF) - print(f"{args.name} now off") - print_status(sbc, handle, line) - - -def status(args, sbc, handle, pin_to_line): - pin_to_name = {relay.pin: relay.name for relay in RELAYS} - for pin, name in pin_to_name.items(): - line = pin_to_line[pin] - print_status(sbc, handle, line) - -def print_status(sbc, handle, line): - okay_status, io_number, flags, pin, user = sbc.gpio_get_line_info(handle, line) - names = flags_to_name(flags) - if okay_status == 0: - print(f"{io_number}): {flags} {pin} {user}: {' '.join(sorted(names))}") - else: - print(f"Failed to get status for {line}") - -def map_pin_to_line(sbc, handle): - start_pin, end_pin, name, driver = sbc.gpio_get_chip_info(handle) - pin_to_line = {} - for x in range(start_pin, end_pin): - okay_status, io_number, flags, pin, user = sbc.gpio_get_line_info(handle, x) - flag_names = flags_to_name(flags) - if okay_status != 0: - print(f"Error getting pin {x}: status {okay_status}") - continue - pin_to_line[pin] = x - return pin_to_line - -def main(): - parser = argparse.ArgumentParser() - subparsers = parser.add_subparsers(required=True, help="the sub-command to execute") - - parser_status = subparsers.add_parser("status", help="Show status of the relays") - parser_status.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to") - parser_status.set_defaults(func=status) - - parser_alloff = subparsers.add_parser("alloff", help="Turn off all relays") - parser_alloff.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to") - parser_alloff.set_defaults(func=alloff) - - parser_on = subparsers.add_parser("on", help="Turn on a relay") - parser_on.add_argument("name", choices=[relay.name for relay in RELAYS]) - parser_on.add_argument("-d", "--duration", type=int, default=0, help="Time, in minutes, to keep the relay on") - parser_on.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to") - parser_on.set_defaults(func=on) - args = parser.parse_args() - - sbc = rgpio.sbc(host=args.host) - if not sbc.connected: - print("Not connected") - return -1 - - handle = sbc.gpiochip_open(0) # open /dev/gpiochip0 - if handle < 0: - print("Error on open") - return -1 - - pin_to_line = map_pin_to_line(sbc, handle) - - try: - args.func(args, sbc, handle, pin_to_line) - finally: - sbc.gpiochip_close(handle) - - return 0 - -if __name__ == "__main__": - sys.exit(main()) diff --git a/mdns-registration-test.py b/mdns-registration-test.py deleted file mode 100755 index 6c2ff0e..0000000 --- a/mdns-registration-test.py +++ /dev/null @@ -1,74 +0,0 @@ -#!/usr/bin/env python3 -"""Example of announcing 250 services (in this case, a fake HTTP server).""" - -import argparse -import asyncio -import logging -import socket -from typing import List, Optional - -from zeroconf import IPVersion -from zeroconf.asyncio import AsyncServiceInfo, AsyncZeroconf - - -class AsyncRunner: - def __init__(self, ip_version: IPVersion) -> None: - self.ip_version = ip_version - self.aiozc: Optional[AsyncZeroconf] = None - - async def register_services(self, infos: List[AsyncServiceInfo]) -> None: - self.aiozc = AsyncZeroconf(ip_version=self.ip_version) - tasks = [self.aiozc.async_register_service(info) for info in infos] - background_tasks = await asyncio.gather(*tasks) - await asyncio.gather(*background_tasks) - print("Finished registration, press Ctrl-C to exit...") - while True: - await asyncio.sleep(1) - - async def unregister_services(self, infos: List[AsyncServiceInfo]) -> None: - assert self.aiozc is not None - tasks = [self.aiozc.async_unregister_service(info) for info in infos] - background_tasks = await asyncio.gather(*tasks) - await asyncio.gather(*background_tasks) - await self.aiozc.async_close() - - -if __name__ == '__main__': - logging.basicConfig(level=logging.DEBUG) - - parser = argparse.ArgumentParser() - parser.add_argument('--debug', action='store_true') - version_group = parser.add_mutually_exclusive_group() - version_group.add_argument('--v6', action='store_true') - version_group.add_argument('--v6-only', action='store_true') - args = parser.parse_args() - - if args.debug: - logging.getLogger('zeroconf').setLevel(logging.DEBUG) - if args.v6: - ip_version = IPVersion.All - elif args.v6_only: - ip_version = IPVersion.V6Only - else: - ip_version = IPVersion.V4Only - - infos = [] - for i in range(250): - infos.append( - AsyncServiceInfo( - "_http._tcp.local.", - f"Paul's Test Web Site {i}._http._tcp.local.", - addresses=[socket.inet_aton("127.0.0.1")], - port=80, - properties={'path': '/~paulsm/'}, - server=f"zcdemohost-{i}.local.", - ) - ) - - print("Registration of 250 services...") - loop = asyncio.get_event_loop() - runner = AsyncRunner(ip_version) - try: - loop.run_until_complete(runner.register_services(infos)) - except KeyboardInterrupt: - loop.run_until_complete(runner.unregister_services(infos)) diff --git a/mdns-scan-test.py b/mdns-scan-test.py deleted file mode 100755 index 8898d8a..0000000 --- a/mdns-scan-test.py +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/env python3 -from zeroconf import ServiceBrowser, ServiceListener, Zeroconf - - -class MyListener(ServiceListener): - - def update_service(self, zc: Zeroconf, type_: str, name: str) -> None: - print(f"Service {name} updated") - - def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None: - print(f"Service {name} removed") - - def add_service(self, zc: Zeroconf, type_: str, name: str) -> None: - info = zc.get_service_info(type_, name) - print(f"Service {name} added, service info: {info}") - - -zeroconf = Zeroconf() -listener = MyListener() -browser = ServiceBrowser(zeroconf, "_http._tcp.local.", listener) -try: - input("Press enter to exit...\n\n") -finally: - zeroconf.close() diff --git a/pnpdevice/config.py b/pnpdevice/config.py new file mode 100644 index 0000000..4d74568 --- /dev/null +++ b/pnpdevice/config.py @@ -0,0 +1,8 @@ +from starlette.config import Config + +config = Config(".env") + +DEBUG = config("DEBUG", cast=bool, default=False) +DATABASE = config("DATABASE_URL", cast=databases.DatabaseURL) +SECRET_KEY = config("SECRET_KEY", cast=Secret) +ALLOWED_HOSTS = config("ALLOWED_HOSTS", cast=CommaSeparatedStrings) diff --git a/pnpdevice/relays.py b/pnpdevice/relays.py index d3f08c2..5b19ac1 100644 --- a/pnpdevice/relays.py +++ b/pnpdevice/relays.py @@ -3,75 +3,24 @@ import logging import os import pathlib import tomllib -from typing import Dict, Iterable, List, Union +from typing import Dict, List, Union -import rgpio import tomli_w LOGGER = logging.getLogger(__name__) -# Max number of chips to scan for -MAX_CHIPS = 16 - -# From http://abyz.me.uk/lg/py_rgpio.html#gpio_get_mode -GPIO_KERNEL_IN_USE = 1<<0 -GPIO_KERNEL_OUTPUT = 1<<1 -GPIO_KERNEL_ACTIVE_LOW = 1<<2 -GPIO_KERNEL_OPEN_DRAIN = 1<<3 -GPIO_KERNEL_OPEN_SOURCE = 1<<4 -GPIO_KERNEL_PULL_UP = 1<<5 -GPIO_KERNEL_PULL_DOWN = 1<<6 -GPIO_KERNEL_PULL_OFF = 1<<7 -GPIO_LG_INPUT = 1<<8 -GPIO_LG_OUTPUT = 1<<9 -GPIO_LG_ALERT = 1<<10 -GPIO_LG_GROUP = 1<<11 -GPIO_LG_NOT_USED1 = 1<<12 -GPIO_LG_NOT_USED2 = 1<<13 -GPIO_LG_NOT_USED3 = 1<<14 -GPIO_LG_NOT_USED4 = 1<<15 -GPIO_KERNEL_INPUT = 1<<16 -GPIO_KERNEL_RISING_EDGE_ALERT = 1<<17 -GPIO_KERNEL_FALLING_EDGE_ALERT = 1<<18 -GPIO_KERNEL_REALTIME_CLOCK_ALERT = 1<<19 - -RELAY_OFF = 1 -RELAY_ON = 0 - -@dataclasses.dataclass(frozen=True) -class Chip: - handle: int - label: str - name: str - number: int - number_of_lines: int - - @property - def id(self) -> str: - return f"{self.number}-{self.name}" - -@dataclasses.dataclass(frozen=True) +@dataclasses.dataclass class Pin: - chip: Chip - flags: int - line_number: int + chip: str + number: int name: str - user: str - @property - def id(self) -> str: - return f"{self.chip.id}-{self.line_number}" - -@dataclasses.dataclass(frozen=True) +@dataclasses.dataclass class Relay: pin: Pin name: str state: bool - @property - def id(self) -> str: - return f"{self.pin.id}" - def _deserialize(data: List[Dict[str, str]]) -> List[Relay]: "Deserialize a list of relays from the config." return [Relay( @@ -80,12 +29,10 @@ def _deserialize(data: List[Dict[str, str]]) -> List[Relay]: chip=relay["chip"], number=int(relay["pinnumber"]), name=relay["pinname"], - ), - state=False) for relay in data] + )) for relay in data] def _serialize(data: List[Relay]) -> List[Dict[str, str]]: "Serialize a list of relays to the config." - LOGGER.info("Serializing %s", data) return [{ "chip": relay.pin.chip, "name": relay.name, @@ -93,207 +40,71 @@ def _serialize(data: List[Relay]) -> List[Dict[str, str]]: "pinname": relay.pin.name, } for relay in data] -class Manager: +class Relays: "Class for interacting with relays." - def __init__(self, configpath: pathlib.Path, has_fakes: bool): - try: - self.config = _read_config(configpath) - except FileNotFoundError: - self.config = { - "rgpio": {"host": "localhost", "port": 8889}, - "relays": [{ - "chip": -1, - "pin": "FAKE-P1", - "name": "Fake", - }], - } - # Immediately write out the default config - _write_config(configpath, self.config) + def __init__(self, configpath: pathlib.Path): self.configpath = configpath - self.has_fakes = has_fakes - - # Handles to various chips, mapped by chip number - self._chips = {} - # Info on various pins, mapped by (chip number, line_number) - self._pins = {} - # Cached info on the relays, mapped by relay name - self._relays = {} - # Connection to single-board computer GPIO - self.sbc = None - - def __iter__(self): - "Provide an iterator for the relays." - return iter(self.relays) - - @property - def chips(self) -> List[Chip]: - if not self._chips: - self._load_chips() - return self._chips.values() - - def connect(self) -> None: - self.sbc = rgpio.sbc( - host=self.config["rgpio"]["host"], - port=self.config["rgpio"]["port"], - ) - if not self.sbc.connected: - LOGGER.warning("Failed to connect sbc") - self.sbc = None - - def get_chip_by_number(self, number: int) -> Chip: try: - return self._chips[number] - except KeyError: - self._load_chip(number) - return self._chips[number] + with open(configpath, "rb") as f: + content = tomllib.load(f) + self.config = content + except Exception as e: + LOGGER.info("Unable to load config file: %s", e) + self.config = { + "relays": [], + } + self.relays = _deserialize(self.config["relays"]) - def get_pin_by_number(self, chip: Chip, line_number: int) -> Pin: - try: - return self._pins[(chip.number, line_number)] - except KeyError: - self._load_pin(chip, line_number) - return self._pins[(chip.number, line_number)] - def get_relay_by_pin_id(self, pin_id: str) -> Relay: - for relay in self.relays: - if relay.pin.id == pin_id: - return relay - return None + def chips_list(self) -> List[str]: + raise NotImplementedError() - def load_all_pins(self) -> None: - self._load_chips() - self._load_pins() + def pins_list(self) -> List[Pin]: + raise NotImplementedError() - @property - def pins(self) -> List[Pin]: - if not self._pins: - self._load_pins() - return self._pins.values() - - @property - def relays(self) -> Iterable[Relay]: - if not self._relays: - for relay in self.config["relays"]: - LOGGER.debug("Relay: %s", relay) - chip = self.get_chip_by_number(relay["chip"]["number"]) - pin = self.get_pin_by_number(chip, relay["pin"]["line_number"]) - self._relays[relay["name"]] = Relay( - name=relay["name"], - pin=pin, - state=False, - ) - return self._relays.values() - - def relay_add(self, chip_number: int, chip_name: str, line_number: int, name: str) -> None: - LOGGER.info("Creating relay %s with chip %d %s on line %d", name, chip_number, chip_name, line_number) - chip = self.get_chip_by_number(chip_number) - pin = self.get_pin_by_number(chip, line_number) + def relay_add(self, chip: str, pinnumber: int, name: str) -> None: self.config["relays"].append({ - "chip": { - "name": chip.name, - "number": chip.number, - }, - "pin": { - "name": pin.name, - "line_number": pin.line_number, - }, + "chip": chip, "name": name, + "pinnumber": pinnumber, }) - _write_config(self.configpath, self.config) - self._relays = {} + with open(self.configpath, "wb") as f: + tomli_w.dump({ + "relays": _serialize(self.config["relays"]), + }, f) + LOGGER.info("Wrote config file %s", self.configpath) - def relay_set(self, relay: Relay, state: bool) -> None: - level = RELAY_ON if state else RELAY_OFF - ok = self.sbc.gpio_write(relay.pin.chip.handle, relay.pin.line_number, level) - if ok != 0: - LOGGER.warn("Failed to set relay level for %s (%s)", relay.name, relay.id) - else: - LOGGER.info("Set relay %s (%s) to %s", relay.name, relay.id, "enabled" if state else "disabled") - okay, offset, flags, name, user = self.sbc.gpio_get_line_info(relay.pin.chip.handle, relay.pin.line_number) - LOGGER.info("Flags are now %d", flags) - self._relays[relay.name] = Relay( - name=relay.name, - pin=relay.pin, - state=state, - ) + def relays_list(self) -> List[Relay]: + return self.relays + + def relay_on(self, name: str) -> bool: + raise NotImplementedError() + def relay_set(self, name: str, state: bool) -> None: + raise NotImplementedError() - def shutdown(self) -> None: - _write_config(self.configpath, self.config) +class RelaysFake(Relays): + "Class for fake relays, useful for testing." + def chips_list(self) -> List[str]: + return ["chipA", "chipB"] - def _load_chip(self, number: int) -> None: - try: - handle = self.sbc.gpiochip_open(number) - except rgpio.error: - return - if handle < 0: - return - okay, number_of_lines, name, label = self.sbc.gpio_get_chip_info(handle) - LOGGER.info("Chip info: %s %s %s %s", okay, number_of_lines, name, label) - if okay != 0: - LOGGER.warn("Chip %s not okay.", name) - return - self._chips[number] = Chip( - handle=handle, - label=label, - name=name, - number=number, - number_of_lines=number_of_lines, - ) + def pins_list(self) -> List[Pin]: + return [ + Pin(chip="chipA", number=1, name="CON2-P1"), + Pin(chip="chipA", number=2, name="CON2-P2"), + Pin(chip="chipA", number=3, name="CON2-P10"), + Pin(chip="chipB", number=1, name="CON2-P3"), + Pin(chip="chipB", number=2, name="CON2-P7"), + ] - # Talk to the remote pins daemon and get information about the chips - def _load_chips(self) -> None: - for i in range(MAX_CHIPS): - self._load_chip(i) + def relays_list(self) -> List[Relay]: + raise NotImplementedError() + + def relay_on(self, name: str) -> bool: + raise NotImplementedError() - def _load_pin(self, chip: Chip, line_number: int) -> None: - okay, offset, flags, name, user = self.sbc.gpio_get_line_info(chip.handle, line_number) - LOGGER.info("Got line info: %s %s %s %s", offset, flags, name, user) - assert offset == line_number - if okay != 0: - LOGGER.warn("Line %s is not okay", name) - return - if not name: - LOGGER.warn("Ignoring line %d because it has no name.", line_number) - return - # Claim the pin for output, set initially to relay off. - okay = self.sbc.gpio_claim_output( - handle=chip.handle, - gpio=line_number, - level=RELAY_OFF, - ) - if okay != 0: - LOGGER.warn("Failed to claim pin %s", line_number) - return - okay = self.sbc.gpio_write(chip.handle, line_number, RELAY_OFF) - if okay != 0: - LOGGER.warn("Failed to write pin %s", line_number) - return - okay, offset, flags, name, user = self.sbc.gpio_get_line_info(chip.handle, line_number) - LOGGER.info("Flags are %d", flags) - self._pins[(chip.number, line_number)] = Pin( - chip=chip, - flags=flags, - line_number=line_number, - name=name, - user=user, - ) - - # Talk to the remote pins daemon and get information about the pins - def _load_pins(self) -> None: - LOGGER.info("Loading pins") - for chip in self.chips: - for line in range(chip.number_of_lines): - self._load_pin(chip, line) - -def _read_config(configpath: pathlib.Path) -> None: - with open(configpath, "rb") as f: - content = tomllib.load(f) - LOGGER.info("Read config from %s", configpath) - return content - -def _write_config(configpath: pathlib.Path, config) -> None: - with open(configpath, "wb") as f: - tomli_w.dump(config, f) - LOGGER.info("Wrote config file %s", configpath) + def relay_set(self, name: str, state: bool) -> None: + raise NotImplementedError() +class RelaysReal(Relays): + "Class for controlling real relays." diff --git a/pnpdevice/server.py b/pnpdevice/server.py index 42ecc11..03b8515 100644 --- a/pnpdevice/server.py +++ b/pnpdevice/server.py @@ -1,83 +1,48 @@ -from contextlib import asynccontextmanager -import logging - -from fastapi import FastAPI, Form, Request -from fastapi.responses import HTMLResponse +from fastapi import FastAPI, Request from fastapi.templating import Jinja2Templates from starlette.responses import RedirectResponse import jinja2 -from pnpdevice import settings -from pnpdevice.relays import Manager +from pnpdevice.config import config import pnpdevice.discovery import pnpdevice.relays -LOGGER = logging.getLogger(__name__) -logging.basicConfig(level=logging.DEBUG) - -@asynccontextmanager -async def lifespan(app: FastAPI): - relays.connect() - yield - # relays.shutdown() - -app = FastAPI(debug=settings.DEBUG, lifespan=lifespan) -relays = Manager(settings.CONFIGFILE, settings.RELAYS_FAKE) +app = FastAPI(debug=config.DEBUG) templates = Jinja2Templates(directory="templates") -@app.get("/", response_class=HTMLResponse) -def index(request: Request): +@app.get("/") +def index(): + relays = request.app["relays"] return templates.TemplateResponse("index.template.html", { - "request": request, - "relays": relays, + "relays": relays.relays_list(), }) @app.get("/relay/create") def relay_create_get(request: Request): "Get the form to create a new relay." - relays.load_all_pins() - used_pin_names = {relay.pin.name for relay in relays} - pins = [p for p in relays.pins if p.name not in used_pin_names] - sorted_pins = sorted(pins, key=lambda p: p.name) - return templates.TemplateResponse("relay-create.template.html", { - "relays": relays, - "request": request, - "pins": sorted_pins, - }) + relays = request.app["relays"] + pins = relays.pins_list() + return templates.TemplateResponse("relay-create.template.html", {"pins": pins}) -@app.post("/relay/create") -def relay_create_post( - request: Request, - pin_id: str = Form(...), - name: str = Form(...), - ): - "Create a new relay from form POST." - chip_number, chip_name, line_number = pin_id.split("-") - chip_number = int(chip_number) - line_number = int(line_number) - relays.relay_add(chip_number, chip_name, line_number, name) - return RedirectResponse(status_code=303, url="/") - -@app.get("/relay/{pin_id}") -def relay_detail_get(request: Request, pin_id: str): - "Get details on a single relay." - relay = relays.get_relay_by_pin_id(pin_id) - return templates.TemplateResponse("relay-detail.template.html", { - "request": request, - "relay": relay - }) - -@app.post("/relay/{pin_id}/state") -def relay_state_post( - request: Request, - pin_id: str, - state: bool = Form(...), - ): - "Change the state of a relay." - relay = relays.get_relay_by_pin_id(pin_id) - relays.relay_set(relay, state) +async def relay_create_post(request: Request, chip_and_number: str, name: str): + "Create a new relay." + chip, number = chip_and_number.partition("-") + LOGGER.info("Creating relay %s %s with name %s", chip, number, name) return RedirectResponse(status_code=303, url="/") async def status(request: Request): return html("Status") + +def run(relays: pnpdevice.relays.Relays): + "Run the embedded web server" + app = web.Application() + app["relays"] = relays + aiohttp_jinja2.setup(app, + loader=jinja2.FileSystemLoader("templates")) + app.on_startup.append(pnpdevice.discovery.handle) + app.add_routes([web.get("/", index)]) + app.add_routes([web.get("/relay/create", relay_create_get)]) + app.add_routes([web.post("/relay/create", relay_create_post)]) + web.run_app(app) + diff --git a/pnpdevice/settings.py b/pnpdevice/settings.py deleted file mode 100644 index e1e7d2a..0000000 --- a/pnpdevice/settings.py +++ /dev/null @@ -1,11 +0,0 @@ -from pathlib import Path -from starlette.config import Config -from starlette.datastructures import CommaSeparatedStrings - -config = Config(".env") - -CONFIGFILE = config("CONFIGFILE", cast=Path, default="/etc/pnpdevice.toml") -DEBUG = config("DEBUG", cast=bool, default=False) -RELAYS_FAKE = config("RELAYS_FAKE", cast=bool, default=False) -# SECRET_KEY = config("SECRET_KEY", cast=Secret) -# ALLOWED_HOSTS = config("ALLOWED_HOSTS", cast=CommaSeparatedStrings) diff --git a/pyproject.toml b/pyproject.toml index 0f8d2e6..c6ef362 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,6 @@ authors = [ ] dependencies = [ "fastapi", - "python-multipart", "tomli-w", "uvicorn[standard]", "zeroconf", diff --git a/rgpio.py b/rgpio.py deleted file mode 100644 index 07c5bb5..0000000 --- a/rgpio.py +++ /dev/null @@ -1,3806 +0,0 @@ -""" -[http://abyz.me.uk/lg/py_rgpio.html] - -rgpio is a Python module which allows remote control of the GPIO -and other functions of Linux SBCs running the rgpiod daemon. - -The rgpiod daemon must be running on the SBCs you wish to control. - -*Features* - -o the rgpio Python module can run on Windows, Macs, or Linux -o controls one or more SBCs -o reading and writing GPIO singly and in groups -o software timed PWM and waves -o GPIO callbacks -o pipe notification of GPIO alerts -o I2C wrapper -o SPI wrapper -o serial link wrapper -o simple file handling -o creating and running scripts on the rgpiod daemon - -*Exceptions* - -By default a fatal exception is raised if you pass an invalid -argument to a rgpio function. - -If you wish to handle the returned status yourself you should set -rgpio.exceptions to False. - -You may prefer to check the returned status in only a few parts -of your code. In that case do the following: - -... -rgpio.exceptions = False - -# Code where you want to test the error status. - -rgpio.exceptions = True -... - -*Usage* - -The rgpiod daemon must be running on the SBCs whose GPIO -are to be manipulated. - -The normal way to start rgpiod is during system start. - -rgpiod & - -Your Python program must import rgpio and create one or more -instances of the rgpio.sbc class. This class gives access to -the specified SBC's GPIO. - -... -sbc1 = rgpio.sbc() # sbc1 accesses the local SBC's GPIO -sbc2 = rgpio.sbc('tom') # sbc2 accesses tom's GPIO -sbc3 = rgpio.sbc('dick') # sbc3 accesses dick's GPIO -... - -The later example code snippets assume that sbc is an instance of -the rgpio.sbc class. - -*Licence* - -This is free and unencumbered software released into the public domain. - -Anyone is free to copy, modify, publish, use, compile, sell, or -distribute this software, either in source code form or as a compiled -binary, for any purpose, commercial or non-commercial, and by any -means. - -In jurisdictions that recognize copyright laws, the author or authors -of this software dedicate any and all copyright interest in the -software to the public domain. We make this dedication for the benefit -of the public at large and to the detriment of our heirs and -successors. We intend this dedication to be an overt act of -relinquishment in perpetuity of all present and future rights to this -software under copyright law. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. -IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR -OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, -ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR -OTHER DEALINGS IN THE SOFTWARE. - -For more information, please refer to - -OVERVIEW - -ESSENTIAL - -rgpio.sbc Initialise sbc connection -stop Stop a sbc connection - -FILES - -file_open Opens a file -file_close Closes a file - -file_read Reads bytes from a file -file_write Writes bytes to a file - -file_seek Seeks to a position within a file - -file_list List files which match a pattern - -GPIO - -gpiochip_open Opens a gpiochip device -gpiochip_close Closes a gpiochip device - -gpio_get_chip_info Get information about a gpiochip -gpio_get_line_info Get information about a gpiochip line - -gpio_get_mode Gets the mode of a GPIO - -gpio_claim_input Claims a GPIO for input -gpio_claim_output Claims a GPIO for output -gpio_claim_alert Claims a GPIO for alerts -gpio_free Frees a GPIO - -group_claim_input Claims a group of GPIO for inputs -group_claim_output Claims a group of GPIO for outputs -group_free Frees a group of GPIO - -gpio_read Reads a GPIO -gpio_write Writes a GPIO - -group_read Reads a group of GPIO -group_write Writes a group of GPIO - -tx_pulse Starts pulses on a GPIO -tx_pwm Starts PWM on a GPIO -tx_servo Starts servo pulses on a GPIO -tx_wave Starts a wave on a group of GPIO -tx_busy See if tx is active on a GPIO or group -tx_room See if more room for tx on a GPIO or group - -gpio_set_debounce_micros Sets the debounce time for a GPIO -gpio_set_watchdog_micros Sets the watchdog time for a GPIO - -callback Starts a GPIO callback - -I2C - -i2c_open Opens an I2C device -i2c_close Closes an I2C device - -i2c_write_quick SMBus write quick - -i2c_read_byte SMBus read byte -i2c_write_byte SMBus write byte - -i2c_read_byte_data SMBus read byte data -i2c_write_byte_data SMBus write byte data - -i2c_read_word_data SMBus read word data -i2c_write_word_data SMBus write word data - -i2c_read_block_data SMBus read block data -i2c_write_block_data SMBus write block data - -i2c_read_i2c_block_data SMBus read I2C block data -i2c_write_i2c_block_data SMBus write I2C block data - -i2c_read_device Reads the raw I2C device -i2c_write_device Writes the raw I2C device - -i2c_process_call SMBus process call -i2c_block_process_call SMBus block process call - -i2c_zip Performs multiple I2C transactions - -NOTIFICATIONS - -notify_open Request a notification handle -notify_close Close a notification -notify_pause Pause notifications -notify_resume Resume notifications - -SCRIPTS - -script_store Store a script -script_run Run a stored script -script_update Set a scripts parameters -script_status Get script status and parameters -script_stop Stop a running script -script_delete Delete a stored script - -SERIAL - -serial_open Opens a serial device -serial_close Closes a serial device - -serial_read_byte Reads a byte from a serial device -serial_write_byte Writes a byte to a serial device - -serial_read Reads bytes from a serial device -serial_write Writes bytes to a serial device - -serial_data_available Returns number of bytes ready to be read - -SHELL - -shell Executes a shell command - -SPI - -spi_open Opens a SPI device -spi_close Closes a SPI device - -spi_read Reads bytes from a SPI device -spi_write Writes bytes to a SPI device -spi_xfer Transfers bytes with a SPI device - -UTILITIES - -get_sbc_name Get the SBC name - -get_internal Get an SBC configuration value -set_internal Set an SBC configuration value - -set_user Set the user (and associated permissions) -set_share_id Set the share id for a resource -use_share_id Use this share id when asking for a resource - -rgpio.get_module_version Get the rgpio Python module version -rgpio.error_text Get the error text for an error code -""" - -import sys -import socket -import struct -import time -import threading -import os -import atexit -import hashlib - -RGPIO_PY_VERSION = 0x00020200 - -exceptions = True - -MAGIC=1818715245 - -# GPIO levels - -OFF = 0 -LOW = 0 -CLEAR = 0 - -ON = 1 -HIGH = 1 -SET = 1 - -TIMEOUT = 2 - -GROUP_ALL = 0xffffffffffffffff - -# GPIO line flags - -SET_ACTIVE_LOW = 4 -SET_OPEN_DRAIN = 8 -SET_OPEN_SOURCE = 16 -SET_PULL_UP = 32 -SET_PULL_DOWN = 64 -SET_PULL_NONE = 128 - -# GPIO event flags - -RISING_EDGE = 1 -FALLING_EDGE = 2 -BOTH_EDGES = 3 - -# tx constants - -TX_PWM = 0 -TX_WAVE = 1 - -# script run status - -SCRIPT_INITING = 0 -SCRIPT_READY = 1 -SCRIPT_RUNNING = 2 -SCRIPT_WAITING = 3 -SCRIPT_ENDED = 4 -SCRIPT_HALTED = 5 -SCRIPT_FAILED = 6 - -# notification flags - -NTFY_FLAGS_ALIVE = (1 << 0) - -FILE_READ = 1 -FILE_WRITE = 2 -FILE_RW = 3 - -FILE_APPEND = 4 -FILE_CREATE = 8 -FILE_TRUNC = 16 - -FROM_START = 0 -FROM_CURRENT = 1 -FROM_END = 2 - -SPI_MODE_0 = 0 -SPI_MODE_1 = 1 -SPI_MODE_2 = 2 -SPI_MODE_3 = 3 - -_SOCK_CMD_LEN = 16 - -# rgpiod command numbers - -_CMD_FO = 1 -_CMD_FC = 2 -_CMD_FR = 3 -_CMD_FW = 4 -_CMD_FS = 5 -_CMD_FL = 6 -_CMD_GO = 10 -_CMD_GC = 11 -_CMD_GSIX = 12 -_CMD_GSOX = 13 -_CMD_GSAX = 14 -_CMD_GSF = 15 -_CMD_GSGIX = 16 -_CMD_GSGOX = 17 -_CMD_GSGF = 18 -_CMD_GR = 19 -_CMD_GW = 20 -_CMD_GGR = 21 -_CMD_GGWX = 22 -_CMD_GPX = 23 -_CMD_PX = 24 -_CMD_SX = 25 -_CMD_GWAVE = 26 -_CMD_GBUSY = 27 -_CMD_GROOM = 28 -_CMD_GDEB = 29 -_CMD_GWDOG = 30 -_CMD_GIC = 31 -_CMD_GIL = 32 -_CMD_GMODE = 33 -_CMD_I2CO = 40 -_CMD_I2CC = 41 -_CMD_I2CRD = 42 -_CMD_I2CWD = 43 -_CMD_I2CWQ = 44 -_CMD_I2CRS = 45 -_CMD_I2CWS = 46 -_CMD_I2CRB = 47 -_CMD_I2CWB = 48 -_CMD_I2CRW = 49 -_CMD_I2CWW = 50 -_CMD_I2CRK = 51 -_CMD_I2CWK = 52 -_CMD_I2CRI = 53 -_CMD_I2CWI = 54 -_CMD_I2CPC = 55 -_CMD_I2CPK = 56 -_CMD_I2CZ = 57 -_CMD_NO = 70 -_CMD_NC = 71 -_CMD_NR = 72 -_CMD_NP = 73 -_CMD_PARSE = 80 -_CMD_PROC = 81 -_CMD_PROCD = 82 -_CMD_PROCP = 83 -_CMD_PROCR = 84 -_CMD_PROCS = 85 -_CMD_PROCU = 86 -_CMD_SERO = 90 -_CMD_SERC = 91 -_CMD_SERRB = 92 -_CMD_SERWB = 93 -_CMD_SERR = 94 -_CMD_SERW = 95 -_CMD_SERDA = 96 -_CMD_SPIO = 100 -_CMD_SPIC = 101 -_CMD_SPIR = 102 -_CMD_SPIW = 103 -_CMD_SPIX = 104 -_CMD_MICS = 113 -_CMD_MILS = 114 -_CMD_CGI = 115 -_CMD_CSI = 116 -_CMD_NOIB = 117 -_CMD_SHELL = 118 -_CMD_SBC = 120 -_CMD_FREE = 121 -_CMD_SHARE = 130 -_CMD_USER = 131 -_CMD_PASSW = 132 -_CMD_LCFG = 133 -_CMD_SHRU = 134 -_CMD_SHRS = 135 -_CMD_PWD = 136 -_CMD_PCD = 137 -_CMD_LGV = 140 -_CMD_TICK = 141 - -# rgpiod error numbers - -OKAY = 0 -INIT_FAILED = -1 -BAD_MICROS = -2 -BAD_PATHNAME = -3 -NO_HANDLE = -4 -BAD_HANDLE = -5 -BAD_SOCKET_PORT = -6 -NOT_PERMITTED = -7 -SOME_PERMITTED = -8 -BAD_SCRIPT = -9 -BAD_TX_TYPE = -10 -GPIO_IN_USE = -11 -BAD_PARAM_NUM = -12 -DUP_TAG = -13 -TOO_MANY_TAGS = -14 -BAD_SCRIPT_CMD = -15 -BAD_VAR_NUM = -16 -NO_SCRIPT_ROOM = -17 -NO_MEMORY = -18 -SOCK_READ_FAILED = -19 -SOCK_WRIT_FAILED = -20 -TOO_MANY_PARAM = -21 -SCRIPT_NOT_READY = -22 -BAD_TAG = -23 -BAD_MICS_DELAY = -24 -BAD_MILS_DELAY = -25 -I2C_OPEN_FAILED = -26 -SERIAL_OPEN_FAILED = -27 -SPI_OPEN_FAILED = -28 -BAD_I2C_BUS = -29 -BAD_I2C_ADDR = -30 -BAD_SPI_CHANNEL = -31 -BAD_I2C_FLAGS = -32 -BAD_SPI_FLAGS = -33 -BAD_SERIAL_FLAGS = -34 -BAD_SPI_SPEED = -35 -BAD_SERIAL_DEVICE = -36 -BAD_SERIAL_SPEED = -37 -BAD_FILE_PARAM = -38 -BAD_I2C_PARAM = -39 -BAD_SERIAL_PARAM = -40 -I2C_WRITE_FAILED = -41 -I2C_READ_FAILED = -42 -BAD_SPI_COUNT = -43 -SERIAL_WRITE_FAILED = -44 -SERIAL_READ_FAILED = -45 -SERIAL_READ_NO_DATA = -46 -UNKNOWN_COMMAND = -47 -SPI_XFER_FAILED = -48 -BAD_POINTER = -49 -MSG_TOOBIG = -50 -BAD_MALLOC_MODE = -51 -TOO_MANY_SEGS = -52 -BAD_I2C_SEG = -53 -BAD_SMBUS_CMD = -54 -BAD_I2C_WLEN = -55 -BAD_I2C_RLEN = -56 -BAD_I2C_CMD = -57 -FILE_OPEN_FAILED = -58 -BAD_FILE_MODE = -59 -BAD_FILE_FLAG = -60 -BAD_FILE_READ = -61 -BAD_FILE_WRITE = -62 -FILE_NOT_ROPEN = -63 -FILE_NOT_WOPEN = -64 -BAD_FILE_SEEK = -65 -NO_FILE_MATCH = -66 -NO_FILE_ACCESS = -67 -FILE_IS_A_DIR = -68 -BAD_SHELL_STATUS = -69 -BAD_SCRIPT_NAME = -70 -CMD_INTERRUPTED = -71 -BAD_EVENT_REQUEST = -72 -BAD_GPIO_NUMBER = -73 -BAD_GROUP_SIZE = -74 -BAD_LINEINFO_IOCTL = -75 -BAD_READ = -76 -BAD_WRITE = -77 -CANNOT_OPEN_CHIP = -78 -GPIO_BUSY = -79 -GPIO_NOT_ALLOCATED = -80 -NOT_A_GPIOCHIP = -81 -NOT_ENOUGH_MEMORY = -82 -POLL_FAILED = -83 -TOO_MANY_GPIOS = -84 -UNEGPECTED_ERROR = -85 -BAD_PWM_MICROS = -86 -NOT_GROUP_LEADER = -87 -SPI_IOCTL_FAILED = -88 -BAD_GPIOCHIP = -89 -BAD_CHIPINFO_IOCTL = -90 -BAD_CONFIG_FILE = -91 -BAD_CONFIG_VALUE = -92 -NO_PERMISSIONS = -93 -BAD_USERNAME = -94 -BAD_SECRET = -95 -TX_QUEUE_FULL = -96 -BAD_CONFIG_ID = -97 -BAD_DEBOUNCE_MICS = -98 -BAD_WATCHDOG_MICS = -99 -BAD_SERVO_FREQ = -100 -BAD_SERVO_WIDTH = -101 -BAD_PWM_FREQ = -102 -BAD_PWM_DUTY = -103 -GPIO_NOT_AN_OUTPUT = -104 -INVALID_GROUP_ALERT = -105 - -# rgpiod error text - -_errors=[ - [OKAY, "No error"], - [INIT_FAILED, "initialisation failed"], - [BAD_MICROS, "micros not 0-999999"], - [BAD_PATHNAME, "can not open pathname"], - [NO_HANDLE, "no handle available"], - [BAD_HANDLE, "unknown handle"], - [BAD_SOCKET_PORT, "socket port not 1024-32000"], - [NOT_PERMITTED, "GPIO operation not permitted"], - [SOME_PERMITTED, "one or more GPIO not permitted"], - [BAD_SCRIPT, "invalid script"], - [BAD_TX_TYPE, "bad tx type for GPIO and group"], - [GPIO_IN_USE, "GPIO already in use"], - [BAD_PARAM_NUM, "script parameter id not 0-9"], - [DUP_TAG, "script has duplicate tag"], - [TOO_MANY_TAGS, "script has too many tags"], - [BAD_SCRIPT_CMD, "illegal script command"], - [BAD_VAR_NUM, "script variable id not 0-149"], - [NO_SCRIPT_ROOM, "no more room for scripts"], - [NO_MEMORY, "can not allocate temporary memory"], - [SOCK_READ_FAILED, "socket read failed"], - [SOCK_WRIT_FAILED, "socket write failed"], - [TOO_MANY_PARAM, "too many script parameters (> 10)"], - [SCRIPT_NOT_READY, "script initialising"], - [BAD_TAG, "script has unresolved tag"], - [BAD_MICS_DELAY, "bad MICS delay (too large)"], - [BAD_MILS_DELAY, "bad MILS delay (too large)"], - [I2C_OPEN_FAILED, "can not open I2C device"], - [SERIAL_OPEN_FAILED, "can not open serial device"], - [SPI_OPEN_FAILED, "can not open SPI device"], - [BAD_I2C_BUS, "bad I2C bus"], - [BAD_I2C_ADDR, "bad I2C address"], - [BAD_SPI_CHANNEL, "bad SPI channel"], - [BAD_I2C_FLAGS, "bad I2C open flags"], - [BAD_SPI_FLAGS, "bad SPI open flags"], - [BAD_SERIAL_FLAGS, "bad serial open flags"], - [BAD_SPI_SPEED, "bad SPI speed"], - [BAD_SERIAL_DEVICE, "bad serial device name"], - [BAD_SERIAL_SPEED, "bad serial baud rate"], - [BAD_FILE_PARAM, "bad file parameter"], - [BAD_I2C_PARAM, "bad I2C parameter"], - [BAD_SERIAL_PARAM, "bad serial parameter"], - [I2C_WRITE_FAILED, "i2c write failed"], - [I2C_READ_FAILED, "i2c read failed"], - [BAD_SPI_COUNT, "bad SPI count"], - [SERIAL_WRITE_FAILED, "ser write failed"], - [SERIAL_READ_FAILED, "ser read failed"], - [SERIAL_READ_NO_DATA, "ser read no data available"], - [UNKNOWN_COMMAND, "unknown command"], - [SPI_XFER_FAILED, "spi xfer/read/write failed"], - [BAD_POINTER, "bad (NULL) pointer"], - [MSG_TOOBIG, "socket/pipe message too big"], - [BAD_MALLOC_MODE, "bad memory allocation mode"], - [TOO_MANY_SEGS, "too many I2C transaction segments"], - [BAD_I2C_SEG, "an I2C transaction segment failed"], - [BAD_SMBUS_CMD, "SMBus command not supported by driver"], - [BAD_I2C_WLEN, "bad I2C write length"], - [BAD_I2C_RLEN, "bad I2C read length"], - [BAD_I2C_CMD, "bad I2C command"], - [FILE_OPEN_FAILED, "file open failed"], - [BAD_FILE_MODE, "bad file mode"], - [BAD_FILE_FLAG, "bad file flag"], - [BAD_FILE_READ, "bad file read"], - [BAD_FILE_WRITE, "bad file write"], - [FILE_NOT_ROPEN, "file not open for read"], - [FILE_NOT_WOPEN, "file not open for write"], - [BAD_FILE_SEEK, "bad file seek"], - [NO_FILE_MATCH, "no files match pattern"], - [NO_FILE_ACCESS, "no permission to access file"], - [FILE_IS_A_DIR, "file is a directory"], - [BAD_SHELL_STATUS, "bad shell return status"], - [BAD_SCRIPT_NAME, "bad script name"], - [CMD_INTERRUPTED, "Python socket command interrupted"], - [BAD_EVENT_REQUEST, "bad event request"], - [BAD_GPIO_NUMBER, "bad GPIO number"], - [BAD_GROUP_SIZE, "bad group size"], - [BAD_LINEINFO_IOCTL, "bad lineinfo IOCTL"], - [BAD_READ, "bad GPIO read"], - [BAD_WRITE, "bad GPIO write"], - [CANNOT_OPEN_CHIP, "can not open gpiochip"], - [GPIO_BUSY, "GPIO busy"], - [GPIO_NOT_ALLOCATED, "GPIO not allocated"], - [NOT_A_GPIOCHIP, "not a gpiochip"], - [NOT_ENOUGH_MEMORY, "not enough memory"], - [POLL_FAILED, "GPIO poll failed"], - [TOO_MANY_GPIOS, "too many GPIO"], - [UNEGPECTED_ERROR, "unexpected error"], - [BAD_PWM_MICROS, "bad PWM micros"], - [NOT_GROUP_LEADER, "GPIO not the group leader"], - [SPI_IOCTL_FAILED, "SPI iOCTL failed"], - [BAD_GPIOCHIP, "bad gpiochip"], - [BAD_CHIPINFO_IOCTL, "bad chipinfo IOCTL"], - [BAD_CONFIG_FILE, "bad configuration file"], - [BAD_CONFIG_VALUE, "bad configuration value"], - [NO_PERMISSIONS, "no permission to perform action"], - [BAD_USERNAME, "bad user name"], - [BAD_SECRET, "bad secret for user"], - [TX_QUEUE_FULL, "TX queue full"], - [BAD_CONFIG_ID, "bad configuration id"], - [BAD_DEBOUNCE_MICS, "bad debounce microseconds"], - [BAD_WATCHDOG_MICS, "bad watchdog microseconds"], - [BAD_SERVO_FREQ, "bad servo frequency"], - [BAD_SERVO_WIDTH, "bad servo pulsewidth"], - [BAD_PWM_FREQ, "bad PWM frequency"], - [BAD_PWM_DUTY, "bad PWM dutycycle"], - [GPIO_NOT_AN_OUTPUT, "GPIO not set as an output"], - [INVALID_GROUP_ALERT, "can not set a group to alert"], -] - -_except_a = "############################################################\n{}" - -_except_z = "############################################################" - -_except_1 = """ -Did you start the rgpiod daemon? E.g. rgpiod & - -Did you specify the correct host/port in the environment -variables LG_ADDR and/or LG_PORT? -E.g. export LG_ADDR=soft, export LG_PORT=8889 - -Did you specify the correct host/port in the -rgpio.sbc() function? E.g. rgpio.sbc('soft', 8889)""" - -_except_2 = """ -Do you have permission to access the rgpiod daemon? -Perhaps it was started with rgpiod -nlocalhost""" - -_except_3 = """ -Can't create callback thread. -Perhaps too many simultaneous rgpiod connections.""" - -class _socklock: - """ - A class to store socket and lock. - """ - def __init__(self): - self.s = None - self.l = threading.Lock() - -class error(Exception): - """ - rgpio module exception - """ - def __init__(self, value): - self.value = value - def __str__(self): - return repr(self.value) - -class pulse: - """ - A class to store pulse information. - """ - - def __init__(self, group_bits, group_mask, pulse_delay): - """ - Initialises a pulse. - - group_bits:= the levels to set if the corresponding bit in - group_mask is set. - group_mask:= a mask indicating the group GPIO to be updated. - delay:= the delay in microseconds before the next pulse. - """ - self.group_bits = group_bits - self.group_mask = group_mask - self.pulse_delay = pulse_delay - - -# A couple of hacks to cope with different string handling -# between various Python versions -# 3 != 2.7.8 != 2.7.3 - -if sys.hexversion < 0x03000000: - def _b(x): - return x -else: - def _b(x): - return x.encode('latin-1') - -if sys.hexversion < 0x02070800: - def _str(x): - return buffer(x) -else: - def _str(x): - return x - -def u2i(uint32): - """ - Converts a 32 bit unsigned number to signed. - - uint32:= an unsigned 32 bit number - - Returns a signed number. - - ... - print(u2i(4294967272)) - -24 - print(u2i(37)) - 37 - ... - """ - mask = (2 ** 32) - 1 - if uint32 & (1 << 31): - v = uint32 | ~mask - else: - v = uint32 & mask - return v - -def _u2i(status): - """ - If the status is negative it indicates an error. On error - a rgpio exception will be raised if exceptions is True. - """ - v = u2i(status) - if v < 0: - if exceptions: - raise error(error_text(v)) - return v - -def _u2i_list(lst): - """ - Checks a returned list. The first member is the status. - If the status is negative it indicates an error. On error - a rgpio exception will be raised if exceptions is True. - """ - lst[0] = u2i(lst[0]) - if lst[0] < 0: - if exceptions: - raise error(error_text(lst[0])) - return lst - -def _lg_command(sl, cmd, Q=0, L=0, H=0): - """ - """ - status = CMD_INTERRUPTED - with sl.l: - sl.s.send(struct.pack('IIHHHH', MAGIC, 0, cmd, Q, L, H)) - status, dummy = struct.unpack('I12s', sl.s.recv(_SOCK_CMD_LEN)) - return status - -def _lg_command_nolock(sl, cmd, Q=0, L=0, H=0): - """ - """ - status = CMD_INTERRUPTED - sl.s.send(struct.pack('IIHHHH', MAGIC, 0, cmd, Q, L, H)) - status, dummy = struct.unpack('I12s', sl.s.recv(_SOCK_CMD_LEN)) - return status - -def _lg_command_ext(sl, cmd, p3, extents, Q=0, L=0, H=0): - """ - """ - ext = bytearray(struct.pack('IIHHHH', MAGIC, p3, cmd, Q, L, H)) - for x in extents: - if type(x) == type(""): - ext.extend(_b(x)) - else: - ext.extend(x) - status = CMD_INTERRUPTED - with sl.l: - sl.s.sendall(ext) - status, dummy = struct.unpack('I12s', sl.s.recv(_SOCK_CMD_LEN)) - return status - -def _lg_command_ext_nolock(sl, cmd, p3, extents, Q=0, L=0, H=0): - """ - """ - status = CMD_INTERRUPTED - ext = bytearray(struct.pack('IIHHHH', MAGIC, p3, cmd, Q, L, H)) - for x in extents: - if type(x) == type(""): - ext.extend(_b(x)) - else: - ext.extend(x) - sl.s.sendall(ext) - status, dummy = struct.unpack('I12s', sl.s.recv(_SOCK_CMD_LEN)) - return status - -class _callback_ADT: - """ - An ADT class to hold callback information. - """ - - def __init__(self, gpiochip, gpio, edge, func): - """ - Initialises a callback ADT. - - gpiochip:= gpiochip device number. - gpio:= gpio number in device. - edge:= BOTH_EDGES, RISING_EDGE, or FALLING_EDGE. - func:= a user function taking four arguments - (gpiochip, gpio, level, tick). - """ - self.chip = gpiochip - self.gpio = gpio - self.edge = edge - self.func = func - -class _callback_thread(threading.Thread): - """ - A class to encapsulate rgpio notification callbacks. - """ - - def __init__(self, control, host, port): - """ - Initialises notifications. - """ - threading.Thread.__init__(self) - self.control = control - self.sl = _socklock() - self.go = False - self.daemon = True - self.monitor = 0 - self.callbacks = [] - self.sl.s = socket.create_connection((host, port), None) - self.lastLevel = 0 - self.handle = _u2i(_lg_command(self.sl, _CMD_NOIB)) - self.go = True - self.start() - - def stop(self): - """ - Stops notifications. - """ - if self.go: - self.go = False - ext = [struct.pack("I", self.handle)] - _lg_command_ext(self.control, _CMD_NC, 4, ext, L=1) - - def append(self, callb): - """ - Adds a callback to the notification thread. - """ - self.callbacks.append(callb) - - def remove(self, callb): - """ - Removes a callback from the notification thread. - """ - if callb in self.callbacks: - self.callbacks.remove(callb) - - def run(self): - """ - Runs the notification thread. - """ - - lastLevel = self.lastLevel - RECV_SIZ = 4096 - MSG_SIZ = 16 # 4 bytes of padding in each message - - buf = bytes() - while self.go: - - buf += self.sl.s.recv(RECV_SIZ) - offset = 0 - - while self.go and (len(buf) - offset) >= MSG_SIZ: - msgbuf = buf[offset:offset + MSG_SIZ] - offset += MSG_SIZ - tick, chip, gpio, level, flags, pad = ( - struct.unpack('QBBBBI', msgbuf)) - - if flags == 0: - for cb in self.callbacks: - if cb.gpio == gpio: - cb.func(chip, gpio, level, tick) - else: # no flags currently defined, ignore. - pass - - buf = buf[offset:] - - self.sl.s.close() - -class _callback: - """ - A class to provide GPIO level change callbacks. - """ - - def __init__(self, notify, chip, gpio, edge=RISING_EDGE, func=None): - """ - Initialise a callback and adds it to the notification thread. - """ - self._notify = notify - self.count=0 - self._reset = False - if func is None: - func=self._tally - self.callb = _callback_ADT(chip, gpio, edge, func) - self._notify.append(self.callb) - - def cancel(self): - """ - Cancels a callback by removing it from the notification thread. - """ - self._notify.remove(self.callb) - - def _tally(self, chip, gpio, level, tick): - """ - Increment the callback called count. - """ - if self._reset: - self._reset = False - self.count = 0 - self.count += 1 - - def tally(self): - """ - Provides a count of how many times the default tally - callback has triggered. - - The count will be zero if the user has supplied their own - callback function. - """ - return self.count - - def reset_tally(self): - """ - Resets the tally count to zero. - """ - self._reset = True - self.count = 0 - -def error_text(errnum): - """ - Returns a description of an error number. - - errnum:= <0, the error number - - ... - print(rgpio.error_text(-5)) - level not 0-1 - ... - """ - for e in _errors: - if e[0] == errnum: - return e[1] - return "unknown error" - -def get_module_version(): - """ - Returns the version number of the rgpio Python module as a dotted - quad. - - A.B.C.D - - A. API major version, changed if breaks previous API - B. API minor version, changed when new function added - C. bug fix - D. documentation changegi - """ - return "rgpio.py_{}.{}.{}.{}".format( - (RGPIO_PY_VERSION>>24)&0xff, (RGPIO_PY_VERSION>>16)&0xff, - (RGPIO_PY_VERSION>>8)&0xff, RGPIO_PY_VERSION&0xff) - -class sbc(): - - def _rxbuf(self, count): - """ - Returns count bytes from the command socket. - """ - ext = bytearray(self.sl.s.recv(count)) - while len(ext) < count: - ext.extend(self.sl.s.recv(count - len(ext))) - return ext - - def __repr__(self): - """ - Returns details of the sbc connection. - """ - return "".format(self._host, self._port) - - # ESSENTIAL - - def __init__(self, - host = os.getenv("LG_ADDR", 'localhost'), - port = os.getenv("LG_PORT", 8889), - show_errors = True): - """ - Establishes a connection to the rgpiod daemon running on a SBC. - - host:= the host name of the SBC on which the rgpiod daemon is - running. The default is localhost unless overridden by - the LG_ADDR environment variable. - port:= the port number on which the rgpiod daemon is listening. - The default is 8889 unless overridden by the LG_PORT - environment variable. The rgpiod daemon must have been - started with the same port number. - - This connects to the rgpiod daemon and reserves resources - to be used for sending commands and receiving notifications. - - An instance attribute [*connected*] may be used to check the - success of the connection. If the connection is established - successfully [*connected*] will be True, otherwise False. - - If the LG_USER environment variable exists that user will be - "logged in" using [*set_user*]. This only has an effect if - the rgpiod daemon is running with access control enabled. - - ... - sbc = rgpio.sbc() # use defaults - sbc = rgpio.sbc('mypi') # specify host, default port - sbc = rgpio.sbc('mypi', 7777) # specify host and port - - sbc = rgpio.sbc() # exit script if no connection - if not sbc.connected: - exit() - ... - """ - self.connected = True - - self.sl = _socklock() - self._notify = None - - port = int(port) - - if host == '': - host = "localhost" - - self._host = host - self._port = port - - try: - self.sl.s = socket.create_connection((host, port), None) - - # Disable the Nagle algorithm. - self.sl.s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - - self._notify = _callback_thread(self.sl, host, port) - - except socket.error: - exception = 1 - - except struct.error: - exception = 2 - - except error: - # assumed to be no handle available - exception = 3 - - else: - exception = 0 - atexit.register(self.stop) - - if exception != 0: - - self.connected = False - - if self.sl.s is not None: - self.sl.s = None - - if show_errors: - - s = "Can't connect to rgpiod at {}({})".format(host, str(port)) - - - print(_except_a.format(s)) - if exception == 1: - print(_except_1) - elif exception == 2: - print(_except_2) - else: - print(_except_3) - print(_except_z) - - else: # auto login if LG_USER exists - - user = os.getenv("LG_USER", '') - - if len(user) > 0: - self.set_user(user) - - def stop(self): - """ - Disconnects from the rgpiod daemon and releases any used resources. - - ... - sbc.stop() - ... - """ - - self.connected = False - - if self._notify is not None: - self._notify.stop() - self._notify = None - - if self.sl.s is not None: - # Free all resources allocated to this connection - _lg_command(self.sl, _CMD_FREE) - self.sl.s.close() - self.sl.s = None - - # FILES - - def file_open(self, file_name, file_mode): - """ - This function returns a handle to a file opened in a specified mode. - - This is a privileged command. See [+Permits+]. - - file_name:= the file to open. - file_mode:= the file open mode. - - If OK returns a handle (>= 0). - - On failure returns a negative error code. - - Mode - - The mode may have the following values: - - Constant @ Value @ Meaning - FILE_READ @ 1 @ open file for reading - FILE_WRITE @ 2 @ open file for writing - FILE_RW @ 3 @ open file for reading and writing - - The following values may be or'd into the mode: - - Name @ Value @ Meaning - FILE_APPEND @ 4 @ All writes append data to the end of the file - FILE_CREATE @ 8 @ The file is created if it doesn't exist - FILE_TRUNC @ 16 @ The file is truncated - - Newly created files are owned by the user who launched the daemon. - They will have permissions owner read and write. - - ... - #!/usr/bin/env python - - import rgpio - - sbc = rgpio.sbc() - - if not sbc.connected: - exit() - - handle = sbc.file_open("/ram/lg.c", rgpio.FILE_READ) - - done = False - - while not done: - c, d = sbc.file_read(handle, 60000) - if c > 0: - print(d) - else: - done = True - - sbc.file_close(handle) - - sbc.stop() - ... - """ - ext = [struct.pack("I", file_mode)] + [file_name] - return _u2i(_lg_command_ext( - self.sl, _CMD_FO, 4+len(file_name), ext, L=1)) - - def file_close(self, handle): - """ - Closes a file. - - handle:= >= 0 (as returned by [*file_open*]). - - If OK returns 0. - - On failure returns a negative error code. - - ... - sbc.file_close(handle) - ... - """ - ext = [struct.pack("I", handle)] - return _u2i(_lg_command_ext(self.sl, _CMD_FC, 4, ext, L=1)) - - def file_read(self, handle, count): - """ - Reads up to count bytes from the file. - - handle:= >= 0 (as returned by [*file_open*]). - count:= >0, the number of bytes to read. - - If OK returns a list of the number of bytes read and a - bytearray containing the bytes. - - On failure returns a list of a negative error code and an - empty string. - - ... - (b, d) = sbc.file_read(h2, 100) - if b > 0: - # process read data - ... - """ - bytes = CMD_INTERRUPTED - ext = [struct.pack("II", handle, count)] - rdata = "" - with self.sl.l: - bytes = u2i( - _lg_command_ext_nolock(self.sl, _CMD_FR, 8, ext, L=2)) - if bytes > 0: - rdata = self._rxbuf(bytes) - return _u2i_list([bytes, rdata]) - - def file_write(self, handle, data): - """ - Writes the data bytes to the file. - - handle:= >= 0 (as returned by [*file_open*]). - data:= the bytes to write. - - If OK returns 0. - - On failure returns a negative error code. - - ... - sbc.file_write(h1, b'\\x02\\x03\\x04') - - sbc.file_write(h2, b'help') - - sbc.file_write(h2, "hello") - - sbc.file_write(h1, [2, 3, 4]) - ... - """ - ext = [struct.pack("I", handle)] + [data] - return _u2i(_lg_command_ext(self.sl, _CMD_FW, 4+len(data), ext, L=1)) - - def file_seek(self, handle, seek_offset, seek_from): - """ - Seeks to a position relative to the start, current position, - or end of the file. Returns the new position. - - handle:= >= 0 (as returned by [*file_open*]). - seek_offset:= byte offset. - seek_from:= FROM_START, FROM_CURRENT, or FROM_END. - - If OK returns the new file position. - - On failure returns a negative error code. - - ... - new_pos = sbc.file_seek(h, 100, rgpio.FROM_START) - - cur_pos = sbc.file_seek(h, 0, rgpio.FROM_CURRENT) - - file_size = sbc.file_seek(h, 0, rgpio.FROM_END) - ... - """ - ext = [struct.pack("IiI", handle, seek_offset, seek_from)] - return _u2i(_lg_command_ext(self.sl, _CMD_FS, 12, ext, L=3)) - - def file_list(self, fpattern): - """ - Returns a list of files which match a pattern. - - This is a privileged command. See [+Permits+]. - - fpattern:= file pattern to match. - - If OK returns a list of the number of bytes read and a - bytearray containing the matching filenames (the filenames - are separated by newline characters). - - On failure returns a list of a negative error code and an - empty string. - - ... - #!/usr/bin/env python - - import rgpio - - sbc = rgpio.sbc() - - if not sbc.connected: - exit() - - c, d = sbc.file_list("/ram/p*.c") - if c > 0: - print(d) - - sbc.stop() - ... - """ - bytes = CMD_INTERRUPTED - ext = [struct.pack("I", 60000)] + [fpattern] - rdata = "" - with self.sl.l: - bytes = u2i(_lg_command_ext_nolock( - self.sl, _CMD_FL, 4+len(fpattern), ext, L=1)) - if bytes > 0: - rdata = self._rxbuf(bytes) - return _u2i_list([bytes, rdata]) - - # GPIO - - def gpiochip_open(self, gpiochip): - """ - This returns a handle to a gpiochip device. - - This is a privileged command. See [+permits+]. - - gpiochip:= >= 0 - - If OK returns a handle (>= 0). - - On failure returns a negative error code. - - ... - h = gpiochip_open(0) # open /dev/gpiochip0 - if h >= 0: - # open okay - else: - # open error - ... - """ - ext = [struct.pack("I", gpiochip)] - handle = u2i(_lg_command_ext(self.sl, _CMD_GO, 4, ext, L=1)) - if handle >= 0: - handle = handle | (gpiochip << 16) - - return _u2i(handle) - - def gpiochip_close(self, handle): - """ - This closes a gpiochip device. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - - If OK returns 0. - - On failure returns a negative error code. - - ... - sbc.gpiochip_close(h) - ... - """ - ext = [struct.pack("I", handle&0xffff)] - return _u2i(_lg_command_ext(self.sl, _CMD_GC, 4, ext, L=1)) - - def gpio_get_chip_info(self, handle): - """ - This returns summary information of an opened gpiochip. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - - If OK returns a list of okay status, number of - lines, name, and label. - - On failure returns a negative error code. - """ - bytes = CMD_INTERRUPTED - ext = [struct.pack("I", handle&0xffff)] - rdata = "" - with self.sl.l: - bytes = u2i( - _lg_command_ext_nolock(self.sl, _CMD_GIC, 4, ext, L=1)) - if bytes > 0: - rdata = self._rxbuf(bytes) - lines, name, label = struct.unpack("I32s32s", rdata) - bytes = OKAY - else: - lines, name, label = 0, "", "" - return _u2i_list([bytes, lines, - name.decode().rstrip('\0'), label.decode().rstrip('\0')]) - - - def gpio_get_line_info(self, handle, gpio): - """ - This returns detailed information of a GPIO of - an opened gpiochip. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - gpio:= the GPIO. - - If OK returns a list of okay status, offset, - line flags, name, and user. - - The meaning of the line flags bits are as given for the mode - by [*gpio_get_mode*]. - - On failure returns a negative error code. - """ - bytes = CMD_INTERRUPTED - ext = [struct.pack("II", handle&0xffff, gpio)] - rdata = "" - with self.sl.l: - bytes = u2i( - _lg_command_ext_nolock(self.sl, _CMD_GIL, 8, ext, L=2)) - if bytes > 0: - rdata = self._rxbuf(bytes) - offset, flags, name, user = struct.unpack("II32s32s", rdata) - bytes = OKAY - else: - offset, flags, name, user = 0, 0, "", "" - return _u2i_list( - [bytes, offset, flags, - name.decode().rstrip('\0'), user.decode().rstrip('\0')]) - - def gpio_get_mode(self, handle, gpio): - """ - This returns the mode of a GPIO. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - gpio:= the GPIO. - - If OK returns the mode of the GPIO. - - On failure returns a negative error code. - - Bit @ Value @ Meaning - 0 @ 1 @ Kernel: In use by the kernel - 1 @ 2 @ Kernel: Output - 2 @ 4 @ Kernel: Active low - 3 @ 8 @ Kernel: Open drain - 4 @ 16 @ Kernel: Open source - 5 @ 32 @ Kernel: Pull up set - 6 @ 64 @ Kernel: Pull down set - 7 @ 128 @ Kernel: Pulls off set - 8 @ 256 @ LG: Input - 9 @ 512 @ LG: Output - 10 @ 1024 @ LG: Alert - 11 @ 2048 @ LG: Group - 12 @ 4096 @ LG: --- - 13 @ 8192 @ LG: --- - 14 @ 16384 @ LG: --- - 15 @ 32768 @ LG: --- - 16 @ 65536 @ Kernel: Input - 17 @ 1<<17 @ Kernel: Rising edge alert - 18 @ 1<<18 @ Kernel: Falling edge alert - 19 @ 1<<19 @ Kernel: Realtime clock alert - - The LG bits are only set if the query was made by the process - that owns the GPIO. - """ - ext = [struct.pack("II", handle&0xffff, gpio)] - return _u2i(_lg_command_ext(self.sl, _CMD_GMODE, 8, ext, L=2)) - - def gpio_claim_input(self, handle, gpio, lFlags=0): - """ - This claims a GPIO for input. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - gpio:= the GPIO to be claimed. - lFlags:= line flags for the GPIO. - - If OK returns 0. - - On failure returns a negative error code. - - The line flags may be used to set the GPIO - as active low, open drain, open source, - pull up, pull down, pull off. - - ... - sbc.gpio_claim_input(h, 23) # open GPIO 23 for input. - ... - """ - ext = [struct.pack("III", handle&0xffff, lFlags, gpio)] - return _u2i(_lg_command_ext(self.sl, _CMD_GSIX, 12, ext, L=3)) - - def gpio_claim_output(self, handle, gpio, level=0, lFlags=0): - """ - This claims a GPIO for output. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - gpio:= the GPIO to be claimed. - level:= the initial value for the GPIO. - lFlags:= line flags for the GPIO. - - If OK returns 0. - - On failure returns a negative error code. - - The line flags may be used to set the GPIO - as active low, open drain, open source, - pull up, pull down, pull off. - - If level is zero the GPIO will be initialised low (0). If any other - value is used the GPIO will be initialised high (1). - - ... - sbc.gpio_claim_output(h, 3) # open GPIO 3 for low output. - ... - """ - ext = [struct.pack("IIII", handle&0xffff, lFlags, gpio, level)] - return _u2i(_lg_command_ext(self.sl, _CMD_GSOX, 16, ext, L=4)) - - def gpio_free(self, handle, gpio): - """ - This frees a GPIO. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - gpio:= the GPIO to be freed. - - If OK returns 0. - - On failure returns a negative error code. - - The GPIO may now be claimed by another user or for - a different purpose. - """ - ext = [struct.pack("II", handle&0xffff, gpio)] - return _u2i(_lg_command_ext(self.sl, _CMD_GSF, 8, ext, L=2)) - - def group_claim_input(self, handle, gpio, lFlags=0): - """ - This claims a group of GPIO for inputs. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - gpios:= a list of GPIO to be claimed. - lFlags:= line flags for the group of GPIO. - - If OK returns 0. - - On failure returns a negative error code. - - The line flags may be used to set the group - as active low, open drain, open source, - pull up, pull down, pull off. - - gpio is a list of one or more GPIO. The first GPIO in the - list is called the group leader and is used to reference the - group as a whole. - - """ - if len(gpio): - ext = bytearray() - ext.extend(struct.pack("II", handle&0xffff, lFlags)) - for g in gpio: - ext.extend(struct.pack("I", g)) - return _u2i(_lg_command_ext( - self.sl, _CMD_GSGIX, (len(gpio)+2)*4, [ext], L=len(gpio)+2)) - else: - return 0 - - def group_claim_output(self, handle, gpio, levels=[0], lFlags=0): - """ - This claims a group of GPIO for outputs. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - gpio:= a list of GPIO to be claimed. - levels:= a list of the initial value for each GPIO. - lFlags:= line flags for the group of GPIO. - - If OK returns 0. - - On failure returns a negative error code. - - The line flags may be used to set the group - as active low, open drain, open source, - pull up, pull down, pull off. - - gpio is a list of one or more GPIO. The first GPIO in the list is - called the group leader and is used to reference the group as a whole. - - levels is a list of initialisation values for the GPIO. If a value is - zero the corresponding GPIO will be initialised low (0). If any other - value is used the corresponding GPIO will be initialised high (1). - - """ - if len(gpio): - diff = len(gpio)-len(levels) - if diff > 0: - levels = levels + [0]*diff - ext = bytearray() - ext.extend(struct.pack("II", handle&0xffff, lFlags)) - for g in gpio: - ext.extend(struct.pack("I", g)) - for v in range(len(gpio)): - ext.extend(struct.pack("I", levels[v])) - return _u2i(_lg_command_ext( - self.sl, _CMD_GSGOX, - (2+(len(gpio)*2))*4, [ext], L=2+(len(gpio)*2))) - else: - return 0 - - def group_free(self, handle, gpio): - """ - This frees all the GPIO associated with a group. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - gpio:= the group leader. - - If OK returns 0. - - On failure returns a negative error code. - - The GPIO may now be claimed by another user or for a different purpose. - - """ - ext = [struct.pack("II", handle&0xffff, gpio)] - return _u2i(_lg_command_ext(self.sl, _CMD_GSGF, 8, ext, L=2)) - - def gpio_read(self, handle, gpio): - """ - This returns the level of a GPIO. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - gpio:= the GPIO to be read. - - If OK returns 0 (low) or 1 (high). - - On failure returns a negative error code. - - This command will work for any claimed GPIO (even if a member - of a group). For an output GPIO the value returned - will be that last written to the GPIO. - - """ - ext = [struct.pack("II", handle&0xffff, gpio)] - return _u2i(_lg_command_ext(self.sl, _CMD_GR, 8, ext, L=2)) - - def gpio_write(self, handle, gpio, level): - """ - This sets the level of an output GPIO. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - gpio:= the GPIO to be written. - level:= the value to write. - - If OK returns 0. - - On failure returns a negative error code. - - This command will work for any GPIO claimed as an output - (even if a member of a group). - - If level is zero the GPIO will be set low (0). - If any other value is used the GPIO will be set high (1). - - """ - ext = [struct.pack("III", handle&0xffff, gpio, level)] - return _u2i(_lg_command_ext(self.sl, _CMD_GW, 12, ext, L=3)) - - - def group_read(self, handle, gpio): - """ - This returns the levels read from a group. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - gpio:= the group to be read. - - If OK returns a list of group size and levels. - - On failure returns a list of negative error code and a dummy. - - This command will work for an output group as well as an input - group. For an output group the value returned - will be that last written to the group GPIO. - - Note that this command will also work on an individual GPIO claimed - as an input or output as that is treated as a group with one member. - - After a successful read levels is set as follows. - - Bit 0 is the level of the group leader. - Bit 1 is the level of the second GPIO in the group. - Bit x is the level of GPIO x+1 of the group. - - """ - ext = [struct.pack("II", handle&0xffff, gpio)] - status = CMD_INTERRUPTED - levels = 0 - with self.sl.l: - bytes = u2i( - _lg_command_ext_nolock(self.sl, _CMD_GGR, 8, ext, L=2)) - if bytes > 0: - data = self._rxbuf(bytes) - levels, status = struct.unpack('QI', _str(data)) - else: - status = bytes - return _u2i_list([status, levels]) - - def group_write(self, handle, gpio, group_bits, group_mask=GROUP_ALL): - """ - This sets the levels of an output group. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - gpio:= the group to be written. - group_bits:= the level to set if the corresponding bit in - group_mask is set. - group_mask:= a mask indicating the group GPIO to be updated. - - If OK returns 0. - - On failure returns a negative error code. - - The values of each GPIO of the group are set according to the bits - of group_bits. - - Bit 0 sets the level of the group leader. - Bit 1 sets the level of the second GPIO in the group. - Bit x sets the level of GPIO x+1 in the group. - - However this may be overridden by the group_mask. A GPIO is only - updated if the corresponding bit in the mask is 1. - - """ - ext = [struct.pack( - "QQII", group_bits, group_mask, handle&0xffff, gpio)] - return _u2i(_lg_command_ext(self.sl, _CMD_GGWX, 24, ext, Q=2, L=2)) - - - def tx_pulse(self, handle, gpio, - pulse_on, pulse_off, pulse_offset=0, pulse_cycles=0): - """ - This starts software timed pulses on an output GPIO. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - gpio:= the GPIO to be pulsed. - pulse_on:= pulse high time in microseconds. - pulse_off:= pulse low time in microsseconds. - pulse_offset:= offset from nominal pulse start position. - pulse_cycles:= the number of cycles to be sent, 0 for infinite. - - If OK returns the number of entries left in the PWM queue for the GPIO. - - On failure returns a negative error code. - - If both pulse_on and pulse_off are zero pulses will be - switched off for that GPIO. The active pulse, if any, - will be stopped and any queued pulses will be deleted. - - Each successful call to this function consumes one PWM queue entry. - - pulse_cycles cycles are transmitted (0 means infinite). Each - cycle consists of pulse_on microseconds of GPIO high followed by - pulse_off microseconds of GPIO low. - - PWM is characterised by two values, its frequency (number of cycles - per second) and its dutycycle (percentage of high time per cycle). - - The set frequency will be 1000000 / (pulse_on + pulse_off) Hz. - - The set dutycycle will be pulse_on / (pulse_on + pulse_off) * 100 %. - - E.g. if pulse_on is 50 and pulse_off is 100 the frequency will be - 6666.67 Hz and the dutycycle will be 33.33 %. - - pulse_offset is a microsecond offset from the natural start of - the pulse cycle. - - For instance if the PWM frequency is 10 Hz the natural start of each - cycle is at seconds 0, then 0.1, 0.2, 0.3 etc. In this case if - the offset is 20000 microseconds the cycle will start at seconds - 0.02, 0.12, 0.22, 0.32 etc. - - Another pulse command may be issued to the GPIO before the last - has finished. - - If the last pulse had infinite cycles (pulse_cycles of 0) then it - will be replaced by the new settings at the end of the current - cycle. Otherwise it will be replaced by the new settings at - the end of pulse_cycles cycles. - - Multiple pulse settings may be queued in this way. - - """ - ext = [struct.pack("IIIIII", handle&0xffff, gpio, - pulse_on, pulse_off, pulse_offset, pulse_cycles)] - return _u2i(_lg_command_ext(self.sl, _CMD_GPX, 24, ext, L=6)) - - - def tx_pwm(self, handle, gpio, - pwm_frequency, pwm_duty_cycle, pulse_offset=0, pulse_cycles=0): - """ - This starts software timed PWM on an output GPIO. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - gpio:= the GPIO to be pulsed. - pwm_frequency:= PWM frequency in Hz (0=off, 0.1-10000). - pwm_duty_cycle:= PWM duty cycle in % (0-100). - pulse_offset:= offset from nominal pulse start position. - pulse_cycles:= the number of cycles to be sent, 0 for infinite. - - If OK returns the number of entries left in the PWM queue for the GPIO. - - On failure returns a negative error code. - - Each successful call to this function consumes one PWM queue entry. - - pulse_cycles cycles are transmitted (0 means infinite). - - PWM is characterised by two values, its frequency (number of cycles - per second) and its dutycycle (percentage of high time per cycle). - - pulse_offset is a microsecond offset from the natural start of - the pulse cycle. - - For instance if the PWM frequency is 10 Hz the natural start of each - cycle is at seconds 0, then 0.1, 0.2, 0.3 etc. In this case if - the offset is 20000 microseconds the cycle will start at seconds - 0.02, 0.12, 0.22, 0.32 etc. - - Another PWM command may be issued to the GPIO before the last - has finished. - - If the last pulse had infinite cycles then it will be replaced by - the new settings at the end of the current cycle. Otherwise it will - be replaced by the new settings when all its cycles are complete. - - Multiple pulse settings may be queued in this way. - - """ - ext = [struct.pack("IIIIII", handle&0xffff, gpio, - int(pwm_frequency*1000), int(pwm_duty_cycle*1000), - pulse_offset, pulse_cycles)] - return _u2i(_lg_command_ext(self.sl, _CMD_PX, 24, ext, L=6)) - - - def tx_servo(self, handle, gpio, pulse_width, - servo_frequency=50, pulse_offset=0, pulse_cycles=0): - """ - This starts software timed servo pulses on an output GPIO. - - I would only use software timed servo pulses for testing - purposes. The timing jitter will cause the servo to fidget. - This may cause it to overheat and wear out prematurely. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - gpio:= the GPIO to be pulsed. - pulse_width:= pulse high time in microseconds (0=off, 500-2500). - servo_frequency:= the number of pulses per second (40-500). - pulse_offset:= offset from nominal pulse start position. - pulse_cycles:= the number of cycles to be sent, 0 for infinite. - - If OK returns the number of entries left in the PWM queue for the GPIO. - - On failure returns a negative error code. - - Each successful call to this function consumes one PWM queue entry. - - pulse_cycles cycles are transmitted (0 means infinite). - - pulse_offset is a microsecond offset from the natural start of - the pulse cycle. - - Another servo command may be issued to the GPIO before the last - has finished. - - If the last pulse had infinite cycles then it will be replaced by - the new settings at the end of the current cycle. Otherwise it will - be replaced by the new settings when all its cycles are compete. - - Multiple servo settings may be queued in this way. - - """ - ext = [struct.pack("IIIIII", handle&0xffff, gpio, - pulse_width, servo_frequency, pulse_offset, pulse_cycles)] - return _u2i(_lg_command_ext(self.sl, _CMD_SX, 24, ext, L=6)) - - - def tx_wave(self, handle, gpio, pulses): - """ - This starts a software timed wave on an output group. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - gpio:= the group to be pulsed. - pulses:= the pulses to transmit. - - If OK returns the number of entries left in the wave queue - for the group. - - On failure returns a negative error code. - - Each successful call to this function consumes one wave queue entry. - - This command starts a wave of pulses. - - pulses is an array of pulses to be transmitted on the group. - - Each pulse is defined by the following triplet: - - bits: the levels to set for the selected GPIO - mask: the GPIO to select - delay: the delay in microseconds before the next pulse - - Another wave command may be issued to the group before the - last has finished transmission. The new wave will start when - the previous wave has competed. - - Multiple waves may be queued in this way. - - """ - if len(pulses): - q = 3 * len(pulses) - l = 2 - size = (q*8) + (l*4) - ext1 = bytearray() - for p in pulses: - ext1.extend(struct.pack( - "QQQ", p.group_bits, p.group_mask, p.pulse_delay)) - ext2 = struct.pack("II", handle&0xffff, gpio) - ext = [ext1, ext2] - return _u2i(_lg_command_ext(self.sl, _CMD_GWAVE, size, ext, Q=q, L=l)) - else: - return 0 - - - def tx_busy(self, handle, gpio, kind): - """ - This returns true if transmissions of the specified kind - are active on the GPIO or group. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - gpio:= the GPIO or group to be checked. - kind:= TX_PWM or TX_WAVE. - - If OK returns 1 for busy and 0 for not busy. - - On failure returns a negative error code. - - """ - ext = [struct.pack("III", handle&0xffff, gpio, kind)] - return _u2i(_lg_command_ext(self.sl, _CMD_GBUSY, 12, ext, L=3)) - - def tx_room(self, handle, gpio, kind): - """ - This returns the number of slots there are to queue further - transmissions of the specified kind on a GPIO or group. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - gpio:= the GPIO or group to be checked. - kind:= TX_PWM or TX_WAVE. - - If OK returns the number of free entries (0 for none). - - On failure returns a negative error code. - - """ - ext = [struct.pack("III", handle&0xffff, gpio, kind)] - return _u2i(_lg_command_ext(self.sl, _CMD_GROOM, 12, ext, L=3)) - - def gpio_set_debounce_micros(self, handle, gpio, debounce_micros): - """ - This sets the debounce time for a GPIO. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - gpio:= the GPIO to be configured. - debounce_micros:= the value to set. - - If OK returns 0. - - On failure returns a negative error code. - - This only affects alerts. - - An alert will only be issued if the edge has been stable for - at least debounce microseconds. - - Generally this is used to debounce mechanical switches - (e.g. contact bounce). - - Suppose that a square wave at 5 Hz is being generated on a GPIO. - Each edge will last 100000 microseconds. If a debounce time - of 100001 is set no alerts will be generated, If a debounce - time of 99999 is set 10 alerts will be generated per second. - - Note that level changes will be timestamped debounce microseconds - after the actual level change. - - """ - ext = [struct.pack("III", handle&0xffff, gpio, debounce_micros)] - return _u2i(_lg_command_ext(self.sl, _CMD_GDEB, 12, ext, L=3)) - - - def gpio_set_watchdog_micros(self, handle, gpio, watchdog_micros): - """ - This sets the watchdog time for a GPIO. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - gpio:= the GPIO to be configured. - watchdog_micros:= the value to set. - - If OK returns 0. - - On failure returns a negative error code. - - This only affects alerts. - - A watchdog alert will be sent if no edge alert has been issued - for that GPIO in the previous watchdog microseconds. - - Note that only one watchdog alert will be sent per stream of - edge alerts. The watchdog is reset by the sending of a new - edge alert. - - The level is set to TIMEOUT (2) for a watchdog alert. - """ - ext = [struct.pack("III", handle&0xffff, gpio, watchdog_micros)] - return _u2i(_lg_command_ext(self.sl, _CMD_GWDOG, 12, ext, L=3)) - - - def gpio_claim_alert( - self, handle, gpio, eFlags, lFlags=0, notify_handle=None): - """ - This claims a GPIO to be used as a source of alerts on level changes. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - gpio:= >= 0, as legal for the gpiochip. - eFlags:= event flags for the GPIO. - lFlags:= line flags for the GPIO. - notifiy_handle: >=0 (as returned by [*notify_open*]). - - If OK returns 0. - - On failure returns a negative error code. - - The line flags may be used to set the GPIO - as active low, open drain, open source, - pull up, pull down, pull off. - - The event flags are used to generate alerts for a rising edge, - falling edge, or both edges. - - Use the default notification handle of None unless you plan - to read the alerts from a notification pipe you have opened. - - """ - if notify_handle is None: - notify_handle = self._notify.handle - ext = [struct.pack( - "IIIII", handle&0xffff, lFlags, eFlags, gpio, notify_handle)] - return _u2i(_lg_command_ext(self.sl, _CMD_GSAX, 20, ext, L=5)) - - - - def callback(self, handle, gpio, edge=RISING_EDGE, func=None): - """ - Calls a user supplied function (a callback) whenever the - specified GPIO edge is detected. - - handle:= >= 0 (as returned by [*gpiochip_open*]). - gpio:= >= 0, as legal for the gpiochip. - edge:= BOTH_EDGES, RISING_EDGE (default), or FALLING_EDGE. - func:= user supplied callback function. - - Returns a callback instance. - - The user supplied callback receives four parameters, the chip, - the GPIO, the level, and the timestamp. - - The reported level will be one of - - 0: change to low (a falling edge) - 1: change to high (a rising edge) - 2: no level change (a watchdog timeout) - - Early kernels used to provide a timestamp as the number of nanoseconds - since the Epoch (start of 1970). Later kernels use the number of - nanoseconds since boot. It's probably best not to make any assumption - as to the timestamp origin. - - If a user callback is not specified a default tally callback is - provided which simply counts edges. The count may be retrieved - by calling the callback instance's tally() method. The count may - be reset to zero by calling the callback instance's reset_tally() - method. - - The callback may be cancelled by calling the callback - instance's cancel() method. - - A GPIO may have multiple callbacks (although I can't think of - a reason to do so). - - If you want to track the level of more than one GPIO do so by - maintaining the state in the callback. Do not use [*gpio_read*]. - Remember the alert that triggered the callback may have - happened several milliseconds before and the GPIO may have - changed level many times since then. - - ... - def cbf(chip, gpio, level, timestamp): - print(chip, gpio, level, timestamp) - - cb1 = sbc.callback(0, 22, rgpio.BOTH_EDGES, cbf) - - cb2 = sbc.callback(0, 4, rgpio.BOTH_EDGES) - - cb3 = sbc.callback(0, 17) - - print(cb3.tally()) - - cb3.reset_tally() - - cb1.cancel() # To cancel callback cb1. - ... - """ - return _callback(self._notify, handle>>16, gpio, edge, func) - - - # I2C - - def i2c_open(self, i2c_bus, i2c_address, i2c_flags=0): - """ - Returns a handle (>= 0) for the device at the I2C bus address. - - This is a privileged command. See [+Permits+]. - - i2c_bus:= >= 0. - i2c_address:= 0-0x7F. - i2c_flags:= 0, no flags are currently defined. - - If OK returns a handle (>= 0). - - On failure returns a negative error code. - - For the SMBus commands the low level transactions are shown - at the end of the function description. The following - abbreviations are used: - - . . - S (1 bit) : Start bit - P (1 bit) : Stop bit - Rd/Wr (1 bit) : Read/Write bit. Rd equals 1, Wr equals 0. - A, NA (1 bit) : Accept and not accept bit. - Addr (7 bits): I2C 7 bit address. - reg (8 bits): Command byte, which often selects a register. - Data (8 bits): A data byte. - Count (8 bits): A byte defining the length of a block operation. - - [..]: Data sent by the device. - . . - - ... - h = sbc.i2c_open(1, 0x53) # open device at address 0x53 on bus 1 - ... - """ - ext = [struct.pack("III", i2c_bus, i2c_address, i2c_flags)] - return _u2i(_lg_command_ext(self.sl, _CMD_I2CO, 12, ext, L=3)) - - def i2c_close(self, handle): - """ - Closes the I2C device. - - handle:= >= 0 (as returned by [*i2c_open*]). - - If OK returns 0. - - On failure returns a negative error code. - - ... - sbc.i2c_close(h) - ... - """ - ext = [struct.pack("I", handle)] - return _u2i(_lg_command_ext(self.sl, _CMD_I2CC, 4, ext, L=1)) - - def i2c_write_quick(self, handle, bit): - """ - Sends a single bit to the device. - - handle:= >= 0 (as returned by [*i2c_open*]). - bit:= 0 or 1, the value to write. - - If OK returns 0. - - On failure returns a negative error code. - - SMBus 2.0 5.5.1 - Quick command. - . . - S Addr bit [A] P - . . - - ... - sbc.i2c_write_quick(0, 1) # send 1 to handle 0 - sbc.i2c_write_quick(3, 0) # send 0 to handle 3 - ... - """ - ext = [struct.pack("II", handle, bit)] - return _u2i(_lg_command_ext(self.sl, _CMD_I2CWQ, 8, ext, L=2)) - - def i2c_write_byte(self, handle, byte_val): - """ - Sends a single byte to the device. - - handle:= >= 0 (as returned by [*i2c_open*]). - byte_val:= 0-255, the value to write. - - If OK returns 0. - - On failure returns a negative error code. - - SMBus 2.0 5.5.2 - Send byte. - . . - S Addr Wr [A] byte_val [A] P - . . - - ... - sbc.i2c_write_byte(1, 17) # send byte 17 to handle 1 - sbc.i2c_write_byte(2, 0x23) # send byte 0x23 to handle 2 - ... - """ - ext = [struct.pack("II", handle, byte_val)] - return _u2i(_lg_command_ext(self.sl, _CMD_I2CWS, 8, ext, L=2)) - - def i2c_read_byte(self, handle): - """ - Reads a single byte from the device. - - handle:= >= 0 (as returned by [*i2c_open*]). - - If OK returns the read byte (0-255). - - On failure returns a negative error code. - - SMBus 2.0 5.5.3 - Receive byte. - . . - S Addr Rd [A] [Data] NA P - . . - - ... - b = sbc.i2c_read_byte(2) # read a byte from handle 2 - ... - """ - ext = [struct.pack("I", handle)] - return _u2i(_lg_command_ext(self.sl, _CMD_I2CRS, 4, ext, L=1)) - - def i2c_write_byte_data(self, handle, reg, byte_val): - """ - Writes a single byte to the specified register of the device. - - handle:= >= 0 (as returned by [*i2c_open*]). - reg:= >= 0, the device register. - byte_val:= 0-255, the value to write. - - If OK returns 0. - - On failure returns a negative error code. - - SMBus 2.0 5.5.4 - Write byte. - . . - S Addr Wr [A] reg [A] byte_val [A] P - . . - - ... - # send byte 0xC5 to reg 2 of handle 1 - sbc.i2c_write_byte_data(1, 2, 0xC5) - - # send byte 9 to reg 4 of handle 2 - sbc.i2c_write_byte_data(2, 4, 9) - ... - """ - ext = [struct.pack("III", handle, reg, byte_val)] - return _u2i(_lg_command_ext(self.sl, _CMD_I2CWB, 12, ext, L=3)) - - def i2c_write_word_data(self, handle, reg, word_val): - """ - Writes a single 16 bit word to the specified register of the - device. - - handle:= >= 0 (as returned by [*i2c_open*]). - reg:= >= 0, the device register. - word_val:= 0-65535, the value to write. - - If OK returns 0. - - On failure returns a negative error code. - - SMBus 2.0 5.5.4 - Write word. - . . - S Addr Wr [A] reg [A] word_val_Low [A] word_val_High [A] P - . . - - ... - # send word 0xA0C5 to reg 5 of handle 4 - sbc.i2c_write_word_data(4, 5, 0xA0C5) - - # send word 2 to reg 2 of handle 5 - sbc.i2c_write_word_data(5, 2, 23) - ... - """ - ext = [struct.pack("III", handle, reg, word_val)] - return _u2i(_lg_command_ext(self.sl, _CMD_I2CWW, 12, ext, L=3)) - - def i2c_read_byte_data(self, handle, reg): - """ - Reads a single byte from the specified register of the device. - - handle:= >= 0 (as returned by [*i2c_open*]). - reg:= >= 0, the device register. - - If OK returns the read byte (0-255). - - On failure returns a negative error code. - - SMBus 2.0 5.5.5 - Read byte. - . . - S Addr Wr [A] reg [A] S Addr Rd [A] [Data] NA P - . . - - ... - # read byte from reg 17 of handle 2 - b = sbc.i2c_read_byte_data(2, 17) - - # read byte from reg 1 of handle 0 - b = sbc.i2c_read_byte_data(0, 1) - ... - """ - ext = [struct.pack("II", handle, reg)] - return _u2i(_lg_command_ext(self.sl, _CMD_I2CRB, 8, ext, L=2)) - - def i2c_read_word_data(self, handle, reg): - """ - Reads a single 16 bit word from the specified register of the - device. - - handle:= >= 0 (as returned by [*i2c_open*]). - reg:= >= 0, the device register. - - If OK returns the read word (0-65535). - - On failure returns a negative error code. - - SMBus 2.0 5.5.5 - Read word. - . . - S Addr Wr [A] reg [A] S Addr Rd [A] [DataLow] A [DataHigh] NA P - . . - - ... - # read word from reg 2 of handle 3 - w = sbc.i2c_read_word_data(3, 2) - - # read word from reg 7 of handle 2 - w = sbc.i2c_read_word_data(2, 7) - ... - """ - ext = [struct.pack("II", handle, reg)] - return _u2i(_lg_command_ext(self.sl, _CMD_I2CRW, 8, ext, L=2)) - - def i2c_process_call(self, handle, reg, word_val): - """ - Writes 16 bits of data to the specified register of the device - and reads 16 bits of data in return. - - handle:= >= 0 (as returned by [*i2c_open*]). - reg:= >= 0, the device register. - word_val:= 0-65535, the value to write. - - If OK returns the read word (0-65535). - - On failure returns a negative error code. - - SMBus 2.0 5.5.6 - Process call. - . . - S Addr Wr [A] reg [A] word_val_Low [A] word_val_High [A] - S Addr Rd [A] [DataLow] A [DataHigh] NA P - . . - - ... - r = sbc.i2c_process_call(h, 4, 0x1231) - r = sbc.i2c_process_call(h, 6, 0) - ... - """ - ext = [struct.pack("III", handle, reg, word_val)] - return _u2i(_lg_command_ext(self.sl, _CMD_I2CPC, 12, ext, L=3)) - - def i2c_write_block_data(self, handle, reg, data): - """ - Writes up to 32 bytes to the specified register of the device. - - handle:= >= 0 (as returned by [*i2c_open*]). - reg:= >= 0, the device register. - data:= the bytes to write. - - If OK returns 0. - - On failure returns a negative error code. - - SMBus 2.0 5.5.7 - Block write. - . . - S Addr Wr [A] reg [A] len(data) [A] data0 [A] data1 [A] ... [A] - datan [A] P - . . - - ... - sbc.i2c_write_block_data(4, 5, b'hello') - - sbc.i2c_write_block_data(4, 5, "data bytes") - - sbc.i2c_write_block_data(5, 0, b'\\x00\\x01\\x22') - - sbc.i2c_write_block_data(6, 2, [0, 1, 0x22]) - ... - """ - ext = [struct.pack("II", handle, reg)] + [data] - return _u2i(_lg_command_ext(self.sl, _CMD_I2CWK, 8+len(data), ext, L=2)) - - def i2c_read_block_data(self, handle, reg): - """ - Reads a block of up to 32 bytes from the specified register of - the device. - - handle:= >= 0 (as returned by [*i2c_open*]). - reg:= >= 0, the device register. - - If OK returns a list of the number of bytes read and a - bytearray containing the bytes. - - On failure returns a list of a negative error code and - a null string. - - SMBus 2.0 5.5.7 - Block read. - . . - S Addr Wr [A] reg [A] - S Addr Rd [A] [Count] A [Data] A [Data] A ... A [Data] NA P - . . - - The amount of returned data is set by the device. - - ... - (b, d) = sbc.i2c_read_block_data(h, 10) - if b >= 0: - # process data - else: - # process read failure - ... - """ - bytes = CMD_INTERRUPTED - rdata = "" - ext = [struct.pack("II", handle, reg)] - with self.sl.l: - bytes = u2i(_lg_command_ext_nolock(self.sl, _CMD_I2CRK, 8, ext, L=2)) - if bytes > 0: - rdata = self._rxbuf(bytes) - return _u2i_list([bytes, rdata]) - - def i2c_block_process_call(self, handle, reg, data): - """ - Writes data bytes to the specified register of the device - and reads a device specified number of bytes of data in return. - - handle:= >= 0 (as returned by [*i2c_open*]). - reg:= >= 0, the device register. - data:= the bytes to write. - - If OK returns a list of the number of bytes read and a - bytearray containing the bytes. - - On failure returns a list of a negative error code and - a null string. - - The SMBus 2.0 documentation states that a minimum of 1 byte may - be sent and a minimum of 1 byte may be received. The total - number of bytes sent/received must be 32 or less. - - SMBus 2.0 5.5.8 - Block write-block read. - . . - S Addr Wr [A] reg [A] len(data) [A] data0 [A] ... datan [A] - S Addr Rd [A] [Count] A [Data] ... A P - . . - - ... - (b, d) = sbc.i2c_block_process_call(h, 10, b'\\x02\\x05\\x00') - - (b, d) = sbc.i2c_block_process_call(h, 10, b'abcdr') - - (b, d) = sbc.i2c_block_process_call(h, 10, "abracad") - - (b, d) = sbc.i2c_block_process_call(h, 10, [2, 5, 16]) - ... - """ - bytes = CMD_INTERRUPTED - rdata = "" - ext = [struct.pack("II", handle, reg)] + [data] - with self.sl.l: - bytes = u2i(_lg_command_ext_nolock(self.sl, _CMD_I2CPK, 8+len(data), ext, L=2)) - if bytes > 0: - rdata = self._rxbuf(bytes) - return _u2i_list([bytes, rdata]) - - def i2c_write_i2c_block_data(self, handle, reg, data): - """ - Writes data bytes to the specified register of the device. - 1-32 bytes may be written. - - handle:= >= 0 (as returned by [*i2c_open*]). - reg:= >= 0, the device register. - data:= the bytes to write. - - If OK returns 0. - - On failure returns a negative error code. - - . . - S Addr Wr [A] reg [A] data0 [A] data1 [A] ... [A] datan [NA] P - . . - - ... - sbc.i2c_write_i2c_block_data(4, 5, 'hello') - - sbc.i2c_write_i2c_block_data(4, 5, b'hello') - - sbc.i2c_write_i2c_block_data(5, 0, b'\\x00\\x01\\x22') - - sbc.i2c_write_i2c_block_data(6, 2, [0, 1, 0x22]) - ... - """ - ext = [struct.pack("II", handle, reg)] + [data] - return _u2i(_lg_command_ext(self.sl, _CMD_I2CWI, 8+len(data), ext, L=2)) - - def i2c_read_i2c_block_data(self, handle, reg, count): - """ - Reads count bytes from the specified register of the device. - The count may be 1-32. - - handle:= >= 0 (as returned by [*i2c_open*]). - reg:= >= 0, the device register. - count:= >0, the number of bytes to read. - - If OK returns a list of the number of bytes read and a - bytearray containing the bytes. - - On failure returns a list of a negative error code and - a null string. - - . . - S Addr Wr [A] reg [A] - S Addr Rd [A] [Data] A [Data] A ... A [Data] NA P - . . - - ... - (b, d) = sbc.i2c_read_i2c_block_data(h, 4, 32) - if b >= 0: - # process data - else: - # process read failure - ... - """ - bytes = CMD_INTERRUPTED - rdata = "" - ext = [struct.pack("III", handle, reg, count)] - with self.sl.l: - bytes = u2i(_lg_command_ext_nolock(self.sl, _CMD_I2CRI, 12, ext, L=3)) - if bytes > 0: - rdata = self._rxbuf(bytes) - return _u2i_list([bytes, rdata]) - - def i2c_read_device(self, handle, count): - """ - Returns count bytes read from the raw device associated - with handle. - - handle:= >= 0 (as returned by [*i2c_open*]). - count:= >0, the number of bytes to read. - - If OK returns a list of the number of bytes read and a - bytearray containing the bytes. - - On failure returns a list of a negative error code and - a null string. - - . . - S Addr Rd [A] [Data] A [Data] A ... A [Data] NA P - . . - - ... - (count, data) = sbc.i2c_read_device(h, 12) - ... - """ - bytes = CMD_INTERRUPTED - rdata = "" - ext = [struct.pack("II", handle, count)] - with self.sl.l: - bytes = u2i( - _lg_command_ext_nolock(self.sl, _CMD_I2CRD, 8, ext, L=2)) - if bytes > 0: - rdata = self._rxbuf(bytes) - return _u2i_list([bytes, rdata]) - - def i2c_write_device(self, handle, data): - """ - Writes the data bytes to the raw device. - - handle:= >= 0 (as returned by [*i2c_open*]). - data:= the bytes to write. - - If OK returns 0. - - On failure returns a negative error code. - - . . - S Addr Wr [A] data0 [A] data1 [A] ... [A] datan [A] P - . . - - ... - sbc.i2c_write_device(h, b"\\x12\\x34\\xA8") - - sbc.i2c_write_device(h, b"help") - - sbc.i2c_write_device(h, 'help') - - sbc.i2c_write_device(h, [23, 56, 231]) - ... - """ - ext = [struct.pack("I", handle)] + [data] - return _u2i(_lg_command_ext(self.sl, _CMD_I2CWD, 4+len(data), ext, L=1)) - - - def i2c_zip(self, handle, data): - """ - This function executes a sequence of I2C operations. The - operations to be performed are specified by the contents of data - which contains the concatenated command codes and associated data. - - handle:= >= 0 (as returned by [*i2c_open*]). - data:= the concatenated I2C commands, see below - - If OK returns a list of the number of bytes read and a - bytearray containing the bytes. - - On failure returns a list of a negative error code and - a null string. - - ... - (count, data) = sbc.i2c_zip(h, [4, 0x53, 7, 1, 0x32, 6, 6, 0]) - ... - - The following command codes are supported: - - Name @ Cmd & Data @ Meaning - End @ 0 @ No more commands - Escape @ 1 @ Next P is two bytes - Address @ 2 P @ Set I2C address to P - Flags @ 3 lsb msb @ Set I2C flags to lsb + (msb << 8) - Read @ 4 P @ Read P bytes of data - Write @ 5 P ... @ Write P bytes of data - - The address, read, and write commands take a parameter P. - Normally P is one byte (0-255). If the command is preceded by - the Escape command then P is two bytes (0-65535, least significant - byte first). - - The address defaults to that associated with the handle. - The flags default to 0. The address and flags maintain their - previous value until updated. - - Any read I2C data is concatenated in the returned bytearray. - - ... - Set address 0x53, write 0x32, read 6 bytes - Set address 0x1E, write 0x03, read 6 bytes - Set address 0x68, write 0x1B, read 8 bytes - End - - 2 0x53 5 1 0x32 4 6 - 2 0x1E 5 1 0x03 4 6 - 2 0x68 5 1 0x1B 4 8 - 0 - ... - """ - bytes = CMD_INTERRUPTED - rdata = "" - ext = [struct.pack("I", handle)] + [data] - with self.sl.l: - bytes = u2i(_lg_command_ext_nolock( - self.sl, _CMD_I2CZ, 4+len(data), ext, L=1)) - if bytes > 0: - rdata = self._rxbuf(bytes) - return _u2i_list([bytes, rdata]) - - # NOTIFICATIONS - - def notify_open(self): - """ - Opens a notification pipe. - - This is a privileged command. See [+Permits+]. - - If OK returns a handle (>= 0). - - On failure returns a negative error code. - - A notification is a method for being notified of GPIO - alerts via a pipe. - - Pipes are only accessible from the local machine so this - function serves no purpose if you are using Python from a - remote machine. The in-built (socket) notifications - provided by [*callback*] should be used instead. - - The pipes are created in the library's working directory. - - Notifications for handle x will be available at the pipe - named .lgd-nfyx (where x is the handle number). - - E.g. if the function returns 15 then the notifications must be - read from .lgd-nfy15. - - Notifications have the following structure: - - . . - Q timestamp - B chip - B gpio - B level - B flags - . . - - timestamp: the number of nanoseconds since a kernel dependent origin. - - Early kernels used to provide a timestamp as the number of nanoseconds - since the Epoch (start of 1970). Later kernels use the number of - nanoseconds since boot. It's probably best not to make any assumption - as to the timestamp origin. - - chip: the gpiochip device number (NOT the handle). - - gpio: the GPIO. - - level: indicates the level of the GPIO (0=low, 1=high, 2=timeout). - - flags: no flags are currently defined. - - ... - h = sbc.notify_open() - if h >= 0: - sbc.notify_resume(h) - ... - """ - return _u2i(_lg_command(self.sl, _CMD_NO)) - - def notify_pause(self, handle): - """ - Pauses notifications on a handle. - - handle:= >= 0 (as returned by [*notify_open*]) - - If OK returns 0. - - On failure returns a negative error code. - - Notifications for the handle are suspended until - [*notify_resume*] is called. - - ... - h = sbc.notify_open() - if h >= 0: - sbc.notify_resume(h) - ... - sbc.notify_pause(h) - ... - sbc.notify_resume(h) - ... - ... - """ - ext = [struct.pack("I", handle)] - return _u2i(_lg_command_ext(self.sl, _CMD_NP, 4, ext, L=1)) - - def notify_resume(self, handle): - """ - Resumes notifications on a handle. - - handle:= >= 0 (as returned by [*notify_open*]) - - If OK returns 0. - - On failure returns a negative error code. - - ... - h = sbc.notify_open() - if h >= 0: - sbc.notify_resume(h) - ... - """ - ext = [struct.pack("I", handle)] - return _u2i(_lg_command_ext(self.sl, _CMD_NR, 4, ext, L=1)) - - def notify_close(self, handle): - """ - Stops notifications on a handle and frees the handle for reuse. - - handle:= >= 0 (as returned by [*notify_open*]) - - If OK returns 0. - - On failure returns a negative error code. - - ... - h = sbc.notify_open() - if h >= 0: - sbc.notify_resume(h) - ... - sbc.notify_close(h) - ... - ... - """ - ext = [struct.pack("I", handle)] - return _u2i(_lg_command_ext(self.sl, _CMD_NC, 4, ext, L=1)) - - # SCRIPTS - - def script_store(self, script): - """ - Store a script for later execution. - - This is a privileged command. See [+Permits+]. - - script:= the script text as a series of bytes. - - If OK returns a handle (>= 0). - - On failure returns a negative error code. - - ... - h = sbc.script_store( - b'tag 0 w 22 1 mils 100 w 22 0 mils 100 dcr p0 jp 0') - ... - """ - if len(script): - return _u2i(_lg_command_ext( - self.sl, _CMD_PROC, len(script)+1, [script+'\0'])) - else: - return 0 - - def script_run(self, handle, params=None): - """ - Runs a stored script. - - handle:= >=0 (as returned by [*script_store*]). - params:= up to 10 parameters required by the script. - - If OK returns 0. - - On failure returns a negative error code. - - ... - s = sbc.script_run(h, [par1, par2]) - - s = sbc.script_run(h) - - s = sbc.script_run(h, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) - ... - """ - ext = struct.pack("I", handle) - nump = 1 - if params is not None: - for p in params: - ext += struct.pack("I", p) - nump = 1 + len(params) - return _u2i(_lg_command_ext(self.sl, _CMD_PROCR, nump*4, [ext], L=nump)) - - def script_update(self, handle, params=None): - """ - Sets the parameters of a script. The script may or - may not be running. The parameters of the script are - overwritten with the new values. - - handle:= >=0 (as returned by [*script_store*]). - params:= up to 10 parameters required by the script. - - If OK returns 0. - - On failure returns a negative error code. - - ... - s = sbc.script_update(h, [par1, par2]) - - s = sbc.script_update(h, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) - ... - """ - ext = struct.pack("I", handle) - nump = 1 - if params is not None: - for p in params: - ext += struct.pack("I", p) - nump = 1 + len(params) - return _u2i(_lg_command_ext(self.sl, _CMD_PROCU, nump*4, [ext], L=nump)) - - def script_status(self, handle): - """ - Returns the run status of a stored script as well as the - current values of parameters 0 to 9. - - handle:= >=0 (as returned by [*script_store*]). - - If OK returns a list of the run status and a list of - the 10 parameters. - - On failure returns a negative error code and a null list. - - The run status may be - - . . - SCRIPT_INITING - SCRIPT_READY - SCRIPT_RUNNING - SCRIPT_WAITING - SCRIPT_ENDED - SCRIPT_HALTED - SCRIPT_FAILED - . . - - ... - (s, pars) = sbc.script_status(h) - ... - """ - status = CMD_INTERRUPTED - params = () - ext = [struct.pack("I", handle)] - with self.sl.l: - bytes = u2i( - _lg_command_ext_nolock(self.sl, _CMD_PROCP, 4, ext, L=1)) - if bytes > 0: - data = self._rxbuf(bytes) - pars = struct.unpack('11i', _str(data)) - status = pars[0] - params = pars[1:] - else: - status = bytes - return _u2i_list([status, params]) - - def script_stop(self, handle): - """ - Stops a running script. - - handle:= >=0 (as returned by [*script_store*]). - - If OK returns 0. - - On failure returns a negative error code. - - ... - status = sbc.script_stop(h) - ... - """ - ext = [struct.pack("I", handle)] - return _u2i(_lg_command_ext(self.sl, _CMD_PROCS, 4, ext, L=1)) - - def script_delete(self, handle): - """ - Deletes a stored script. - - handle:= >=0 (as returned by [*script_store*]). - - If OK returns 0. - - On failure returns a negative error code. - - ... - status = sbc.script_delete(h) - ... - """ - ext = [struct.pack("I", handle)] - return _u2i(_lg_command_ext(self.sl, _CMD_PROCD, 4, ext, L=1)) - - # SERIAL - - def serial_open(self, tty, baud, ser_flags=0): - """ - Returns a handle for the serial tty device opened - at baud bits per second. - - This is a privileged command. See [+Permits+]. - - tty:= the serial device to open. - baud:= baud rate in bits per second, see below. - ser_flags:= 0, no flags are currently defined. - - If OK returns a handle (>= 0). - - On failure returns a negative error code. - - The baud rate must be one of 50, 75, 110, 134, 150, - 200, 300, 600, 1200, 1800, 2400, 4800, 9600, 19200, - 38400, 57600, 115200, or 230400. - - ... - h1 = sbc.serial_open("/dev/ttyAMA0", 300) - - h2 = sbc.serial_open("/dev/ttyUSB1", 19200, 0) - - h3 = sbc.serial_open("/dev/serial0", 9600) - ... - """ - ext = [struct.pack("II", baud, ser_flags)] + [tty] - return _u2i(_lg_command_ext(self.sl, _CMD_SERO, 8+len(tty), ext, L=2)) - - def serial_close(self, handle): - """ - Closes the serial device. - - handle:= >= 0 (as returned by [*serial_open*]). - - If OK returns 0. - - On failure returns a negative error code. - - ... - sbc.serial_close(h1) - ... - """ - ext = [struct.pack("I", handle)] - return _u2i(_lg_command_ext(self.sl, _CMD_SERC, 4, ext, L=1)) - - def serial_read_byte(self, handle): - """ - Returns a single byte from the device. - - handle:= >= 0 (as returned by [*serial_open*]). - - If OK returns the read byte (0-255). - - On failure returns a negative error code. - - ... - b = sbc.serial_read_byte(h1) - ... - """ - ext = [struct.pack("I", handle)] - return _u2i(_lg_command_ext(self.sl, _CMD_SERRB, 4, ext, L=1)) - - def serial_write_byte(self, handle, byte_val): - """ - Writes a single byte to the device. - - handle:= >= 0 (as returned by [*serial_open*]). - byte_val:= 0-255, the value to write. - - If OK returns 0. - - On failure returns a negative error code. - - ... - sbc.serial_write_byte(h1, 23) - - sbc.serial_write_byte(h1, ord('Z')) - ... - """ - ext = [struct.pack("II", handle, byte_val)] - return _u2i( - _lg_command_ext(self.sl, _CMD_SERWB, 8, ext, L=2)) - - def serial_read(self, handle, count=1000): - """ - Reads up to count bytes from the device. - - handle:= >= 0 (as returned by [*serial_open*]). - count:= >0, the number of bytes to read (defaults to 1000). - - If OK returns a list of the number of bytes read and - a bytearray containing the bytes. - - On failure returns a list of negative error code and - a null string. - - If no data is ready a bytes read of zero is returned. - - ... - (b, d) = sbc.serial_read(h2, 100) - if b > 0: - # process read data - ... - """ - bytes = CMD_INTERRUPTED - rdata = "" - ext = [struct.pack("II", handle, count)] - with self.sl.l: - bytes = u2i( - _lg_command_ext_nolock(self.sl, _CMD_SERR, 8, ext, L=2)) - if bytes > 0: - rdata = self._rxbuf(bytes) - return _u2i_list([bytes, rdata]) - - def serial_write(self, handle, data): - """ - Writes the data bytes to the device. - - handle:= >= 0 (as returned by [*serial_open*]). - data:= the bytes to write. - - If OK returns 0. - - On failure returns a negative error code. - - ... - sbc.serial_write(h1, b'\\x02\\x03\\x04') - - sbc.serial_write(h2, b'help') - - sbc.serial_write(h2, "hello") - - sbc.serial_write(h1, [2, 3, 4]) - ... - """ - ext = [struct.pack("I", handle)] + [data] - return _u2i(_lg_command_ext(self.sl, _CMD_SERW, 4+len(data), ext, L=1)) - - def serial_data_available(self, handle): - """ - Returns the number of bytes available to be read from the - device. - - handle:= >= 0 (as returned by [*serial_open*]). - - If OK returns the count of bytes available (>= 0). - - On failure returns a negative error code. - - ... - rdy = sbc.serial_data_available(h1) - - if rdy > 0: - (b, d) = sbc.serial_read(h1, rdy) - ... - """ - ext = [struct.pack("I", handle)] - return _u2i(_lg_command_ext(self.sl, _CMD_SERDA, 4, ext, L=1)) - - - # SHELL - - def shell(self, shellscr, pstring=""): - """ - This function uses the system call to execute a shell script - with the given string as its parameter. - - This is a privileged command. See [+Permits+]. - - shellscr:= the name of the script, only alphanumerics, - '-' and '_' are allowed in the name - pstring := the parameter string to pass to the script - - If OK returns the exit status of the system call. - - On failure returns a negative error code. - - shellscr must exist in a directory named cgi in the daemon's - configuration directory and must be executable. - - The returned exit status is normally 256 times that set by - the shell script exit function. If the script can't be - found 32512 will be returned. - - The following table gives some example returned statuses: - - Script exit status @ Returned system call status - 1 @ 256 - 5 @ 1280 - 10 @ 2560 - 200 @ 51200 - script not found @ 32512 - - ... - // pass two parameters, hello and world - status = sbc.shell("scr1", "hello world"); - - // pass three parameters, hello, string with spaces, and world - status = sbc.shell("scr1", "hello 'string with spaces' world"); - - // pass one parameter, hello string with spaces world - status = sbc.shell("scr1", "\\"hello string with spaces world\\""); - ... - """ - ls = len(shellscr)+1 - lp = len(pstring)+1 - ext = [struct.pack("I", ls)] + [shellscr+'\x00'+pstring+'\x00'] - return _u2i(_lg_command_ext(self.sl, _CMD_SHELL, 4+ls+lp, ext, L=1)) - - # SPI - - def spi_open(self, spi_device, spi_channel, baud, spi_flags=0): - """ - Returns a handle for the SPI device on the channel. Data - will be transferred at baud bits per second. The flags - may be used to modify the default behaviour. - - This is a privileged command. See [+Permits+]. - - spi_device:= >= 0. - spi_channel:= >= 0. - baud:= speed to use. - spi_flags:= see below. - - If OK returns a handle (>= 0). - - On failure returns a negative error code. - - spi_flags consists of the least significant 2 bits. - - . . - 1 0 - m m - . . - - mm defines the SPI mode. - - . . - Mode POL PHA - 0 0 0 - 1 0 1 - 2 1 0 - 3 1 1 - . . - - - The other bits in flags should be set to zero. - - ... - # open SPI device on channel 1 in mode 3 at 50000 bits per second - - h = sbc.spi_open(1, 50000, 3) - ... - """ - ext = [struct.pack("IIII", spi_device, spi_channel, baud, spi_flags)] - return _u2i(_lg_command_ext(self.sl, _CMD_SPIO, 16, ext, L=4)) - - def spi_close(self, handle): - """ - Closes the SPI device. - - handle:= >= 0 (as returned by [*spi_open*]). - - If OK returns 0. - - On failure returns a negative error code. - - ... - sbc.spi_close(h) - ... - """ - ext = [struct.pack("I", handle)] - return _u2i(_lg_command_ext(self.sl, _CMD_SPIC, 4, ext, L=1)) - - def spi_read(self, handle, count): - """ - Reads count bytes from the SPI device. - - handle:= >= 0 (as returned by [*spi_open*]). - count:= >0, the number of bytes to read. - - If OK returns a list of the number of bytes read and a - bytearray containing the bytes. - - On failure returns a list of negative error code and - a null string. - - ... - (b, d) = sbc.spi_read(h, 60) # read 60 bytes from handle h - if b == 60: - # process read data - else: - # error path - ... - """ - bytes = CMD_INTERRUPTED - rdata = "" - ext = [struct.pack("II", handle, count)] - with self.sl.l: - bytes = u2i(_lg_command_ext_nolock(self.sl, _CMD_SPIR, 8, ext, L=2)) - if bytes > 0: - rdata = self._rxbuf(bytes) - return _u2i_list([bytes, rdata]) - - def spi_write(self, handle, data): - """ - Writes the data bytes to the SPI device. - - handle:= >= 0 (as returned by [*spi_open*]). - data:= the bytes to write. - - If OK returns 0. - - On failure returns a negative error code. - - ... - sbc.spi_write(0, b'\\x02\\xc0\\x80') # write 3 bytes to handle 0 - - sbc.spi_write(0, b'defgh') # write 5 bytes to handle 0 - - sbc.spi_write(0, "def") # write 3 bytes to handle 0 - - sbc.spi_write(1, [2, 192, 128]) # write 3 bytes to handle 1 - ... - """ - # I p1 handle - # I p2 0 - # I p3 len - ## extension ## - # s len data bytes - ext = [struct.pack("I", handle)] + [data] - return _u2i(_lg_command_ext(self.sl, _CMD_SPIW, 4+len(data), ext, L=1)) - - def spi_xfer(self, handle, data): - """ - Writes the data bytes to the SPI device, - returning the data bytes read from the device. - - handle:= >= 0 (as returned by [*spi_open*]). - data:= the bytes to write. - - If OK returns a list of the number of bytes read and a - bytearray containing the bytes. - - On failure returns a list of negative error code and - a null string. - - ... - (count, rx_data) = sbc.spi_xfer(h, b'\\x01\\x80\\x00') - - (count, rx_data) = sbc.spi_xfer(h, [1, 128, 0]) - - (count, rx_data) = sbc.spi_xfer(h, b"hello") - - (count, rx_data) = sbc.spi_xfer(h, "hello") - ... - """ - # I p1 handle - # I p2 0 - # I p3 len - ## extension ## - # s len data bytes - - bytes = CMD_INTERRUPTED - rdata = "" - ext = [struct.pack("I", handle)] + [data] - with self.sl.l: - bytes = u2i(_lg_command_ext_nolock( - self.sl, _CMD_SPIX, 4+len(data), ext, L=1)) - if bytes > 0: - rdata = self._rxbuf(bytes) - return _u2i_list([bytes, rdata]) - - - # UTILITIES - - def get_sbc_name(self): - """ - Returns the name of the sbc running the rgpiod daemon. - - If OK returns the SBC host name. - - On failure returns a null string. - - ... - server = sbc.get_sbc_name() - print(server) - ... - """ - bytes = CMD_INTERRUPTED - rdata = "" - with self.sl.l: - bytes = u2i( - _lg_command_nolock(self.sl, _CMD_SBC)) - if bytes > 0: - rdata = self._rxbuf(bytes) - else: - rdata = "" - return rdata - - def set_user(self, user="default", - secretsFile=os.path.expanduser("~/.lg_secret")): - """ - Sets the rgpiod daemon user. The user then has the - associated permissions. - - user:= the user to set (defaults to the default user). - secretsFile:= the path to the shared secret file (defaults to - .lg_secret in the users home directory). - - If OK returns True if the user was set, False if not. - - On failure returns a negative error code. - - The returned value is True if permission is granted. - - ... - if sbc.set_user("gpio"): - print("using user gpio permissions") - else: - print("using default permissions") - ... - """ - - user = user.strip() - - if user == "": - user = "default" - - secret = bytearray() - - with open(secretsFile) as f: - for line in f: - l = line.split("=") - if len(l) == 2: - if l[0].strip() == user: - secret = bytearray(l[1].strip().encode('utf-8')) - break - - bytes = CMD_INTERRUPTED - - with self.sl.l: - - salt1 = "{:015x}".format((int(time.time()*1e7))&0xfffffffffffffff) - ext = salt1 + '.' + user - bytes = u2i(_lg_command_ext_nolock( - self.sl, _CMD_USER, len(ext), [ext])) - - if bytes < 0: - return bytes - - salt1 = bytearray(salt1.encode('utf-8')) - salt2 = self._rxbuf(bytes)[:15] - - pwd="" - - h = hashlib.md5() - h.update(salt1 + secret + salt2) - pwd = h.hexdigest() - - res = u2i(_lg_command_ext_nolock( - self.sl, _CMD_PASSW, len(pwd), [pwd])) - - return res - - def set_share_id(self, handle, share_id): - """ - Starts or stops sharing of an object. - - handle:= >=0 - share_id:= >= 0, 0 stops sharing. - - If OK returns 0. - - On failure returns a negative error code. - - Normally objects associated with a handle are only accessible - to the Python script which created them (and are automatically - deleted when the script ends). - - If a non-zero share is set the object is accessible to any - software which knows the share (and are not automatically - deleted when the script ends). - - ... - sbc.share_set(h, 23) - ... - """ - ext = [struct.pack("II", handle, share_id)] - return _u2i(_lg_command_ext(self.sl, _CMD_SHRS, 8, ext, L=2)) - - def use_share_id(self, share_id): - """ - Starts or stops sharing of an object. - - share_id:= >= 0, 0 stops sharing. - - If OK returns 0. - - On failure returns a negative error code. - - Normally objects associated with a handle are only accessible - to the Python script which created them (and are automatically - deleted when the script ends). - - If a non-zero share is set the object is accessible to any - software which knows the share and the object handle. - - ... - sbc.share_use(23) - ... - """ - ext = [struct.pack("I", share_id)] - return _u2i(_lg_command_ext(self.sl, _CMD_SHRU, 4, ext, L=1)) - - def get_internal(self, config_id): - """ - Returns the value of a configuration item. - - This is a privileged command. See [+Permits+]. - - If OK returns a list of 0 (OK) and the item's value. - - On failure returns a list of negative error code and None. - - config_id:= the configuration item. - - ... - cfg = sbc.get_internal(0) - print(cfg) - ... - """ - ext = [struct.pack("I", config_id)] - status = CMD_INTERRUPTED - config_value = None - with self.sl.l: - bytes = u2i( - _lg_command_ext_nolock(self.sl, _CMD_CGI, 4, ext, L=1)) - if bytes > 0: - data = self._rxbuf(bytes) - config_value = struct.unpack('Q', _str(data))[0] - status = OKAY - else: - status = bytes - return _u2i_list([status, config_value]) - - def set_internal(self, config_id, config_value): - """ - Sets the value of a sbc internal. - - This is a privileged command. See [+Permits+]. - - config_id:= the configuration item. - config_value:= the value to set. - - If OK returns 0. - - On failure returns a negative error code. - - ... - sbc.set_internal(0, 255) - cfg = sbc.get_internal() - print(cfg) - ... - """ - ext = [struct.pack("QI", config_value, config_id)] - return _u2i(_lg_command_ext(self.sl, _CMD_CSI, 12, ext, Q=1, L=1)) - - -def xref(): - """ - baud: - The speed of serial communication (I2C, SPI, serial link) - in bits per second. - - bit: 0-1 - A value of 0 or 1. - - byte_val: 0-255 - A whole number. - - config_id: - A number identifying a configuration item. - - . . - CFG_ID_DEBUG_LEVEL 0 - CFG_ID_MIN_DELAY 1 - . . - - config_value: - The value of a configurtion item. - - - connected: - True if a connection was established, False otherwise. - - count: - The number of bytes of data to be transferred. - - data: - Data to be transmitted, a series of bytes. - - debounce_micros: - The debounce time in microseconds. - - edge: - . . - RISING_EDGE - FALLING_EDGE - BOTH_EDGES - . . - - eFlags: - Alert request flags for the GPIO. - - The following values may be or'd to form the value. - - . . - RISING_EDGE - FALLING_EDGE - BOTH_EDGES - . . - - errnum: <0 - Indicates an error. Use [*rgpio.error_text*] for the error text. - - file_mode: - The mode may have the following values - - . . - FILE_READ 1 - FILE_WRITE 2 - FILE_RW 3 - . . - - The following values can be or'd into the file open mode - - . . - FILE_APPEND 4 - FILE_CREATE 8 - FILE_TRUNC 16 - . . - - file_name: - A full file path. To be accessible the path must match - an entry in the [files] section of the permits file. - - fpattern: - A file path which may contain wildcards. To be accessible the path - must match an entry in the [files] section of the permits file. - - func: - A user supplied callback function. - - gpio: - The 0 based offset of a GPIO from the start of a gpiochip. - - gpiochip: >= 0 - The number of a gpiochip device. - - group_bits: - A 64-bit value used to set the levels of a group. - - Set bit x to set GPIO x of the group high. - - Clear bit x to set GPIO x of the group low. - - group_mask: - A 64-bit value used to determine which members of a group - should be updated. - - Set bit x to update GPIO x of the group. - - Clear bit x to leave GPIO x of the group unaltered. - - handle: >= 0 - A number referencing an object opened by one of the following - - [*file_open*] - [*gpiochip_open*] - [*i2c_open*] - [*notify_open*] - [*serial_open*] - [*script_store*] - [*spi_open*] - - host: - The name or IP address of the sbc running the rgpiod daemon. - - i2c_address: 0-0x7F - The address of a device on the I2C bus. - - i2c_bus: >= 0 - An I2C bus number. - - i2c_flags: 0 - No I2C flags are currently defined. - - kind: TX_PWM or TX_WAVE - A type of transmission. - - level: 0 or 1 - A GPIO level. - - levels: - A list of GPIO levels. - - lFlags: - Line modifiers for the GPIO. - - The following values may be or'd to form the value. - - . . - SET_ACTIVE_LOW - SET_OPEN_DRAIN - SET_OPEN_SOURCE - SET_PULL_UP - SET_PULL_DOWN - SET_PULL_NONE - . . - - notify_handle: - This associates a notification with a GPIO alert. - - params: 32 bit number - When scripts are started they can receive up to 10 parameters - to define their operation. - - port: - The port used by the rgpiod daemon, defaults to 8889. - - pstring: - The string to be passed to a [*shell*] script to be executed. - - pulse_cycles: >= 0 - The number of pulses to generate. A value of 0 means infinite. - - pulse_delay: - The delay in microseconds before the next wave pulse. - - pulse_off: >= 0 - The off period for a pulse in microseconds. - - pulse_offset: >= 0 - The offset in microseconds from the nominal pulse start. - - pulse_on: >= 0 - The on period for a pulse in microseconds. - - pulse_width: 0, 500-2500 microseconds - Servo pulse width - - pulses: - pulses is a list of pulse objects. A pulse object is a container - class with the following members. - - group_bits - the levels to set if the corresponding bit in - group_mask is set. - group_mask - a mask indicating the group GPIO to be updated. - pulse_delay - the delay in microseconds before the next pulse. - - pwm_duty_cycle: 0-100 % - PWM duty cycle % - - pwm_frequency: 0.1-10000 Hz - PWM frequency - - reg: 0-255 - An I2C device register. The usable registers depend on the - actual device. - - script: - The text of a script to store on the rgpiod daemon. - - secretsFile: - The file containing the shared secret for a user. If the shared - secret for a user matches that known by the rgpiod daemon the user can - "log in" to the daemon. - - seek_from: 0-2 - Direction to seek for [*file_seek*]. - - . . - FROM_START=0 - FROM_CURRENT=1 - FROM_END=2 - . . - - seek_offset: - The number of bytes to move forward (positive) or backwards - (negative) from the seek position (start, current, or end of file). - - ser_flags: 32 bit - No serial flags are currently defined. - - servo_frequency:: 40-500 Hz - Servo pulse frequency - - share_id: - Objects created with a non-zero share_id are persistent and may be - used by other software which knows the share_id. - - shellscr: - The name of a shell script. The script must exist - in the cgi directory of the rgpiod daemon's configuration - directory and must be executable. - - show_errors: - Controls the display of rgpiod daemon connection failures. - The default of True prints the probable failure reasons to - standard output. - - spi_channel: >= 0 - A SPI channel. - - spi_device: >= 0 - A SPI device. - - spi_flags: 32 bit - See [*spi_open*]. - - tty: - A serial device, e.g. /dev/ttyAMA0, /dev/ttyUSB0 - - uint32: - An unsigned 32 bit number. - - user: - A name known by the rgpiod daemon and associated with a set of user - permissions. - - watchdog_micros: - The watchdog time in microseconds. - - word_val: 0-65535 - A whole number. - """ - pass - diff --git a/templates/index.template.html b/templates/index.template.html index 88d6b02..cbb0d9d 100644 --- a/templates/index.template.html +++ b/templates/index.template.html @@ -4,18 +4,11 @@

No relays found.

{% else %}

Relays

- - - - {% for relay in relays %} - - - - - {% endfor %} - -
RelayState
{{ relay.name }}{{ "ON" if relay.state else "OFF"}}
+ {% endif %} -
Create Relay diff --git a/templates/relay-create.template.html b/templates/relay-create.template.html index 181f453..5fa4ddf 100644 --- a/templates/relay-create.template.html +++ b/templates/relay-create.template.html @@ -1,19 +1,10 @@

Relay Creation

-

Existing relays

- - - - {% for relay in relays %} - - {% endfor %} -
RelayPin
{{ relay.name }}{{ relay.pin.name }}
-
- {% for pin in pins %} - + {% endfor %} diff --git a/templates/relay-detail.template.html b/templates/relay-detail.template.html deleted file mode 100644 index 116e6ec..0000000 --- a/templates/relay-detail.template.html +++ /dev/null @@ -1,21 +0,0 @@ - -

Pools'n'Pumps Device

-

Relay "{{relay.name}}"

- - - - - - - - - - -
PropertyValue
Chip Number{{ relay.pin.chip.number }}
Chip Label{{ relay.pin.chip.label }}
Chip Name{{ relay.pin.chip.name }}
Pin Name{{ relay.pin.name }}
Pin Number{{ relay.pin.line_number }}
Relay State{{ relay.state }}
- - - - -
-Home -