Compare commits

..

8 Commits

Author SHA1 Message Date
Eli Ribble 7e43fca7d4 Add, I dunno, some changes I made. 2024-09-15 14:29:13 -07:00
Eli Ribble 68e59c0ae4 Add missing files 2024-09-15 14:28:58 -07:00
Eli Ribble 73e41d90b7 Make daemon-test work similar to previous 'test', but over the network.
This really is just to make my life a little easier since the daemon is
running and stable on my remote machine and directly editing the
software on that device is obnoxious due to network errors.
2023-05-31 09:37:29 -07:00
Eli Ribble 82c741033d Show existing relays on relay creation page.
This both shows the relays that we already have assigned and removes
existing pins from the list to choose from. It also alphabetizes for
easy reading by the human.
2023-05-26 09:52:01 -07:00
Eli Ribble 0fee05acf7 Invalidate relay cache after changing config.
Without this we don't see new relays after we add them.
2023-05-26 09:47:20 -07:00
Eli Ribble b4027baf68 Don't require a full pin scan for the root page.
The full pin scan is slow, and it's slowing me down. Instead, just load
what we know we need for the configured relays unless we are adding a
new relay.
2023-05-26 09:44:17 -07:00
Eli Ribble dd637c2eaa Get working scanning over rgpiod
This includes getting a list of chips, their line numbers, enumerating
the properties of the lines/pins, and providing to the user the list of
valid pins to connect to relays.

It also includes reworking the fake relay system and the configuration
file.

I believe this ends the rampant refactoring and I'll now stabilize out
some features.
2023-05-26 09:28:39 -07:00
Eli Ribble cbffa327b9 Move to using remote rgpiod connection.
This adds a bunch of stuff, some barely working. The core idea that I'm
adding now is that I'm redoing the way that we interact with the gpio
pins. Previously I had planned to use lgpio from abyz.me.uk/lg/ directly
and run this program *only* on the single-board computer with the GPIO
pins. Turns out I can instead run the lgpio project's rgpiod program on
the single-board computer and talk to it over the network.

This is way more stable and way faster for development, so that's what
I'm doing.

This is just a checkpoint.

Incidentally, here is the license for the rgpio.py code:

"This is free and unencumbered software released into the public domain.

Anyone is free to copy, modify, publish, use, compile, sell, or distribute this software, either in source code form or as a compiled binary, for any purpose, commercial or non-commercial, and by any means.

In jurisdictions that recognize copyright laws, the author or authors of this software dedicate any and all copyright interest in the software to the public domain. We make this dedication for the benefit of the public at large and to the detriment of our heirs and successors. We intend this dedication to be an overt act of relinquishment in perpetuity of all present and future rights to this software under copyright law.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

For more information, please refer to <http://unlicense.org/>"
2023-05-26 08:13:54 -07:00
13 changed files with 4477 additions and 100 deletions

47
config.toml Normal file
View File

@ -0,0 +1,47 @@
[[relays]]
name = "Unknown 1"
[relays.chip]
name = "gpiochip0"
number = 0
[relays.pin]
name = "CON2-P22"
line_number = 2
[[relays]]
name = "Waterfall"
[relays.chip]
name = "gpiochip0"
number = 0
[relays.pin]
name = "CON2-P18"
line_number = 68
[[relays]]
name = "Filter"
[relays.chip]
name = "gpiochip0"
number = 0
[relays.pin]
name = "CON2-P16"
line_number = 15
[[relays]]
name = "Unknown 2"
[relays.chip]
name = "gpiochip0"
number = 0
[relays.pin]
name = "CON2-P12"
line_number = 16
[rgpio]
host = "pool.ribble.net"
port = 8889

161
daemon-test.py Executable file
View File

@ -0,0 +1,161 @@
#!/usr/bin/env python3
import argparse
import dataclasses
import sys
from typing import Dict
import time
import rgpio
# From test C program using wiringPi
# RELAY_PINS = [4, 17, 27, 18, 22, 23, 24, 25]
# Physical pins
# PINS = [7, 11, 13, 15, 12, 16, 18, 22]
# From Banana Pi Zero M2 pinout
RELAY_PINS = ["CON2-P07", "CON2-P11", "CON2-P13", "CON2-P15", "CON2-P12", "CON2-P16", "CON2-P18", "CON2-P22"]
# con2-p22 ?
# p18: waterfall
# p16: filter pump, salt cell
#lgpio.h:#define LG_GPIO_IS_KERNEL 1
#lgpio.h:#define LG_GPIO_IS_OUTPUT 2
#lgpio.h:#define LG_GPIO_IS_ACTIVE_LOW 4
#lgpio.h:#define LG_GPIO_IS_OPEN_DRAIN 8
#lgpio.h:#define LG_GPIO_IS_OPEN_SOURCE 16
#lgpio.h:#define LG_GPIO_IS_PULL_UP 32
#lgpio.h:#define LG_GPIO_IS_PULL_DOWN 64
#lgpio.h:#define LG_GPIO_IS_PULL_NONE 128
#lgpio.h:#define LG_GPIO_IS_INPUT 65536
#lgpio.h:#define LG_GPIO_IS_RISING_EDGE 131072
#lgpio.h:#define LG_GPIO_IS_FALLING_EDGE 262144
#lgpio.h:#define LG_GPIO_IS_REALTIME_CLOCK 524288
FLAG_TO_NAME = {
1: "kernel",
2: "output",
4: "active_low",
8: "open_drain",
16: "open_source",
32: "pull_up",
64: "pull_down",
128: "pull_none",
65536: "input",
131072: "rising_edge",
262144: "falling_edge",
524288: "realtime_clock",
}
@dataclasses.dataclass
class Relay:
name: str
pin: str
RELAY_ON = 0
RELAY_OFF = 1
RELAYS = [
Relay("Light", "CON2-P22"),
Relay("Waterfall", "CON2-P18"),
Relay("Filter", "CON2-P16"),
Relay("Unknown 2", "CON2-P12"),
]
DEVICE = "/dev/ttyUSB0"
def alloff(args, sbc, handle, pin_to_line):
"Turn off all relays"
for relay in RELAYS:
line = pin_to_line[relay.pin]
sbc.gpio_claim_output(handle, line, level=RELAY_OFF)
print_status(sbc, handle, line)
def flags_to_name(flags: int):
"Get the set of flag names based on the provided flag value"
result = set()
for value, name in FLAG_TO_NAME.items():
if flags & value:
result.add(name)
return result
def on(args, sbc, handle, pin_to_line):
"Turn on a single relay"
name_to_pin = {relay.name.lower(): relay.pin for relay in RELAYS}
pin = name_to_pin[args.name.lower()]
line = pin_to_line[pin]
sbc.gpio_claim_output(handle, line, level=RELAY_ON)
sbc.gpio_write(handle, line, RELAY_ON)
print_status(sbc, handle, line)
print(f"{args.name} now on")
if args.duration:
print(f"Waiting {args.duration} minutes ({args.duration / 60.0} hours), then turning off")
time.sleep(args.duration * 60)
sbc.gpio_write(handle, line, RELAY_OFF)
print(f"{args.name} now off")
print_status(sbc, handle, line)
def status(args, sbc, handle, pin_to_line):
pin_to_name = {relay.pin: relay.name for relay in RELAYS}
for pin, name in pin_to_name.items():
line = pin_to_line[pin]
print_status(sbc, handle, line)
def print_status(sbc, handle, line):
okay_status, io_number, flags, pin, user = sbc.gpio_get_line_info(handle, line)
names = flags_to_name(flags)
if okay_status == 0:
print(f"{io_number}): {flags} {pin} {user}: {' '.join(sorted(names))}")
else:
print(f"Failed to get status for {line}")
def map_pin_to_line(sbc, handle):
start_pin, end_pin, name, driver = sbc.gpio_get_chip_info(handle)
pin_to_line = {}
for x in range(start_pin, end_pin):
okay_status, io_number, flags, pin, user = sbc.gpio_get_line_info(handle, x)
flag_names = flags_to_name(flags)
if okay_status != 0:
print(f"Error getting pin {x}: status {okay_status}")
continue
pin_to_line[pin] = x
return pin_to_line
def main():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(required=True, help="the sub-command to execute")
parser_status = subparsers.add_parser("status", help="Show status of the relays")
parser_status.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to")
parser_status.set_defaults(func=status)
parser_alloff = subparsers.add_parser("alloff", help="Turn off all relays")
parser_alloff.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to")
parser_alloff.set_defaults(func=alloff)
parser_on = subparsers.add_parser("on", help="Turn on a relay")
parser_on.add_argument("name", choices=[relay.name for relay in RELAYS])
parser_on.add_argument("-d", "--duration", type=int, default=0, help="Time, in minutes, to keep the relay on")
parser_on.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to")
parser_on.set_defaults(func=on)
args = parser.parse_args()
sbc = rgpio.sbc(host=args.host)
if not sbc.connected:
print("Not connected")
return -1
handle = sbc.gpiochip_open(0) # open /dev/gpiochip0
if handle < 0:
print("Error on open")
return -1
pin_to_line = map_pin_to_line(sbc, handle)
try:
args.func(args, sbc, handle, pin_to_line)
finally:
sbc.gpiochip_close(handle)
return 0
if __name__ == "__main__":
sys.exit(main())

74
mdns-registration-test.py Executable file
View File

@ -0,0 +1,74 @@
#!/usr/bin/env python3
"""Example of announcing 250 services (in this case, a fake HTTP server)."""
import argparse
import asyncio
import logging
import socket
from typing import List, Optional
from zeroconf import IPVersion
from zeroconf.asyncio import AsyncServiceInfo, AsyncZeroconf
class AsyncRunner:
def __init__(self, ip_version: IPVersion) -> None:
self.ip_version = ip_version
self.aiozc: Optional[AsyncZeroconf] = None
async def register_services(self, infos: List[AsyncServiceInfo]) -> None:
self.aiozc = AsyncZeroconf(ip_version=self.ip_version)
tasks = [self.aiozc.async_register_service(info) for info in infos]
background_tasks = await asyncio.gather(*tasks)
await asyncio.gather(*background_tasks)
print("Finished registration, press Ctrl-C to exit...")
while True:
await asyncio.sleep(1)
async def unregister_services(self, infos: List[AsyncServiceInfo]) -> None:
assert self.aiozc is not None
tasks = [self.aiozc.async_unregister_service(info) for info in infos]
background_tasks = await asyncio.gather(*tasks)
await asyncio.gather(*background_tasks)
await self.aiozc.async_close()
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
parser = argparse.ArgumentParser()
parser.add_argument('--debug', action='store_true')
version_group = parser.add_mutually_exclusive_group()
version_group.add_argument('--v6', action='store_true')
version_group.add_argument('--v6-only', action='store_true')
args = parser.parse_args()
if args.debug:
logging.getLogger('zeroconf').setLevel(logging.DEBUG)
if args.v6:
ip_version = IPVersion.All
elif args.v6_only:
ip_version = IPVersion.V6Only
else:
ip_version = IPVersion.V4Only
infos = []
for i in range(250):
infos.append(
AsyncServiceInfo(
"_http._tcp.local.",
f"Paul's Test Web Site {i}._http._tcp.local.",
addresses=[socket.inet_aton("127.0.0.1")],
port=80,
properties={'path': '/~paulsm/'},
server=f"zcdemohost-{i}.local.",
)
)
print("Registration of 250 services...")
loop = asyncio.get_event_loop()
runner = AsyncRunner(ip_version)
try:
loop.run_until_complete(runner.register_services(infos))
except KeyboardInterrupt:
loop.run_until_complete(runner.unregister_services(infos))

24
mdns-scan-test.py Executable file
View File

@ -0,0 +1,24 @@
#!/usr/bin/env python3
from zeroconf import ServiceBrowser, ServiceListener, Zeroconf
class MyListener(ServiceListener):
def update_service(self, zc: Zeroconf, type_: str, name: str) -> None:
print(f"Service {name} updated")
def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None:
print(f"Service {name} removed")
def add_service(self, zc: Zeroconf, type_: str, name: str) -> None:
info = zc.get_service_info(type_, name)
print(f"Service {name} added, service info: {info}")
zeroconf = Zeroconf()
listener = MyListener()
browser = ServiceBrowser(zeroconf, "_http._tcp.local.", listener)
try:
input("Press enter to exit...\n\n")
finally:
zeroconf.close()

View File

@ -1,8 +0,0 @@
from starlette.config import Config
config = Config(".env")
DEBUG = config("DEBUG", cast=bool, default=False)
DATABASE = config("DATABASE_URL", cast=databases.DatabaseURL)
SECRET_KEY = config("SECRET_KEY", cast=Secret)
ALLOWED_HOSTS = config("ALLOWED_HOSTS", cast=CommaSeparatedStrings)

View File

@ -3,24 +3,75 @@ import logging
import os
import pathlib
import tomllib
from typing import Dict, List, Union
from typing import Dict, Iterable, List, Union
import rgpio
import tomli_w
LOGGER = logging.getLogger(__name__)
@dataclasses.dataclass
class Pin:
chip: str
number: int
name: str
# Max number of chips to scan for
MAX_CHIPS = 16
@dataclasses.dataclass
# From http://abyz.me.uk/lg/py_rgpio.html#gpio_get_mode
GPIO_KERNEL_IN_USE = 1<<0
GPIO_KERNEL_OUTPUT = 1<<1
GPIO_KERNEL_ACTIVE_LOW = 1<<2
GPIO_KERNEL_OPEN_DRAIN = 1<<3
GPIO_KERNEL_OPEN_SOURCE = 1<<4
GPIO_KERNEL_PULL_UP = 1<<5
GPIO_KERNEL_PULL_DOWN = 1<<6
GPIO_KERNEL_PULL_OFF = 1<<7
GPIO_LG_INPUT = 1<<8
GPIO_LG_OUTPUT = 1<<9
GPIO_LG_ALERT = 1<<10
GPIO_LG_GROUP = 1<<11
GPIO_LG_NOT_USED1 = 1<<12
GPIO_LG_NOT_USED2 = 1<<13
GPIO_LG_NOT_USED3 = 1<<14
GPIO_LG_NOT_USED4 = 1<<15
GPIO_KERNEL_INPUT = 1<<16
GPIO_KERNEL_RISING_EDGE_ALERT = 1<<17
GPIO_KERNEL_FALLING_EDGE_ALERT = 1<<18
GPIO_KERNEL_REALTIME_CLOCK_ALERT = 1<<19
RELAY_OFF = 1
RELAY_ON = 0
@dataclasses.dataclass(frozen=True)
class Chip:
handle: int
label: str
name: str
number: int
number_of_lines: int
@property
def id(self) -> str:
return f"{self.number}-{self.name}"
@dataclasses.dataclass(frozen=True)
class Pin:
chip: Chip
flags: int
line_number: int
name: str
user: str
@property
def id(self) -> str:
return f"{self.chip.id}-{self.line_number}"
@dataclasses.dataclass(frozen=True)
class Relay:
pin: Pin
name: str
state: bool
@property
def id(self) -> str:
return f"{self.pin.id}"
def _deserialize(data: List[Dict[str, str]]) -> List[Relay]:
"Deserialize a list of relays from the config."
return [Relay(
@ -29,10 +80,12 @@ def _deserialize(data: List[Dict[str, str]]) -> List[Relay]:
chip=relay["chip"],
number=int(relay["pinnumber"]),
name=relay["pinname"],
)) for relay in data]
),
state=False) for relay in data]
def _serialize(data: List[Relay]) -> List[Dict[str, str]]:
"Serialize a list of relays to the config."
LOGGER.info("Serializing %s", data)
return [{
"chip": relay.pin.chip,
"name": relay.name,
@ -40,71 +93,207 @@ def _serialize(data: List[Relay]) -> List[Dict[str, str]]:
"pinname": relay.pin.name,
} for relay in data]
class Relays:
class Manager:
"Class for interacting with relays."
def __init__(self, configpath: pathlib.Path):
self.configpath = configpath
def __init__(self, configpath: pathlib.Path, has_fakes: bool):
try:
with open(configpath, "rb") as f:
content = tomllib.load(f)
self.config = content
except Exception as e:
LOGGER.info("Unable to load config file: %s", e)
self.config = _read_config(configpath)
except FileNotFoundError:
self.config = {
"relays": [],
"rgpio": {"host": "localhost", "port": 8889},
"relays": [{
"chip": -1,
"pin": "FAKE-P1",
"name": "Fake",
}],
}
self.relays = _deserialize(self.config["relays"])
# Immediately write out the default config
_write_config(configpath, self.config)
self.configpath = configpath
self.has_fakes = has_fakes
# Handles to various chips, mapped by chip number
self._chips = {}
# Info on various pins, mapped by (chip number, line_number)
self._pins = {}
# Cached info on the relays, mapped by relay name
self._relays = {}
# Connection to single-board computer GPIO
self.sbc = None
def chips_list(self) -> List[str]:
raise NotImplementedError()
def __iter__(self):
"Provide an iterator for the relays."
return iter(self.relays)
def pins_list(self) -> List[Pin]:
raise NotImplementedError()
@property
def chips(self) -> List[Chip]:
if not self._chips:
self._load_chips()
return self._chips.values()
def relay_add(self, chip: str, pinnumber: int, name: str) -> None:
def connect(self) -> None:
self.sbc = rgpio.sbc(
host=self.config["rgpio"]["host"],
port=self.config["rgpio"]["port"],
)
if not self.sbc.connected:
LOGGER.warning("Failed to connect sbc")
self.sbc = None
def get_chip_by_number(self, number: int) -> Chip:
try:
return self._chips[number]
except KeyError:
self._load_chip(number)
return self._chips[number]
def get_pin_by_number(self, chip: Chip, line_number: int) -> Pin:
try:
return self._pins[(chip.number, line_number)]
except KeyError:
self._load_pin(chip, line_number)
return self._pins[(chip.number, line_number)]
def get_relay_by_pin_id(self, pin_id: str) -> Relay:
for relay in self.relays:
if relay.pin.id == pin_id:
return relay
return None
def load_all_pins(self) -> None:
self._load_chips()
self._load_pins()
@property
def pins(self) -> List[Pin]:
if not self._pins:
self._load_pins()
return self._pins.values()
@property
def relays(self) -> Iterable[Relay]:
if not self._relays:
for relay in self.config["relays"]:
LOGGER.debug("Relay: %s", relay)
chip = self.get_chip_by_number(relay["chip"]["number"])
pin = self.get_pin_by_number(chip, relay["pin"]["line_number"])
self._relays[relay["name"]] = Relay(
name=relay["name"],
pin=pin,
state=False,
)
return self._relays.values()
def relay_add(self, chip_number: int, chip_name: str, line_number: int, name: str) -> None:
LOGGER.info("Creating relay %s with chip %d %s on line %d", name, chip_number, chip_name, line_number)
chip = self.get_chip_by_number(chip_number)
pin = self.get_pin_by_number(chip, line_number)
self.config["relays"].append({
"chip": chip,
"chip": {
"name": chip.name,
"number": chip.number,
},
"pin": {
"name": pin.name,
"line_number": pin.line_number,
},
"name": name,
"pinnumber": pinnumber,
})
with open(self.configpath, "wb") as f:
tomli_w.dump({
"relays": _serialize(self.config["relays"]),
}, f)
LOGGER.info("Wrote config file %s", self.configpath)
_write_config(self.configpath, self.config)
self._relays = {}
def relays_list(self) -> List[Relay]:
return self.relays
def relay_on(self, name: str) -> bool:
raise NotImplementedError()
def relay_set(self, relay: Relay, state: bool) -> None:
level = RELAY_ON if state else RELAY_OFF
ok = self.sbc.gpio_write(relay.pin.chip.handle, relay.pin.line_number, level)
if ok != 0:
LOGGER.warn("Failed to set relay level for %s (%s)", relay.name, relay.id)
else:
LOGGER.info("Set relay %s (%s) to %s", relay.name, relay.id, "enabled" if state else "disabled")
okay, offset, flags, name, user = self.sbc.gpio_get_line_info(relay.pin.chip.handle, relay.pin.line_number)
LOGGER.info("Flags are now %d", flags)
self._relays[relay.name] = Relay(
name=relay.name,
pin=relay.pin,
state=state,
)
def relay_set(self, name: str, state: bool) -> None:
raise NotImplementedError()
class RelaysFake(Relays):
"Class for fake relays, useful for testing."
def chips_list(self) -> List[str]:
return ["chipA", "chipB"]
def shutdown(self) -> None:
_write_config(self.configpath, self.config)
def pins_list(self) -> List[Pin]:
return [
Pin(chip="chipA", number=1, name="CON2-P1"),
Pin(chip="chipA", number=2, name="CON2-P2"),
Pin(chip="chipA", number=3, name="CON2-P10"),
Pin(chip="chipB", number=1, name="CON2-P3"),
Pin(chip="chipB", number=2, name="CON2-P7"),
]
def _load_chip(self, number: int) -> None:
try:
handle = self.sbc.gpiochip_open(number)
except rgpio.error:
return
if handle < 0:
return
okay, number_of_lines, name, label = self.sbc.gpio_get_chip_info(handle)
LOGGER.info("Chip info: %s %s %s %s", okay, number_of_lines, name, label)
if okay != 0:
LOGGER.warn("Chip %s not okay.", name)
return
self._chips[number] = Chip(
handle=handle,
label=label,
name=name,
number=number,
number_of_lines=number_of_lines,
)
def relays_list(self) -> List[Relay]:
raise NotImplementedError()
def relay_on(self, name: str) -> bool:
raise NotImplementedError()
# Talk to the remote pins daemon and get information about the chips
def _load_chips(self) -> None:
for i in range(MAX_CHIPS):
self._load_chip(i)
def relay_set(self, name: str, state: bool) -> None:
raise NotImplementedError()
def _load_pin(self, chip: Chip, line_number: int) -> None:
okay, offset, flags, name, user = self.sbc.gpio_get_line_info(chip.handle, line_number)
LOGGER.info("Got line info: %s %s %s %s", offset, flags, name, user)
assert offset == line_number
if okay != 0:
LOGGER.warn("Line %s is not okay", name)
return
if not name:
LOGGER.warn("Ignoring line %d because it has no name.", line_number)
return
# Claim the pin for output, set initially to relay off.
okay = self.sbc.gpio_claim_output(
handle=chip.handle,
gpio=line_number,
level=RELAY_OFF,
)
if okay != 0:
LOGGER.warn("Failed to claim pin %s", line_number)
return
okay = self.sbc.gpio_write(chip.handle, line_number, RELAY_OFF)
if okay != 0:
LOGGER.warn("Failed to write pin %s", line_number)
return
okay, offset, flags, name, user = self.sbc.gpio_get_line_info(chip.handle, line_number)
LOGGER.info("Flags are %d", flags)
self._pins[(chip.number, line_number)] = Pin(
chip=chip,
flags=flags,
line_number=line_number,
name=name,
user=user,
)
# Talk to the remote pins daemon and get information about the pins
def _load_pins(self) -> None:
LOGGER.info("Loading pins")
for chip in self.chips:
for line in range(chip.number_of_lines):
self._load_pin(chip, line)
def _read_config(configpath: pathlib.Path) -> None:
with open(configpath, "rb") as f:
content = tomllib.load(f)
LOGGER.info("Read config from %s", configpath)
return content
def _write_config(configpath: pathlib.Path, config) -> None:
with open(configpath, "wb") as f:
tomli_w.dump(config, f)
LOGGER.info("Wrote config file %s", configpath)
class RelaysReal(Relays):
"Class for controlling real relays."

View File

@ -1,48 +1,83 @@
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
import logging
from fastapi import FastAPI, Form, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from starlette.responses import RedirectResponse
import jinja2
from pnpdevice.config import config
from pnpdevice import settings
from pnpdevice.relays import Manager
import pnpdevice.discovery
import pnpdevice.relays
app = FastAPI(debug=config.DEBUG)
LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
@asynccontextmanager
async def lifespan(app: FastAPI):
relays.connect()
yield
# relays.shutdown()
app = FastAPI(debug=settings.DEBUG, lifespan=lifespan)
relays = Manager(settings.CONFIGFILE, settings.RELAYS_FAKE)
templates = Jinja2Templates(directory="templates")
@app.get("/")
def index():
relays = request.app["relays"]
@app.get("/", response_class=HTMLResponse)
def index(request: Request):
return templates.TemplateResponse("index.template.html", {
"relays": relays.relays_list(),
"request": request,
"relays": relays,
})
@app.get("/relay/create")
def relay_create_get(request: Request):
"Get the form to create a new relay."
relays = request.app["relays"]
pins = relays.pins_list()
return templates.TemplateResponse("relay-create.template.html", {"pins": pins})
relays.load_all_pins()
used_pin_names = {relay.pin.name for relay in relays}
pins = [p for p in relays.pins if p.name not in used_pin_names]
sorted_pins = sorted(pins, key=lambda p: p.name)
return templates.TemplateResponse("relay-create.template.html", {
"relays": relays,
"request": request,
"pins": sorted_pins,
})
async def relay_create_post(request: Request, chip_and_number: str, name: str):
"Create a new relay."
chip, number = chip_and_number.partition("-")
LOGGER.info("Creating relay %s %s with name %s", chip, number, name)
@app.post("/relay/create")
def relay_create_post(
request: Request,
pin_id: str = Form(...),
name: str = Form(...),
):
"Create a new relay from form POST."
chip_number, chip_name, line_number = pin_id.split("-")
chip_number = int(chip_number)
line_number = int(line_number)
relays.relay_add(chip_number, chip_name, line_number, name)
return RedirectResponse(status_code=303, url="/")
@app.get("/relay/{pin_id}")
def relay_detail_get(request: Request, pin_id: str):
"Get details on a single relay."
relay = relays.get_relay_by_pin_id(pin_id)
return templates.TemplateResponse("relay-detail.template.html", {
"request": request,
"relay": relay
})
@app.post("/relay/{pin_id}/state")
def relay_state_post(
request: Request,
pin_id: str,
state: bool = Form(...),
):
"Change the state of a relay."
relay = relays.get_relay_by_pin_id(pin_id)
relays.relay_set(relay, state)
return RedirectResponse(status_code=303, url="/")
async def status(request: Request):
return html("Status")
def run(relays: pnpdevice.relays.Relays):
"Run the embedded web server"
app = web.Application()
app["relays"] = relays
aiohttp_jinja2.setup(app,
loader=jinja2.FileSystemLoader("templates"))
app.on_startup.append(pnpdevice.discovery.handle)
app.add_routes([web.get("/", index)])
app.add_routes([web.get("/relay/create", relay_create_get)])
app.add_routes([web.post("/relay/create", relay_create_post)])
web.run_app(app)

11
pnpdevice/settings.py Normal file
View File

@ -0,0 +1,11 @@
from pathlib import Path
from starlette.config import Config
from starlette.datastructures import CommaSeparatedStrings
config = Config(".env")
CONFIGFILE = config("CONFIGFILE", cast=Path, default="/etc/pnpdevice.toml")
DEBUG = config("DEBUG", cast=bool, default=False)
RELAYS_FAKE = config("RELAYS_FAKE", cast=bool, default=False)
# SECRET_KEY = config("SECRET_KEY", cast=Secret)
# ALLOWED_HOSTS = config("ALLOWED_HOSTS", cast=CommaSeparatedStrings)

View File

@ -5,6 +5,7 @@ authors = [
]
dependencies = [
"fastapi",
"python-multipart",
"tomli-w",
"uvicorn[standard]",
"zeroconf",

3806
rgpio.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -4,11 +4,18 @@
<h2>No relays found.</h2>
{% else %}
<h2>Relays</h2>
<ul>
{% for relay in relays %}
<li>{{ relay.name }}</li>
{% endfor %}
</ul>
<table>
<thead><th>Relay</th><th>State</th></thead>
<tbody>
{% for relay in relays %}
<tr>
<td><a href="/relay/{{ relay.id }}">{{ relay.name }}</td></a></td>
<td>{{ "ON" if relay.state else "OFF"}}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endif %}
<br>
<a href="/relay/create">Create Relay</a>
</html>

View File

@ -1,10 +1,19 @@
<html>
<h1>Relay Creation</h1>
<h2>Existing relays</h2>
<table>
<thead><th>Relay</th><th>Pin</th></thead>
<tbody>
{% for relay in relays %}
<tr><td>{{ relay.name }}</td><td>{{ relay.pin.name }}</td></tr>
{% endfor %}
</table>
<form method="POST" action="/relay/create">
<input type="text" name="name" placeholder="Pool Pump 1"></input>
<select name="chip-and-number">
<select name="pin_id">:
{% for pin in pins %}
<option value="{{ pin.chip }}-{{ pin.number }}">{{ pin.name }}</option>
<option value="{{ pin.chip.number }}-{{ pin.chip.name }}-{{ pin.line_number }}">{{ pin.name }}</option>
{% endfor %}
</select>
<button type="submit">Create</button>

View File

@ -0,0 +1,21 @@
<html>
<h1>Pools'n'Pumps Device</h1>
<h2>Relay "{{relay.name}}"</h2>
<table>
<thead><tr><th>Property</th><th>Value</th></tr></thead>
<tbody>
<tr><td>Chip Number</td><td>{{ relay.pin.chip.number }}</td></tr>
<tr><td>Chip Label</td><td>{{ relay.pin.chip.label }}</td></tr>
<tr><td>Chip Name</td><td>{{ relay.pin.chip.name }}</td></tr>
<tr><td>Pin Name</td><td>{{ relay.pin.name }}</td></tr>
<tr><td>Pin Number</td><td>{{ relay.pin.line_number }}</td></tr>
<tr><td>Relay State</td><td>{{ relay.state }}</td></tr>
</tbody>
</table>
<form method="POST" action="/relay/{{relay.id}}/state">
<input type="hidden" name="pin_id" value="{{ relay.pin.id }}">
<input type="hidden" name="state" value="{{not relay.state}}">
<button type="submit">{{ "Turn OFF" if relay.state else "Turn ON" }}</button>
</form>
<a href="/">Home</a>
</html>