Compare commits

..

7 Commits

Author SHA1 Message Date
Eli Ribble 57d8ff0b39 Checkpoint, switching computers. 2023-05-16 19:21:11 -07:00
Eli Ribble c36a32179a This is just work-in-progress. Switching underlying frameworks...
I wanna go FastAPI.
2023-05-15 14:46:58 -07:00
Eli Ribble 320591277f Add sketch of relays, jinja templating. 2023-05-12 15:09:44 -07:00
Eli Ribble 0d1020a3d5 Add aiohttp web server.
We switch to having the server kick off the mDNS discovery process
because aiohttp recommends it and it fits well enough that I don't mind.
2023-05-12 14:47:42 -07:00
Eli Ribble 9f3de33337 Add coroutine for handling mDNS query responses.
We can now discover our device. Maybe. I'm really not sure if I'm doing
the protocol correctly, but it looks pretty good. *shrug*.
2023-05-12 14:31:45 -07:00
Eli Ribble bc79e842e7 Create a basic async loop. 2023-05-12 14:13:27 -07:00
Eli Ribble 409e6152ae Add basic logic for creating a module using flit. 2023-05-12 14:07:58 -07:00
10 changed files with 284 additions and 0 deletions

2
.gitignore vendored
View File

@ -160,3 +160,5 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Virtual env
ve/

2
pnpdevice/__init__.py Normal file
View File

@ -0,0 +1,2 @@
"Software for controlling a pool and pump device."
__version__ = "0.1"

8
pnpdevice/config.py Normal file
View File

@ -0,0 +1,8 @@
from starlette.config import Config
config = Config(".env")
DEBUG = config("DEBUG", cast=bool, default=False)
DATABASE = config("DATABASE_URL", cast=databases.DatabaseURL)
SECRET_KEY = config("SECRET_KEY", cast=Secret)
ALLOWED_HOSTS = config("ALLOWED_HOSTS", cast=CommaSeparatedStrings)

37
pnpdevice/discovery.py Normal file
View File

@ -0,0 +1,37 @@
import logging
import socket
from zeroconf import IPVersion
from zeroconf.asyncio import AsyncServiceInfo, AsyncZeroconf
LOGGER = logging.getLogger(__name__)
# From https://stackoverflow.com/questions/166506/finding-local-ip-addresses-using-pythons-stdlib/
def get_ip() -> str:
"Get the primary IP"
LOGGER.info("Determining IP address")
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0)
try:
# doesn't even have to be reachable
s.connect(('10.254.254.254', 1))
ip = s.getsockname()[0]
except Exception:
ip = '127.0.0.1'
finally:
s.close()
LOGGER.info("IP address seems to be %s", ip)
return ip
async def handle(*args):
"Handle requests for discovery"
ip = get_ip()
info = AsyncServiceInfo(
"_http._tcp.local.",
"pnpdevice._http._tcp.local.",
addresses=[socket.inet_aton("127.0.0.1")],
port=80,
server=ip,
)
aiozc = AsyncZeroconf(ip_version=IPVersion.V4Only)
await aiozc.async_register_service(info)

24
pnpdevice/main.py Normal file
View File

@ -0,0 +1,24 @@
import argparse
import asyncio
import logging
import pathlib
import pnpdevice.server
LOGGER = logging.getLogger(__name__)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", default="/etc/pnpdevice/config.toml", type=pathlib.Path, help="The config file to use.")
parser.add_argument("-s", "--simulate", action="store_true", help="When present, simulate the state of the relays")
parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose logging")
args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
if args.simulate:
relays = pnpdevice.relays.RelaysFake(args.config)
else:
relays = pnpdevice.relays.RelaysFake(args.config)
#relays = pnpdevice.relays.RelaysReal(args.config)
pnpdevice.server.run(relays)

110
pnpdevice/relays.py Normal file
View File

@ -0,0 +1,110 @@
import dataclasses
import logging
import os
import pathlib
import tomllib
from typing import Dict, List, Union
import tomli_w
LOGGER = logging.getLogger(__name__)
@dataclasses.dataclass
class Pin:
chip: str
number: int
name: str
@dataclasses.dataclass
class Relay:
pin: Pin
name: str
state: bool
def _deserialize(data: List[Dict[str, str]]) -> List[Relay]:
"Deserialize a list of relays from the config."
return [Relay(
name=relay["name"],
pin=Pin(
chip=relay["chip"],
number=int(relay["pinnumber"]),
name=relay["pinname"],
)) for relay in data]
def _serialize(data: List[Relay]) -> List[Dict[str, str]]:
"Serialize a list of relays to the config."
return [{
"chip": relay.pin.chip,
"name": relay.name,
"pinnumber": str(relay.pin.number),
"pinname": relay.pin.name,
} for relay in data]
class Relays:
"Class for interacting with relays."
def __init__(self, configpath: pathlib.Path):
self.configpath = configpath
try:
with open(configpath, "rb") as f:
content = tomllib.load(f)
self.config = content
except Exception as e:
LOGGER.info("Unable to load config file: %s", e)
self.config = {
"relays": [],
}
self.relays = _deserialize(self.config["relays"])
def chips_list(self) -> List[str]:
raise NotImplementedError()
def pins_list(self) -> List[Pin]:
raise NotImplementedError()
def relay_add(self, chip: str, pinnumber: int, name: str) -> None:
self.config["relays"].append({
"chip": chip,
"name": name,
"pinnumber": pinnumber,
})
with open(self.configpath, "wb") as f:
tomli_w.dump({
"relays": _serialize(self.config["relays"]),
}, f)
LOGGER.info("Wrote config file %s", self.configpath)
def relays_list(self) -> List[Relay]:
return self.relays
def relay_on(self, name: str) -> bool:
raise NotImplementedError()
def relay_set(self, name: str, state: bool) -> None:
raise NotImplementedError()
class RelaysFake(Relays):
"Class for fake relays, useful for testing."
def chips_list(self) -> List[str]:
return ["chipA", "chipB"]
def pins_list(self) -> List[Pin]:
return [
Pin(chip="chipA", number=1, name="CON2-P1"),
Pin(chip="chipA", number=2, name="CON2-P2"),
Pin(chip="chipA", number=3, name="CON2-P10"),
Pin(chip="chipB", number=1, name="CON2-P3"),
Pin(chip="chipB", number=2, name="CON2-P7"),
]
def relays_list(self) -> List[Relay]:
raise NotImplementedError()
def relay_on(self, name: str) -> bool:
raise NotImplementedError()
def relay_set(self, name: str, state: bool) -> None:
raise NotImplementedError()
class RelaysReal(Relays):
"Class for controlling real relays."

48
pnpdevice/server.py Normal file
View File

@ -0,0 +1,48 @@
from fastapi import FastAPI, Request
from fastapi.templating import Jinja2Templates
from starlette.responses import RedirectResponse
import jinja2
from pnpdevice.config import config
import pnpdevice.discovery
import pnpdevice.relays
app = FastAPI(debug=config.DEBUG)
templates = Jinja2Templates(directory="templates")
@app.get("/")
def index():
relays = request.app["relays"]
return templates.TemplateResponse("index.template.html", {
"relays": relays.relays_list(),
})
@app.get("/relay/create")
def relay_create_get(request: Request):
"Get the form to create a new relay."
relays = request.app["relays"]
pins = relays.pins_list()
return templates.TemplateResponse("relay-create.template.html", {"pins": pins})
async def relay_create_post(request: Request, chip_and_number: str, name: str):
"Create a new relay."
chip, number = chip_and_number.partition("-")
LOGGER.info("Creating relay %s %s with name %s", chip, number, name)
return RedirectResponse(status_code=303, url="/")
async def status(request: Request):
return html("Status")
def run(relays: pnpdevice.relays.Relays):
"Run the embedded web server"
app = web.Application()
app["relays"] = relays
aiohttp_jinja2.setup(app,
loader=jinja2.FileSystemLoader("templates"))
app.on_startup.append(pnpdevice.discovery.handle)
app.add_routes([web.get("/", index)])
app.add_routes([web.get("/relay/create", relay_create_get)])
app.add_routes([web.post("/relay/create", relay_create_post)])
web.run_app(app)

27
pyproject.toml Normal file
View File

@ -0,0 +1,27 @@
[project]
name = "pnpdevice"
authors = [
{ name = "Eli Ribble", email = "eli@theribbles.org"}
]
dependencies = [
"fastapi",
"tomli-w",
"uvicorn[standard]",
"zeroconf",
]
dynamic = ["version", "description"]
[project.scripts]
pnpdevice = "pnpdevice:main.main"
[project.urls]
Home = "https://source.theribbles.org/eliribble/pnpdevice"
[project.optional-dependencies]
dev = [
"pre-commit",
]
[build-system]
build-backend = "flit_core.buildapi"
requires = ["flit_core >=3.2,<4"]

View File

@ -0,0 +1,14 @@
<html>
<h1>Pools'n'Pumps Device</h1>
{% if not relays %}
<h2>No relays found.</h2>
{% else %}
<h2>Relays</h2>
<ul>
{% for relay in relays %}
<li>{{ relay.name }}</li>
{% endfor %}
</ul>
{% endif %}
<a href="/relay/create">Create Relay</a>
</html>

View File

@ -0,0 +1,12 @@
<html>
<h1>Relay Creation</h1>
<form method="POST" action="/relay/create">
<input type="text" name="name" placeholder="Pool Pump 1"></input>
<select name="chip-and-number">
{% for pin in pins %}
<option value="{{ pin.chip }}-{{ pin.number }}">{{ pin.name }}</option>
{% endfor %}
</select>
<button type="submit">Create</button>
</form>
</html>