Push commands to a central processing queue

This commit is contained in:
Eli Ribble 2023-06-19 19:40:41 -07:00
parent 3b4667d9c8
commit 61f4e2cdae
1 changed files with 14 additions and 3 deletions

View File

@ -1,5 +1,6 @@
import asyncio import asyncio
import asyncio.streams import asyncio.streams
import functools
import logging import logging
from typing import Iterable from typing import Iterable
@ -30,7 +31,7 @@ def main():
except KeyboardInterrupt: except KeyboardInterrupt:
LOGGER.info("Exiting") LOGGER.info("Exiting")
async def on_connect(reader, writer): async def on_connect(command_queue: asyncio.Queue, reader, writer):
LOGGER.info("connected") LOGGER.info("connected")
try: try:
connection = await handshake(reader, writer) connection = await handshake(reader, writer)
@ -41,7 +42,7 @@ async def on_connect(reader, writer):
continue continue
LOGGER.debug("Got data: '%s'", data) LOGGER.debug("Got data: '%s'", data)
command = control_pb2.Command.FromString(data) command = control_pb2.Command.FromString(data)
LOGGER.info("Command: %s", command) await command_queue.put(command)
except ClientError: except ClientError:
LOGGER.info("Client error") LOGGER.info("Client error")
@ -73,8 +74,18 @@ async def handshake(reader, writer) -> Connection:
return Connection( return Connection(
reader, writer, namespace, app_name, app_version, chosen_format, client_key) reader, writer, namespace, app_name, app_version, chosen_format, client_key)
async def processor(command_queue):
while True:
command = await command_queue.get()
if command:
LOGGER.info("Processing %s", command)
await asyncio.sleep(0)
async def run(): async def run():
server = await asyncio.start_server(on_connect, host="localhost", port=9988) command_queue = asyncio.Queue()
command_processor = asyncio.create_task(processor(command_queue))
client_handler = functools.partial(on_connect, command_queue)
server = await asyncio.start_server(client_handler, host="localhost", port=9988)
async with server: async with server:
try: try:
await server.serve_forever() await server.serve_forever()