162 lines
4.7 KiB
Python
Executable File
162 lines
4.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import argparse
|
|
import dataclasses
|
|
import sys
|
|
from typing import Dict
|
|
import time
|
|
|
|
import rgpio
|
|
# From test C program using wiringPi
|
|
# RELAY_PINS = [4, 17, 27, 18, 22, 23, 24, 25]
|
|
# Physical pins
|
|
# PINS = [7, 11, 13, 15, 12, 16, 18, 22]
|
|
# From Banana Pi Zero M2 pinout
|
|
RELAY_PINS = ["CON2-P07", "CON2-P11", "CON2-P13", "CON2-P15", "CON2-P12", "CON2-P16", "CON2-P18", "CON2-P22"]
|
|
# con2-p22 ?
|
|
# p18: waterfall
|
|
# p16: filter pump, salt cell
|
|
|
|
#lgpio.h:#define LG_GPIO_IS_KERNEL 1
|
|
#lgpio.h:#define LG_GPIO_IS_OUTPUT 2
|
|
#lgpio.h:#define LG_GPIO_IS_ACTIVE_LOW 4
|
|
#lgpio.h:#define LG_GPIO_IS_OPEN_DRAIN 8
|
|
#lgpio.h:#define LG_GPIO_IS_OPEN_SOURCE 16
|
|
#lgpio.h:#define LG_GPIO_IS_PULL_UP 32
|
|
#lgpio.h:#define LG_GPIO_IS_PULL_DOWN 64
|
|
#lgpio.h:#define LG_GPIO_IS_PULL_NONE 128
|
|
#lgpio.h:#define LG_GPIO_IS_INPUT 65536
|
|
#lgpio.h:#define LG_GPIO_IS_RISING_EDGE 131072
|
|
#lgpio.h:#define LG_GPIO_IS_FALLING_EDGE 262144
|
|
#lgpio.h:#define LG_GPIO_IS_REALTIME_CLOCK 524288
|
|
|
|
FLAG_TO_NAME = {
|
|
1: "kernel",
|
|
2: "output",
|
|
4: "active_low",
|
|
8: "open_drain",
|
|
16: "open_source",
|
|
32: "pull_up",
|
|
64: "pull_down",
|
|
128: "pull_none",
|
|
65536: "input",
|
|
131072: "rising_edge",
|
|
262144: "falling_edge",
|
|
524288: "realtime_clock",
|
|
}
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Relay:
|
|
name: str
|
|
pin: str
|
|
|
|
RELAY_ON = 0
|
|
RELAY_OFF = 1
|
|
|
|
RELAYS = [
|
|
Relay("Light", "CON2-P22"),
|
|
Relay("Waterfall", "CON2-P18"),
|
|
Relay("Filter", "CON2-P16"),
|
|
Relay("Unknown 2", "CON2-P12"),
|
|
]
|
|
DEVICE = "/dev/ttyUSB0"
|
|
|
|
def alloff(args, sbc, handle, pin_to_line):
|
|
"Turn off all relays"
|
|
for relay in RELAYS:
|
|
line = pin_to_line[relay.pin]
|
|
sbc.gpio_claim_output(handle, line, level=RELAY_OFF)
|
|
print_status(sbc, handle, line)
|
|
|
|
def flags_to_name(flags: int):
|
|
"Get the set of flag names based on the provided flag value"
|
|
result = set()
|
|
for value, name in FLAG_TO_NAME.items():
|
|
if flags & value:
|
|
result.add(name)
|
|
return result
|
|
|
|
def on(args, sbc, handle, pin_to_line):
|
|
"Turn on a single relay"
|
|
name_to_pin = {relay.name.lower(): relay.pin for relay in RELAYS}
|
|
pin = name_to_pin[args.name.lower()]
|
|
line = pin_to_line[pin]
|
|
sbc.gpio_claim_output(handle, line, level=RELAY_ON)
|
|
sbc.gpio_write(handle, line, RELAY_ON)
|
|
print_status(sbc, handle, line)
|
|
print(f"{args.name} now on")
|
|
if args.duration:
|
|
print(f"Waiting {args.duration} minutes ({args.duration / 60.0} hours), then turning off")
|
|
time.sleep(args.duration * 60)
|
|
sbc.gpio_write(handle, line, RELAY_OFF)
|
|
print(f"{args.name} now off")
|
|
print_status(sbc, handle, line)
|
|
|
|
|
|
def status(args, sbc, handle, pin_to_line):
|
|
pin_to_name = {relay.pin: relay.name for relay in RELAYS}
|
|
for pin, name in pin_to_name.items():
|
|
line = pin_to_line[pin]
|
|
print_status(sbc, handle, line)
|
|
|
|
def print_status(sbc, handle, line):
|
|
okay_status, io_number, flags, pin, user = sbc.gpio_get_line_info(handle, line)
|
|
names = flags_to_name(flags)
|
|
if okay_status == 0:
|
|
print(f"{io_number}): {flags} {pin} {user}: {' '.join(sorted(names))}")
|
|
else:
|
|
print(f"Failed to get status for {line}")
|
|
|
|
def map_pin_to_line(sbc, handle):
|
|
start_pin, end_pin, name, driver = sbc.gpio_get_chip_info(handle)
|
|
pin_to_line = {}
|
|
for x in range(start_pin, end_pin):
|
|
okay_status, io_number, flags, pin, user = sbc.gpio_get_line_info(handle, x)
|
|
flag_names = flags_to_name(flags)
|
|
if okay_status != 0:
|
|
print(f"Error getting pin {x}: status {okay_status}")
|
|
continue
|
|
pin_to_line[pin] = x
|
|
return pin_to_line
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
subparsers = parser.add_subparsers(required=True, help="the sub-command to execute")
|
|
|
|
parser_status = subparsers.add_parser("status", help="Show status of the relays")
|
|
parser_status.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to")
|
|
parser_status.set_defaults(func=status)
|
|
|
|
parser_alloff = subparsers.add_parser("alloff", help="Turn off all relays")
|
|
parser_alloff.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to")
|
|
parser_alloff.set_defaults(func=alloff)
|
|
|
|
parser_on = subparsers.add_parser("on", help="Turn on a relay")
|
|
parser_on.add_argument("name", choices=[relay.name for relay in RELAYS])
|
|
parser_on.add_argument("-d", "--duration", type=int, default=0, help="Time, in minutes, to keep the relay on")
|
|
parser_on.add_argument("-H", "--host", default="127.0.0.1", help="The host to connect to")
|
|
parser_on.set_defaults(func=on)
|
|
args = parser.parse_args()
|
|
|
|
sbc = rgpio.sbc(host=args.host)
|
|
if not sbc.connected:
|
|
print("Not connected")
|
|
return -1
|
|
|
|
handle = sbc.gpiochip_open(0) # open /dev/gpiochip0
|
|
if handle < 0:
|
|
print("Error on open")
|
|
return -1
|
|
|
|
pin_to_line = map_pin_to_line(sbc, handle)
|
|
|
|
try:
|
|
args.func(args, sbc, handle, pin_to_line)
|
|
finally:
|
|
sbc.gpiochip_close(handle)
|
|
|
|
return 0
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|