Move to using remote rgpiod connection.
This adds a bunch of stuff, some barely working. The core idea that I'm adding now is that I'm redoing the way that we interact with the gpio pins. Previously I had planned to use lgpio from abyz.me.uk/lg/ directly and run this program *only* on the single-board computer with the GPIO pins. Turns out I can instead run the lgpio project's rgpiod program on the single-board computer and talk to it over the network. This is way more stable and way faster for development, so that's what I'm doing. This is just a checkpoint. Incidentally, here is the license for the rgpio.py code: "This is free and unencumbered software released into the public domain. Anyone is free to copy, modify, publish, use, compile, sell, or distribute this software, either in source code form or as a compiled binary, for any purpose, commercial or non-commercial, and by any means. In jurisdictions that recognize copyright laws, the author or authors of this software dedicate any and all copyright interest in the software to the public domain. We make this dedication for the benefit of the public at large and to the detriment of our heirs and successors. We intend this dedication to be an overt act of relinquishment in perpetuity of all present and future rights to this software under copyright law. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. For more information, please refer to <http://unlicense.org/>"
This commit is contained in:
parent
57d8ff0b39
commit
cbffa327b9
|
@ -1,8 +0,0 @@
|
|||
from starlette.config import Config
|
||||
|
||||
config = Config(".env")
|
||||
|
||||
DEBUG = config("DEBUG", cast=bool, default=False)
|
||||
DATABASE = config("DATABASE_URL", cast=databases.DatabaseURL)
|
||||
SECRET_KEY = config("SECRET_KEY", cast=Secret)
|
||||
ALLOWED_HOSTS = config("ALLOWED_HOSTS", cast=CommaSeparatedStrings)
|
|
@ -9,18 +9,27 @@ import tomli_w
|
|||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@dataclasses.dataclass
|
||||
import rgpio
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class Pin:
|
||||
chip: str
|
||||
number: int
|
||||
name: str
|
||||
@property
|
||||
def id(self) -> str:
|
||||
return f"{self.chip}-{self.number}"
|
||||
|
||||
@dataclasses.dataclass
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class Relay:
|
||||
pin: Pin
|
||||
name: str
|
||||
state: bool
|
||||
|
||||
@property
|
||||
def id(self) -> str:
|
||||
return f"{self.pin.id}"
|
||||
|
||||
def _deserialize(data: List[Dict[str, str]]) -> List[Relay]:
|
||||
"Deserialize a list of relays from the config."
|
||||
return [Relay(
|
||||
|
@ -29,10 +38,12 @@ def _deserialize(data: List[Dict[str, str]]) -> List[Relay]:
|
|||
chip=relay["chip"],
|
||||
number=int(relay["pinnumber"]),
|
||||
name=relay["pinname"],
|
||||
)) for relay in data]
|
||||
),
|
||||
state=False) for relay in data]
|
||||
|
||||
def _serialize(data: List[Relay]) -> List[Dict[str, str]]:
|
||||
"Serialize a list of relays to the config."
|
||||
LOGGER.info("Serializing %s", data)
|
||||
return [{
|
||||
"chip": relay.pin.chip,
|
||||
"name": relay.name,
|
||||
|
@ -40,71 +51,87 @@ def _serialize(data: List[Relay]) -> List[Dict[str, str]]:
|
|||
"pinname": relay.pin.name,
|
||||
} for relay in data]
|
||||
|
||||
class Relays:
|
||||
class Manager:
|
||||
"Class for interacting with relays."
|
||||
def __init__(self, configpath: pathlib.Path):
|
||||
self.configpath = configpath
|
||||
def __init__(self, configpath: pathlib.Path, has_fakes: bool):
|
||||
try:
|
||||
with open(configpath, "rb") as f:
|
||||
content = tomllib.load(f)
|
||||
self.config = content
|
||||
except Exception as e:
|
||||
LOGGER.info("Unable to load config file: %s", e)
|
||||
self.config = _read_config(configpath)
|
||||
except FileNotFoundError:
|
||||
self.config = {
|
||||
"relays": [],
|
||||
"rgpio": {"host": "localhost", "port": 8889},
|
||||
"relays": [{
|
||||
"chip": -1,
|
||||
"pin": "FAKE-P1",
|
||||
"name": "Fake",
|
||||
}],
|
||||
}
|
||||
self.relays = _deserialize(self.config["relays"])
|
||||
# Immediately write out the default config
|
||||
_write_config(configpath, self.config)
|
||||
self.configpath = configpath
|
||||
self.has_fakes = has_fakes
|
||||
|
||||
# Connection to single-board computer GPIO
|
||||
self.sbc = None
|
||||
|
||||
def __iter__(self):
|
||||
"Provide an iterator for the relays."
|
||||
return iter(self.relays)
|
||||
|
||||
def connect(self) -> None:
|
||||
self.sbc = rgpio.sbc(
|
||||
host=self.config["rgpio"]["host"],
|
||||
port=self.config["rgpio"]["port"],
|
||||
)
|
||||
|
||||
def chips_list(self) -> List[str]:
|
||||
raise NotImplementedError()
|
||||
if self.has_fakes:
|
||||
return ["fakeA", "fakeB"]
|
||||
|
||||
def get_relay_by_pin_id(self, pin_id: str) -> Relay:
|
||||
for relay in self.relays:
|
||||
if relay.pin.id == pin_id:
|
||||
return relay
|
||||
return None
|
||||
|
||||
def pins_list(self) -> List[Pin]:
|
||||
raise NotImplementedError()
|
||||
if self.has_fakes:
|
||||
return [
|
||||
Pin(chip="fakeA", number=1, name="CON2-P1"),
|
||||
Pin(chip="fakeA", number=2, name="CON2-P2"),
|
||||
Pin(chip="fakeA", number=3, name="CON2-P10"),
|
||||
Pin(chip="fakeB", number=1, name="CON2-P3"),
|
||||
Pin(chip="fakeB", number=2, name="CON2-P7"),
|
||||
]
|
||||
|
||||
|
||||
def relay_add(self, chip: str, pinnumber: int, name: str) -> None:
|
||||
self.config["relays"].append({
|
||||
"chip": chip,
|
||||
"name": name,
|
||||
"pinnumber": pinnumber,
|
||||
})
|
||||
with open(self.configpath, "wb") as f:
|
||||
tomli_w.dump({
|
||||
"relays": _serialize(self.config["relays"]),
|
||||
}, f)
|
||||
LOGGER.info("Wrote config file %s", self.configpath)
|
||||
LOGGER.info("Creating relay %s %s with name %s", chip, pinnumber, name)
|
||||
self.relays.append(Relay(
|
||||
name=name,
|
||||
pin=Pin(
|
||||
chip=chip,
|
||||
number=pinnumber,
|
||||
name=name,
|
||||
),
|
||||
state=False))
|
||||
self._write_config()
|
||||
|
||||
def relays_list(self) -> List[Relay]:
|
||||
return self.relays
|
||||
|
||||
def relay_on(self, name: str) -> bool:
|
||||
raise NotImplementedError()
|
||||
def relay_set(self, relay: Relay, state: bool) -> None:
|
||||
if not relay.pin.chip.startswith("fake"):
|
||||
pass
|
||||
|
||||
def relay_set(self, name: str, state: bool) -> None:
|
||||
raise NotImplementedError()
|
||||
def shutdown(self) -> None:
|
||||
_write_config(self.configpath, self.config)
|
||||
|
||||
class RelaysFake(Relays):
|
||||
"Class for fake relays, useful for testing."
|
||||
def chips_list(self) -> List[str]:
|
||||
return ["chipA", "chipB"]
|
||||
|
||||
def pins_list(self) -> List[Pin]:
|
||||
return [
|
||||
Pin(chip="chipA", number=1, name="CON2-P1"),
|
||||
Pin(chip="chipA", number=2, name="CON2-P2"),
|
||||
Pin(chip="chipA", number=3, name="CON2-P10"),
|
||||
Pin(chip="chipB", number=1, name="CON2-P3"),
|
||||
Pin(chip="chipB", number=2, name="CON2-P7"),
|
||||
]
|
||||
def _read_config(configpath: pathlib.Path) -> None:
|
||||
with open(configpath, "rb") as f:
|
||||
content = tomllib.load(f)
|
||||
LOGGER.info("Read config from %s", configpath)
|
||||
return content
|
||||
|
||||
def relays_list(self) -> List[Relay]:
|
||||
raise NotImplementedError()
|
||||
|
||||
def relay_on(self, name: str) -> bool:
|
||||
raise NotImplementedError()
|
||||
def _write_config(configpath: pathlib.Path, config) -> None:
|
||||
with open(configpath, "wb") as f:
|
||||
tomli_w.dump(config, f)
|
||||
LOGGER.info("Wrote config file %s", configpath)
|
||||
|
||||
def relay_set(self, name: str, state: bool) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
class RelaysReal(Relays):
|
||||
"Class for controlling real relays."
|
||||
|
|
|
@ -1,48 +1,78 @@
|
|||
from fastapi import FastAPI, Request
|
||||
from contextlib import asynccontextmanager
|
||||
import logging
|
||||
|
||||
from fastapi import FastAPI, Form, Request
|
||||
from fastapi.responses import HTMLResponse
|
||||
from fastapi.templating import Jinja2Templates
|
||||
from starlette.responses import RedirectResponse
|
||||
|
||||
import jinja2
|
||||
|
||||
from pnpdevice.config import config
|
||||
from pnpdevice import settings
|
||||
from pnpdevice.relays import Manager
|
||||
import pnpdevice.discovery
|
||||
import pnpdevice.relays
|
||||
|
||||
app = FastAPI(debug=config.DEBUG)
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
relays.connect()
|
||||
yield
|
||||
relays.shutdown()
|
||||
|
||||
app = FastAPI(debug=settings.DEBUG, lifespan=lifespan)
|
||||
relays = Manager(settings.CONFIGFILE, settings.RELAYS_FAKE)
|
||||
templates = Jinja2Templates(directory="templates")
|
||||
|
||||
@app.get("/")
|
||||
def index():
|
||||
relays = request.app["relays"]
|
||||
@app.get("/", response_class=HTMLResponse)
|
||||
def index(request: Request):
|
||||
return templates.TemplateResponse("index.template.html", {
|
||||
"relays": relays.relays_list(),
|
||||
"request": request,
|
||||
"relays": relays,
|
||||
})
|
||||
|
||||
@app.get("/relay/create")
|
||||
def relay_create_get(request: Request):
|
||||
"Get the form to create a new relay."
|
||||
relays = request.app["relays"]
|
||||
pins = relays.pins_list()
|
||||
return templates.TemplateResponse("relay-create.template.html", {"pins": pins})
|
||||
return templates.TemplateResponse("relay-create.template.html", {
|
||||
"request": request,
|
||||
"pins": pins
|
||||
})
|
||||
|
||||
async def relay_create_post(request: Request, chip_and_number: str, name: str):
|
||||
"Create a new relay."
|
||||
chip, number = chip_and_number.partition("-")
|
||||
LOGGER.info("Creating relay %s %s with name %s", chip, number, name)
|
||||
@app.post("/relay/create")
|
||||
def relay_create_post(
|
||||
request: Request,
|
||||
chip_and_number: str = Form(...),
|
||||
name: str = Form(...),
|
||||
):
|
||||
"Create a new relay from form POST."
|
||||
chip, _, number = chip_and_number.partition("-")
|
||||
number = int(number)
|
||||
relays.relay_add(chip, number, name)
|
||||
return RedirectResponse(status_code=303, url="/")
|
||||
|
||||
@app.get("/relay/{pin_id}")
|
||||
def relay_detail_get(request: Request, pin_id: str):
|
||||
"Get details on a single relay."
|
||||
relay = relays.get_relay_by_pin_id(pin_id)
|
||||
return templates.TemplateResponse("relay-detail.template.html", {
|
||||
"request": request,
|
||||
"relay": relay
|
||||
})
|
||||
|
||||
@app.post("/relay/create")
|
||||
def relay_state_post(
|
||||
request: Request,
|
||||
pin_id: str = Form(...),
|
||||
state: str = Form(...),
|
||||
):
|
||||
"Change the state of a relay."
|
||||
relay = relays.get_relay_by_pin_id(pin_id)
|
||||
relays.relay_set()
|
||||
return RedirectResponse(status_code=303, url="/")
|
||||
|
||||
async def status(request: Request):
|
||||
return html("Status")
|
||||
|
||||
def run(relays: pnpdevice.relays.Relays):
|
||||
"Run the embedded web server"
|
||||
app = web.Application()
|
||||
app["relays"] = relays
|
||||
aiohttp_jinja2.setup(app,
|
||||
loader=jinja2.FileSystemLoader("templates"))
|
||||
app.on_startup.append(pnpdevice.discovery.handle)
|
||||
app.add_routes([web.get("/", index)])
|
||||
app.add_routes([web.get("/relay/create", relay_create_get)])
|
||||
app.add_routes([web.post("/relay/create", relay_create_post)])
|
||||
web.run_app(app)
|
||||
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
from pathlib import Path
|
||||
from starlette.config import Config
|
||||
from starlette.datastructures import CommaSeparatedStrings
|
||||
|
||||
config = Config(".env")
|
||||
|
||||
CONFIGFILE = config("CONFIGFILE", cast=Path, default="/etc/pnpdevice.toml")
|
||||
DEBUG = config("DEBUG", cast=bool, default=False)
|
||||
RELAYS_FAKE = config("RELAYS_FAKE", cast=bool, default=False)
|
||||
# SECRET_KEY = config("SECRET_KEY", cast=Secret)
|
||||
# ALLOWED_HOSTS = config("ALLOWED_HOSTS", cast=CommaSeparatedStrings)
|
|
@ -5,6 +5,7 @@ authors = [
|
|||
]
|
||||
dependencies = [
|
||||
"fastapi",
|
||||
"python-multipart",
|
||||
"tomli-w",
|
||||
"uvicorn[standard]",
|
||||
"zeroconf",
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
<h2>Relays</h2>
|
||||
<ul>
|
||||
{% for relay in relays %}
|
||||
<li>{{ relay.name }}</li>
|
||||
<li><a href="/relay/{{ relay.id }}">{{ relay.name }}</a> - {{ relay.state }}</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
{% endif %}
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
<h1>Relay Creation</h1>
|
||||
<form method="POST" action="/relay/create">
|
||||
<input type="text" name="name" placeholder="Pool Pump 1"></input>
|
||||
<select name="chip-and-number">
|
||||
<select name="chip-and-number">:
|
||||
{% for pin in pins %}
|
||||
<option value="{{ pin.chip }}-{{ pin.number }}">{{ pin.name }}</option>
|
||||
{% endfor %}
|
||||
|
|
Loading…
Reference in New Issue