From 61f4e2cdae77a4da8dc91a324f24e3db05cf4dc0 Mon Sep 17 00:00:00 2001 From: Eli Ribble Date: Mon, 19 Jun 2023 19:40:41 -0700 Subject: [PATCH] Push commands to a central processing queue --- server.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/server.py b/server.py index 2d171a9..436540c 100644 --- a/server.py +++ b/server.py @@ -1,5 +1,6 @@ import asyncio import asyncio.streams +import functools import logging from typing import Iterable @@ -30,7 +31,7 @@ def main(): except KeyboardInterrupt: LOGGER.info("Exiting") -async def on_connect(reader, writer): +async def on_connect(command_queue: asyncio.Queue, reader, writer): LOGGER.info("connected") try: connection = await handshake(reader, writer) @@ -41,7 +42,7 @@ async def on_connect(reader, writer): continue LOGGER.debug("Got data: '%s'", data) command = control_pb2.Command.FromString(data) - LOGGER.info("Command: %s", command) + await command_queue.put(command) except ClientError: LOGGER.info("Client error") @@ -73,8 +74,18 @@ async def handshake(reader, writer) -> Connection: return Connection( reader, writer, namespace, app_name, app_version, chosen_format, client_key) +async def processor(command_queue): + while True: + command = await command_queue.get() + if command: + LOGGER.info("Processing %s", command) + await asyncio.sleep(0) + async def run(): - server = await asyncio.start_server(on_connect, host="localhost", port=9988) + command_queue = asyncio.Queue() + command_processor = asyncio.create_task(processor(command_queue)) + client_handler = functools.partial(on_connect, command_queue) + server = await asyncio.start_server(client_handler, host="localhost", port=9988) async with server: try: await server.serve_forever()