From 090f55e2258c4553f5e04a136c826002f4d9ae1d Mon Sep 17 00:00:00 2001 From: Eli Ribble Date: Tue, 12 Dec 2023 13:40:25 -0700 Subject: [PATCH] Temporary WIP for adding commands --- control.proto | 19 ++++++++++++++++--- server.py | 47 +++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 55 insertions(+), 11 deletions(-) diff --git a/control.proto b/control.proto index 047b38c..7956be9 100644 --- a/control.proto +++ b/control.proto @@ -22,9 +22,22 @@ message Command { WRITE = 5; } - Type type = 1; + // unique identifier for the command supplied by the client. + // Any response to the command will reference this ID. + string id = 1; + Type type = 2; - optional string table_name = 2; - repeated SchemaEntry schema_entry = 3; + optional string table_name = 3; + repeated SchemaEntry schema_entry = 4; } +message Error { + int32 code = 1; + string message = 2; +} + +message CommandResponse { + string id = 1; + bool success = 2; + optional Error error = 3; +} diff --git a/server.py b/server.py index 436540c..b08bb28 100644 --- a/server.py +++ b/server.py @@ -1,5 +1,6 @@ import asyncio import asyncio.streams +import dataclasses import functools import logging from typing import Iterable @@ -14,6 +15,11 @@ SUPPORTED_FORMATS = [ "json", ] +SCHEMA_TYPE_TO_PYTHON_TYPE = { + control_pb2.SchemaEntry.Type.STRING: str, + control_pb2.SchemaEntry.Type.INT: int, +} + class Connection: def __init__(self, reader, writer, namespace: str, app_name: str, app_version: str, chosen_format: str, client_key: str): self.reader = reader @@ -42,7 +48,7 @@ async def on_connect(command_queue: asyncio.Queue, reader, writer): continue LOGGER.debug("Got data: '%s'", data) command = control_pb2.Command.FromString(data) - await command_queue.put(command) + await command_queue.put((command, write)) except ClientError: LOGGER.info("Client error") @@ -74,16 +80,41 @@ async def handshake(reader, writer) -> Connection: return Connection( reader, writer, namespace, app_name, app_version, chosen_format, client_key) -async def processor(command_queue): - while True: - command = await command_queue.get() - if command: - LOGGER.info("Processing %s", command) - await asyncio.sleep(0) +class DataManager: + def __init__(self): + self.schema = {} + self.process_task = None + + def process(self, command_queue: asyncio.Queue) -> None: + self.process_task = asyncio.create_task(self._processor(command_queue)) + + async def _processor(self, command_queue: asyncio.Queue) -> None: + while True: + command, writer = await command_queue.get() + if command: + LOGGER.info("Processing %s", command) + if command.type == control_pb2.Command.Type.SCHEMA_WRITE: + _write_schema(command.table_name, command.schema_entry) + result = control_pb2.CommandResponse( + id=command.id, + + writer.write( + await asyncio.sleep(0) + + def _write_schema(self, table_name: str, schema_entries: Iterable[control_pb2.SchemaEntry]) -> None: + table_schema = self.schema.get(table_name, {}) + # Probably should check I'm not overwriting schema + for entry in schema_entries: + table_schema[entry.name] = SCHEMA_TYPE_TO_PYTHON_TYPE[entry.type] + +def write_schema(table_name: str, schema_entries) -> None: + LOGGER.info("Writing schumea: %s", schema_entries) + async def run(): command_queue = asyncio.Queue() - command_processor = asyncio.create_task(processor(command_queue)) + data_manager = DataManager() + data_manager.process(command_queue) client_handler = functools.partial(on_connect, command_queue) server = await asyncio.start_server(client_handler, host="localhost", port=9988) async with server: