diff --git a/pnpdevice/relays.py b/pnpdevice/relays.py index 0d7ccf8..83fb4d8 100644 --- a/pnpdevice/relays.py +++ b/pnpdevice/relays.py @@ -3,22 +3,39 @@ import logging import os import pathlib import tomllib -from typing import Dict, List, Union +from typing import Dict, Iterable, List, Union +import rgpio import tomli_w LOGGER = logging.getLogger(__name__) -import rgpio +# Max number of chips to scan for +MAX_CHIPS = 16 + +@dataclasses.dataclass(frozen=True) +class Chip: + handle: int + label: str + name: str + number: int + number_of_lines: int + + @property + def id(self) -> str: + return f"{self.number}-{self.name}" @dataclasses.dataclass(frozen=True) class Pin: - chip: str - number: int + chip: Chip + flags: int + line_number: int name: str + user: str + @property def id(self) -> str: - return f"{self.chip}-{self.number}" + return f"{self.chip.id}-{self.line_number}" @dataclasses.dataclass(frozen=True) class Relay: @@ -70,6 +87,12 @@ class Manager: self.configpath = configpath self.has_fakes = has_fakes + # Handles to various chips + self._chips = [] + # Info on various pins + self._pins = [] + # Cached info on the relays + self._relays = [] # Connection to single-board computer GPIO self.sbc = None @@ -77,15 +100,32 @@ class Manager: "Provide an iterator for the relays." return iter(self.relays) + @property + def chips(self) -> List[Chip]: + if not self._chips: + self._load_chips() + return self._chips + def connect(self) -> None: self.sbc = rgpio.sbc( host=self.config["rgpio"]["host"], port=self.config["rgpio"]["port"], ) + if not self.sbc.connected: + LOGGER.warning("Failed to connect sbc") + self.sbc = None - def chips_list(self) -> List[str]: - if self.has_fakes: - return ["fakeA", "fakeB"] + def get_chip_by_number(self, number: int) -> Chip: + for chip in self.chips: + if chip.number == number: + return chip + raise Exception(f"Can't find chip {number}") + + def get_pin_by_number(self, chip: Chip, line_number: int) -> Pin: + for pin in self.pins: + if pin.chip == chip and pin.line_number == line_number: + return pin + raise Exception(f"Can't find pin {chip.name}-{line_number}") def get_relay_by_pin_id(self, pin_id: str) -> Relay: for relay in self.relays: @@ -93,28 +133,42 @@ class Manager: return relay return None - def pins_list(self) -> List[Pin]: - if self.has_fakes: - return [ - Pin(chip="fakeA", number=1, name="CON2-P1"), - Pin(chip="fakeA", number=2, name="CON2-P2"), - Pin(chip="fakeA", number=3, name="CON2-P10"), - Pin(chip="fakeB", number=1, name="CON2-P3"), - Pin(chip="fakeB", number=2, name="CON2-P7"), - ] + @property + def pins(self) -> List[Pin]: + if not self._pins: + self._load_pins() + return self._pins + @property + def relays(self) -> Iterable[Relay]: + if not self._relays: + for relay in self.config["relays"]: + LOGGER.debug("Relay: %s", relay) + chip = self.get_chip_by_number(relay["chip"]["number"]) + pin = self.get_pin_by_number(chip, relay["pin"]["line_number"]) + self._relays.append(Relay( + name=relay["name"], + pin=pin, + state=False, + )) + return self._relays - def relay_add(self, chip: str, pinnumber: int, name: str) -> None: - LOGGER.info("Creating relay %s %s with name %s", chip, pinnumber, name) - self.relays.append(Relay( - name=name, - pin=Pin( - chip=chip, - number=pinnumber, - name=name, - ), - state=False)) - self._write_config() + def relay_add(self, chip_number: int, chip_name: str, line_number: int, name: str) -> None: + LOGGER.info("Creating relay %s with chip %d %s on line %d", name, chip_number, chip_name, line_number) + chip = self.get_chip_by_number(chip_number) + pin = self.get_pin_by_number(chip, line_number) + self.config["relays"].append({ + "chip": { + "name": chip.name, + "number": chip.number, + }, + "pin": { + "name": pin.name, + "line_number": pin.line_number, + }, + "name": name, + }) + _write_config(self.configpath, self.config) def relay_set(self, relay: Relay, state: bool) -> None: if not relay.pin.chip.startswith("fake"): @@ -123,6 +177,50 @@ class Manager: def shutdown(self) -> None: _write_config(self.configpath, self.config) + # Talk to the remote pins daemon and get information about the chips + def _load_chips(self) -> None: + for i in range(MAX_CHIPS): + try: + handle = self.sbc.gpiochip_open(i) + except rgpio.error: + continue + if handle < 0: + continue + okay, number_of_lines, name, label = self.sbc.gpio_get_chip_info(handle) + LOGGER.info("Chip info: %s %s %s %s", okay, number_of_lines, name, label) + if okay != 0: + LOGGER.warn("Chip %s not okay.", name) + continue + self._chips.append(Chip( + handle=handle, + label=label, + name=name, + number=i, + number_of_lines=number_of_lines, + )) + + # Talk to the remote pins daemon and get information about the pins + def _load_pins(self) -> None: + LOGGER.info("Loading pins") + for chip in self.chips: + for line in range(chip.number_of_lines): + okay, offset, flags, name, user = self.sbc.gpio_get_line_info(chip.handle, line) + LOGGER.info("Got line info: %s %s %s %s %s", okay, offset, flags, name, user) + assert offset == line + if okay != 0: + LOGGER.warn("Line %s is not okay", name) + continue + if not name: + LOGGER.warn("Ignoring line %d because it has no name.", line) + continue + self._pins.append(Pin( + chip=chip, + flags=flags, + line_number=line, + name=name, + user=user, + )) + def _read_config(configpath: pathlib.Path) -> None: with open(configpath, "rb") as f: diff --git a/pnpdevice/server.py b/pnpdevice/server.py index abe41ee..22303e6 100644 --- a/pnpdevice/server.py +++ b/pnpdevice/server.py @@ -20,7 +20,7 @@ logging.basicConfig(level=logging.DEBUG) async def lifespan(app: FastAPI): relays.connect() yield - relays.shutdown() + # relays.shutdown() app = FastAPI(debug=settings.DEBUG, lifespan=lifespan) relays = Manager(settings.CONFIGFILE, settings.RELAYS_FAKE) @@ -36,22 +36,23 @@ def index(request: Request): @app.get("/relay/create") def relay_create_get(request: Request): "Get the form to create a new relay." - pins = relays.pins_list() + pins = relays.pins return templates.TemplateResponse("relay-create.template.html", { "request": request, - "pins": pins + "pins": pins, }) @app.post("/relay/create") def relay_create_post( request: Request, - chip_and_number: str = Form(...), + pin_id: str = Form(...), name: str = Form(...), ): "Create a new relay from form POST." - chip, _, number = chip_and_number.partition("-") - number = int(number) - relays.relay_add(chip, number, name) + chip_number, chip_name, line_number = pin_id.split("-") + chip_number = int(chip_number) + line_number = int(line_number) + relays.relay_add(chip_number, chip_name, line_number, name) return RedirectResponse(status_code=303, url="/") @app.get("/relay/{pin_id}") diff --git a/templates/index.template.html b/templates/index.template.html index 43bd495..7d7c22e 100644 --- a/templates/index.template.html +++ b/templates/index.template.html @@ -6,7 +6,7 @@