Make daemon-test work similar to previous 'test', but over the network.

This really is just to make my life a little easier since the daemon is
running and stable on my remote machine and directly editing the
software on that device is obnoxious due to network errors.
This commit is contained in:
Eli Ribble 2023-05-31 09:37:29 -07:00
parent 82c741033d
commit 73e41d90b7
1 changed files with 161 additions and 0 deletions

161
daemon-test.py Executable file
View File

@ -0,0 +1,161 @@
#!/usr/bin/env python3
import argparse
import dataclasses
import sys
from typing import Dict
import time
import rgpio
# From test C program using wiringPi
# RELAY_PINS = [4, 17, 27, 18, 22, 23, 24, 25]
# Physical pins
# PINS = [7, 11, 13, 15, 12, 16, 18, 22]
# From Banana Pi Zero M2 pinout
RELAY_PINS = ["CON2-P07", "CON2-P11", "CON2-P13", "CON2-P15", "CON2-P12", "CON2-P16", "CON2-P18", "CON2-P22"]
# con2-p22 ?
# p18: waterfall
# p16: filter pump, salt cell
#lgpio.h:#define LG_GPIO_IS_KERNEL 1
#lgpio.h:#define LG_GPIO_IS_OUTPUT 2
#lgpio.h:#define LG_GPIO_IS_ACTIVE_LOW 4
#lgpio.h:#define LG_GPIO_IS_OPEN_DRAIN 8
#lgpio.h:#define LG_GPIO_IS_OPEN_SOURCE 16
#lgpio.h:#define LG_GPIO_IS_PULL_UP 32
#lgpio.h:#define LG_GPIO_IS_PULL_DOWN 64
#lgpio.h:#define LG_GPIO_IS_PULL_NONE 128
#lgpio.h:#define LG_GPIO_IS_INPUT 65536
#lgpio.h:#define LG_GPIO_IS_RISING_EDGE 131072
#lgpio.h:#define LG_GPIO_IS_FALLING_EDGE 262144
#lgpio.h:#define LG_GPIO_IS_REALTIME_CLOCK 524288
FLAG_TO_NAME = {
1: "kernel",
2: "output",
4: "active_low",
8: "open_drain",
16: "open_source",
32: "pull_up",
64: "pull_down",
128: "pull_none",
65536: "input",
131072: "rising_edge",
262144: "falling_edge",
524288: "realtime_clock",
}
@dataclasses.dataclass
class Relay:
name: str
pin: str
RELAY_ON = 0
RELAY_OFF = 1
RELAYS = [
Relay("Light", "CON2-P22"),
Relay("Waterfall", "CON2-P18"),
Relay("Filter", "CON2-P16"),
Relay("Unknown 2", "CON2-P12"),
]
DEVICE = "/dev/ttyUSB0"
def alloff(args, sbc, handle, pin_to_line):
"Turn off all relays"
for relay in RELAYS:
line = pin_to_line[relay.pin]
sbc.gpio_claim_output(handle, line, level=RELAY_OFF)
print_status(sbc, handle, line)
def flags_to_name(flags: int):
"Get the set of flag names based on the provided flag value"
result = set()
for value, name in FLAG_TO_NAME.items():
if flags & value:
result.add(name)
return result
def on(args, sbc, handle, pin_to_line):
"Turn on a single relay"
name_to_pin = {relay.name.lower(): relay.pin for relay in RELAYS}
pin = name_to_pin[args.name.lower()]
line = pin_to_line[pin]
sbc.gpio_claim_output(handle, line, level=RELAY_ON)
sbc.gpio_write(handle, line, RELAY_ON)
print_status(sbc, handle, line)
print(f"{args.name} now on")
if args.duration:
print(f"Waiting {args.duration} minutes ({args.duration / 60.0} hours), then turning off")
time.sleep(args.duration * 60)
sbc.gpio_write(handle, line, RELAY_OFF)
print(f"{args.name} now off")
print_status(sbc, handle, line)
def status(args, sbc, handle, pin_to_line):
pin_to_name = {relay.pin: relay.name for relay in RELAYS}
for pin, name in pin_to_name.items():
line = pin_to_line[pin]
print_status(sbc, handle, line)
def print_status(sbc, handle, line):
okay_status, io_number, flags, pin, user = sbc.gpio_get_line_info(handle, line)
names = flags_to_name(flags)
if okay_status == 0:
print(f"{io_number}): {flags} {pin} {user}: {' '.join(sorted(names))}")
else:
print(f"Failed to get status for {line}")
def map_pin_to_line(sbc, handle):
start_pin, end_pin, name, driver = sbc.gpio_get_chip_info(handle)
pin_to_line = {}
for x in range(start_pin, end_pin):
okay_status, io_number, flags, pin, user = sbc.gpio_get_line_info(handle, x)
flag_names = flags_to_name(flags)
if okay_status != 0:
print(f"Error getting pin {x}: status {okay_status}")
continue
pin_to_line[pin] = x
return pin_to_line
def main():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(required=True, help="the sub-command to execute")
parser_status = subparsers.add_parser("status", help="Show status of the relays")
parser_status.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to")
parser_status.set_defaults(func=status)
parser_alloff = subparsers.add_parser("alloff", help="Turn off all relays")
parser_alloff.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to")
parser_alloff.set_defaults(func=alloff)
parser_on = subparsers.add_parser("on", help="Turn on a relay")
parser_on.add_argument("name", choices=[relay.name for relay in RELAYS])
parser_on.add_argument("-d", "--duration", type=int, default=0, help="Time, in minutes, to keep the relay on")
parser_on.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to")
parser_on.set_defaults(func=on)
args = parser.parse_args()
sbc = rgpio.sbc(host=args.host)
if not sbc.connected:
print("Not connected")
return -1
handle = sbc.gpiochip_open(0) # open /dev/gpiochip0
if handle < 0:
print("Error on open")
return -1
pin_to_line = map_pin_to_line(sbc, handle)
try:
args.func(args, sbc, handle, pin_to_line)
finally:
sbc.gpiochip_close(handle)
return 0
if __name__ == "__main__":
sys.exit(main())