87 lines
3.0 KiB
Python
Executable File
87 lines
3.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import asyncio
|
|
import argparse
|
|
import dataclasses
|
|
import enum
|
|
import ipaddress
|
|
import logging
|
|
import urllib.parse
|
|
|
|
import aiohttp
|
|
|
|
MQTT = "mh=mqtt.ribble.net&ml=1883&mc=DVES_%2506X&mu=device_user&mp=let_device_user_in&mt=tasmota_%2506X&mf=%25prefix%25%2F%25topic%25%2F&save="
|
|
RULE1 = "Rule1%20ON%20Wifi%23Disconnected%20DO%20BACKLOG%20DELAY%203000%20%3B%20RESTART%201%3B%20ENDON%20%20ON%20Mqtt%23Connected%20DO%20BACKLOG%20ENDON%20%20ON%20Mqtt%23Disconnected%20DO%20BACKLOG%20DELAY%203000%20%3B%20RESTART%201%3B%20ENDON"
|
|
|
|
class DeviceStatus(enum.Enum):
|
|
DISCONNECTED = "Disconnected"
|
|
ERROR = "Error"
|
|
OTHER = "Other"
|
|
TASMOTA = "Tasmota"
|
|
TIMEOUT = "Timeout"
|
|
UNKNOWN = "Unknown"
|
|
|
|
@dataclasses.dataclass
|
|
class DeviceReport:
|
|
address: ipaddress.IPv4Address
|
|
status: DeviceStatus
|
|
version: str
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--subnet", default="192.168.1.0/24", type=ipaddress.IPv4Network, help="The subnet to scan")
|
|
parser.add_argument("--timeout", default=10, type=float, help="Timeout in seconds to wait for a connection")
|
|
args = parser.parse_args()
|
|
|
|
asyncio.run(run(args.subnet, args.timeout))
|
|
|
|
async def run(subnet: ipaddress.IPv4Network, timeout: float) -> None:
|
|
if subnet.num_addresses > 256:
|
|
print(f"You are about to scan {subnet.num_addresses} addresses. Continue?")
|
|
return
|
|
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session:
|
|
async with asyncio.TaskGroup() as group:
|
|
tasks = []
|
|
for host in subnet.hosts():
|
|
tasks.append(group.create_task(has_tasmota(session, host)))
|
|
reports = [task.result() for task in tasks]
|
|
tasmotas = [report for report in reports if report.status == DeviceStatus.TASMOTA]
|
|
for tasmota in sorted(tasmotas, key=lambda t: t.address):
|
|
print(tasmota.address, tasmota.version)
|
|
url = f"http://{tasmota.address}/cs?c2=61&c1={RULE1}"
|
|
async with session.get(url) as response:
|
|
print(tasmota.address, response.status)
|
|
print(await response.text())
|
|
url = f"http://{tasmota.address}/mq?{MQTT}"
|
|
async with session.get(url) as response:
|
|
print(tasmota.address, response.status)
|
|
print(await response.text())
|
|
|
|
|
|
async def has_tasmota(session: aiohttp.ClientSession, ip_address: ipaddress.IPv4Address) -> bool:
|
|
"Determine if a given address has a Tasmota device."
|
|
url = f"http://{ip_address}/"
|
|
status = DeviceStatus.UNKNOWN
|
|
version = None
|
|
try:
|
|
async with session.head(url) as response:
|
|
server = response.headers["Server"]
|
|
if server.startswith("Tasmota"):
|
|
version = server.partition("/")[2]
|
|
status = DeviceStatus.TASMOTA
|
|
else:
|
|
status = DeviceStatus.OTHER
|
|
except aiohttp.client_exceptions.ServerDisconnectedError:
|
|
status = DeviceStatus.DISCONNECTED
|
|
except aiohttp.client_exceptions.ClientConnectorError:
|
|
status = DeviceStatus.ERROR
|
|
except asyncio.TimeoutError:
|
|
status = DeviceStatus.TIMEOUT
|
|
return DeviceReport(
|
|
address = ip_address,
|
|
status = status,
|
|
version = version,
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|