Add, I dunno, some changes I made.

This commit is contained in:
Eli Ribble 2024-09-15 14:29:13 -07:00
parent 68e59c0ae4
commit 7e43fca7d4
3 changed files with 77 additions and 18 deletions

View File

@ -13,6 +13,31 @@ LOGGER = logging.getLogger(__name__)
# Max number of chips to scan for
MAX_CHIPS = 16
# From http://abyz.me.uk/lg/py_rgpio.html#gpio_get_mode
GPIO_KERNEL_IN_USE = 1<<0
GPIO_KERNEL_OUTPUT = 1<<1
GPIO_KERNEL_ACTIVE_LOW = 1<<2
GPIO_KERNEL_OPEN_DRAIN = 1<<3
GPIO_KERNEL_OPEN_SOURCE = 1<<4
GPIO_KERNEL_PULL_UP = 1<<5
GPIO_KERNEL_PULL_DOWN = 1<<6
GPIO_KERNEL_PULL_OFF = 1<<7
GPIO_LG_INPUT = 1<<8
GPIO_LG_OUTPUT = 1<<9
GPIO_LG_ALERT = 1<<10
GPIO_LG_GROUP = 1<<11
GPIO_LG_NOT_USED1 = 1<<12
GPIO_LG_NOT_USED2 = 1<<13
GPIO_LG_NOT_USED3 = 1<<14
GPIO_LG_NOT_USED4 = 1<<15
GPIO_KERNEL_INPUT = 1<<16
GPIO_KERNEL_RISING_EDGE_ALERT = 1<<17
GPIO_KERNEL_FALLING_EDGE_ALERT = 1<<18
GPIO_KERNEL_REALTIME_CLOCK_ALERT = 1<<19
RELAY_OFF = 1
RELAY_ON = 0
@dataclasses.dataclass(frozen=True)
class Chip:
handle: int
@ -91,8 +116,8 @@ class Manager:
self._chips = {}
# Info on various pins, mapped by (chip number, line_number)
self._pins = {}
# Cached info on the relays
self._relays = []
# Cached info on the relays, mapped by relay name
self._relays = {}
# Connection to single-board computer GPIO
self.sbc = None
@ -152,12 +177,12 @@ class Manager:
LOGGER.debug("Relay: %s", relay)
chip = self.get_chip_by_number(relay["chip"]["number"])
pin = self.get_pin_by_number(chip, relay["pin"]["line_number"])
self._relays.append(Relay(
self._relays[relay["name"]] = Relay(
name=relay["name"],
pin=pin,
state=False,
))
return self._relays
)
return self._relays.values()
def relay_add(self, chip_number: int, chip_name: str, line_number: int, name: str) -> None:
LOGGER.info("Creating relay %s with chip %d %s on line %d", name, chip_number, chip_name, line_number)
@ -175,11 +200,23 @@ class Manager:
"name": name,
})
_write_config(self.configpath, self.config)
self._relays = []
self._relays = {}
def relay_set(self, relay: Relay, state: bool) -> None:
if not relay.pin.chip.startswith("fake"):
pass
level = RELAY_ON if state else RELAY_OFF
ok = self.sbc.gpio_write(relay.pin.chip.handle, relay.pin.line_number, level)
if ok != 0:
LOGGER.warn("Failed to set relay level for %s (%s)", relay.name, relay.id)
else:
LOGGER.info("Set relay %s (%s) to %s", relay.name, relay.id, "enabled" if state else "disabled")
okay, offset, flags, name, user = self.sbc.gpio_get_line_info(relay.pin.chip.handle, relay.pin.line_number)
LOGGER.info("Flags are now %d", flags)
self._relays[relay.name] = Relay(
name=relay.name,
pin=relay.pin,
state=state,
)
def shutdown(self) -> None:
_write_config(self.configpath, self.config)
@ -211,7 +248,7 @@ class Manager:
def _load_pin(self, chip: Chip, line_number: int) -> None:
okay, offset, flags, name, user = self.sbc.gpio_get_line_info(chip.handle, line_number)
LOGGER.info("Got line info: %s %s %s %s %s", okay, offset, flags, name, user)
LOGGER.info("Got line info: %s %s %s %s", offset, flags, name, user)
assert offset == line_number
if okay != 0:
LOGGER.warn("Line %s is not okay", name)
@ -219,6 +256,21 @@ class Manager:
if not name:
LOGGER.warn("Ignoring line %d because it has no name.", line_number)
return
# Claim the pin for output, set initially to relay off.
okay = self.sbc.gpio_claim_output(
handle=chip.handle,
gpio=line_number,
level=RELAY_OFF,
)
if okay != 0:
LOGGER.warn("Failed to claim pin %s", line_number)
return
okay = self.sbc.gpio_write(chip.handle, line_number, RELAY_OFF)
if okay != 0:
LOGGER.warn("Failed to write pin %s", line_number)
return
okay, offset, flags, name, user = self.sbc.gpio_get_line_info(chip.handle, line_number)
LOGGER.info("Flags are %d", flags)
self._pins[(chip.number, line_number)] = Pin(
chip=chip,
flags=flags,

View File

@ -68,15 +68,15 @@ def relay_detail_get(request: Request, pin_id: str):
"relay": relay
})
@app.post("/relay/create")
@app.post("/relay/{pin_id}/state")
def relay_state_post(
request: Request,
pin_id: str = Form(...),
state: str = Form(...),
pin_id: str,
state: bool = Form(...),
):
"Change the state of a relay."
relay = relays.get_relay_by_pin_id(pin_id)
relays.relay_set()
relays.relay_set(relay, state)
return RedirectResponse(status_code=303, url="/")
async def status(request: Request):

View File

@ -4,11 +4,18 @@
<h2>No relays found.</h2>
{% else %}
<h2>Relays</h2>
<ul>
{% for relay in relays %}
<li><a href="/relay/{{ relay.id }}">{{ relay.name }}</a> - {{ relay.state }}</li>
{% endfor %}
</ul>
<table>
<thead><th>Relay</th><th>State</th></thead>
<tbody>
{% for relay in relays %}
<tr>
<td><a href="/relay/{{ relay.id }}">{{ relay.name }}</td></a></td>
<td>{{ "ON" if relay.state else "OFF"}}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endif %}
<br>
<a href="/relay/create">Create Relay</a>
</html>