Compare commits
No commits in common. "7e43fca7d4e108d6a77db8ae52fce291d653b2a4" and "57d8ff0b39eabae75e1b8e549360897c18421238" have entirely different histories.
7e43fca7d4
...
57d8ff0b39
47
config.toml
47
config.toml
|
@ -1,47 +0,0 @@
|
|||
[[relays]]
|
||||
name = "Unknown 1"
|
||||
|
||||
[relays.chip]
|
||||
name = "gpiochip0"
|
||||
number = 0
|
||||
|
||||
[relays.pin]
|
||||
name = "CON2-P22"
|
||||
line_number = 2
|
||||
|
||||
[[relays]]
|
||||
name = "Waterfall"
|
||||
|
||||
[relays.chip]
|
||||
name = "gpiochip0"
|
||||
number = 0
|
||||
|
||||
[relays.pin]
|
||||
name = "CON2-P18"
|
||||
line_number = 68
|
||||
|
||||
[[relays]]
|
||||
name = "Filter"
|
||||
|
||||
[relays.chip]
|
||||
name = "gpiochip0"
|
||||
number = 0
|
||||
|
||||
[relays.pin]
|
||||
name = "CON2-P16"
|
||||
line_number = 15
|
||||
|
||||
[[relays]]
|
||||
name = "Unknown 2"
|
||||
|
||||
[relays.chip]
|
||||
name = "gpiochip0"
|
||||
number = 0
|
||||
|
||||
[relays.pin]
|
||||
name = "CON2-P12"
|
||||
line_number = 16
|
||||
|
||||
[rgpio]
|
||||
host = "pool.ribble.net"
|
||||
port = 8889
|
161
daemon-test.py
161
daemon-test.py
|
@ -1,161 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import dataclasses
|
||||
import sys
|
||||
from typing import Dict
|
||||
import time
|
||||
|
||||
import rgpio
|
||||
# From test C program using wiringPi
|
||||
# RELAY_PINS = [4, 17, 27, 18, 22, 23, 24, 25]
|
||||
# Physical pins
|
||||
# PINS = [7, 11, 13, 15, 12, 16, 18, 22]
|
||||
# From Banana Pi Zero M2 pinout
|
||||
RELAY_PINS = ["CON2-P07", "CON2-P11", "CON2-P13", "CON2-P15", "CON2-P12", "CON2-P16", "CON2-P18", "CON2-P22"]
|
||||
# con2-p22 ?
|
||||
# p18: waterfall
|
||||
# p16: filter pump, salt cell
|
||||
|
||||
#lgpio.h:#define LG_GPIO_IS_KERNEL 1
|
||||
#lgpio.h:#define LG_GPIO_IS_OUTPUT 2
|
||||
#lgpio.h:#define LG_GPIO_IS_ACTIVE_LOW 4
|
||||
#lgpio.h:#define LG_GPIO_IS_OPEN_DRAIN 8
|
||||
#lgpio.h:#define LG_GPIO_IS_OPEN_SOURCE 16
|
||||
#lgpio.h:#define LG_GPIO_IS_PULL_UP 32
|
||||
#lgpio.h:#define LG_GPIO_IS_PULL_DOWN 64
|
||||
#lgpio.h:#define LG_GPIO_IS_PULL_NONE 128
|
||||
#lgpio.h:#define LG_GPIO_IS_INPUT 65536
|
||||
#lgpio.h:#define LG_GPIO_IS_RISING_EDGE 131072
|
||||
#lgpio.h:#define LG_GPIO_IS_FALLING_EDGE 262144
|
||||
#lgpio.h:#define LG_GPIO_IS_REALTIME_CLOCK 524288
|
||||
|
||||
FLAG_TO_NAME = {
|
||||
1: "kernel",
|
||||
2: "output",
|
||||
4: "active_low",
|
||||
8: "open_drain",
|
||||
16: "open_source",
|
||||
32: "pull_up",
|
||||
64: "pull_down",
|
||||
128: "pull_none",
|
||||
65536: "input",
|
||||
131072: "rising_edge",
|
||||
262144: "falling_edge",
|
||||
524288: "realtime_clock",
|
||||
}
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class Relay:
|
||||
name: str
|
||||
pin: str
|
||||
|
||||
RELAY_ON = 0
|
||||
RELAY_OFF = 1
|
||||
|
||||
RELAYS = [
|
||||
Relay("Light", "CON2-P22"),
|
||||
Relay("Waterfall", "CON2-P18"),
|
||||
Relay("Filter", "CON2-P16"),
|
||||
Relay("Unknown 2", "CON2-P12"),
|
||||
]
|
||||
DEVICE = "/dev/ttyUSB0"
|
||||
|
||||
def alloff(args, sbc, handle, pin_to_line):
|
||||
"Turn off all relays"
|
||||
for relay in RELAYS:
|
||||
line = pin_to_line[relay.pin]
|
||||
sbc.gpio_claim_output(handle, line, level=RELAY_OFF)
|
||||
print_status(sbc, handle, line)
|
||||
|
||||
def flags_to_name(flags: int):
|
||||
"Get the set of flag names based on the provided flag value"
|
||||
result = set()
|
||||
for value, name in FLAG_TO_NAME.items():
|
||||
if flags & value:
|
||||
result.add(name)
|
||||
return result
|
||||
|
||||
def on(args, sbc, handle, pin_to_line):
|
||||
"Turn on a single relay"
|
||||
name_to_pin = {relay.name.lower(): relay.pin for relay in RELAYS}
|
||||
pin = name_to_pin[args.name.lower()]
|
||||
line = pin_to_line[pin]
|
||||
sbc.gpio_claim_output(handle, line, level=RELAY_ON)
|
||||
sbc.gpio_write(handle, line, RELAY_ON)
|
||||
print_status(sbc, handle, line)
|
||||
print(f"{args.name} now on")
|
||||
if args.duration:
|
||||
print(f"Waiting {args.duration} minutes ({args.duration / 60.0} hours), then turning off")
|
||||
time.sleep(args.duration * 60)
|
||||
sbc.gpio_write(handle, line, RELAY_OFF)
|
||||
print(f"{args.name} now off")
|
||||
print_status(sbc, handle, line)
|
||||
|
||||
|
||||
def status(args, sbc, handle, pin_to_line):
|
||||
pin_to_name = {relay.pin: relay.name for relay in RELAYS}
|
||||
for pin, name in pin_to_name.items():
|
||||
line = pin_to_line[pin]
|
||||
print_status(sbc, handle, line)
|
||||
|
||||
def print_status(sbc, handle, line):
|
||||
okay_status, io_number, flags, pin, user = sbc.gpio_get_line_info(handle, line)
|
||||
names = flags_to_name(flags)
|
||||
if okay_status == 0:
|
||||
print(f"{io_number}): {flags} {pin} {user}: {' '.join(sorted(names))}")
|
||||
else:
|
||||
print(f"Failed to get status for {line}")
|
||||
|
||||
def map_pin_to_line(sbc, handle):
|
||||
start_pin, end_pin, name, driver = sbc.gpio_get_chip_info(handle)
|
||||
pin_to_line = {}
|
||||
for x in range(start_pin, end_pin):
|
||||
okay_status, io_number, flags, pin, user = sbc.gpio_get_line_info(handle, x)
|
||||
flag_names = flags_to_name(flags)
|
||||
if okay_status != 0:
|
||||
print(f"Error getting pin {x}: status {okay_status}")
|
||||
continue
|
||||
pin_to_line[pin] = x
|
||||
return pin_to_line
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
subparsers = parser.add_subparsers(required=True, help="the sub-command to execute")
|
||||
|
||||
parser_status = subparsers.add_parser("status", help="Show status of the relays")
|
||||
parser_status.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to")
|
||||
parser_status.set_defaults(func=status)
|
||||
|
||||
parser_alloff = subparsers.add_parser("alloff", help="Turn off all relays")
|
||||
parser_alloff.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to")
|
||||
parser_alloff.set_defaults(func=alloff)
|
||||
|
||||
parser_on = subparsers.add_parser("on", help="Turn on a relay")
|
||||
parser_on.add_argument("name", choices=[relay.name for relay in RELAYS])
|
||||
parser_on.add_argument("-d", "--duration", type=int, default=0, help="Time, in minutes, to keep the relay on")
|
||||
parser_on.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to")
|
||||
parser_on.set_defaults(func=on)
|
||||
args = parser.parse_args()
|
||||
|
||||
sbc = rgpio.sbc(host=args.host)
|
||||
if not sbc.connected:
|
||||
print("Not connected")
|
||||
return -1
|
||||
|
||||
handle = sbc.gpiochip_open(0) # open /dev/gpiochip0
|
||||
if handle < 0:
|
||||
print("Error on open")
|
||||
return -1
|
||||
|
||||
pin_to_line = map_pin_to_line(sbc, handle)
|
||||
|
||||
try:
|
||||
args.func(args, sbc, handle, pin_to_line)
|
||||
finally:
|
||||
sbc.gpiochip_close(handle)
|
||||
|
||||
return 0
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
|
@ -1,74 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Example of announcing 250 services (in this case, a fake HTTP server)."""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import logging
|
||||
import socket
|
||||
from typing import List, Optional
|
||||
|
||||
from zeroconf import IPVersion
|
||||
from zeroconf.asyncio import AsyncServiceInfo, AsyncZeroconf
|
||||
|
||||
|
||||
class AsyncRunner:
|
||||
def __init__(self, ip_version: IPVersion) -> None:
|
||||
self.ip_version = ip_version
|
||||
self.aiozc: Optional[AsyncZeroconf] = None
|
||||
|
||||
async def register_services(self, infos: List[AsyncServiceInfo]) -> None:
|
||||
self.aiozc = AsyncZeroconf(ip_version=self.ip_version)
|
||||
tasks = [self.aiozc.async_register_service(info) for info in infos]
|
||||
background_tasks = await asyncio.gather(*tasks)
|
||||
await asyncio.gather(*background_tasks)
|
||||
print("Finished registration, press Ctrl-C to exit...")
|
||||
while True:
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def unregister_services(self, infos: List[AsyncServiceInfo]) -> None:
|
||||
assert self.aiozc is not None
|
||||
tasks = [self.aiozc.async_unregister_service(info) for info in infos]
|
||||
background_tasks = await asyncio.gather(*tasks)
|
||||
await asyncio.gather(*background_tasks)
|
||||
await self.aiozc.async_close()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--debug', action='store_true')
|
||||
version_group = parser.add_mutually_exclusive_group()
|
||||
version_group.add_argument('--v6', action='store_true')
|
||||
version_group.add_argument('--v6-only', action='store_true')
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.debug:
|
||||
logging.getLogger('zeroconf').setLevel(logging.DEBUG)
|
||||
if args.v6:
|
||||
ip_version = IPVersion.All
|
||||
elif args.v6_only:
|
||||
ip_version = IPVersion.V6Only
|
||||
else:
|
||||
ip_version = IPVersion.V4Only
|
||||
|
||||
infos = []
|
||||
for i in range(250):
|
||||
infos.append(
|
||||
AsyncServiceInfo(
|
||||
"_http._tcp.local.",
|
||||
f"Paul's Test Web Site {i}._http._tcp.local.",
|
||||
addresses=[socket.inet_aton("127.0.0.1")],
|
||||
port=80,
|
||||
properties={'path': '/~paulsm/'},
|
||||
server=f"zcdemohost-{i}.local.",
|
||||
)
|
||||
)
|
||||
|
||||
print("Registration of 250 services...")
|
||||
loop = asyncio.get_event_loop()
|
||||
runner = AsyncRunner(ip_version)
|
||||
try:
|
||||
loop.run_until_complete(runner.register_services(infos))
|
||||
except KeyboardInterrupt:
|
||||
loop.run_until_complete(runner.unregister_services(infos))
|
|
@ -1,24 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
from zeroconf import ServiceBrowser, ServiceListener, Zeroconf
|
||||
|
||||
|
||||
class MyListener(ServiceListener):
|
||||
|
||||
def update_service(self, zc: Zeroconf, type_: str, name: str) -> None:
|
||||
print(f"Service {name} updated")
|
||||
|
||||
def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None:
|
||||
print(f"Service {name} removed")
|
||||
|
||||
def add_service(self, zc: Zeroconf, type_: str, name: str) -> None:
|
||||
info = zc.get_service_info(type_, name)
|
||||
print(f"Service {name} added, service info: {info}")
|
||||
|
||||
|
||||
zeroconf = Zeroconf()
|
||||
listener = MyListener()
|
||||
browser = ServiceBrowser(zeroconf, "_http._tcp.local.", listener)
|
||||
try:
|
||||
input("Press enter to exit...\n\n")
|
||||
finally:
|
||||
zeroconf.close()
|
|
@ -0,0 +1,8 @@
|
|||
from starlette.config import Config
|
||||
|
||||
config = Config(".env")
|
||||
|
||||
DEBUG = config("DEBUG", cast=bool, default=False)
|
||||
DATABASE = config("DATABASE_URL", cast=databases.DatabaseURL)
|
||||
SECRET_KEY = config("SECRET_KEY", cast=Secret)
|
||||
ALLOWED_HOSTS = config("ALLOWED_HOSTS", cast=CommaSeparatedStrings)
|
|
@ -3,75 +3,24 @@ import logging
|
|||
import os
|
||||
import pathlib
|
||||
import tomllib
|
||||
from typing import Dict, Iterable, List, Union
|
||||
from typing import Dict, List, Union
|
||||
|
||||
import rgpio
|
||||
import tomli_w
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
# Max number of chips to scan for
|
||||
MAX_CHIPS = 16
|
||||
|
||||
# From http://abyz.me.uk/lg/py_rgpio.html#gpio_get_mode
|
||||
GPIO_KERNEL_IN_USE = 1<<0
|
||||
GPIO_KERNEL_OUTPUT = 1<<1
|
||||
GPIO_KERNEL_ACTIVE_LOW = 1<<2
|
||||
GPIO_KERNEL_OPEN_DRAIN = 1<<3
|
||||
GPIO_KERNEL_OPEN_SOURCE = 1<<4
|
||||
GPIO_KERNEL_PULL_UP = 1<<5
|
||||
GPIO_KERNEL_PULL_DOWN = 1<<6
|
||||
GPIO_KERNEL_PULL_OFF = 1<<7
|
||||
GPIO_LG_INPUT = 1<<8
|
||||
GPIO_LG_OUTPUT = 1<<9
|
||||
GPIO_LG_ALERT = 1<<10
|
||||
GPIO_LG_GROUP = 1<<11
|
||||
GPIO_LG_NOT_USED1 = 1<<12
|
||||
GPIO_LG_NOT_USED2 = 1<<13
|
||||
GPIO_LG_NOT_USED3 = 1<<14
|
||||
GPIO_LG_NOT_USED4 = 1<<15
|
||||
GPIO_KERNEL_INPUT = 1<<16
|
||||
GPIO_KERNEL_RISING_EDGE_ALERT = 1<<17
|
||||
GPIO_KERNEL_FALLING_EDGE_ALERT = 1<<18
|
||||
GPIO_KERNEL_REALTIME_CLOCK_ALERT = 1<<19
|
||||
|
||||
RELAY_OFF = 1
|
||||
RELAY_ON = 0
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class Chip:
|
||||
handle: int
|
||||
label: str
|
||||
name: str
|
||||
number: int
|
||||
number_of_lines: int
|
||||
|
||||
@property
|
||||
def id(self) -> str:
|
||||
return f"{self.number}-{self.name}"
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
@dataclasses.dataclass
|
||||
class Pin:
|
||||
chip: Chip
|
||||
flags: int
|
||||
line_number: int
|
||||
chip: str
|
||||
number: int
|
||||
name: str
|
||||
user: str
|
||||
|
||||
@property
|
||||
def id(self) -> str:
|
||||
return f"{self.chip.id}-{self.line_number}"
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
@dataclasses.dataclass
|
||||
class Relay:
|
||||
pin: Pin
|
||||
name: str
|
||||
state: bool
|
||||
|
||||
@property
|
||||
def id(self) -> str:
|
||||
return f"{self.pin.id}"
|
||||
|
||||
def _deserialize(data: List[Dict[str, str]]) -> List[Relay]:
|
||||
"Deserialize a list of relays from the config."
|
||||
return [Relay(
|
||||
|
@ -80,12 +29,10 @@ def _deserialize(data: List[Dict[str, str]]) -> List[Relay]:
|
|||
chip=relay["chip"],
|
||||
number=int(relay["pinnumber"]),
|
||||
name=relay["pinname"],
|
||||
),
|
||||
state=False) for relay in data]
|
||||
)) for relay in data]
|
||||
|
||||
def _serialize(data: List[Relay]) -> List[Dict[str, str]]:
|
||||
"Serialize a list of relays to the config."
|
||||
LOGGER.info("Serializing %s", data)
|
||||
return [{
|
||||
"chip": relay.pin.chip,
|
||||
"name": relay.name,
|
||||
|
@ -93,207 +40,71 @@ def _serialize(data: List[Relay]) -> List[Dict[str, str]]:
|
|||
"pinname": relay.pin.name,
|
||||
} for relay in data]
|
||||
|
||||
class Manager:
|
||||
class Relays:
|
||||
"Class for interacting with relays."
|
||||
def __init__(self, configpath: pathlib.Path, has_fakes: bool):
|
||||
try:
|
||||
self.config = _read_config(configpath)
|
||||
except FileNotFoundError:
|
||||
self.config = {
|
||||
"rgpio": {"host": "localhost", "port": 8889},
|
||||
"relays": [{
|
||||
"chip": -1,
|
||||
"pin": "FAKE-P1",
|
||||
"name": "Fake",
|
||||
}],
|
||||
}
|
||||
# Immediately write out the default config
|
||||
_write_config(configpath, self.config)
|
||||
def __init__(self, configpath: pathlib.Path):
|
||||
self.configpath = configpath
|
||||
self.has_fakes = has_fakes
|
||||
|
||||
# Handles to various chips, mapped by chip number
|
||||
self._chips = {}
|
||||
# Info on various pins, mapped by (chip number, line_number)
|
||||
self._pins = {}
|
||||
# Cached info on the relays, mapped by relay name
|
||||
self._relays = {}
|
||||
# Connection to single-board computer GPIO
|
||||
self.sbc = None
|
||||
|
||||
def __iter__(self):
|
||||
"Provide an iterator for the relays."
|
||||
return iter(self.relays)
|
||||
|
||||
@property
|
||||
def chips(self) -> List[Chip]:
|
||||
if not self._chips:
|
||||
self._load_chips()
|
||||
return self._chips.values()
|
||||
|
||||
def connect(self) -> None:
|
||||
self.sbc = rgpio.sbc(
|
||||
host=self.config["rgpio"]["host"],
|
||||
port=self.config["rgpio"]["port"],
|
||||
)
|
||||
if not self.sbc.connected:
|
||||
LOGGER.warning("Failed to connect sbc")
|
||||
self.sbc = None
|
||||
|
||||
def get_chip_by_number(self, number: int) -> Chip:
|
||||
try:
|
||||
return self._chips[number]
|
||||
except KeyError:
|
||||
self._load_chip(number)
|
||||
return self._chips[number]
|
||||
|
||||
def get_pin_by_number(self, chip: Chip, line_number: int) -> Pin:
|
||||
try:
|
||||
return self._pins[(chip.number, line_number)]
|
||||
except KeyError:
|
||||
self._load_pin(chip, line_number)
|
||||
return self._pins[(chip.number, line_number)]
|
||||
|
||||
def get_relay_by_pin_id(self, pin_id: str) -> Relay:
|
||||
for relay in self.relays:
|
||||
if relay.pin.id == pin_id:
|
||||
return relay
|
||||
return None
|
||||
|
||||
def load_all_pins(self) -> None:
|
||||
self._load_chips()
|
||||
self._load_pins()
|
||||
|
||||
@property
|
||||
def pins(self) -> List[Pin]:
|
||||
if not self._pins:
|
||||
self._load_pins()
|
||||
return self._pins.values()
|
||||
|
||||
@property
|
||||
def relays(self) -> Iterable[Relay]:
|
||||
if not self._relays:
|
||||
for relay in self.config["relays"]:
|
||||
LOGGER.debug("Relay: %s", relay)
|
||||
chip = self.get_chip_by_number(relay["chip"]["number"])
|
||||
pin = self.get_pin_by_number(chip, relay["pin"]["line_number"])
|
||||
self._relays[relay["name"]] = Relay(
|
||||
name=relay["name"],
|
||||
pin=pin,
|
||||
state=False,
|
||||
)
|
||||
return self._relays.values()
|
||||
|
||||
def relay_add(self, chip_number: int, chip_name: str, line_number: int, name: str) -> None:
|
||||
LOGGER.info("Creating relay %s with chip %d %s on line %d", name, chip_number, chip_name, line_number)
|
||||
chip = self.get_chip_by_number(chip_number)
|
||||
pin = self.get_pin_by_number(chip, line_number)
|
||||
self.config["relays"].append({
|
||||
"chip": {
|
||||
"name": chip.name,
|
||||
"number": chip.number,
|
||||
},
|
||||
"pin": {
|
||||
"name": pin.name,
|
||||
"line_number": pin.line_number,
|
||||
},
|
||||
"name": name,
|
||||
})
|
||||
_write_config(self.configpath, self.config)
|
||||
self._relays = {}
|
||||
|
||||
def relay_set(self, relay: Relay, state: bool) -> None:
|
||||
level = RELAY_ON if state else RELAY_OFF
|
||||
ok = self.sbc.gpio_write(relay.pin.chip.handle, relay.pin.line_number, level)
|
||||
if ok != 0:
|
||||
LOGGER.warn("Failed to set relay level for %s (%s)", relay.name, relay.id)
|
||||
else:
|
||||
LOGGER.info("Set relay %s (%s) to %s", relay.name, relay.id, "enabled" if state else "disabled")
|
||||
okay, offset, flags, name, user = self.sbc.gpio_get_line_info(relay.pin.chip.handle, relay.pin.line_number)
|
||||
LOGGER.info("Flags are now %d", flags)
|
||||
self._relays[relay.name] = Relay(
|
||||
name=relay.name,
|
||||
pin=relay.pin,
|
||||
state=state,
|
||||
)
|
||||
|
||||
|
||||
def shutdown(self) -> None:
|
||||
_write_config(self.configpath, self.config)
|
||||
|
||||
def _load_chip(self, number: int) -> None:
|
||||
try:
|
||||
handle = self.sbc.gpiochip_open(number)
|
||||
except rgpio.error:
|
||||
return
|
||||
if handle < 0:
|
||||
return
|
||||
okay, number_of_lines, name, label = self.sbc.gpio_get_chip_info(handle)
|
||||
LOGGER.info("Chip info: %s %s %s %s", okay, number_of_lines, name, label)
|
||||
if okay != 0:
|
||||
LOGGER.warn("Chip %s not okay.", name)
|
||||
return
|
||||
self._chips[number] = Chip(
|
||||
handle=handle,
|
||||
label=label,
|
||||
name=name,
|
||||
number=number,
|
||||
number_of_lines=number_of_lines,
|
||||
)
|
||||
|
||||
# Talk to the remote pins daemon and get information about the chips
|
||||
def _load_chips(self) -> None:
|
||||
for i in range(MAX_CHIPS):
|
||||
self._load_chip(i)
|
||||
|
||||
def _load_pin(self, chip: Chip, line_number: int) -> None:
|
||||
okay, offset, flags, name, user = self.sbc.gpio_get_line_info(chip.handle, line_number)
|
||||
LOGGER.info("Got line info: %s %s %s %s", offset, flags, name, user)
|
||||
assert offset == line_number
|
||||
if okay != 0:
|
||||
LOGGER.warn("Line %s is not okay", name)
|
||||
return
|
||||
if not name:
|
||||
LOGGER.warn("Ignoring line %d because it has no name.", line_number)
|
||||
return
|
||||
# Claim the pin for output, set initially to relay off.
|
||||
okay = self.sbc.gpio_claim_output(
|
||||
handle=chip.handle,
|
||||
gpio=line_number,
|
||||
level=RELAY_OFF,
|
||||
)
|
||||
if okay != 0:
|
||||
LOGGER.warn("Failed to claim pin %s", line_number)
|
||||
return
|
||||
okay = self.sbc.gpio_write(chip.handle, line_number, RELAY_OFF)
|
||||
if okay != 0:
|
||||
LOGGER.warn("Failed to write pin %s", line_number)
|
||||
return
|
||||
okay, offset, flags, name, user = self.sbc.gpio_get_line_info(chip.handle, line_number)
|
||||
LOGGER.info("Flags are %d", flags)
|
||||
self._pins[(chip.number, line_number)] = Pin(
|
||||
chip=chip,
|
||||
flags=flags,
|
||||
line_number=line_number,
|
||||
name=name,
|
||||
user=user,
|
||||
)
|
||||
|
||||
# Talk to the remote pins daemon and get information about the pins
|
||||
def _load_pins(self) -> None:
|
||||
LOGGER.info("Loading pins")
|
||||
for chip in self.chips:
|
||||
for line in range(chip.number_of_lines):
|
||||
self._load_pin(chip, line)
|
||||
|
||||
def _read_config(configpath: pathlib.Path) -> None:
|
||||
with open(configpath, "rb") as f:
|
||||
content = tomllib.load(f)
|
||||
LOGGER.info("Read config from %s", configpath)
|
||||
return content
|
||||
self.config = content
|
||||
except Exception as e:
|
||||
LOGGER.info("Unable to load config file: %s", e)
|
||||
self.config = {
|
||||
"relays": [],
|
||||
}
|
||||
self.relays = _deserialize(self.config["relays"])
|
||||
|
||||
def _write_config(configpath: pathlib.Path, config) -> None:
|
||||
with open(configpath, "wb") as f:
|
||||
tomli_w.dump(config, f)
|
||||
LOGGER.info("Wrote config file %s", configpath)
|
||||
|
||||
def chips_list(self) -> List[str]:
|
||||
raise NotImplementedError()
|
||||
|
||||
def pins_list(self) -> List[Pin]:
|
||||
raise NotImplementedError()
|
||||
|
||||
def relay_add(self, chip: str, pinnumber: int, name: str) -> None:
|
||||
self.config["relays"].append({
|
||||
"chip": chip,
|
||||
"name": name,
|
||||
"pinnumber": pinnumber,
|
||||
})
|
||||
with open(self.configpath, "wb") as f:
|
||||
tomli_w.dump({
|
||||
"relays": _serialize(self.config["relays"]),
|
||||
}, f)
|
||||
LOGGER.info("Wrote config file %s", self.configpath)
|
||||
|
||||
def relays_list(self) -> List[Relay]:
|
||||
return self.relays
|
||||
|
||||
def relay_on(self, name: str) -> bool:
|
||||
raise NotImplementedError()
|
||||
|
||||
def relay_set(self, name: str, state: bool) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
class RelaysFake(Relays):
|
||||
"Class for fake relays, useful for testing."
|
||||
def chips_list(self) -> List[str]:
|
||||
return ["chipA", "chipB"]
|
||||
|
||||
def pins_list(self) -> List[Pin]:
|
||||
return [
|
||||
Pin(chip="chipA", number=1, name="CON2-P1"),
|
||||
Pin(chip="chipA", number=2, name="CON2-P2"),
|
||||
Pin(chip="chipA", number=3, name="CON2-P10"),
|
||||
Pin(chip="chipB", number=1, name="CON2-P3"),
|
||||
Pin(chip="chipB", number=2, name="CON2-P7"),
|
||||
]
|
||||
|
||||
def relays_list(self) -> List[Relay]:
|
||||
raise NotImplementedError()
|
||||
|
||||
def relay_on(self, name: str) -> bool:
|
||||
raise NotImplementedError()
|
||||
|
||||
def relay_set(self, name: str, state: bool) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
class RelaysReal(Relays):
|
||||
"Class for controlling real relays."
|
||||
|
|
|
@ -1,83 +1,48 @@
|
|||
from contextlib import asynccontextmanager
|
||||
import logging
|
||||
|
||||
from fastapi import FastAPI, Form, Request
|
||||
from fastapi.responses import HTMLResponse
|
||||
from fastapi import FastAPI, Request
|
||||
from fastapi.templating import Jinja2Templates
|
||||
from starlette.responses import RedirectResponse
|
||||
|
||||
import jinja2
|
||||
|
||||
from pnpdevice import settings
|
||||
from pnpdevice.relays import Manager
|
||||
from pnpdevice.config import config
|
||||
import pnpdevice.discovery
|
||||
import pnpdevice.relays
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
relays.connect()
|
||||
yield
|
||||
# relays.shutdown()
|
||||
|
||||
app = FastAPI(debug=settings.DEBUG, lifespan=lifespan)
|
||||
relays = Manager(settings.CONFIGFILE, settings.RELAYS_FAKE)
|
||||
app = FastAPI(debug=config.DEBUG)
|
||||
templates = Jinja2Templates(directory="templates")
|
||||
|
||||
@app.get("/", response_class=HTMLResponse)
|
||||
def index(request: Request):
|
||||
@app.get("/")
|
||||
def index():
|
||||
relays = request.app["relays"]
|
||||
return templates.TemplateResponse("index.template.html", {
|
||||
"request": request,
|
||||
"relays": relays,
|
||||
"relays": relays.relays_list(),
|
||||
})
|
||||
|
||||
@app.get("/relay/create")
|
||||
def relay_create_get(request: Request):
|
||||
"Get the form to create a new relay."
|
||||
relays.load_all_pins()
|
||||
used_pin_names = {relay.pin.name for relay in relays}
|
||||
pins = [p for p in relays.pins if p.name not in used_pin_names]
|
||||
sorted_pins = sorted(pins, key=lambda p: p.name)
|
||||
return templates.TemplateResponse("relay-create.template.html", {
|
||||
"relays": relays,
|
||||
"request": request,
|
||||
"pins": sorted_pins,
|
||||
})
|
||||
relays = request.app["relays"]
|
||||
pins = relays.pins_list()
|
||||
return templates.TemplateResponse("relay-create.template.html", {"pins": pins})
|
||||
|
||||
@app.post("/relay/create")
|
||||
def relay_create_post(
|
||||
request: Request,
|
||||
pin_id: str = Form(...),
|
||||
name: str = Form(...),
|
||||
):
|
||||
"Create a new relay from form POST."
|
||||
chip_number, chip_name, line_number = pin_id.split("-")
|
||||
chip_number = int(chip_number)
|
||||
line_number = int(line_number)
|
||||
relays.relay_add(chip_number, chip_name, line_number, name)
|
||||
return RedirectResponse(status_code=303, url="/")
|
||||
|
||||
@app.get("/relay/{pin_id}")
|
||||
def relay_detail_get(request: Request, pin_id: str):
|
||||
"Get details on a single relay."
|
||||
relay = relays.get_relay_by_pin_id(pin_id)
|
||||
return templates.TemplateResponse("relay-detail.template.html", {
|
||||
"request": request,
|
||||
"relay": relay
|
||||
})
|
||||
|
||||
@app.post("/relay/{pin_id}/state")
|
||||
def relay_state_post(
|
||||
request: Request,
|
||||
pin_id: str,
|
||||
state: bool = Form(...),
|
||||
):
|
||||
"Change the state of a relay."
|
||||
relay = relays.get_relay_by_pin_id(pin_id)
|
||||
relays.relay_set(relay, state)
|
||||
async def relay_create_post(request: Request, chip_and_number: str, name: str):
|
||||
"Create a new relay."
|
||||
chip, number = chip_and_number.partition("-")
|
||||
LOGGER.info("Creating relay %s %s with name %s", chip, number, name)
|
||||
return RedirectResponse(status_code=303, url="/")
|
||||
|
||||
async def status(request: Request):
|
||||
return html("Status")
|
||||
|
||||
def run(relays: pnpdevice.relays.Relays):
|
||||
"Run the embedded web server"
|
||||
app = web.Application()
|
||||
app["relays"] = relays
|
||||
aiohttp_jinja2.setup(app,
|
||||
loader=jinja2.FileSystemLoader("templates"))
|
||||
app.on_startup.append(pnpdevice.discovery.handle)
|
||||
app.add_routes([web.get("/", index)])
|
||||
app.add_routes([web.get("/relay/create", relay_create_get)])
|
||||
app.add_routes([web.post("/relay/create", relay_create_post)])
|
||||
web.run_app(app)
|
||||
|
||||
|
|
|
@ -1,11 +0,0 @@
|
|||
from pathlib import Path
|
||||
from starlette.config import Config
|
||||
from starlette.datastructures import CommaSeparatedStrings
|
||||
|
||||
config = Config(".env")
|
||||
|
||||
CONFIGFILE = config("CONFIGFILE", cast=Path, default="/etc/pnpdevice.toml")
|
||||
DEBUG = config("DEBUG", cast=bool, default=False)
|
||||
RELAYS_FAKE = config("RELAYS_FAKE", cast=bool, default=False)
|
||||
# SECRET_KEY = config("SECRET_KEY", cast=Secret)
|
||||
# ALLOWED_HOSTS = config("ALLOWED_HOSTS", cast=CommaSeparatedStrings)
|
|
@ -5,7 +5,6 @@ authors = [
|
|||
]
|
||||
dependencies = [
|
||||
"fastapi",
|
||||
"python-multipart",
|
||||
"tomli-w",
|
||||
"uvicorn[standard]",
|
||||
"zeroconf",
|
||||
|
|
|
@ -4,18 +4,11 @@
|
|||
<h2>No relays found.</h2>
|
||||
{% else %}
|
||||
<h2>Relays</h2>
|
||||
<table>
|
||||
<thead><th>Relay</th><th>State</th></thead>
|
||||
<tbody>
|
||||
<ul>
|
||||
{% for relay in relays %}
|
||||
<tr>
|
||||
<td><a href="/relay/{{ relay.id }}">{{ relay.name }}</td></a></td>
|
||||
<td>{{ "ON" if relay.state else "OFF"}}</td>
|
||||
</tr>
|
||||
<li>{{ relay.name }}</li>
|
||||
{% endfor %}
|
||||
</tbody>
|
||||
</table>
|
||||
</ul>
|
||||
{% endif %}
|
||||
<br>
|
||||
<a href="/relay/create">Create Relay</a>
|
||||
</html>
|
||||
|
|
|
@ -1,19 +1,10 @@
|
|||
<html>
|
||||
<h1>Relay Creation</h1>
|
||||
<h2>Existing relays</h2>
|
||||
<table>
|
||||
<thead><th>Relay</th><th>Pin</th></thead>
|
||||
<tbody>
|
||||
{% for relay in relays %}
|
||||
<tr><td>{{ relay.name }}</td><td>{{ relay.pin.name }}</td></tr>
|
||||
{% endfor %}
|
||||
</table>
|
||||
|
||||
<form method="POST" action="/relay/create">
|
||||
<input type="text" name="name" placeholder="Pool Pump 1"></input>
|
||||
<select name="pin_id">:
|
||||
<select name="chip-and-number">
|
||||
{% for pin in pins %}
|
||||
<option value="{{ pin.chip.number }}-{{ pin.chip.name }}-{{ pin.line_number }}">{{ pin.name }}</option>
|
||||
<option value="{{ pin.chip }}-{{ pin.number }}">{{ pin.name }}</option>
|
||||
{% endfor %}
|
||||
</select>
|
||||
<button type="submit">Create</button>
|
||||
|
|
|
@ -1,21 +0,0 @@
|
|||
<html>
|
||||
<h1>Pools'n'Pumps Device</h1>
|
||||
<h2>Relay "{{relay.name}}"</h2>
|
||||
<table>
|
||||
<thead><tr><th>Property</th><th>Value</th></tr></thead>
|
||||
<tbody>
|
||||
<tr><td>Chip Number</td><td>{{ relay.pin.chip.number }}</td></tr>
|
||||
<tr><td>Chip Label</td><td>{{ relay.pin.chip.label }}</td></tr>
|
||||
<tr><td>Chip Name</td><td>{{ relay.pin.chip.name }}</td></tr>
|
||||
<tr><td>Pin Name</td><td>{{ relay.pin.name }}</td></tr>
|
||||
<tr><td>Pin Number</td><td>{{ relay.pin.line_number }}</td></tr>
|
||||
<tr><td>Relay State</td><td>{{ relay.state }}</td></tr>
|
||||
</tbody>
|
||||
</table>
|
||||
<form method="POST" action="/relay/{{relay.id}}/state">
|
||||
<input type="hidden" name="pin_id" value="{{ relay.pin.id }}">
|
||||
<input type="hidden" name="state" value="{{not relay.state}}">
|
||||
<button type="submit">{{ "Turn OFF" if relay.state else "Turn ON" }}</button>
|
||||
</form>
|
||||
<a href="/">Home</a>
|
||||
</html>
|
Loading…
Reference in New Issue