Compare commits
8 Commits
57d8ff0b39
...
7e43fca7d4
Author | SHA1 | Date |
---|---|---|
|
7e43fca7d4 | |
|
68e59c0ae4 | |
|
73e41d90b7 | |
|
82c741033d | |
|
0fee05acf7 | |
|
b4027baf68 | |
|
dd637c2eaa | |
|
cbffa327b9 |
|
@ -0,0 +1,47 @@
|
||||||
|
[[relays]]
|
||||||
|
name = "Unknown 1"
|
||||||
|
|
||||||
|
[relays.chip]
|
||||||
|
name = "gpiochip0"
|
||||||
|
number = 0
|
||||||
|
|
||||||
|
[relays.pin]
|
||||||
|
name = "CON2-P22"
|
||||||
|
line_number = 2
|
||||||
|
|
||||||
|
[[relays]]
|
||||||
|
name = "Waterfall"
|
||||||
|
|
||||||
|
[relays.chip]
|
||||||
|
name = "gpiochip0"
|
||||||
|
number = 0
|
||||||
|
|
||||||
|
[relays.pin]
|
||||||
|
name = "CON2-P18"
|
||||||
|
line_number = 68
|
||||||
|
|
||||||
|
[[relays]]
|
||||||
|
name = "Filter"
|
||||||
|
|
||||||
|
[relays.chip]
|
||||||
|
name = "gpiochip0"
|
||||||
|
number = 0
|
||||||
|
|
||||||
|
[relays.pin]
|
||||||
|
name = "CON2-P16"
|
||||||
|
line_number = 15
|
||||||
|
|
||||||
|
[[relays]]
|
||||||
|
name = "Unknown 2"
|
||||||
|
|
||||||
|
[relays.chip]
|
||||||
|
name = "gpiochip0"
|
||||||
|
number = 0
|
||||||
|
|
||||||
|
[relays.pin]
|
||||||
|
name = "CON2-P12"
|
||||||
|
line_number = 16
|
||||||
|
|
||||||
|
[rgpio]
|
||||||
|
host = "pool.ribble.net"
|
||||||
|
port = 8889
|
|
@ -0,0 +1,161 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
import argparse
|
||||||
|
import dataclasses
|
||||||
|
import sys
|
||||||
|
from typing import Dict
|
||||||
|
import time
|
||||||
|
|
||||||
|
import rgpio
|
||||||
|
# From test C program using wiringPi
|
||||||
|
# RELAY_PINS = [4, 17, 27, 18, 22, 23, 24, 25]
|
||||||
|
# Physical pins
|
||||||
|
# PINS = [7, 11, 13, 15, 12, 16, 18, 22]
|
||||||
|
# From Banana Pi Zero M2 pinout
|
||||||
|
RELAY_PINS = ["CON2-P07", "CON2-P11", "CON2-P13", "CON2-P15", "CON2-P12", "CON2-P16", "CON2-P18", "CON2-P22"]
|
||||||
|
# con2-p22 ?
|
||||||
|
# p18: waterfall
|
||||||
|
# p16: filter pump, salt cell
|
||||||
|
|
||||||
|
#lgpio.h:#define LG_GPIO_IS_KERNEL 1
|
||||||
|
#lgpio.h:#define LG_GPIO_IS_OUTPUT 2
|
||||||
|
#lgpio.h:#define LG_GPIO_IS_ACTIVE_LOW 4
|
||||||
|
#lgpio.h:#define LG_GPIO_IS_OPEN_DRAIN 8
|
||||||
|
#lgpio.h:#define LG_GPIO_IS_OPEN_SOURCE 16
|
||||||
|
#lgpio.h:#define LG_GPIO_IS_PULL_UP 32
|
||||||
|
#lgpio.h:#define LG_GPIO_IS_PULL_DOWN 64
|
||||||
|
#lgpio.h:#define LG_GPIO_IS_PULL_NONE 128
|
||||||
|
#lgpio.h:#define LG_GPIO_IS_INPUT 65536
|
||||||
|
#lgpio.h:#define LG_GPIO_IS_RISING_EDGE 131072
|
||||||
|
#lgpio.h:#define LG_GPIO_IS_FALLING_EDGE 262144
|
||||||
|
#lgpio.h:#define LG_GPIO_IS_REALTIME_CLOCK 524288
|
||||||
|
|
||||||
|
FLAG_TO_NAME = {
|
||||||
|
1: "kernel",
|
||||||
|
2: "output",
|
||||||
|
4: "active_low",
|
||||||
|
8: "open_drain",
|
||||||
|
16: "open_source",
|
||||||
|
32: "pull_up",
|
||||||
|
64: "pull_down",
|
||||||
|
128: "pull_none",
|
||||||
|
65536: "input",
|
||||||
|
131072: "rising_edge",
|
||||||
|
262144: "falling_edge",
|
||||||
|
524288: "realtime_clock",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class Relay:
|
||||||
|
name: str
|
||||||
|
pin: str
|
||||||
|
|
||||||
|
RELAY_ON = 0
|
||||||
|
RELAY_OFF = 1
|
||||||
|
|
||||||
|
RELAYS = [
|
||||||
|
Relay("Light", "CON2-P22"),
|
||||||
|
Relay("Waterfall", "CON2-P18"),
|
||||||
|
Relay("Filter", "CON2-P16"),
|
||||||
|
Relay("Unknown 2", "CON2-P12"),
|
||||||
|
]
|
||||||
|
DEVICE = "/dev/ttyUSB0"
|
||||||
|
|
||||||
|
def alloff(args, sbc, handle, pin_to_line):
|
||||||
|
"Turn off all relays"
|
||||||
|
for relay in RELAYS:
|
||||||
|
line = pin_to_line[relay.pin]
|
||||||
|
sbc.gpio_claim_output(handle, line, level=RELAY_OFF)
|
||||||
|
print_status(sbc, handle, line)
|
||||||
|
|
||||||
|
def flags_to_name(flags: int):
|
||||||
|
"Get the set of flag names based on the provided flag value"
|
||||||
|
result = set()
|
||||||
|
for value, name in FLAG_TO_NAME.items():
|
||||||
|
if flags & value:
|
||||||
|
result.add(name)
|
||||||
|
return result
|
||||||
|
|
||||||
|
def on(args, sbc, handle, pin_to_line):
|
||||||
|
"Turn on a single relay"
|
||||||
|
name_to_pin = {relay.name.lower(): relay.pin for relay in RELAYS}
|
||||||
|
pin = name_to_pin[args.name.lower()]
|
||||||
|
line = pin_to_line[pin]
|
||||||
|
sbc.gpio_claim_output(handle, line, level=RELAY_ON)
|
||||||
|
sbc.gpio_write(handle, line, RELAY_ON)
|
||||||
|
print_status(sbc, handle, line)
|
||||||
|
print(f"{args.name} now on")
|
||||||
|
if args.duration:
|
||||||
|
print(f"Waiting {args.duration} minutes ({args.duration / 60.0} hours), then turning off")
|
||||||
|
time.sleep(args.duration * 60)
|
||||||
|
sbc.gpio_write(handle, line, RELAY_OFF)
|
||||||
|
print(f"{args.name} now off")
|
||||||
|
print_status(sbc, handle, line)
|
||||||
|
|
||||||
|
|
||||||
|
def status(args, sbc, handle, pin_to_line):
|
||||||
|
pin_to_name = {relay.pin: relay.name for relay in RELAYS}
|
||||||
|
for pin, name in pin_to_name.items():
|
||||||
|
line = pin_to_line[pin]
|
||||||
|
print_status(sbc, handle, line)
|
||||||
|
|
||||||
|
def print_status(sbc, handle, line):
|
||||||
|
okay_status, io_number, flags, pin, user = sbc.gpio_get_line_info(handle, line)
|
||||||
|
names = flags_to_name(flags)
|
||||||
|
if okay_status == 0:
|
||||||
|
print(f"{io_number}): {flags} {pin} {user}: {' '.join(sorted(names))}")
|
||||||
|
else:
|
||||||
|
print(f"Failed to get status for {line}")
|
||||||
|
|
||||||
|
def map_pin_to_line(sbc, handle):
|
||||||
|
start_pin, end_pin, name, driver = sbc.gpio_get_chip_info(handle)
|
||||||
|
pin_to_line = {}
|
||||||
|
for x in range(start_pin, end_pin):
|
||||||
|
okay_status, io_number, flags, pin, user = sbc.gpio_get_line_info(handle, x)
|
||||||
|
flag_names = flags_to_name(flags)
|
||||||
|
if okay_status != 0:
|
||||||
|
print(f"Error getting pin {x}: status {okay_status}")
|
||||||
|
continue
|
||||||
|
pin_to_line[pin] = x
|
||||||
|
return pin_to_line
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
subparsers = parser.add_subparsers(required=True, help="the sub-command to execute")
|
||||||
|
|
||||||
|
parser_status = subparsers.add_parser("status", help="Show status of the relays")
|
||||||
|
parser_status.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to")
|
||||||
|
parser_status.set_defaults(func=status)
|
||||||
|
|
||||||
|
parser_alloff = subparsers.add_parser("alloff", help="Turn off all relays")
|
||||||
|
parser_alloff.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to")
|
||||||
|
parser_alloff.set_defaults(func=alloff)
|
||||||
|
|
||||||
|
parser_on = subparsers.add_parser("on", help="Turn on a relay")
|
||||||
|
parser_on.add_argument("name", choices=[relay.name for relay in RELAYS])
|
||||||
|
parser_on.add_argument("-d", "--duration", type=int, default=0, help="Time, in minutes, to keep the relay on")
|
||||||
|
parser_on.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to")
|
||||||
|
parser_on.set_defaults(func=on)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
sbc = rgpio.sbc(host=args.host)
|
||||||
|
if not sbc.connected:
|
||||||
|
print("Not connected")
|
||||||
|
return -1
|
||||||
|
|
||||||
|
handle = sbc.gpiochip_open(0) # open /dev/gpiochip0
|
||||||
|
if handle < 0:
|
||||||
|
print("Error on open")
|
||||||
|
return -1
|
||||||
|
|
||||||
|
pin_to_line = map_pin_to_line(sbc, handle)
|
||||||
|
|
||||||
|
try:
|
||||||
|
args.func(args, sbc, handle, pin_to_line)
|
||||||
|
finally:
|
||||||
|
sbc.gpiochip_close(handle)
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
|
@ -0,0 +1,74 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Example of announcing 250 services (in this case, a fake HTTP server)."""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import socket
|
||||||
|
from typing import List, Optional
|
||||||
|
|
||||||
|
from zeroconf import IPVersion
|
||||||
|
from zeroconf.asyncio import AsyncServiceInfo, AsyncZeroconf
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncRunner:
|
||||||
|
def __init__(self, ip_version: IPVersion) -> None:
|
||||||
|
self.ip_version = ip_version
|
||||||
|
self.aiozc: Optional[AsyncZeroconf] = None
|
||||||
|
|
||||||
|
async def register_services(self, infos: List[AsyncServiceInfo]) -> None:
|
||||||
|
self.aiozc = AsyncZeroconf(ip_version=self.ip_version)
|
||||||
|
tasks = [self.aiozc.async_register_service(info) for info in infos]
|
||||||
|
background_tasks = await asyncio.gather(*tasks)
|
||||||
|
await asyncio.gather(*background_tasks)
|
||||||
|
print("Finished registration, press Ctrl-C to exit...")
|
||||||
|
while True:
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
async def unregister_services(self, infos: List[AsyncServiceInfo]) -> None:
|
||||||
|
assert self.aiozc is not None
|
||||||
|
tasks = [self.aiozc.async_unregister_service(info) for info in infos]
|
||||||
|
background_tasks = await asyncio.gather(*tasks)
|
||||||
|
await asyncio.gather(*background_tasks)
|
||||||
|
await self.aiozc.async_close()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument('--debug', action='store_true')
|
||||||
|
version_group = parser.add_mutually_exclusive_group()
|
||||||
|
version_group.add_argument('--v6', action='store_true')
|
||||||
|
version_group.add_argument('--v6-only', action='store_true')
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if args.debug:
|
||||||
|
logging.getLogger('zeroconf').setLevel(logging.DEBUG)
|
||||||
|
if args.v6:
|
||||||
|
ip_version = IPVersion.All
|
||||||
|
elif args.v6_only:
|
||||||
|
ip_version = IPVersion.V6Only
|
||||||
|
else:
|
||||||
|
ip_version = IPVersion.V4Only
|
||||||
|
|
||||||
|
infos = []
|
||||||
|
for i in range(250):
|
||||||
|
infos.append(
|
||||||
|
AsyncServiceInfo(
|
||||||
|
"_http._tcp.local.",
|
||||||
|
f"Paul's Test Web Site {i}._http._tcp.local.",
|
||||||
|
addresses=[socket.inet_aton("127.0.0.1")],
|
||||||
|
port=80,
|
||||||
|
properties={'path': '/~paulsm/'},
|
||||||
|
server=f"zcdemohost-{i}.local.",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
print("Registration of 250 services...")
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
runner = AsyncRunner(ip_version)
|
||||||
|
try:
|
||||||
|
loop.run_until_complete(runner.register_services(infos))
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
loop.run_until_complete(runner.unregister_services(infos))
|
|
@ -0,0 +1,24 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
from zeroconf import ServiceBrowser, ServiceListener, Zeroconf
|
||||||
|
|
||||||
|
|
||||||
|
class MyListener(ServiceListener):
|
||||||
|
|
||||||
|
def update_service(self, zc: Zeroconf, type_: str, name: str) -> None:
|
||||||
|
print(f"Service {name} updated")
|
||||||
|
|
||||||
|
def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None:
|
||||||
|
print(f"Service {name} removed")
|
||||||
|
|
||||||
|
def add_service(self, zc: Zeroconf, type_: str, name: str) -> None:
|
||||||
|
info = zc.get_service_info(type_, name)
|
||||||
|
print(f"Service {name} added, service info: {info}")
|
||||||
|
|
||||||
|
|
||||||
|
zeroconf = Zeroconf()
|
||||||
|
listener = MyListener()
|
||||||
|
browser = ServiceBrowser(zeroconf, "_http._tcp.local.", listener)
|
||||||
|
try:
|
||||||
|
input("Press enter to exit...\n\n")
|
||||||
|
finally:
|
||||||
|
zeroconf.close()
|
|
@ -1,8 +0,0 @@
|
||||||
from starlette.config import Config
|
|
||||||
|
|
||||||
config = Config(".env")
|
|
||||||
|
|
||||||
DEBUG = config("DEBUG", cast=bool, default=False)
|
|
||||||
DATABASE = config("DATABASE_URL", cast=databases.DatabaseURL)
|
|
||||||
SECRET_KEY = config("SECRET_KEY", cast=Secret)
|
|
||||||
ALLOWED_HOSTS = config("ALLOWED_HOSTS", cast=CommaSeparatedStrings)
|
|
|
@ -3,24 +3,75 @@ import logging
|
||||||
import os
|
import os
|
||||||
import pathlib
|
import pathlib
|
||||||
import tomllib
|
import tomllib
|
||||||
from typing import Dict, List, Union
|
from typing import Dict, Iterable, List, Union
|
||||||
|
|
||||||
|
import rgpio
|
||||||
import tomli_w
|
import tomli_w
|
||||||
|
|
||||||
LOGGER = logging.getLogger(__name__)
|
LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
@dataclasses.dataclass
|
# Max number of chips to scan for
|
||||||
class Pin:
|
MAX_CHIPS = 16
|
||||||
chip: str
|
|
||||||
number: int
|
|
||||||
name: str
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
# From http://abyz.me.uk/lg/py_rgpio.html#gpio_get_mode
|
||||||
|
GPIO_KERNEL_IN_USE = 1<<0
|
||||||
|
GPIO_KERNEL_OUTPUT = 1<<1
|
||||||
|
GPIO_KERNEL_ACTIVE_LOW = 1<<2
|
||||||
|
GPIO_KERNEL_OPEN_DRAIN = 1<<3
|
||||||
|
GPIO_KERNEL_OPEN_SOURCE = 1<<4
|
||||||
|
GPIO_KERNEL_PULL_UP = 1<<5
|
||||||
|
GPIO_KERNEL_PULL_DOWN = 1<<6
|
||||||
|
GPIO_KERNEL_PULL_OFF = 1<<7
|
||||||
|
GPIO_LG_INPUT = 1<<8
|
||||||
|
GPIO_LG_OUTPUT = 1<<9
|
||||||
|
GPIO_LG_ALERT = 1<<10
|
||||||
|
GPIO_LG_GROUP = 1<<11
|
||||||
|
GPIO_LG_NOT_USED1 = 1<<12
|
||||||
|
GPIO_LG_NOT_USED2 = 1<<13
|
||||||
|
GPIO_LG_NOT_USED3 = 1<<14
|
||||||
|
GPIO_LG_NOT_USED4 = 1<<15
|
||||||
|
GPIO_KERNEL_INPUT = 1<<16
|
||||||
|
GPIO_KERNEL_RISING_EDGE_ALERT = 1<<17
|
||||||
|
GPIO_KERNEL_FALLING_EDGE_ALERT = 1<<18
|
||||||
|
GPIO_KERNEL_REALTIME_CLOCK_ALERT = 1<<19
|
||||||
|
|
||||||
|
RELAY_OFF = 1
|
||||||
|
RELAY_ON = 0
|
||||||
|
|
||||||
|
@dataclasses.dataclass(frozen=True)
|
||||||
|
class Chip:
|
||||||
|
handle: int
|
||||||
|
label: str
|
||||||
|
name: str
|
||||||
|
number: int
|
||||||
|
number_of_lines: int
|
||||||
|
|
||||||
|
@property
|
||||||
|
def id(self) -> str:
|
||||||
|
return f"{self.number}-{self.name}"
|
||||||
|
|
||||||
|
@dataclasses.dataclass(frozen=True)
|
||||||
|
class Pin:
|
||||||
|
chip: Chip
|
||||||
|
flags: int
|
||||||
|
line_number: int
|
||||||
|
name: str
|
||||||
|
user: str
|
||||||
|
|
||||||
|
@property
|
||||||
|
def id(self) -> str:
|
||||||
|
return f"{self.chip.id}-{self.line_number}"
|
||||||
|
|
||||||
|
@dataclasses.dataclass(frozen=True)
|
||||||
class Relay:
|
class Relay:
|
||||||
pin: Pin
|
pin: Pin
|
||||||
name: str
|
name: str
|
||||||
state: bool
|
state: bool
|
||||||
|
|
||||||
|
@property
|
||||||
|
def id(self) -> str:
|
||||||
|
return f"{self.pin.id}"
|
||||||
|
|
||||||
def _deserialize(data: List[Dict[str, str]]) -> List[Relay]:
|
def _deserialize(data: List[Dict[str, str]]) -> List[Relay]:
|
||||||
"Deserialize a list of relays from the config."
|
"Deserialize a list of relays from the config."
|
||||||
return [Relay(
|
return [Relay(
|
||||||
|
@ -29,10 +80,12 @@ def _deserialize(data: List[Dict[str, str]]) -> List[Relay]:
|
||||||
chip=relay["chip"],
|
chip=relay["chip"],
|
||||||
number=int(relay["pinnumber"]),
|
number=int(relay["pinnumber"]),
|
||||||
name=relay["pinname"],
|
name=relay["pinname"],
|
||||||
)) for relay in data]
|
),
|
||||||
|
state=False) for relay in data]
|
||||||
|
|
||||||
def _serialize(data: List[Relay]) -> List[Dict[str, str]]:
|
def _serialize(data: List[Relay]) -> List[Dict[str, str]]:
|
||||||
"Serialize a list of relays to the config."
|
"Serialize a list of relays to the config."
|
||||||
|
LOGGER.info("Serializing %s", data)
|
||||||
return [{
|
return [{
|
||||||
"chip": relay.pin.chip,
|
"chip": relay.pin.chip,
|
||||||
"name": relay.name,
|
"name": relay.name,
|
||||||
|
@ -40,71 +93,207 @@ def _serialize(data: List[Relay]) -> List[Dict[str, str]]:
|
||||||
"pinname": relay.pin.name,
|
"pinname": relay.pin.name,
|
||||||
} for relay in data]
|
} for relay in data]
|
||||||
|
|
||||||
class Relays:
|
class Manager:
|
||||||
"Class for interacting with relays."
|
"Class for interacting with relays."
|
||||||
def __init__(self, configpath: pathlib.Path):
|
def __init__(self, configpath: pathlib.Path, has_fakes: bool):
|
||||||
self.configpath = configpath
|
|
||||||
try:
|
try:
|
||||||
with open(configpath, "rb") as f:
|
self.config = _read_config(configpath)
|
||||||
content = tomllib.load(f)
|
except FileNotFoundError:
|
||||||
self.config = content
|
|
||||||
except Exception as e:
|
|
||||||
LOGGER.info("Unable to load config file: %s", e)
|
|
||||||
self.config = {
|
self.config = {
|
||||||
"relays": [],
|
"rgpio": {"host": "localhost", "port": 8889},
|
||||||
|
"relays": [{
|
||||||
|
"chip": -1,
|
||||||
|
"pin": "FAKE-P1",
|
||||||
|
"name": "Fake",
|
||||||
|
}],
|
||||||
}
|
}
|
||||||
self.relays = _deserialize(self.config["relays"])
|
# Immediately write out the default config
|
||||||
|
_write_config(configpath, self.config)
|
||||||
|
self.configpath = configpath
|
||||||
|
self.has_fakes = has_fakes
|
||||||
|
|
||||||
|
# Handles to various chips, mapped by chip number
|
||||||
|
self._chips = {}
|
||||||
|
# Info on various pins, mapped by (chip number, line_number)
|
||||||
|
self._pins = {}
|
||||||
|
# Cached info on the relays, mapped by relay name
|
||||||
|
self._relays = {}
|
||||||
|
# Connection to single-board computer GPIO
|
||||||
|
self.sbc = None
|
||||||
|
|
||||||
def chips_list(self) -> List[str]:
|
def __iter__(self):
|
||||||
raise NotImplementedError()
|
"Provide an iterator for the relays."
|
||||||
|
return iter(self.relays)
|
||||||
|
|
||||||
def pins_list(self) -> List[Pin]:
|
@property
|
||||||
raise NotImplementedError()
|
def chips(self) -> List[Chip]:
|
||||||
|
if not self._chips:
|
||||||
|
self._load_chips()
|
||||||
|
return self._chips.values()
|
||||||
|
|
||||||
def relay_add(self, chip: str, pinnumber: int, name: str) -> None:
|
def connect(self) -> None:
|
||||||
|
self.sbc = rgpio.sbc(
|
||||||
|
host=self.config["rgpio"]["host"],
|
||||||
|
port=self.config["rgpio"]["port"],
|
||||||
|
)
|
||||||
|
if not self.sbc.connected:
|
||||||
|
LOGGER.warning("Failed to connect sbc")
|
||||||
|
self.sbc = None
|
||||||
|
|
||||||
|
def get_chip_by_number(self, number: int) -> Chip:
|
||||||
|
try:
|
||||||
|
return self._chips[number]
|
||||||
|
except KeyError:
|
||||||
|
self._load_chip(number)
|
||||||
|
return self._chips[number]
|
||||||
|
|
||||||
|
def get_pin_by_number(self, chip: Chip, line_number: int) -> Pin:
|
||||||
|
try:
|
||||||
|
return self._pins[(chip.number, line_number)]
|
||||||
|
except KeyError:
|
||||||
|
self._load_pin(chip, line_number)
|
||||||
|
return self._pins[(chip.number, line_number)]
|
||||||
|
|
||||||
|
def get_relay_by_pin_id(self, pin_id: str) -> Relay:
|
||||||
|
for relay in self.relays:
|
||||||
|
if relay.pin.id == pin_id:
|
||||||
|
return relay
|
||||||
|
return None
|
||||||
|
|
||||||
|
def load_all_pins(self) -> None:
|
||||||
|
self._load_chips()
|
||||||
|
self._load_pins()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def pins(self) -> List[Pin]:
|
||||||
|
if not self._pins:
|
||||||
|
self._load_pins()
|
||||||
|
return self._pins.values()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def relays(self) -> Iterable[Relay]:
|
||||||
|
if not self._relays:
|
||||||
|
for relay in self.config["relays"]:
|
||||||
|
LOGGER.debug("Relay: %s", relay)
|
||||||
|
chip = self.get_chip_by_number(relay["chip"]["number"])
|
||||||
|
pin = self.get_pin_by_number(chip, relay["pin"]["line_number"])
|
||||||
|
self._relays[relay["name"]] = Relay(
|
||||||
|
name=relay["name"],
|
||||||
|
pin=pin,
|
||||||
|
state=False,
|
||||||
|
)
|
||||||
|
return self._relays.values()
|
||||||
|
|
||||||
|
def relay_add(self, chip_number: int, chip_name: str, line_number: int, name: str) -> None:
|
||||||
|
LOGGER.info("Creating relay %s with chip %d %s on line %d", name, chip_number, chip_name, line_number)
|
||||||
|
chip = self.get_chip_by_number(chip_number)
|
||||||
|
pin = self.get_pin_by_number(chip, line_number)
|
||||||
self.config["relays"].append({
|
self.config["relays"].append({
|
||||||
"chip": chip,
|
"chip": {
|
||||||
|
"name": chip.name,
|
||||||
|
"number": chip.number,
|
||||||
|
},
|
||||||
|
"pin": {
|
||||||
|
"name": pin.name,
|
||||||
|
"line_number": pin.line_number,
|
||||||
|
},
|
||||||
"name": name,
|
"name": name,
|
||||||
"pinnumber": pinnumber,
|
|
||||||
})
|
})
|
||||||
with open(self.configpath, "wb") as f:
|
_write_config(self.configpath, self.config)
|
||||||
tomli_w.dump({
|
self._relays = {}
|
||||||
"relays": _serialize(self.config["relays"]),
|
|
||||||
}, f)
|
|
||||||
LOGGER.info("Wrote config file %s", self.configpath)
|
|
||||||
|
|
||||||
def relays_list(self) -> List[Relay]:
|
def relay_set(self, relay: Relay, state: bool) -> None:
|
||||||
return self.relays
|
level = RELAY_ON if state else RELAY_OFF
|
||||||
|
ok = self.sbc.gpio_write(relay.pin.chip.handle, relay.pin.line_number, level)
|
||||||
|
if ok != 0:
|
||||||
|
LOGGER.warn("Failed to set relay level for %s (%s)", relay.name, relay.id)
|
||||||
|
else:
|
||||||
|
LOGGER.info("Set relay %s (%s) to %s", relay.name, relay.id, "enabled" if state else "disabled")
|
||||||
|
okay, offset, flags, name, user = self.sbc.gpio_get_line_info(relay.pin.chip.handle, relay.pin.line_number)
|
||||||
|
LOGGER.info("Flags are now %d", flags)
|
||||||
|
self._relays[relay.name] = Relay(
|
||||||
|
name=relay.name,
|
||||||
|
pin=relay.pin,
|
||||||
|
state=state,
|
||||||
|
)
|
||||||
|
|
||||||
def relay_on(self, name: str) -> bool:
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
def relay_set(self, name: str, state: bool) -> None:
|
def shutdown(self) -> None:
|
||||||
raise NotImplementedError()
|
_write_config(self.configpath, self.config)
|
||||||
|
|
||||||
class RelaysFake(Relays):
|
def _load_chip(self, number: int) -> None:
|
||||||
"Class for fake relays, useful for testing."
|
try:
|
||||||
def chips_list(self) -> List[str]:
|
handle = self.sbc.gpiochip_open(number)
|
||||||
return ["chipA", "chipB"]
|
except rgpio.error:
|
||||||
|
return
|
||||||
|
if handle < 0:
|
||||||
|
return
|
||||||
|
okay, number_of_lines, name, label = self.sbc.gpio_get_chip_info(handle)
|
||||||
|
LOGGER.info("Chip info: %s %s %s %s", okay, number_of_lines, name, label)
|
||||||
|
if okay != 0:
|
||||||
|
LOGGER.warn("Chip %s not okay.", name)
|
||||||
|
return
|
||||||
|
self._chips[number] = Chip(
|
||||||
|
handle=handle,
|
||||||
|
label=label,
|
||||||
|
name=name,
|
||||||
|
number=number,
|
||||||
|
number_of_lines=number_of_lines,
|
||||||
|
)
|
||||||
|
|
||||||
def pins_list(self) -> List[Pin]:
|
# Talk to the remote pins daemon and get information about the chips
|
||||||
return [
|
def _load_chips(self) -> None:
|
||||||
Pin(chip="chipA", number=1, name="CON2-P1"),
|
for i in range(MAX_CHIPS):
|
||||||
Pin(chip="chipA", number=2, name="CON2-P2"),
|
self._load_chip(i)
|
||||||
Pin(chip="chipA", number=3, name="CON2-P10"),
|
|
||||||
Pin(chip="chipB", number=1, name="CON2-P3"),
|
|
||||||
Pin(chip="chipB", number=2, name="CON2-P7"),
|
|
||||||
]
|
|
||||||
|
|
||||||
def relays_list(self) -> List[Relay]:
|
def _load_pin(self, chip: Chip, line_number: int) -> None:
|
||||||
raise NotImplementedError()
|
okay, offset, flags, name, user = self.sbc.gpio_get_line_info(chip.handle, line_number)
|
||||||
|
LOGGER.info("Got line info: %s %s %s %s", offset, flags, name, user)
|
||||||
|
assert offset == line_number
|
||||||
|
if okay != 0:
|
||||||
|
LOGGER.warn("Line %s is not okay", name)
|
||||||
|
return
|
||||||
|
if not name:
|
||||||
|
LOGGER.warn("Ignoring line %d because it has no name.", line_number)
|
||||||
|
return
|
||||||
|
# Claim the pin for output, set initially to relay off.
|
||||||
|
okay = self.sbc.gpio_claim_output(
|
||||||
|
handle=chip.handle,
|
||||||
|
gpio=line_number,
|
||||||
|
level=RELAY_OFF,
|
||||||
|
)
|
||||||
|
if okay != 0:
|
||||||
|
LOGGER.warn("Failed to claim pin %s", line_number)
|
||||||
|
return
|
||||||
|
okay = self.sbc.gpio_write(chip.handle, line_number, RELAY_OFF)
|
||||||
|
if okay != 0:
|
||||||
|
LOGGER.warn("Failed to write pin %s", line_number)
|
||||||
|
return
|
||||||
|
okay, offset, flags, name, user = self.sbc.gpio_get_line_info(chip.handle, line_number)
|
||||||
|
LOGGER.info("Flags are %d", flags)
|
||||||
|
self._pins[(chip.number, line_number)] = Pin(
|
||||||
|
chip=chip,
|
||||||
|
flags=flags,
|
||||||
|
line_number=line_number,
|
||||||
|
name=name,
|
||||||
|
user=user,
|
||||||
|
)
|
||||||
|
|
||||||
def relay_on(self, name: str) -> bool:
|
# Talk to the remote pins daemon and get information about the pins
|
||||||
raise NotImplementedError()
|
def _load_pins(self) -> None:
|
||||||
|
LOGGER.info("Loading pins")
|
||||||
|
for chip in self.chips:
|
||||||
|
for line in range(chip.number_of_lines):
|
||||||
|
self._load_pin(chip, line)
|
||||||
|
|
||||||
def relay_set(self, name: str, state: bool) -> None:
|
def _read_config(configpath: pathlib.Path) -> None:
|
||||||
raise NotImplementedError()
|
with open(configpath, "rb") as f:
|
||||||
|
content = tomllib.load(f)
|
||||||
|
LOGGER.info("Read config from %s", configpath)
|
||||||
|
return content
|
||||||
|
|
||||||
|
def _write_config(configpath: pathlib.Path, config) -> None:
|
||||||
|
with open(configpath, "wb") as f:
|
||||||
|
tomli_w.dump(config, f)
|
||||||
|
LOGGER.info("Wrote config file %s", configpath)
|
||||||
|
|
||||||
class RelaysReal(Relays):
|
|
||||||
"Class for controlling real relays."
|
|
||||||
|
|
|
@ -1,48 +1,83 @@
|
||||||
from fastapi import FastAPI, Request
|
from contextlib import asynccontextmanager
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from fastapi import FastAPI, Form, Request
|
||||||
|
from fastapi.responses import HTMLResponse
|
||||||
from fastapi.templating import Jinja2Templates
|
from fastapi.templating import Jinja2Templates
|
||||||
from starlette.responses import RedirectResponse
|
from starlette.responses import RedirectResponse
|
||||||
|
|
||||||
import jinja2
|
import jinja2
|
||||||
|
|
||||||
from pnpdevice.config import config
|
from pnpdevice import settings
|
||||||
|
from pnpdevice.relays import Manager
|
||||||
import pnpdevice.discovery
|
import pnpdevice.discovery
|
||||||
import pnpdevice.relays
|
import pnpdevice.relays
|
||||||
|
|
||||||
app = FastAPI(debug=config.DEBUG)
|
LOGGER = logging.getLogger(__name__)
|
||||||
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def lifespan(app: FastAPI):
|
||||||
|
relays.connect()
|
||||||
|
yield
|
||||||
|
# relays.shutdown()
|
||||||
|
|
||||||
|
app = FastAPI(debug=settings.DEBUG, lifespan=lifespan)
|
||||||
|
relays = Manager(settings.CONFIGFILE, settings.RELAYS_FAKE)
|
||||||
templates = Jinja2Templates(directory="templates")
|
templates = Jinja2Templates(directory="templates")
|
||||||
|
|
||||||
@app.get("/")
|
@app.get("/", response_class=HTMLResponse)
|
||||||
def index():
|
def index(request: Request):
|
||||||
relays = request.app["relays"]
|
|
||||||
return templates.TemplateResponse("index.template.html", {
|
return templates.TemplateResponse("index.template.html", {
|
||||||
"relays": relays.relays_list(),
|
"request": request,
|
||||||
|
"relays": relays,
|
||||||
})
|
})
|
||||||
|
|
||||||
@app.get("/relay/create")
|
@app.get("/relay/create")
|
||||||
def relay_create_get(request: Request):
|
def relay_create_get(request: Request):
|
||||||
"Get the form to create a new relay."
|
"Get the form to create a new relay."
|
||||||
relays = request.app["relays"]
|
relays.load_all_pins()
|
||||||
pins = relays.pins_list()
|
used_pin_names = {relay.pin.name for relay in relays}
|
||||||
return templates.TemplateResponse("relay-create.template.html", {"pins": pins})
|
pins = [p for p in relays.pins if p.name not in used_pin_names]
|
||||||
|
sorted_pins = sorted(pins, key=lambda p: p.name)
|
||||||
|
return templates.TemplateResponse("relay-create.template.html", {
|
||||||
|
"relays": relays,
|
||||||
|
"request": request,
|
||||||
|
"pins": sorted_pins,
|
||||||
|
})
|
||||||
|
|
||||||
async def relay_create_post(request: Request, chip_and_number: str, name: str):
|
@app.post("/relay/create")
|
||||||
"Create a new relay."
|
def relay_create_post(
|
||||||
chip, number = chip_and_number.partition("-")
|
request: Request,
|
||||||
LOGGER.info("Creating relay %s %s with name %s", chip, number, name)
|
pin_id: str = Form(...),
|
||||||
|
name: str = Form(...),
|
||||||
|
):
|
||||||
|
"Create a new relay from form POST."
|
||||||
|
chip_number, chip_name, line_number = pin_id.split("-")
|
||||||
|
chip_number = int(chip_number)
|
||||||
|
line_number = int(line_number)
|
||||||
|
relays.relay_add(chip_number, chip_name, line_number, name)
|
||||||
|
return RedirectResponse(status_code=303, url="/")
|
||||||
|
|
||||||
|
@app.get("/relay/{pin_id}")
|
||||||
|
def relay_detail_get(request: Request, pin_id: str):
|
||||||
|
"Get details on a single relay."
|
||||||
|
relay = relays.get_relay_by_pin_id(pin_id)
|
||||||
|
return templates.TemplateResponse("relay-detail.template.html", {
|
||||||
|
"request": request,
|
||||||
|
"relay": relay
|
||||||
|
})
|
||||||
|
|
||||||
|
@app.post("/relay/{pin_id}/state")
|
||||||
|
def relay_state_post(
|
||||||
|
request: Request,
|
||||||
|
pin_id: str,
|
||||||
|
state: bool = Form(...),
|
||||||
|
):
|
||||||
|
"Change the state of a relay."
|
||||||
|
relay = relays.get_relay_by_pin_id(pin_id)
|
||||||
|
relays.relay_set(relay, state)
|
||||||
return RedirectResponse(status_code=303, url="/")
|
return RedirectResponse(status_code=303, url="/")
|
||||||
|
|
||||||
async def status(request: Request):
|
async def status(request: Request):
|
||||||
return html("Status")
|
return html("Status")
|
||||||
|
|
||||||
def run(relays: pnpdevice.relays.Relays):
|
|
||||||
"Run the embedded web server"
|
|
||||||
app = web.Application()
|
|
||||||
app["relays"] = relays
|
|
||||||
aiohttp_jinja2.setup(app,
|
|
||||||
loader=jinja2.FileSystemLoader("templates"))
|
|
||||||
app.on_startup.append(pnpdevice.discovery.handle)
|
|
||||||
app.add_routes([web.get("/", index)])
|
|
||||||
app.add_routes([web.get("/relay/create", relay_create_get)])
|
|
||||||
app.add_routes([web.post("/relay/create", relay_create_post)])
|
|
||||||
web.run_app(app)
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
from pathlib import Path
|
||||||
|
from starlette.config import Config
|
||||||
|
from starlette.datastructures import CommaSeparatedStrings
|
||||||
|
|
||||||
|
config = Config(".env")
|
||||||
|
|
||||||
|
CONFIGFILE = config("CONFIGFILE", cast=Path, default="/etc/pnpdevice.toml")
|
||||||
|
DEBUG = config("DEBUG", cast=bool, default=False)
|
||||||
|
RELAYS_FAKE = config("RELAYS_FAKE", cast=bool, default=False)
|
||||||
|
# SECRET_KEY = config("SECRET_KEY", cast=Secret)
|
||||||
|
# ALLOWED_HOSTS = config("ALLOWED_HOSTS", cast=CommaSeparatedStrings)
|
|
@ -5,6 +5,7 @@ authors = [
|
||||||
]
|
]
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"fastapi",
|
"fastapi",
|
||||||
|
"python-multipart",
|
||||||
"tomli-w",
|
"tomli-w",
|
||||||
"uvicorn[standard]",
|
"uvicorn[standard]",
|
||||||
"zeroconf",
|
"zeroconf",
|
||||||
|
|
|
@ -4,11 +4,18 @@
|
||||||
<h2>No relays found.</h2>
|
<h2>No relays found.</h2>
|
||||||
{% else %}
|
{% else %}
|
||||||
<h2>Relays</h2>
|
<h2>Relays</h2>
|
||||||
<ul>
|
<table>
|
||||||
{% for relay in relays %}
|
<thead><th>Relay</th><th>State</th></thead>
|
||||||
<li>{{ relay.name }}</li>
|
<tbody>
|
||||||
{% endfor %}
|
{% for relay in relays %}
|
||||||
</ul>
|
<tr>
|
||||||
|
<td><a href="/relay/{{ relay.id }}">{{ relay.name }}</td></a></td>
|
||||||
|
<td>{{ "ON" if relay.state else "OFF"}}</td>
|
||||||
|
</tr>
|
||||||
|
{% endfor %}
|
||||||
|
</tbody>
|
||||||
|
</table>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
<br>
|
||||||
<a href="/relay/create">Create Relay</a>
|
<a href="/relay/create">Create Relay</a>
|
||||||
</html>
|
</html>
|
||||||
|
|
|
@ -1,10 +1,19 @@
|
||||||
<html>
|
<html>
|
||||||
<h1>Relay Creation</h1>
|
<h1>Relay Creation</h1>
|
||||||
|
<h2>Existing relays</h2>
|
||||||
|
<table>
|
||||||
|
<thead><th>Relay</th><th>Pin</th></thead>
|
||||||
|
<tbody>
|
||||||
|
{% for relay in relays %}
|
||||||
|
<tr><td>{{ relay.name }}</td><td>{{ relay.pin.name }}</td></tr>
|
||||||
|
{% endfor %}
|
||||||
|
</table>
|
||||||
|
|
||||||
<form method="POST" action="/relay/create">
|
<form method="POST" action="/relay/create">
|
||||||
<input type="text" name="name" placeholder="Pool Pump 1"></input>
|
<input type="text" name="name" placeholder="Pool Pump 1"></input>
|
||||||
<select name="chip-and-number">
|
<select name="pin_id">:
|
||||||
{% for pin in pins %}
|
{% for pin in pins %}
|
||||||
<option value="{{ pin.chip }}-{{ pin.number }}">{{ pin.name }}</option>
|
<option value="{{ pin.chip.number }}-{{ pin.chip.name }}-{{ pin.line_number }}">{{ pin.name }}</option>
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
</select>
|
</select>
|
||||||
<button type="submit">Create</button>
|
<button type="submit">Create</button>
|
||||||
|
|
|
@ -0,0 +1,21 @@
|
||||||
|
<html>
|
||||||
|
<h1>Pools'n'Pumps Device</h1>
|
||||||
|
<h2>Relay "{{relay.name}}"</h2>
|
||||||
|
<table>
|
||||||
|
<thead><tr><th>Property</th><th>Value</th></tr></thead>
|
||||||
|
<tbody>
|
||||||
|
<tr><td>Chip Number</td><td>{{ relay.pin.chip.number }}</td></tr>
|
||||||
|
<tr><td>Chip Label</td><td>{{ relay.pin.chip.label }}</td></tr>
|
||||||
|
<tr><td>Chip Name</td><td>{{ relay.pin.chip.name }}</td></tr>
|
||||||
|
<tr><td>Pin Name</td><td>{{ relay.pin.name }}</td></tr>
|
||||||
|
<tr><td>Pin Number</td><td>{{ relay.pin.line_number }}</td></tr>
|
||||||
|
<tr><td>Relay State</td><td>{{ relay.state }}</td></tr>
|
||||||
|
</tbody>
|
||||||
|
</table>
|
||||||
|
<form method="POST" action="/relay/{{relay.id}}/state">
|
||||||
|
<input type="hidden" name="pin_id" value="{{ relay.pin.id }}">
|
||||||
|
<input type="hidden" name="state" value="{{not relay.state}}">
|
||||||
|
<button type="submit">{{ "Turn OFF" if relay.state else "Turn ON" }}</button>
|
||||||
|
</form>
|
||||||
|
<a href="/">Home</a>
|
||||||
|
</html>
|
Loading…
Reference in New Issue