Compare commits

..

No commits in common. "7e43fca7d4e108d6a77db8ae52fce291d653b2a4" and "57d8ff0b39eabae75e1b8e549360897c18421238" have entirely different histories.

13 changed files with 99 additions and 4476 deletions

View File

@ -1,47 +0,0 @@
[[relays]]
name = "Unknown 1"
[relays.chip]
name = "gpiochip0"
number = 0
[relays.pin]
name = "CON2-P22"
line_number = 2
[[relays]]
name = "Waterfall"
[relays.chip]
name = "gpiochip0"
number = 0
[relays.pin]
name = "CON2-P18"
line_number = 68
[[relays]]
name = "Filter"
[relays.chip]
name = "gpiochip0"
number = 0
[relays.pin]
name = "CON2-P16"
line_number = 15
[[relays]]
name = "Unknown 2"
[relays.chip]
name = "gpiochip0"
number = 0
[relays.pin]
name = "CON2-P12"
line_number = 16
[rgpio]
host = "pool.ribble.net"
port = 8889

View File

@ -1,161 +0,0 @@
#!/usr/bin/env python3
import argparse
import dataclasses
import sys
from typing import Dict
import time
import rgpio
# From test C program using wiringPi
# RELAY_PINS = [4, 17, 27, 18, 22, 23, 24, 25]
# Physical pins
# PINS = [7, 11, 13, 15, 12, 16, 18, 22]
# From Banana Pi Zero M2 pinout
RELAY_PINS = ["CON2-P07", "CON2-P11", "CON2-P13", "CON2-P15", "CON2-P12", "CON2-P16", "CON2-P18", "CON2-P22"]
# con2-p22 ?
# p18: waterfall
# p16: filter pump, salt cell
#lgpio.h:#define LG_GPIO_IS_KERNEL 1
#lgpio.h:#define LG_GPIO_IS_OUTPUT 2
#lgpio.h:#define LG_GPIO_IS_ACTIVE_LOW 4
#lgpio.h:#define LG_GPIO_IS_OPEN_DRAIN 8
#lgpio.h:#define LG_GPIO_IS_OPEN_SOURCE 16
#lgpio.h:#define LG_GPIO_IS_PULL_UP 32
#lgpio.h:#define LG_GPIO_IS_PULL_DOWN 64
#lgpio.h:#define LG_GPIO_IS_PULL_NONE 128
#lgpio.h:#define LG_GPIO_IS_INPUT 65536
#lgpio.h:#define LG_GPIO_IS_RISING_EDGE 131072
#lgpio.h:#define LG_GPIO_IS_FALLING_EDGE 262144
#lgpio.h:#define LG_GPIO_IS_REALTIME_CLOCK 524288
FLAG_TO_NAME = {
1: "kernel",
2: "output",
4: "active_low",
8: "open_drain",
16: "open_source",
32: "pull_up",
64: "pull_down",
128: "pull_none",
65536: "input",
131072: "rising_edge",
262144: "falling_edge",
524288: "realtime_clock",
}
@dataclasses.dataclass
class Relay:
name: str
pin: str
RELAY_ON = 0
RELAY_OFF = 1
RELAYS = [
Relay("Light", "CON2-P22"),
Relay("Waterfall", "CON2-P18"),
Relay("Filter", "CON2-P16"),
Relay("Unknown 2", "CON2-P12"),
]
DEVICE = "/dev/ttyUSB0"
def alloff(args, sbc, handle, pin_to_line):
"Turn off all relays"
for relay in RELAYS:
line = pin_to_line[relay.pin]
sbc.gpio_claim_output(handle, line, level=RELAY_OFF)
print_status(sbc, handle, line)
def flags_to_name(flags: int):
"Get the set of flag names based on the provided flag value"
result = set()
for value, name in FLAG_TO_NAME.items():
if flags & value:
result.add(name)
return result
def on(args, sbc, handle, pin_to_line):
"Turn on a single relay"
name_to_pin = {relay.name.lower(): relay.pin for relay in RELAYS}
pin = name_to_pin[args.name.lower()]
line = pin_to_line[pin]
sbc.gpio_claim_output(handle, line, level=RELAY_ON)
sbc.gpio_write(handle, line, RELAY_ON)
print_status(sbc, handle, line)
print(f"{args.name} now on")
if args.duration:
print(f"Waiting {args.duration} minutes ({args.duration / 60.0} hours), then turning off")
time.sleep(args.duration * 60)
sbc.gpio_write(handle, line, RELAY_OFF)
print(f"{args.name} now off")
print_status(sbc, handle, line)
def status(args, sbc, handle, pin_to_line):
pin_to_name = {relay.pin: relay.name for relay in RELAYS}
for pin, name in pin_to_name.items():
line = pin_to_line[pin]
print_status(sbc, handle, line)
def print_status(sbc, handle, line):
okay_status, io_number, flags, pin, user = sbc.gpio_get_line_info(handle, line)
names = flags_to_name(flags)
if okay_status == 0:
print(f"{io_number}): {flags} {pin} {user}: {' '.join(sorted(names))}")
else:
print(f"Failed to get status for {line}")
def map_pin_to_line(sbc, handle):
start_pin, end_pin, name, driver = sbc.gpio_get_chip_info(handle)
pin_to_line = {}
for x in range(start_pin, end_pin):
okay_status, io_number, flags, pin, user = sbc.gpio_get_line_info(handle, x)
flag_names = flags_to_name(flags)
if okay_status != 0:
print(f"Error getting pin {x}: status {okay_status}")
continue
pin_to_line[pin] = x
return pin_to_line
def main():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(required=True, help="the sub-command to execute")
parser_status = subparsers.add_parser("status", help="Show status of the relays")
parser_status.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to")
parser_status.set_defaults(func=status)
parser_alloff = subparsers.add_parser("alloff", help="Turn off all relays")
parser_alloff.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to")
parser_alloff.set_defaults(func=alloff)
parser_on = subparsers.add_parser("on", help="Turn on a relay")
parser_on.add_argument("name", choices=[relay.name for relay in RELAYS])
parser_on.add_argument("-d", "--duration", type=int, default=0, help="Time, in minutes, to keep the relay on")
parser_on.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to")
parser_on.set_defaults(func=on)
args = parser.parse_args()
sbc = rgpio.sbc(host=args.host)
if not sbc.connected:
print("Not connected")
return -1
handle = sbc.gpiochip_open(0) # open /dev/gpiochip0
if handle < 0:
print("Error on open")
return -1
pin_to_line = map_pin_to_line(sbc, handle)
try:
args.func(args, sbc, handle, pin_to_line)
finally:
sbc.gpiochip_close(handle)
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -1,74 +0,0 @@
#!/usr/bin/env python3
"""Example of announcing 250 services (in this case, a fake HTTP server)."""
import argparse
import asyncio
import logging
import socket
from typing import List, Optional
from zeroconf import IPVersion
from zeroconf.asyncio import AsyncServiceInfo, AsyncZeroconf
class AsyncRunner:
def __init__(self, ip_version: IPVersion) -> None:
self.ip_version = ip_version
self.aiozc: Optional[AsyncZeroconf] = None
async def register_services(self, infos: List[AsyncServiceInfo]) -> None:
self.aiozc = AsyncZeroconf(ip_version=self.ip_version)
tasks = [self.aiozc.async_register_service(info) for info in infos]
background_tasks = await asyncio.gather(*tasks)
await asyncio.gather(*background_tasks)
print("Finished registration, press Ctrl-C to exit...")
while True:
await asyncio.sleep(1)
async def unregister_services(self, infos: List[AsyncServiceInfo]) -> None:
assert self.aiozc is not None
tasks = [self.aiozc.async_unregister_service(info) for info in infos]
background_tasks = await asyncio.gather(*tasks)
await asyncio.gather(*background_tasks)
await self.aiozc.async_close()
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
parser = argparse.ArgumentParser()
parser.add_argument('--debug', action='store_true')
version_group = parser.add_mutually_exclusive_group()
version_group.add_argument('--v6', action='store_true')
version_group.add_argument('--v6-only', action='store_true')
args = parser.parse_args()
if args.debug:
logging.getLogger('zeroconf').setLevel(logging.DEBUG)
if args.v6:
ip_version = IPVersion.All
elif args.v6_only:
ip_version = IPVersion.V6Only
else:
ip_version = IPVersion.V4Only
infos = []
for i in range(250):
infos.append(
AsyncServiceInfo(
"_http._tcp.local.",
f"Paul's Test Web Site {i}._http._tcp.local.",
addresses=[socket.inet_aton("127.0.0.1")],
port=80,
properties={'path': '/~paulsm/'},
server=f"zcdemohost-{i}.local.",
)
)
print("Registration of 250 services...")
loop = asyncio.get_event_loop()
runner = AsyncRunner(ip_version)
try:
loop.run_until_complete(runner.register_services(infos))
except KeyboardInterrupt:
loop.run_until_complete(runner.unregister_services(infos))

View File

@ -1,24 +0,0 @@
#!/usr/bin/env python3
from zeroconf import ServiceBrowser, ServiceListener, Zeroconf
class MyListener(ServiceListener):
def update_service(self, zc: Zeroconf, type_: str, name: str) -> None:
print(f"Service {name} updated")
def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None:
print(f"Service {name} removed")
def add_service(self, zc: Zeroconf, type_: str, name: str) -> None:
info = zc.get_service_info(type_, name)
print(f"Service {name} added, service info: {info}")
zeroconf = Zeroconf()
listener = MyListener()
browser = ServiceBrowser(zeroconf, "_http._tcp.local.", listener)
try:
input("Press enter to exit...\n\n")
finally:
zeroconf.close()

8
pnpdevice/config.py Normal file
View File

@ -0,0 +1,8 @@
from starlette.config import Config
config = Config(".env")
DEBUG = config("DEBUG", cast=bool, default=False)
DATABASE = config("DATABASE_URL", cast=databases.DatabaseURL)
SECRET_KEY = config("SECRET_KEY", cast=Secret)
ALLOWED_HOSTS = config("ALLOWED_HOSTS", cast=CommaSeparatedStrings)

View File

@ -3,75 +3,24 @@ import logging
import os import os
import pathlib import pathlib
import tomllib import tomllib
from typing import Dict, Iterable, List, Union from typing import Dict, List, Union
import rgpio
import tomli_w import tomli_w
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
# Max number of chips to scan for @dataclasses.dataclass
MAX_CHIPS = 16
# From http://abyz.me.uk/lg/py_rgpio.html#gpio_get_mode
GPIO_KERNEL_IN_USE = 1<<0
GPIO_KERNEL_OUTPUT = 1<<1
GPIO_KERNEL_ACTIVE_LOW = 1<<2
GPIO_KERNEL_OPEN_DRAIN = 1<<3
GPIO_KERNEL_OPEN_SOURCE = 1<<4
GPIO_KERNEL_PULL_UP = 1<<5
GPIO_KERNEL_PULL_DOWN = 1<<6
GPIO_KERNEL_PULL_OFF = 1<<7
GPIO_LG_INPUT = 1<<8
GPIO_LG_OUTPUT = 1<<9
GPIO_LG_ALERT = 1<<10
GPIO_LG_GROUP = 1<<11
GPIO_LG_NOT_USED1 = 1<<12
GPIO_LG_NOT_USED2 = 1<<13
GPIO_LG_NOT_USED3 = 1<<14
GPIO_LG_NOT_USED4 = 1<<15
GPIO_KERNEL_INPUT = 1<<16
GPIO_KERNEL_RISING_EDGE_ALERT = 1<<17
GPIO_KERNEL_FALLING_EDGE_ALERT = 1<<18
GPIO_KERNEL_REALTIME_CLOCK_ALERT = 1<<19
RELAY_OFF = 1
RELAY_ON = 0
@dataclasses.dataclass(frozen=True)
class Chip:
handle: int
label: str
name: str
number: int
number_of_lines: int
@property
def id(self) -> str:
return f"{self.number}-{self.name}"
@dataclasses.dataclass(frozen=True)
class Pin: class Pin:
chip: Chip chip: str
flags: int number: int
line_number: int
name: str name: str
user: str
@property @dataclasses.dataclass
def id(self) -> str:
return f"{self.chip.id}-{self.line_number}"
@dataclasses.dataclass(frozen=True)
class Relay: class Relay:
pin: Pin pin: Pin
name: str name: str
state: bool state: bool
@property
def id(self) -> str:
return f"{self.pin.id}"
def _deserialize(data: List[Dict[str, str]]) -> List[Relay]: def _deserialize(data: List[Dict[str, str]]) -> List[Relay]:
"Deserialize a list of relays from the config." "Deserialize a list of relays from the config."
return [Relay( return [Relay(
@ -80,12 +29,10 @@ def _deserialize(data: List[Dict[str, str]]) -> List[Relay]:
chip=relay["chip"], chip=relay["chip"],
number=int(relay["pinnumber"]), number=int(relay["pinnumber"]),
name=relay["pinname"], name=relay["pinname"],
), )) for relay in data]
state=False) for relay in data]
def _serialize(data: List[Relay]) -> List[Dict[str, str]]: def _serialize(data: List[Relay]) -> List[Dict[str, str]]:
"Serialize a list of relays to the config." "Serialize a list of relays to the config."
LOGGER.info("Serializing %s", data)
return [{ return [{
"chip": relay.pin.chip, "chip": relay.pin.chip,
"name": relay.name, "name": relay.name,
@ -93,207 +40,71 @@ def _serialize(data: List[Relay]) -> List[Dict[str, str]]:
"pinname": relay.pin.name, "pinname": relay.pin.name,
} for relay in data] } for relay in data]
class Manager: class Relays:
"Class for interacting with relays." "Class for interacting with relays."
def __init__(self, configpath: pathlib.Path, has_fakes: bool): def __init__(self, configpath: pathlib.Path):
try:
self.config = _read_config(configpath)
except FileNotFoundError:
self.config = {
"rgpio": {"host": "localhost", "port": 8889},
"relays": [{
"chip": -1,
"pin": "FAKE-P1",
"name": "Fake",
}],
}
# Immediately write out the default config
_write_config(configpath, self.config)
self.configpath = configpath self.configpath = configpath
self.has_fakes = has_fakes
# Handles to various chips, mapped by chip number
self._chips = {}
# Info on various pins, mapped by (chip number, line_number)
self._pins = {}
# Cached info on the relays, mapped by relay name
self._relays = {}
# Connection to single-board computer GPIO
self.sbc = None
def __iter__(self):
"Provide an iterator for the relays."
return iter(self.relays)
@property
def chips(self) -> List[Chip]:
if not self._chips:
self._load_chips()
return self._chips.values()
def connect(self) -> None:
self.sbc = rgpio.sbc(
host=self.config["rgpio"]["host"],
port=self.config["rgpio"]["port"],
)
if not self.sbc.connected:
LOGGER.warning("Failed to connect sbc")
self.sbc = None
def get_chip_by_number(self, number: int) -> Chip:
try: try:
return self._chips[number] with open(configpath, "rb") as f:
except KeyError: content = tomllib.load(f)
self._load_chip(number) self.config = content
return self._chips[number] except Exception as e:
LOGGER.info("Unable to load config file: %s", e)
self.config = {
"relays": [],
}
self.relays = _deserialize(self.config["relays"])
def get_pin_by_number(self, chip: Chip, line_number: int) -> Pin:
try:
return self._pins[(chip.number, line_number)]
except KeyError:
self._load_pin(chip, line_number)
return self._pins[(chip.number, line_number)]
def get_relay_by_pin_id(self, pin_id: str) -> Relay: def chips_list(self) -> List[str]:
for relay in self.relays: raise NotImplementedError()
if relay.pin.id == pin_id:
return relay
return None
def load_all_pins(self) -> None: def pins_list(self) -> List[Pin]:
self._load_chips() raise NotImplementedError()
self._load_pins()
@property def relay_add(self, chip: str, pinnumber: int, name: str) -> None:
def pins(self) -> List[Pin]:
if not self._pins:
self._load_pins()
return self._pins.values()
@property
def relays(self) -> Iterable[Relay]:
if not self._relays:
for relay in self.config["relays"]:
LOGGER.debug("Relay: %s", relay)
chip = self.get_chip_by_number(relay["chip"]["number"])
pin = self.get_pin_by_number(chip, relay["pin"]["line_number"])
self._relays[relay["name"]] = Relay(
name=relay["name"],
pin=pin,
state=False,
)
return self._relays.values()
def relay_add(self, chip_number: int, chip_name: str, line_number: int, name: str) -> None:
LOGGER.info("Creating relay %s with chip %d %s on line %d", name, chip_number, chip_name, line_number)
chip = self.get_chip_by_number(chip_number)
pin = self.get_pin_by_number(chip, line_number)
self.config["relays"].append({ self.config["relays"].append({
"chip": { "chip": chip,
"name": chip.name,
"number": chip.number,
},
"pin": {
"name": pin.name,
"line_number": pin.line_number,
},
"name": name, "name": name,
"pinnumber": pinnumber,
}) })
_write_config(self.configpath, self.config) with open(self.configpath, "wb") as f:
self._relays = {} tomli_w.dump({
"relays": _serialize(self.config["relays"]),
}, f)
LOGGER.info("Wrote config file %s", self.configpath)
def relay_set(self, relay: Relay, state: bool) -> None: def relays_list(self) -> List[Relay]:
level = RELAY_ON if state else RELAY_OFF return self.relays
ok = self.sbc.gpio_write(relay.pin.chip.handle, relay.pin.line_number, level)
if ok != 0: def relay_on(self, name: str) -> bool:
LOGGER.warn("Failed to set relay level for %s (%s)", relay.name, relay.id) raise NotImplementedError()
else:
LOGGER.info("Set relay %s (%s) to %s", relay.name, relay.id, "enabled" if state else "disabled")
okay, offset, flags, name, user = self.sbc.gpio_get_line_info(relay.pin.chip.handle, relay.pin.line_number)
LOGGER.info("Flags are now %d", flags)
self._relays[relay.name] = Relay(
name=relay.name,
pin=relay.pin,
state=state,
)
def relay_set(self, name: str, state: bool) -> None:
raise NotImplementedError()
def shutdown(self) -> None: class RelaysFake(Relays):
_write_config(self.configpath, self.config) "Class for fake relays, useful for testing."
def chips_list(self) -> List[str]:
return ["chipA", "chipB"]
def _load_chip(self, number: int) -> None: def pins_list(self) -> List[Pin]:
try: return [
handle = self.sbc.gpiochip_open(number) Pin(chip="chipA", number=1, name="CON2-P1"),
except rgpio.error: Pin(chip="chipA", number=2, name="CON2-P2"),
return Pin(chip="chipA", number=3, name="CON2-P10"),
if handle < 0: Pin(chip="chipB", number=1, name="CON2-P3"),
return Pin(chip="chipB", number=2, name="CON2-P7"),
okay, number_of_lines, name, label = self.sbc.gpio_get_chip_info(handle) ]
LOGGER.info("Chip info: %s %s %s %s", okay, number_of_lines, name, label)
if okay != 0:
LOGGER.warn("Chip %s not okay.", name)
return
self._chips[number] = Chip(
handle=handle,
label=label,
name=name,
number=number,
number_of_lines=number_of_lines,
)
# Talk to the remote pins daemon and get information about the chips def relays_list(self) -> List[Relay]:
def _load_chips(self) -> None: raise NotImplementedError()
for i in range(MAX_CHIPS):
self._load_chip(i) def relay_on(self, name: str) -> bool:
raise NotImplementedError()
def _load_pin(self, chip: Chip, line_number: int) -> None: def relay_set(self, name: str, state: bool) -> None:
okay, offset, flags, name, user = self.sbc.gpio_get_line_info(chip.handle, line_number) raise NotImplementedError()
LOGGER.info("Got line info: %s %s %s %s", offset, flags, name, user)
assert offset == line_number
if okay != 0:
LOGGER.warn("Line %s is not okay", name)
return
if not name:
LOGGER.warn("Ignoring line %d because it has no name.", line_number)
return
# Claim the pin for output, set initially to relay off.
okay = self.sbc.gpio_claim_output(
handle=chip.handle,
gpio=line_number,
level=RELAY_OFF,
)
if okay != 0:
LOGGER.warn("Failed to claim pin %s", line_number)
return
okay = self.sbc.gpio_write(chip.handle, line_number, RELAY_OFF)
if okay != 0:
LOGGER.warn("Failed to write pin %s", line_number)
return
okay, offset, flags, name, user = self.sbc.gpio_get_line_info(chip.handle, line_number)
LOGGER.info("Flags are %d", flags)
self._pins[(chip.number, line_number)] = Pin(
chip=chip,
flags=flags,
line_number=line_number,
name=name,
user=user,
)
# Talk to the remote pins daemon and get information about the pins
def _load_pins(self) -> None:
LOGGER.info("Loading pins")
for chip in self.chips:
for line in range(chip.number_of_lines):
self._load_pin(chip, line)
def _read_config(configpath: pathlib.Path) -> None:
with open(configpath, "rb") as f:
content = tomllib.load(f)
LOGGER.info("Read config from %s", configpath)
return content
def _write_config(configpath: pathlib.Path, config) -> None:
with open(configpath, "wb") as f:
tomli_w.dump(config, f)
LOGGER.info("Wrote config file %s", configpath)
class RelaysReal(Relays):
"Class for controlling real relays."

View File

@ -1,83 +1,48 @@
from contextlib import asynccontextmanager from fastapi import FastAPI, Request
import logging
from fastapi import FastAPI, Form, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
from starlette.responses import RedirectResponse from starlette.responses import RedirectResponse
import jinja2 import jinja2
from pnpdevice import settings from pnpdevice.config import config
from pnpdevice.relays import Manager
import pnpdevice.discovery import pnpdevice.discovery
import pnpdevice.relays import pnpdevice.relays
LOGGER = logging.getLogger(__name__) app = FastAPI(debug=config.DEBUG)
logging.basicConfig(level=logging.DEBUG)
@asynccontextmanager
async def lifespan(app: FastAPI):
relays.connect()
yield
# relays.shutdown()
app = FastAPI(debug=settings.DEBUG, lifespan=lifespan)
relays = Manager(settings.CONFIGFILE, settings.RELAYS_FAKE)
templates = Jinja2Templates(directory="templates") templates = Jinja2Templates(directory="templates")
@app.get("/", response_class=HTMLResponse) @app.get("/")
def index(request: Request): def index():
relays = request.app["relays"]
return templates.TemplateResponse("index.template.html", { return templates.TemplateResponse("index.template.html", {
"request": request, "relays": relays.relays_list(),
"relays": relays,
}) })
@app.get("/relay/create") @app.get("/relay/create")
def relay_create_get(request: Request): def relay_create_get(request: Request):
"Get the form to create a new relay." "Get the form to create a new relay."
relays.load_all_pins() relays = request.app["relays"]
used_pin_names = {relay.pin.name for relay in relays} pins = relays.pins_list()
pins = [p for p in relays.pins if p.name not in used_pin_names] return templates.TemplateResponse("relay-create.template.html", {"pins": pins})
sorted_pins = sorted(pins, key=lambda p: p.name)
return templates.TemplateResponse("relay-create.template.html", {
"relays": relays,
"request": request,
"pins": sorted_pins,
})
@app.post("/relay/create") async def relay_create_post(request: Request, chip_and_number: str, name: str):
def relay_create_post( "Create a new relay."
request: Request, chip, number = chip_and_number.partition("-")
pin_id: str = Form(...), LOGGER.info("Creating relay %s %s with name %s", chip, number, name)
name: str = Form(...),
):
"Create a new relay from form POST."
chip_number, chip_name, line_number = pin_id.split("-")
chip_number = int(chip_number)
line_number = int(line_number)
relays.relay_add(chip_number, chip_name, line_number, name)
return RedirectResponse(status_code=303, url="/")
@app.get("/relay/{pin_id}")
def relay_detail_get(request: Request, pin_id: str):
"Get details on a single relay."
relay = relays.get_relay_by_pin_id(pin_id)
return templates.TemplateResponse("relay-detail.template.html", {
"request": request,
"relay": relay
})
@app.post("/relay/{pin_id}/state")
def relay_state_post(
request: Request,
pin_id: str,
state: bool = Form(...),
):
"Change the state of a relay."
relay = relays.get_relay_by_pin_id(pin_id)
relays.relay_set(relay, state)
return RedirectResponse(status_code=303, url="/") return RedirectResponse(status_code=303, url="/")
async def status(request: Request): async def status(request: Request):
return html("Status") return html("Status")
def run(relays: pnpdevice.relays.Relays):
"Run the embedded web server"
app = web.Application()
app["relays"] = relays
aiohttp_jinja2.setup(app,
loader=jinja2.FileSystemLoader("templates"))
app.on_startup.append(pnpdevice.discovery.handle)
app.add_routes([web.get("/", index)])
app.add_routes([web.get("/relay/create", relay_create_get)])
app.add_routes([web.post("/relay/create", relay_create_post)])
web.run_app(app)

View File

@ -1,11 +0,0 @@
from pathlib import Path
from starlette.config import Config
from starlette.datastructures import CommaSeparatedStrings
config = Config(".env")
CONFIGFILE = config("CONFIGFILE", cast=Path, default="/etc/pnpdevice.toml")
DEBUG = config("DEBUG", cast=bool, default=False)
RELAYS_FAKE = config("RELAYS_FAKE", cast=bool, default=False)
# SECRET_KEY = config("SECRET_KEY", cast=Secret)
# ALLOWED_HOSTS = config("ALLOWED_HOSTS", cast=CommaSeparatedStrings)

View File

@ -5,7 +5,6 @@ authors = [
] ]
dependencies = [ dependencies = [
"fastapi", "fastapi",
"python-multipart",
"tomli-w", "tomli-w",
"uvicorn[standard]", "uvicorn[standard]",
"zeroconf", "zeroconf",

3806
rgpio.py

File diff suppressed because it is too large Load Diff

View File

@ -4,18 +4,11 @@
<h2>No relays found.</h2> <h2>No relays found.</h2>
{% else %} {% else %}
<h2>Relays</h2> <h2>Relays</h2>
<table> <ul>
<thead><th>Relay</th><th>State</th></thead> {% for relay in relays %}
<tbody> <li>{{ relay.name }}</li>
{% for relay in relays %} {% endfor %}
<tr> </ul>
<td><a href="/relay/{{ relay.id }}">{{ relay.name }}</td></a></td>
<td>{{ "ON" if relay.state else "OFF"}}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endif %} {% endif %}
<br>
<a href="/relay/create">Create Relay</a> <a href="/relay/create">Create Relay</a>
</html> </html>

View File

@ -1,19 +1,10 @@
<html> <html>
<h1>Relay Creation</h1> <h1>Relay Creation</h1>
<h2>Existing relays</h2>
<table>
<thead><th>Relay</th><th>Pin</th></thead>
<tbody>
{% for relay in relays %}
<tr><td>{{ relay.name }}</td><td>{{ relay.pin.name }}</td></tr>
{% endfor %}
</table>
<form method="POST" action="/relay/create"> <form method="POST" action="/relay/create">
<input type="text" name="name" placeholder="Pool Pump 1"></input> <input type="text" name="name" placeholder="Pool Pump 1"></input>
<select name="pin_id">: <select name="chip-and-number">
{% for pin in pins %} {% for pin in pins %}
<option value="{{ pin.chip.number }}-{{ pin.chip.name }}-{{ pin.line_number }}">{{ pin.name }}</option> <option value="{{ pin.chip }}-{{ pin.number }}">{{ pin.name }}</option>
{% endfor %} {% endfor %}
</select> </select>
<button type="submit">Create</button> <button type="submit">Create</button>

View File

@ -1,21 +0,0 @@
<html>
<h1>Pools'n'Pumps Device</h1>
<h2>Relay "{{relay.name}}"</h2>
<table>
<thead><tr><th>Property</th><th>Value</th></tr></thead>
<tbody>
<tr><td>Chip Number</td><td>{{ relay.pin.chip.number }}</td></tr>
<tr><td>Chip Label</td><td>{{ relay.pin.chip.label }}</td></tr>
<tr><td>Chip Name</td><td>{{ relay.pin.chip.name }}</td></tr>
<tr><td>Pin Name</td><td>{{ relay.pin.name }}</td></tr>
<tr><td>Pin Number</td><td>{{ relay.pin.line_number }}</td></tr>
<tr><td>Relay State</td><td>{{ relay.state }}</td></tr>
</tbody>
</table>
<form method="POST" action="/relay/{{relay.id}}/state">
<input type="hidden" name="pin_id" value="{{ relay.pin.id }}">
<input type="hidden" name="state" value="{{not relay.state}}">
<button type="submit">{{ "Turn OFF" if relay.state else "Turn ON" }}</button>
</form>
<a href="/">Home</a>
</html>