Get working scanning over rgpiod

This includes getting a list of chips, their line numbers, enumerating
the properties of the lines/pins, and providing to the user the list of
valid pins to connect to relays.

It also includes reworking the fake relay system and the configuration
file.

I believe this ends the rampant refactoring and I'll now stabilize out
some features.
This commit is contained in:
Eli Ribble 2023-05-26 09:28:39 -07:00
parent cbffa327b9
commit dd637c2eaa
5 changed files with 158 additions and 38 deletions

View File

@ -3,22 +3,39 @@ import logging
import os
import pathlib
import tomllib
from typing import Dict, List, Union
from typing import Dict, Iterable, List, Union
import rgpio
import tomli_w
LOGGER = logging.getLogger(__name__)
import rgpio
# Max number of chips to scan for
MAX_CHIPS = 16
@dataclasses.dataclass(frozen=True)
class Chip:
handle: int
label: str
name: str
number: int
number_of_lines: int
@property
def id(self) -> str:
return f"{self.number}-{self.name}"
@dataclasses.dataclass(frozen=True)
class Pin:
chip: str
number: int
chip: Chip
flags: int
line_number: int
name: str
user: str
@property
def id(self) -> str:
return f"{self.chip}-{self.number}"
return f"{self.chip.id}-{self.line_number}"
@dataclasses.dataclass(frozen=True)
class Relay:
@ -70,6 +87,12 @@ class Manager:
self.configpath = configpath
self.has_fakes = has_fakes
# Handles to various chips
self._chips = []
# Info on various pins
self._pins = []
# Cached info on the relays
self._relays = []
# Connection to single-board computer GPIO
self.sbc = None
@ -77,15 +100,32 @@ class Manager:
"Provide an iterator for the relays."
return iter(self.relays)
@property
def chips(self) -> List[Chip]:
if not self._chips:
self._load_chips()
return self._chips
def connect(self) -> None:
self.sbc = rgpio.sbc(
host=self.config["rgpio"]["host"],
port=self.config["rgpio"]["port"],
)
if not self.sbc.connected:
LOGGER.warning("Failed to connect sbc")
self.sbc = None
def chips_list(self) -> List[str]:
if self.has_fakes:
return ["fakeA", "fakeB"]
def get_chip_by_number(self, number: int) -> Chip:
for chip in self.chips:
if chip.number == number:
return chip
raise Exception(f"Can't find chip {number}")
def get_pin_by_number(self, chip: Chip, line_number: int) -> Pin:
for pin in self.pins:
if pin.chip == chip and pin.line_number == line_number:
return pin
raise Exception(f"Can't find pin {chip.name}-{line_number}")
def get_relay_by_pin_id(self, pin_id: str) -> Relay:
for relay in self.relays:
@ -93,28 +133,42 @@ class Manager:
return relay
return None
def pins_list(self) -> List[Pin]:
if self.has_fakes:
return [
Pin(chip="fakeA", number=1, name="CON2-P1"),
Pin(chip="fakeA", number=2, name="CON2-P2"),
Pin(chip="fakeA", number=3, name="CON2-P10"),
Pin(chip="fakeB", number=1, name="CON2-P3"),
Pin(chip="fakeB", number=2, name="CON2-P7"),
]
@property
def pins(self) -> List[Pin]:
if not self._pins:
self._load_pins()
return self._pins
@property
def relays(self) -> Iterable[Relay]:
if not self._relays:
for relay in self.config["relays"]:
LOGGER.debug("Relay: %s", relay)
chip = self.get_chip_by_number(relay["chip"]["number"])
pin = self.get_pin_by_number(chip, relay["pin"]["line_number"])
self._relays.append(Relay(
name=relay["name"],
pin=pin,
state=False,
))
return self._relays
def relay_add(self, chip: str, pinnumber: int, name: str) -> None:
LOGGER.info("Creating relay %s %s with name %s", chip, pinnumber, name)
self.relays.append(Relay(
name=name,
pin=Pin(
chip=chip,
number=pinnumber,
name=name,
),
state=False))
self._write_config()
def relay_add(self, chip_number: int, chip_name: str, line_number: int, name: str) -> None:
LOGGER.info("Creating relay %s with chip %d %s on line %d", name, chip_number, chip_name, line_number)
chip = self.get_chip_by_number(chip_number)
pin = self.get_pin_by_number(chip, line_number)
self.config["relays"].append({
"chip": {
"name": chip.name,
"number": chip.number,
},
"pin": {
"name": pin.name,
"line_number": pin.line_number,
},
"name": name,
})
_write_config(self.configpath, self.config)
def relay_set(self, relay: Relay, state: bool) -> None:
if not relay.pin.chip.startswith("fake"):
@ -123,6 +177,50 @@ class Manager:
def shutdown(self) -> None:
_write_config(self.configpath, self.config)
# Talk to the remote pins daemon and get information about the chips
def _load_chips(self) -> None:
for i in range(MAX_CHIPS):
try:
handle = self.sbc.gpiochip_open(i)
except rgpio.error:
continue
if handle < 0:
continue
okay, number_of_lines, name, label = self.sbc.gpio_get_chip_info(handle)
LOGGER.info("Chip info: %s %s %s %s", okay, number_of_lines, name, label)
if okay != 0:
LOGGER.warn("Chip %s not okay.", name)
continue
self._chips.append(Chip(
handle=handle,
label=label,
name=name,
number=i,
number_of_lines=number_of_lines,
))
# Talk to the remote pins daemon and get information about the pins
def _load_pins(self) -> None:
LOGGER.info("Loading pins")
for chip in self.chips:
for line in range(chip.number_of_lines):
okay, offset, flags, name, user = self.sbc.gpio_get_line_info(chip.handle, line)
LOGGER.info("Got line info: %s %s %s %s %s", okay, offset, flags, name, user)
assert offset == line
if okay != 0:
LOGGER.warn("Line %s is not okay", name)
continue
if not name:
LOGGER.warn("Ignoring line %d because it has no name.", line)
continue
self._pins.append(Pin(
chip=chip,
flags=flags,
line_number=line,
name=name,
user=user,
))
def _read_config(configpath: pathlib.Path) -> None:
with open(configpath, "rb") as f:

View File

@ -20,7 +20,7 @@ logging.basicConfig(level=logging.DEBUG)
async def lifespan(app: FastAPI):
relays.connect()
yield
relays.shutdown()
# relays.shutdown()
app = FastAPI(debug=settings.DEBUG, lifespan=lifespan)
relays = Manager(settings.CONFIGFILE, settings.RELAYS_FAKE)
@ -36,22 +36,23 @@ def index(request: Request):
@app.get("/relay/create")
def relay_create_get(request: Request):
"Get the form to create a new relay."
pins = relays.pins_list()
pins = relays.pins
return templates.TemplateResponse("relay-create.template.html", {
"request": request,
"pins": pins
"pins": pins,
})
@app.post("/relay/create")
def relay_create_post(
request: Request,
chip_and_number: str = Form(...),
pin_id: str = Form(...),
name: str = Form(...),
):
"Create a new relay from form POST."
chip, _, number = chip_and_number.partition("-")
number = int(number)
relays.relay_add(chip, number, name)
chip_number, chip_name, line_number = pin_id.split("-")
chip_number = int(chip_number)
line_number = int(line_number)
relays.relay_add(chip_number, chip_name, line_number, name)
return RedirectResponse(status_code=303, url="/")
@app.get("/relay/{pin_id}")

View File

@ -2,9 +2,9 @@
<h1>Relay Creation</h1>
<form method="POST" action="/relay/create">
<input type="text" name="name" placeholder="Pool Pump 1"></input>
<select name="chip-and-number">:
<select name="pin_id">:
{% for pin in pins %}
<option value="{{ pin.chip }}-{{ pin.number }}">{{ pin.name }}</option>
<option value="{{ pin.chip.number }}-{{ pin.chip.name }}-{{ pin.line_number }}">{{ pin.name }}</option>
{% endfor %}
</select>
<button type="submit">Create</button>

View File

@ -0,0 +1,21 @@
<html>
<h1>Pools'n'Pumps Device</h1>
<h2>Relay "{{relay.name}}"</h2>
<table>
<thead><tr><th>Property</th><th>Value</th></tr></thead>
<tbody>
<tr><td>Chip Number</td><td>{{ relay.pin.chip.number }}</td></tr>
<tr><td>Chip Label</td><td>{{ relay.pin.chip.label }}</td></tr>
<tr><td>Chip Name</td><td>{{ relay.pin.chip.name }}</td></tr>
<tr><td>Pin Name</td><td>{{ relay.pin.name }}</td></tr>
<tr><td>Pin Number</td><td>{{ relay.pin.line_number }}</td></tr>
<tr><td>Relay State</td><td>{{ relay.state }}</td></tr>
</tbody>
</table>
<form method="POST" action="/relay/{{relay.id}}/state">
<input type="hidden" name="pin_id" value="{{ relay.pin.id }}">
<input type="hidden" name="state" value="{{not relay.state}}">
<button type="submit">{{ "Turn OFF" if relay.state else "Turn ON" }}</button>
</form>
<a href="/">Home</a>
</html>