From 0d33aacf3ff4e0883533c24f1706cb8e749b7d84 Mon Sep 17 00:00:00 2001 From: Eli Ribble Date: Mon, 19 Jun 2023 14:30:28 -0700 Subject: [PATCH 01/10] Initial hello world program. --- main.go | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 main.go diff --git a/main.go b/main.go new file mode 100644 index 0000000..5411621 --- /dev/null +++ b/main.go @@ -0,0 +1,7 @@ +package main + +import "fmt" + +func main() { + fmt.Println("Hello World.") +} From e155420a4b52fb278d8f2ee473cd92a25c4d123e Mon Sep 17 00:00:00 2001 From: Eli Ribble Date: Mon, 19 Jun 2023 16:09:19 -0700 Subject: [PATCH 02/10] Working simple client/server --- client.py | 14 ++++++++++++++ datajack/__init__.py | 29 +++++++++++++++++++++++++++++ main.go | 43 +++++++++++++++++++++++++++++++++++++++---- 3 files changed, 82 insertions(+), 4 deletions(-) create mode 100644 client.py create mode 100644 datajack/__init__.py diff --git a/client.py b/client.py new file mode 100644 index 0000000..75bc147 --- /dev/null +++ b/client.py @@ -0,0 +1,14 @@ +import argparse + +import datajack + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("connection_uri", help="The URI to use to connect to the remove datajack.") + args = parser.parse_args() + + with datajack.connection(args.connection_uri) as dj: + dj.send("hi".encode("UTF-8")) + +if __name__ == "__main__": + main() diff --git a/datajack/__init__.py b/datajack/__init__.py new file mode 100644 index 0000000..3cd619b --- /dev/null +++ b/datajack/__init__.py @@ -0,0 +1,29 @@ +import socket +import urllib.parse + +class Connection: + def __init__(self, uri): + parts = urllib.parse.urlparse(uri) + netloc = parts.netloc + self.host, _, self.port = netloc.partition(":") + + + def __enter__(self): + self.connect() + return self + + def __exit__(self, exc_typ, exc_val, exc_tb): + pass + + def connect(self): + self.socket = socket.socket() + self.socket.connect((self.host, int(self.port))) + + def disconnect(self): + pass + + def send(self, data): + self.socket.send(data) + +def connection(uri) -> Connection: + return Connection(uri) diff --git a/main.go b/main.go index 5411621..9b40c8e 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,42 @@ +// socket-server project main.go package main - -import "fmt" - +import ( + "fmt" + "net" + "os" +) +const ( + SERVER_HOST = "localhost" + SERVER_PORT = "9988" + SERVER_TYPE = "tcp" +) func main() { - fmt.Println("Hello World.") + fmt.Println("Server Running...") + server, err := net.Listen(SERVER_TYPE, SERVER_HOST+":"+SERVER_PORT) + if err != nil { + fmt.Println("Error listening:", err.Error()) + os.Exit(1) + } + defer server.Close() + fmt.Println("Listening on " + SERVER_HOST + ":" + SERVER_PORT) + fmt.Println("Waiting for client...") + for { + connection, err := server.Accept() + if err != nil { + fmt.Println("Error accepting: ", err.Error()) + os.Exit(1) + } + fmt.Println("client connected") + go processClient(connection) + } +} +func processClient(connection net.Conn) { + buffer := make([]byte, 1024) + mLen, err := connection.Read(buffer) + if err != nil { + fmt.Println("Error reading:", err.Error()) + } + fmt.Println("Received: ", string(buffer[:mLen])) + _, err = connection.Write([]byte("Thanks! Got your message:" + string(buffer[:mLen]))) + connection.Close() } From 3f08a95db07717321ebbb91d6da0470c0a784b4f Mon Sep 17 00:00:00 2001 From: Eli Ribble Date: Mon, 19 Jun 2023 17:53:00 -0700 Subject: [PATCH 03/10] Add gitignore for Python directories --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bec0b52 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +ve/ +__pycache__/ From 39ff9c3a8ee64bd2b098c2a620a101d50b023c7e Mon Sep 17 00:00:00 2001 From: Eli Ribble Date: Mon, 19 Jun 2023 17:53:12 -0700 Subject: [PATCH 04/10] Add some ideas in a README --- README.md | 111 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 110 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index fe831a9..9a538b5 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,112 @@ # gibtar +The point is can we create something that is database-like without being a traditional database? Here's what we are looking for: -Stores user data across applications \ No newline at end of file + 1. Applications are given access to the fortress on a limited basis. Think "I'm using your webapp, but I keep the data" + 1. Single user, but multi-application/agent by nature. + * My fortress is _mine_. Just me. One human + * I can have an unlimited number of computational agents read/write the data. + * When I share data with another human, you get a copy, not a link. + 1. No usage of a query language like SQL. All access is programmatic. + 1. ACID compliant transactions + 1. Browser-friendly API + * non-password crypto-auth + * AJAX/HTML/fetch friendly requests + * pipelining + * JSON, probably >:P + 1. Data is never deleted. Time-travel is built-in + * Kinda like Datomic? + +## Protocol concerns + + * Built-in behavior for dealing with a cluster + * Network address negotiation (can we use a shorter/closer address? Is a local socket available?) + * Key exchange. Every client is uniquely identified. + * Schema validation since we will support code generation. + * Schema download + * Schema upgrade/manipulation + +## Permissions concerns + * 2 apps are sharing data. + * Badly-behaving app B can't destroy data because we never delete data, we just get a new version + * It could be _very_ badly behaving and could make tons of hostile changes for the user to sift through + * User-requested rollback based on the client app that made the change? Seems very doable. + * Locking semantics? + * Behaviors that need permissions + * Reading data + * By application-level concept + * No reason to separate out schema reading, you need the schema if you're getting the data. + * No reason to separate out querying from iterating, it's just a question of speed. + * No reason to separate subscribing to updates vs + * Writing data + * By application-level concept + * All writes are tagged by application for auditing. + * Altering schema + * Needs heavy warnings, apps need to *really* know what they are doing if they alter schema from another app + * Adding new concepts seems like it should be safe, apps share a namespace but don't share tables. + * Querying the set of available namespaces - can leak data of other applications used. Useful to select plugins on the app's side. + +## Design Questions + * When defining a schema we could force the schema definer to not just work at the layer of "tables, indices, relationships" but also "permissions/logical units". The idea here is that if the application is highly denormalized we don't want the end user to have to understand that it takes 15 tables to contain "user data" but instead we want there to be a concept of a User within the schema and all the information the User contains so a communicating app can request "Access to read users". + * How burdensome would this be on developers? What are the downsides if the developers are lazy or get it wrong? + * Can we defend against nefarious data exfiltration? + * Hard problem, sandboxing is probably the only valid technical solution here. Apps can exfiltrate via crafted DNS queries, so....the only way to prevent exfiltration is to sandbox. + * At-rest encryption by client key? + * Seems like it may really hurt performance... and for questionable gain. + * At what point do we handle permissions? + * We want it done all up-front so we can ask for the user's consent over some channel and do that once. + * We need to enumerate the app-level concepts for the connection. + +## Handshake + * We want to reduce the amount of back-and-forth for latency reasons. Performance matters. + * We want to achieve the following in the handshake: + * Establish the validity of the client + * Determine what data format (protobuf, json) to use when speaking with the client + * Set the permissions scope of the connection + * Set the namespace for the purposes of sharing data between applications + * Store information about the client itself for auditing purposes (name, version, cert) + * Determine if the data schema is one the client recognizes and can work with + * We should not assume schema version semantics - there is no v1.1 v1.2 and the assumption that the client can work with anything below v2.0. That's silly. However, we _vastly prefer_ to be explicit about what schema the client can handle because otherwise we are working on implicit assumptions codified in the imperative code of the client. That's dangerous too. Can we use protobuf-style "I ignore fields I don't understand"? + * Figure out the most efficient path between the client and server to avoid unnecessary network hops. + * Is this necessary, or does the operating system do this for us? + * This includes redirecting to another process or server in load-balanced applications. + +## Client features + * Query data lots of neat ways + * Subscribe to updates of a particular query, get pushed data on those updates + +## Server features + * Triggers? + +## 2 Apps, 1 Schema + +You have App A and App B. They are sharing data with each other about todo list items. At some point T they both fully agree on the schema. + + 1. Do both apps have to fully agree on the schema for all time T? + + If the answer to this is "yes" then the two must be upgradede in lock-step with each other in order to continue using both. If A upgrades first it keeps working and B stops working until it upgrades. Syncing between the app developers is super hard. + Let's _not_ make the answer "yes". + Since the answer is "no" we need to expand the conditions or the behaviors during different types of disagreement. What are all the kinds of disagreement? + + _A knows of a table B does not._ + + This is fine, does not hurt B. + + _A knows of a field B does not._ + + This is fine too. + + _A and B disagree where a piece of data should be kept_ + + This is an interesting case because it _may_ be a situation where both apps have fields the other app does not know about where the data should be kept. That is most likely the case, since if B wants to put the data in field Z and A knows about Z then A should know to put the data there. + + There is likely not an automated way to deal with this - the storage engine can't understand the semantics of a given data field without something like AGI. Concievably a user could fix this by giving them access to tools to remap data labels, but that seems crazy for most users. + + Ultimately what you need is an authority on what the correct representation of the data is. This leads to the idea that every schema should have one-and-only-one owning application, and zero-or-more integrating applications. If there's a conflict, the owner is right and the integrator is wrong. + + No, that's bad, because ultimately we want the user to own the data and concievably if they stopped using application A they could just use B and the data would keep working. + + well, now I guess we need a standards body or something. + + _A believes that a field has a different type than what B believes_ + + For this case we can thankfully detect the conflict and inform the app. But then what? It should abort, I guess... From e227d349b230b191945beaeb26218d713cd962fb Mon Sep 17 00:00:00 2001 From: Eli Ribble Date: Mon, 19 Jun 2023 18:03:21 -0700 Subject: [PATCH 05/10] Add the start of a Python server implementation. I'm on a plane, I can't be looking up golang stuff. --- server.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 server.py diff --git a/server.py b/server.py new file mode 100644 index 0000000..4207bd3 --- /dev/null +++ b/server.py @@ -0,0 +1,22 @@ +import asyncio +import asyncio.streams +import logging + +LOGGER = logging.getLogger("server") + +def main(): + logging.basicConfig(level=logging.DEBUG) + asyncio.run(run()) + +async def on_connect(reader, writer): + LOGGER.info("connected") + data = await reader.read() + print(data.decode("UTF-8")) + +async def run(): + server = await asyncio.start_server(on_connect, host="localhost", port=9988) + async with server: + await server.serve_forever() + +if __name__ == "__main__": + main() From 461fc7ce0b034fd51fbeb2fe798a4292b219a671 Mon Sep 17 00:00:00 2001 From: Eli Ribble Date: Mon, 19 Jun 2023 18:10:47 -0700 Subject: [PATCH 06/10] Add magic check --- server.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/server.py b/server.py index 4207bd3..7f28be3 100644 --- a/server.py +++ b/server.py @@ -4,19 +4,32 @@ import logging LOGGER = logging.getLogger("server") +MAGIC = "e8437140-4347-48cc-a31d-dcdc944ffc16" + def main(): logging.basicConfig(level=logging.DEBUG) asyncio.run(run()) async def on_connect(reader, writer): LOGGER.info("connected") - data = await reader.read() - print(data.decode("UTF-8")) + data = await reader.read(36) + magic = data.decode("UTF-8") + LOGGER.debug("Received magic '%s'", magic) + if magic != MAGIC: + writer.write("ERR1: Magic not recognized".encode("UTF-8")) + writer.close() + LOGGER.info("Bad magic, closing connection.") + return + LOGGER.debug("Magic looks good.") + async def run(): server = await asyncio.start_server(on_connect, host="localhost", port=9988) async with server: - await server.serve_forever() + try: + await server.serve_forever() + except KeyboardInterrupt: + LOGGER.info("Exiting at user request.") if __name__ == "__main__": main() From aadc8520bb2c26fd35f6a45b8b34b56a9939dfc2 Mon Sep 17 00:00:00 2001 From: Eli Ribble Date: Mon, 19 Jun 2023 19:15:08 -0700 Subject: [PATCH 07/10] Working protobuf passing. This is barely working, really. In fact, I have an "unused" field in my enum because if I don't prowide it the protobuf serializes down to an empty buffer which breaks all kinds of things. --- README.md | 12 ++++++- client.py | 8 +++++ control.proto | 19 +++++++++++ datajack/__init__.py | 59 ++++++++++++++++++++++++++++++-- main.go | 15 ++++++-- server.py | 81 ++++++++++++++++++++++++++++++++++++++------ 6 files changed, 179 insertions(+), 15 deletions(-) create mode 100644 control.proto diff --git a/README.md b/README.md index 9a538b5..adeec2a 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,13 @@ The point is can we create something that is database-like without being a tradi * JSON, probably >:P 1. Data is never deleted. Time-travel is built-in * Kinda like Datomic? - + +## Incantations + +``` +protoc --python_out proto control.proto +``` + ## Protocol concerns * Built-in behavior for dealing with a cluster @@ -69,10 +75,14 @@ The point is can we create something that is database-like without being a tradi * Figure out the most efficient path between the client and server to avoid unnecessary network hops. * Is this necessary, or does the operating system do this for us? * This includes redirecting to another process or server in load-balanced applications. + * I think we don't need to specify that we are waiting for user authorization if we don't want to, we can just let the client queue up requests and wait to confirm them. ## Client features * Query data lots of neat ways * Subscribe to updates of a particular query, get pushed data on those updates + * Transactions + * Explicit ordering and unordering of reads/writes + * Multiplexed comms to allow parallel reads/writes over a single connection. ## Server features * Triggers? diff --git a/client.py b/client.py index 75bc147..858fbdb 100644 --- a/client.py +++ b/client.py @@ -1,12 +1,20 @@ import argparse +import logging import datajack +#@datajack.Table +class Todo: + name: str + + def main(): parser = argparse.ArgumentParser() parser.add_argument("connection_uri", help="The URI to use to connect to the remove datajack.") args = parser.parse_args() + logging.basicConfig(level=logging.DEBUG) + with datajack.connection(args.connection_uri) as dj: dj.send("hi".encode("UTF-8")) diff --git a/control.proto b/control.proto new file mode 100644 index 0000000..f7cd620 --- /dev/null +++ b/control.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package datafortress; + +// Indicates a command +message Command { + enum Type { + UNUSED = 0; + STATUS = 1; + SCHEMA_READ = 2; + SCHEMA_WRITE = 3; + QUERY = 4; + WRITE = 5; + } + + Type type = 1; +} + + diff --git a/datajack/__init__.py b/datajack/__init__.py index 3cd619b..eeb8505 100644 --- a/datajack/__init__.py +++ b/datajack/__init__.py @@ -1,12 +1,48 @@ +import enum +import logging import socket +from typing import Tuple import urllib.parse +from proto import control_pb2 + +LOGGER = logging.getLogger(__name__) + +# Uniquely identifies a datajack connection. +# This indicates the handshake style expected and is used +# to quickly and easily determine if the client is even speaking the +# right protocol for the server. +MAGIC = "e8437140-4347-48cc-a31d-dcdc944ffc15" + +# The list of formats supported by this client in descending order of preference +# This is used for negotiation of the data format that will be used with the server. +DATA_FORMATS = ",".join([ + "protobuf", + "json", +]) + +# The types of access that are allowed with this connection +class Permissions(enum.Enum): + pass + +def _parse_netloc(netloc) -> Tuple[str, str, str, int]: + app, _, connection = netloc.partition("@") + appname, _, version = app.partition(":") + host, _, part = connection.partition(":") + return appname, version, host, part + class Connection: def __init__(self, uri): parts = urllib.parse.urlparse(uri) + print(parts) netloc = parts.netloc - self.host, _, self.port = netloc.partition(":") - + assert parts.scheme == "socket" + self.app_name, self.app_version, self.host, self.port = _parse_netloc(netloc) + self.namespace = parts.path.lstrip("/") + self.public_key = "pretend_key" + self.data_format = None + self.address = None + self.server_key = None def __enter__(self): self.connect() @@ -18,6 +54,7 @@ class Connection: def connect(self): self.socket = socket.socket() self.socket.connect((self.host, int(self.port))) + self._handshake() def disconnect(self): pass @@ -25,5 +62,23 @@ class Connection: def send(self, data): self.socket.send(data) + def _handshake(self): + "Handshake with the server, ensure we have all the data we need." + fields = [MAGIC, DATA_FORMATS, self.namespace, self.app_name, self.app_version, self.public_key,] + cliend_hand = " ".join(fields) + self.socket.send(cliend_hand.encode("UTF-8")) + server_hand = self.socket.recv(1024) + if not server_hand: + print("Failed to get server hand") + self.data_format, self.address, self.server_key = server_hand.decode("UTF-8").split(" ") + LOGGER.info("Data format: %s", self.data_format) + command = control_pb2.Command( + type=control_pb2.Command.Type.STATUS + ) + to_send = command.SerializeToString() + LOGGER.info("Sending '%s'", to_send) + self.socket.send(to_send) + + def connection(uri) -> Connection: return Connection(uri) diff --git a/main.go b/main.go index 9b40c8e..2a62580 100644 --- a/main.go +++ b/main.go @@ -4,12 +4,15 @@ import ( "fmt" "net" "os" + "strings" ) const ( SERVER_HOST = "localhost" SERVER_PORT = "9988" SERVER_TYPE = "tcp" + MAGIC_HEADER = "e8437140-4347-48cc-a31d-dcdc944ffc15" ) + func main() { fmt.Println("Server Running...") server, err := net.Listen(SERVER_TYPE, SERVER_HOST+":"+SERVER_PORT) @@ -36,7 +39,15 @@ func processClient(connection net.Conn) { if err != nil { fmt.Println("Error reading:", err.Error()) } - fmt.Println("Received: ", string(buffer[:mLen])) - _, err = connection.Write([]byte("Thanks! Got your message:" + string(buffer[:mLen]))) + // handshake + // split out the elements of the clients handshake + // read the first 40 bytes, check the magic header, reject invalids + // handle data format negotiation and respond with selected protocol + // store information about + // _, err = connection.Write([]byte("Thanks! Got your message:" + string(buffer[:mLen]))) + data := string(buffer[:mLen]) + parts := strings.Split(data, " ") + fmt.Println("Received: ", parts) + _, err = connection.Write([]byte("thanks")) connection.Close() } diff --git a/server.py b/server.py index 7f28be3..f15a55d 100644 --- a/server.py +++ b/server.py @@ -1,28 +1,76 @@ import asyncio import asyncio.streams import logging +from typing import Iterable + +from proto import control_pb2 LOGGER = logging.getLogger("server") -MAGIC = "e8437140-4347-48cc-a31d-dcdc944ffc16" +MAGIC = "e8437140-4347-48cc-a31d-dcdc944ffc15" +SUPPORTED_FORMATS = [ + "protobuf", + "json", +] + +class Connection: + def __init__(self, reader, writer, namespace: str, app_name: str, app_version: str, chosen_format: str, client_key: str): + self.reader = reader + self.writer = writer + self.namepsace = namespace + self.app_name = app_name + self.app_version = app_version + self.chosen_format = chosen_format + self.client_key = client_key def main(): logging.basicConfig(level=logging.DEBUG) - asyncio.run(run()) + try: + asyncio.run(run()) + except KeyboardInterrupt: + LOGGER.info("Exiting") async def on_connect(reader, writer): LOGGER.info("connected") - data = await reader.read(36) + try: + connection = await handshake(reader, writer) + while True: + data = await reader.read(4096) + LOGGER.debug("Got data: '%s'", data) + command = control_pb2.Command.FromString(data) + LOGGER.info("Command: %s", command) + await asyncio.sleep(1) + except ClientError: + LOGGER.info("Client error") + +class ClientError(Exception): + pass + +async def handshake(reader, writer) -> Connection: + data = await reader.read(36 + 1) magic = data.decode("UTF-8") LOGGER.debug("Received magic '%s'", magic) - if magic != MAGIC: - writer.write("ERR1: Magic not recognized".encode("UTF-8")) - writer.close() - LOGGER.info("Bad magic, closing connection.") - return + if magic != MAGIC + " ": + await _write_error(writer, 1, "Magic not recognized") LOGGER.debug("Magic looks good.") - - + data = await reader.read(1024) + client_hand = data.decode("UTF-8") + client_formats, namespace, app_name, app_version, client_key = client_hand.split(" ") + chosen_format = _select_format(client_formats.split(","), SUPPORTED_FORMATS) + if not chosen_format: + await _write_error(writer, 2, "server does not support any of " + client_formats) + target_url = "127.0.0.1:9988" + server_pub_key = "fakeserverkey" + server_hand = " ".join([ + chosen_format, + target_url, + server_pub_key, + ]) + writer.write(server_hand.encode("UTF-8")) + LOGGER.info("Sending %s", server_hand) + return Connection( + reader, writer, namespace, app_name, app_version, chosen_format, client_key) + async def run(): server = await asyncio.start_server(on_connect, host="localhost", port=9988) async with server: @@ -31,5 +79,18 @@ async def run(): except KeyboardInterrupt: LOGGER.info("Exiting at user request.") +def _select_format(client_formats: Iterable[str], supported_formats: Iterable[str]) -> str: + "Pick a format to use with the client." + for f in supported_formats: + if f in client_formats: + return f + +async def _write_error(writer, code: int, message: str) -> None: + await writer.write(f"ERR{code}: {message}".encode("UTF-8")) + writer.close() + LOGGER.info("Client error: %d %s", code, message) + raise ClientError("Failed") + + if __name__ == "__main__": main() From 3b4667d9c811b7f86df16e2f0f90a7aa9a43e4f3 Mon Sep 17 00:00:00 2001 From: Eli Ribble Date: Mon, 19 Jun 2023 19:29:53 -0700 Subject: [PATCH 08/10] Add the abliity to send a schema write request --- client.py | 10 +++++++++- control.proto | 13 ++++++++++++- datajack/__init__.py | 19 ++++++++++++++++++- server.py | 4 +++- 4 files changed, 42 insertions(+), 4 deletions(-) diff --git a/client.py b/client.py index 858fbdb..60636de 100644 --- a/client.py +++ b/client.py @@ -6,6 +6,7 @@ import datajack #@datajack.Table class Todo: name: str + index: int def main(): @@ -16,7 +17,14 @@ def main(): logging.basicConfig(level=logging.DEBUG) with datajack.connection(args.connection_uri) as dj: - dj.send("hi".encode("UTF-8")) + dj.write_schema( + "todo", + { + "name": str, + "index": int, + }, + ) + if __name__ == "__main__": main() diff --git a/control.proto b/control.proto index f7cd620..047b38c 100644 --- a/control.proto +++ b/control.proto @@ -2,6 +2,15 @@ syntax = "proto3"; package datafortress; +message SchemaEntry { + enum Type { + STRING = 0; + INT = 1; + } + Type type = 1; + string name = 2; +} + // Indicates a command message Command { enum Type { @@ -14,6 +23,8 @@ message Command { } Type type = 1; + + optional string table_name = 2; + repeated SchemaEntry schema_entry = 3; } - diff --git a/datajack/__init__.py b/datajack/__init__.py index eeb8505..6ffe2f3 100644 --- a/datajack/__init__.py +++ b/datajack/__init__.py @@ -1,7 +1,7 @@ import enum import logging import socket -from typing import Tuple +from typing import Mapping, Tuple, Type import urllib.parse from proto import control_pb2 @@ -21,6 +21,11 @@ DATA_FORMATS = ",".join([ "json", ]) +PYTHON_TYPE_TO_SCHEMA_TYPE = { + str: control_pb2.SchemaEntry.STRING, + int: control_pb2.SchemaEntry.INT, +} + # The types of access that are allowed with this connection class Permissions(enum.Enum): pass @@ -62,6 +67,18 @@ class Connection: def send(self, data): self.socket.send(data) + def write_schema(self, table_name: str, schema: Mapping[str, Type]) -> None: + "Send a command to write the given schema." + command = control_pb2.Command( + type=control_pb2.Command.Type.SCHEMA_WRITE, + table_name=table_name, + ) + for k, v in schema.items(): + entry = command.schema_entry.add() + entry.name = k + entry.type = PYTHON_TYPE_TO_SCHEMA_TYPE[v] + self.socket.send(command.SerializeToString()) + def _handshake(self): "Handshake with the server, ensure we have all the data we need." fields = [MAGIC, DATA_FORMATS, self.namespace, self.app_name, self.app_version, self.public_key,] diff --git a/server.py b/server.py index f15a55d..2d171a9 100644 --- a/server.py +++ b/server.py @@ -36,10 +36,12 @@ async def on_connect(reader, writer): connection = await handshake(reader, writer) while True: data = await reader.read(4096) + if not data: + await asyncio.sleep(0) + continue LOGGER.debug("Got data: '%s'", data) command = control_pb2.Command.FromString(data) LOGGER.info("Command: %s", command) - await asyncio.sleep(1) except ClientError: LOGGER.info("Client error") From 61f4e2cdae77a4da8dc91a324f24e3db05cf4dc0 Mon Sep 17 00:00:00 2001 From: Eli Ribble Date: Mon, 19 Jun 2023 19:40:41 -0700 Subject: [PATCH 09/10] Push commands to a central processing queue --- server.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/server.py b/server.py index 2d171a9..436540c 100644 --- a/server.py +++ b/server.py @@ -1,5 +1,6 @@ import asyncio import asyncio.streams +import functools import logging from typing import Iterable @@ -30,7 +31,7 @@ def main(): except KeyboardInterrupt: LOGGER.info("Exiting") -async def on_connect(reader, writer): +async def on_connect(command_queue: asyncio.Queue, reader, writer): LOGGER.info("connected") try: connection = await handshake(reader, writer) @@ -41,7 +42,7 @@ async def on_connect(reader, writer): continue LOGGER.debug("Got data: '%s'", data) command = control_pb2.Command.FromString(data) - LOGGER.info("Command: %s", command) + await command_queue.put(command) except ClientError: LOGGER.info("Client error") @@ -73,8 +74,18 @@ async def handshake(reader, writer) -> Connection: return Connection( reader, writer, namespace, app_name, app_version, chosen_format, client_key) +async def processor(command_queue): + while True: + command = await command_queue.get() + if command: + LOGGER.info("Processing %s", command) + await asyncio.sleep(0) + async def run(): - server = await asyncio.start_server(on_connect, host="localhost", port=9988) + command_queue = asyncio.Queue() + command_processor = asyncio.create_task(processor(command_queue)) + client_handler = functools.partial(on_connect, command_queue) + server = await asyncio.start_server(client_handler, host="localhost", port=9988) async with server: try: await server.serve_forever() From 090f55e2258c4553f5e04a136c826002f4d9ae1d Mon Sep 17 00:00:00 2001 From: Eli Ribble Date: Tue, 12 Dec 2023 13:40:25 -0700 Subject: [PATCH 10/10] Temporary WIP for adding commands --- control.proto | 19 ++++++++++++++++--- server.py | 47 +++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 55 insertions(+), 11 deletions(-) diff --git a/control.proto b/control.proto index 047b38c..7956be9 100644 --- a/control.proto +++ b/control.proto @@ -22,9 +22,22 @@ message Command { WRITE = 5; } - Type type = 1; + // unique identifier for the command supplied by the client. + // Any response to the command will reference this ID. + string id = 1; + Type type = 2; - optional string table_name = 2; - repeated SchemaEntry schema_entry = 3; + optional string table_name = 3; + repeated SchemaEntry schema_entry = 4; } +message Error { + int32 code = 1; + string message = 2; +} + +message CommandResponse { + string id = 1; + bool success = 2; + optional Error error = 3; +} diff --git a/server.py b/server.py index 436540c..b08bb28 100644 --- a/server.py +++ b/server.py @@ -1,5 +1,6 @@ import asyncio import asyncio.streams +import dataclasses import functools import logging from typing import Iterable @@ -14,6 +15,11 @@ SUPPORTED_FORMATS = [ "json", ] +SCHEMA_TYPE_TO_PYTHON_TYPE = { + control_pb2.SchemaEntry.Type.STRING: str, + control_pb2.SchemaEntry.Type.INT: int, +} + class Connection: def __init__(self, reader, writer, namespace: str, app_name: str, app_version: str, chosen_format: str, client_key: str): self.reader = reader @@ -42,7 +48,7 @@ async def on_connect(command_queue: asyncio.Queue, reader, writer): continue LOGGER.debug("Got data: '%s'", data) command = control_pb2.Command.FromString(data) - await command_queue.put(command) + await command_queue.put((command, write)) except ClientError: LOGGER.info("Client error") @@ -74,16 +80,41 @@ async def handshake(reader, writer) -> Connection: return Connection( reader, writer, namespace, app_name, app_version, chosen_format, client_key) -async def processor(command_queue): - while True: - command = await command_queue.get() - if command: - LOGGER.info("Processing %s", command) - await asyncio.sleep(0) +class DataManager: + def __init__(self): + self.schema = {} + self.process_task = None + + def process(self, command_queue: asyncio.Queue) -> None: + self.process_task = asyncio.create_task(self._processor(command_queue)) + + async def _processor(self, command_queue: asyncio.Queue) -> None: + while True: + command, writer = await command_queue.get() + if command: + LOGGER.info("Processing %s", command) + if command.type == control_pb2.Command.Type.SCHEMA_WRITE: + _write_schema(command.table_name, command.schema_entry) + result = control_pb2.CommandResponse( + id=command.id, + + writer.write( + await asyncio.sleep(0) + + def _write_schema(self, table_name: str, schema_entries: Iterable[control_pb2.SchemaEntry]) -> None: + table_schema = self.schema.get(table_name, {}) + # Probably should check I'm not overwriting schema + for entry in schema_entries: + table_schema[entry.name] = SCHEMA_TYPE_TO_PYTHON_TYPE[entry.type] + +def write_schema(table_name: str, schema_entries) -> None: + LOGGER.info("Writing schumea: %s", schema_entries) + async def run(): command_queue = asyncio.Queue() - command_processor = asyncio.create_task(processor(command_queue)) + data_manager = DataManager() + data_manager.process(command_queue) client_handler = functools.partial(on_connect, command_queue) server = await asyncio.start_server(client_handler, host="localhost", port=9988) async with server: