From 73e41d90b70e7f33cf74f9aac1e06223f38718ce Mon Sep 17 00:00:00 2001 From: Eli Ribble Date: Wed, 31 May 2023 09:37:29 -0700 Subject: [PATCH] Make daemon-test work similar to previous 'test', but over the network. This really is just to make my life a little easier since the daemon is running and stable on my remote machine and directly editing the software on that device is obnoxious due to network errors. --- daemon-test.py | 161 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 161 insertions(+) create mode 100755 daemon-test.py diff --git a/daemon-test.py b/daemon-test.py new file mode 100755 index 0000000..44505a8 --- /dev/null +++ b/daemon-test.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python3 +import argparse +import dataclasses +import sys +from typing import Dict +import time + +import rgpio +# From test C program using wiringPi +# RELAY_PINS = [4, 17, 27, 18, 22, 23, 24, 25] +# Physical pins +# PINS = [7, 11, 13, 15, 12, 16, 18, 22] +# From Banana Pi Zero M2 pinout +RELAY_PINS = ["CON2-P07", "CON2-P11", "CON2-P13", "CON2-P15", "CON2-P12", "CON2-P16", "CON2-P18", "CON2-P22"] +# con2-p22 ? +# p18: waterfall +# p16: filter pump, salt cell + +#lgpio.h:#define LG_GPIO_IS_KERNEL 1 +#lgpio.h:#define LG_GPIO_IS_OUTPUT 2 +#lgpio.h:#define LG_GPIO_IS_ACTIVE_LOW 4 +#lgpio.h:#define LG_GPIO_IS_OPEN_DRAIN 8 +#lgpio.h:#define LG_GPIO_IS_OPEN_SOURCE 16 +#lgpio.h:#define LG_GPIO_IS_PULL_UP 32 +#lgpio.h:#define LG_GPIO_IS_PULL_DOWN 64 +#lgpio.h:#define LG_GPIO_IS_PULL_NONE 128 +#lgpio.h:#define LG_GPIO_IS_INPUT 65536 +#lgpio.h:#define LG_GPIO_IS_RISING_EDGE 131072 +#lgpio.h:#define LG_GPIO_IS_FALLING_EDGE 262144 +#lgpio.h:#define LG_GPIO_IS_REALTIME_CLOCK 524288 + +FLAG_TO_NAME = { + 1: "kernel", + 2: "output", + 4: "active_low", + 8: "open_drain", + 16: "open_source", + 32: "pull_up", + 64: "pull_down", + 128: "pull_none", + 65536: "input", + 131072: "rising_edge", + 262144: "falling_edge", + 524288: "realtime_clock", +} + + +@dataclasses.dataclass +class Relay: + name: str + pin: str + +RELAY_ON = 0 +RELAY_OFF = 1 + +RELAYS = [ + Relay("Light", "CON2-P22"), + Relay("Waterfall", "CON2-P18"), + Relay("Filter", "CON2-P16"), + Relay("Unknown 2", "CON2-P12"), +] +DEVICE = "/dev/ttyUSB0" + +def alloff(args, sbc, handle, pin_to_line): + "Turn off all relays" + for relay in RELAYS: + line = pin_to_line[relay.pin] + sbc.gpio_claim_output(handle, line, level=RELAY_OFF) + print_status(sbc, handle, line) + +def flags_to_name(flags: int): + "Get the set of flag names based on the provided flag value" + result = set() + for value, name in FLAG_TO_NAME.items(): + if flags & value: + result.add(name) + return result + +def on(args, sbc, handle, pin_to_line): + "Turn on a single relay" + name_to_pin = {relay.name.lower(): relay.pin for relay in RELAYS} + pin = name_to_pin[args.name.lower()] + line = pin_to_line[pin] + sbc.gpio_claim_output(handle, line, level=RELAY_ON) + sbc.gpio_write(handle, line, RELAY_ON) + print_status(sbc, handle, line) + print(f"{args.name} now on") + if args.duration: + print(f"Waiting {args.duration} minutes ({args.duration / 60.0} hours), then turning off") + time.sleep(args.duration * 60) + sbc.gpio_write(handle, line, RELAY_OFF) + print(f"{args.name} now off") + print_status(sbc, handle, line) + + +def status(args, sbc, handle, pin_to_line): + pin_to_name = {relay.pin: relay.name for relay in RELAYS} + for pin, name in pin_to_name.items(): + line = pin_to_line[pin] + print_status(sbc, handle, line) + +def print_status(sbc, handle, line): + okay_status, io_number, flags, pin, user = sbc.gpio_get_line_info(handle, line) + names = flags_to_name(flags) + if okay_status == 0: + print(f"{io_number}): {flags} {pin} {user}: {' '.join(sorted(names))}") + else: + print(f"Failed to get status for {line}") + +def map_pin_to_line(sbc, handle): + start_pin, end_pin, name, driver = sbc.gpio_get_chip_info(handle) + pin_to_line = {} + for x in range(start_pin, end_pin): + okay_status, io_number, flags, pin, user = sbc.gpio_get_line_info(handle, x) + flag_names = flags_to_name(flags) + if okay_status != 0: + print(f"Error getting pin {x}: status {okay_status}") + continue + pin_to_line[pin] = x + return pin_to_line + +def main(): + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers(required=True, help="the sub-command to execute") + + parser_status = subparsers.add_parser("status", help="Show status of the relays") + parser_status.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to") + parser_status.set_defaults(func=status) + + parser_alloff = subparsers.add_parser("alloff", help="Turn off all relays") + parser_alloff.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to") + parser_alloff.set_defaults(func=alloff) + + parser_on = subparsers.add_parser("on", help="Turn on a relay") + parser_on.add_argument("name", choices=[relay.name for relay in RELAYS]) + parser_on.add_argument("-d", "--duration", type=int, default=0, help="Time, in minutes, to keep the relay on") + parser_on.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to") + parser_on.set_defaults(func=on) + args = parser.parse_args() + + sbc = rgpio.sbc(host=args.host) + if not sbc.connected: + print("Not connected") + return -1 + + handle = sbc.gpiochip_open(0) # open /dev/gpiochip0 + if handle < 0: + print("Error on open") + return -1 + + pin_to_line = map_pin_to_line(sbc, handle) + + try: + args.func(args, sbc, handle, pin_to_line) + finally: + sbc.gpiochip_close(handle) + + return 0 + +if __name__ == "__main__": + sys.exit(main())