Compare commits

...

10 Commits

Author SHA1 Message Date
Eli Ribble 090f55e225 Temporary WIP for adding commands 2023-12-12 13:41:05 -07:00
Eli Ribble 61f4e2cdae Push commands to a central processing queue 2023-12-12 13:41:05 -07:00
Eli Ribble 3b4667d9c8 Add the abliity to send a schema write request 2023-12-12 13:41:05 -07:00
Eli Ribble aadc8520bb Working protobuf passing.
This is barely working, really. In fact, I have an "unused" field in my enum because if I don't prowide it the protobuf serializes down to an empty buffer which breaks all kinds of things.
2023-12-12 13:41:05 -07:00
Eli Ribble 461fc7ce0b Add magic check 2023-12-12 13:41:05 -07:00
Eli Ribble e227d349b2 Add the start of a Python server implementation.
I'm on a plane, I can't be looking up golang stuff.
2023-12-12 13:41:05 -07:00
Eli Ribble 39ff9c3a8e Add some ideas in a README 2023-12-12 13:41:01 -07:00
Eli Ribble 3f08a95db0 Add gitignore for Python directories 2023-12-12 13:40:40 -07:00
Eli Ribble e155420a4b Working simple client/server 2023-12-12 13:40:40 -07:00
Eli Ribble 0d33aacf3f Initial hello world program. 2023-12-12 13:40:40 -07:00
7 changed files with 489 additions and 1 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
ve/
__pycache__/

121
README.md
View File

@ -1,3 +1,122 @@
# gibtar # gibtar
The point is can we create something that is database-like without being a traditional database? Here's what we are looking for:
Stores user data across applications 1. Applications are given access to the fortress on a limited basis. Think "I'm using your webapp, but I keep the data"
1. Single user, but multi-application/agent by nature.
* My fortress is _mine_. Just me. One human
* I can have an unlimited number of computational agents read/write the data.
* When I share data with another human, you get a copy, not a link.
1. No usage of a query language like SQL. All access is programmatic.
1. ACID compliant transactions
1. Browser-friendly API
* non-password crypto-auth
* AJAX/HTML/fetch friendly requests
* pipelining
* JSON, probably >:P
1. Data is never deleted. Time-travel is built-in
* Kinda like Datomic?
## Incantations
```
protoc --python_out proto control.proto
```
## Protocol concerns
* Built-in behavior for dealing with a cluster
* Network address negotiation (can we use a shorter/closer address? Is a local socket available?)
* Key exchange. Every client is uniquely identified.
* Schema validation since we will support code generation.
* Schema download
* Schema upgrade/manipulation
## Permissions concerns
* 2 apps are sharing data.
* Badly-behaving app B can't destroy data because we never delete data, we just get a new version
* It could be _very_ badly behaving and could make tons of hostile changes for the user to sift through
* User-requested rollback based on the client app that made the change? Seems very doable.
* Locking semantics?
* Behaviors that need permissions
* Reading data
* By application-level concept
* No reason to separate out schema reading, you need the schema if you're getting the data.
* No reason to separate out querying from iterating, it's just a question of speed.
* No reason to separate subscribing to updates vs
* Writing data
* By application-level concept
* All writes are tagged by application for auditing.
* Altering schema
* Needs heavy warnings, apps need to *really* know what they are doing if they alter schema from another app
* Adding new concepts seems like it should be safe, apps share a namespace but don't share tables.
* Querying the set of available namespaces - can leak data of other applications used. Useful to select plugins on the app's side.
## Design Questions
* When defining a schema we could force the schema definer to not just work at the layer of "tables, indices, relationships" but also "permissions/logical units". The idea here is that if the application is highly denormalized we don't want the end user to have to understand that it takes 15 tables to contain "user data" but instead we want there to be a concept of a User within the schema and all the information the User contains so a communicating app can request "Access to read users".
* How burdensome would this be on developers? What are the downsides if the developers are lazy or get it wrong?
* Can we defend against nefarious data exfiltration?
* Hard problem, sandboxing is probably the only valid technical solution here. Apps can exfiltrate via crafted DNS queries, so....the only way to prevent exfiltration is to sandbox.
* At-rest encryption by client key?
* Seems like it may really hurt performance... and for questionable gain.
* At what point do we handle permissions?
* We want it done all up-front so we can ask for the user's consent over some channel and do that once.
* We need to enumerate the app-level concepts for the connection.
## Handshake
* We want to reduce the amount of back-and-forth for latency reasons. Performance matters.
* We want to achieve the following in the handshake:
* Establish the validity of the client
* Determine what data format (protobuf, json) to use when speaking with the client
* Set the permissions scope of the connection
* Set the namespace for the purposes of sharing data between applications
* Store information about the client itself for auditing purposes (name, version, cert)
* Determine if the data schema is one the client recognizes and can work with
* We should not assume schema version semantics - there is no v1.1 v1.2 and the assumption that the client can work with anything below v2.0. That's silly. However, we _vastly prefer_ to be explicit about what schema the client can handle because otherwise we are working on implicit assumptions codified in the imperative code of the client. That's dangerous too. Can we use protobuf-style "I ignore fields I don't understand"?
* Figure out the most efficient path between the client and server to avoid unnecessary network hops.
* Is this necessary, or does the operating system do this for us?
* This includes redirecting to another process or server in load-balanced applications.
* I think we don't need to specify that we are waiting for user authorization if we don't want to, we can just let the client queue up requests and wait to confirm them.
## Client features
* Query data lots of neat ways
* Subscribe to updates of a particular query, get pushed data on those updates
* Transactions
* Explicit ordering and unordering of reads/writes
* Multiplexed comms to allow parallel reads/writes over a single connection.
## Server features
* Triggers?
## 2 Apps, 1 Schema
You have App A and App B. They are sharing data with each other about todo list items. At some point T they both fully agree on the schema.
1. Do both apps have to fully agree on the schema for all time T?
If the answer to this is "yes" then the two must be upgradede in lock-step with each other in order to continue using both. If A upgrades first it keeps working and B stops working until it upgrades. Syncing between the app developers is super hard.
Let's _not_ make the answer "yes".
Since the answer is "no" we need to expand the conditions or the behaviors during different types of disagreement. What are all the kinds of disagreement?
_A knows of a table B does not._
This is fine, does not hurt B.
_A knows of a field B does not._
This is fine too.
_A and B disagree where a piece of data should be kept_
This is an interesting case because it _may_ be a situation where both apps have fields the other app does not know about where the data should be kept. That is most likely the case, since if B wants to put the data in field Z and A knows about Z then A should know to put the data there.
There is likely not an automated way to deal with this - the storage engine can't understand the semantics of a given data field without something like AGI. Concievably a user could fix this by giving them access to tools to remap data labels, but that seems crazy for most users.
Ultimately what you need is an authority on what the correct representation of the data is. This leads to the idea that every schema should have one-and-only-one owning application, and zero-or-more integrating applications. If there's a conflict, the owner is right and the integrator is wrong.
No, that's bad, because ultimately we want the user to own the data and concievably if they stopped using application A they could just use B and the data would keep working.
well, now I guess we need a standards body or something.
_A believes that a field has a different type than what B believes_
For this case we can thankfully detect the conflict and inform the app. But then what? It should abort, I guess...

30
client.py Normal file
View File

@ -0,0 +1,30 @@
import argparse
import logging
import datajack
#@datajack.Table
class Todo:
name: str
index: int
def main():
parser = argparse.ArgumentParser()
parser.add_argument("connection_uri", help="The URI to use to connect to the remove datajack.")
args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG)
with datajack.connection(args.connection_uri) as dj:
dj.write_schema(
"todo",
{
"name": str,
"index": int,
},
)
if __name__ == "__main__":
main()

43
control.proto Normal file
View File

@ -0,0 +1,43 @@
syntax = "proto3";
package datafortress;
message SchemaEntry {
enum Type {
STRING = 0;
INT = 1;
}
Type type = 1;
string name = 2;
}
// Indicates a command
message Command {
enum Type {
UNUSED = 0;
STATUS = 1;
SCHEMA_READ = 2;
SCHEMA_WRITE = 3;
QUERY = 4;
WRITE = 5;
}
// unique identifier for the command supplied by the client.
// Any response to the command will reference this ID.
string id = 1;
Type type = 2;
optional string table_name = 3;
repeated SchemaEntry schema_entry = 4;
}
message Error {
int32 code = 1;
string message = 2;
}
message CommandResponse {
string id = 1;
bool success = 2;
optional Error error = 3;
}

101
datajack/__init__.py Normal file
View File

@ -0,0 +1,101 @@
import enum
import logging
import socket
from typing import Mapping, Tuple, Type
import urllib.parse
from proto import control_pb2
LOGGER = logging.getLogger(__name__)
# Uniquely identifies a datajack connection.
# This indicates the handshake style expected and is used
# to quickly and easily determine if the client is even speaking the
# right protocol for the server.
MAGIC = "e8437140-4347-48cc-a31d-dcdc944ffc15"
# The list of formats supported by this client in descending order of preference
# This is used for negotiation of the data format that will be used with the server.
DATA_FORMATS = ",".join([
"protobuf",
"json",
])
PYTHON_TYPE_TO_SCHEMA_TYPE = {
str: control_pb2.SchemaEntry.STRING,
int: control_pb2.SchemaEntry.INT,
}
# The types of access that are allowed with this connection
class Permissions(enum.Enum):
pass
def _parse_netloc(netloc) -> Tuple[str, str, str, int]:
app, _, connection = netloc.partition("@")
appname, _, version = app.partition(":")
host, _, part = connection.partition(":")
return appname, version, host, part
class Connection:
def __init__(self, uri):
parts = urllib.parse.urlparse(uri)
print(parts)
netloc = parts.netloc
assert parts.scheme == "socket"
self.app_name, self.app_version, self.host, self.port = _parse_netloc(netloc)
self.namespace = parts.path.lstrip("/")
self.public_key = "pretend_key"
self.data_format = None
self.address = None
self.server_key = None
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_typ, exc_val, exc_tb):
pass
def connect(self):
self.socket = socket.socket()
self.socket.connect((self.host, int(self.port)))
self._handshake()
def disconnect(self):
pass
def send(self, data):
self.socket.send(data)
def write_schema(self, table_name: str, schema: Mapping[str, Type]) -> None:
"Send a command to write the given schema."
command = control_pb2.Command(
type=control_pb2.Command.Type.SCHEMA_WRITE,
table_name=table_name,
)
for k, v in schema.items():
entry = command.schema_entry.add()
entry.name = k
entry.type = PYTHON_TYPE_TO_SCHEMA_TYPE[v]
self.socket.send(command.SerializeToString())
def _handshake(self):
"Handshake with the server, ensure we have all the data we need."
fields = [MAGIC, DATA_FORMATS, self.namespace, self.app_name, self.app_version, self.public_key,]
cliend_hand = " ".join(fields)
self.socket.send(cliend_hand.encode("UTF-8"))
server_hand = self.socket.recv(1024)
if not server_hand:
print("Failed to get server hand")
self.data_format, self.address, self.server_key = server_hand.decode("UTF-8").split(" ")
LOGGER.info("Data format: %s", self.data_format)
command = control_pb2.Command(
type=control_pb2.Command.Type.STATUS
)
to_send = command.SerializeToString()
LOGGER.info("Sending '%s'", to_send)
self.socket.send(to_send)
def connection(uri) -> Connection:
return Connection(uri)

53
main.go Normal file
View File

@ -0,0 +1,53 @@
// socket-server project main.go
package main
import (
"fmt"
"net"
"os"
"strings"
)
const (
SERVER_HOST = "localhost"
SERVER_PORT = "9988"
SERVER_TYPE = "tcp"
MAGIC_HEADER = "e8437140-4347-48cc-a31d-dcdc944ffc15"
)
func main() {
fmt.Println("Server Running...")
server, err := net.Listen(SERVER_TYPE, SERVER_HOST+":"+SERVER_PORT)
if err != nil {
fmt.Println("Error listening:", err.Error())
os.Exit(1)
}
defer server.Close()
fmt.Println("Listening on " + SERVER_HOST + ":" + SERVER_PORT)
fmt.Println("Waiting for client...")
for {
connection, err := server.Accept()
if err != nil {
fmt.Println("Error accepting: ", err.Error())
os.Exit(1)
}
fmt.Println("client connected")
go processClient(connection)
}
}
func processClient(connection net.Conn) {
buffer := make([]byte, 1024)
mLen, err := connection.Read(buffer)
if err != nil {
fmt.Println("Error reading:", err.Error())
}
// handshake
// split out the elements of the clients handshake
// read the first 40 bytes, check the magic header, reject invalids
// handle data format negotiation and respond with selected protocol
// store information about
// _, err = connection.Write([]byte("Thanks! Got your message:" + string(buffer[:mLen])))
data := string(buffer[:mLen])
parts := strings.Split(data, " ")
fmt.Println("Received: ", parts)
_, err = connection.Write([]byte("thanks"))
connection.Close()
}

140
server.py Normal file
View File

@ -0,0 +1,140 @@
import asyncio
import asyncio.streams
import dataclasses
import functools
import logging
from typing import Iterable
from proto import control_pb2
LOGGER = logging.getLogger("server")
MAGIC = "e8437140-4347-48cc-a31d-dcdc944ffc15"
SUPPORTED_FORMATS = [
"protobuf",
"json",
]
SCHEMA_TYPE_TO_PYTHON_TYPE = {
control_pb2.SchemaEntry.Type.STRING: str,
control_pb2.SchemaEntry.Type.INT: int,
}
class Connection:
def __init__(self, reader, writer, namespace: str, app_name: str, app_version: str, chosen_format: str, client_key: str):
self.reader = reader
self.writer = writer
self.namepsace = namespace
self.app_name = app_name
self.app_version = app_version
self.chosen_format = chosen_format
self.client_key = client_key
def main():
logging.basicConfig(level=logging.DEBUG)
try:
asyncio.run(run())
except KeyboardInterrupt:
LOGGER.info("Exiting")
async def on_connect(command_queue: asyncio.Queue, reader, writer):
LOGGER.info("connected")
try:
connection = await handshake(reader, writer)
while True:
data = await reader.read(4096)
if not data:
await asyncio.sleep(0)
continue
LOGGER.debug("Got data: '%s'", data)
command = control_pb2.Command.FromString(data)
await command_queue.put((command, write))
except ClientError:
LOGGER.info("Client error")
class ClientError(Exception):
pass
async def handshake(reader, writer) -> Connection:
data = await reader.read(36 + 1)
magic = data.decode("UTF-8")
LOGGER.debug("Received magic '%s'", magic)
if magic != MAGIC + " ":
await _write_error(writer, 1, "Magic not recognized")
LOGGER.debug("Magic looks good.")
data = await reader.read(1024)
client_hand = data.decode("UTF-8")
client_formats, namespace, app_name, app_version, client_key = client_hand.split(" ")
chosen_format = _select_format(client_formats.split(","), SUPPORTED_FORMATS)
if not chosen_format:
await _write_error(writer, 2, "server does not support any of " + client_formats)
target_url = "127.0.0.1:9988"
server_pub_key = "fakeserverkey"
server_hand = " ".join([
chosen_format,
target_url,
server_pub_key,
])
writer.write(server_hand.encode("UTF-8"))
LOGGER.info("Sending %s", server_hand)
return Connection(
reader, writer, namespace, app_name, app_version, chosen_format, client_key)
class DataManager:
def __init__(self):
self.schema = {}
self.process_task = None
def process(self, command_queue: asyncio.Queue) -> None:
self.process_task = asyncio.create_task(self._processor(command_queue))
async def _processor(self, command_queue: asyncio.Queue) -> None:
while True:
command, writer = await command_queue.get()
if command:
LOGGER.info("Processing %s", command)
if command.type == control_pb2.Command.Type.SCHEMA_WRITE:
_write_schema(command.table_name, command.schema_entry)
result = control_pb2.CommandResponse(
id=command.id,
writer.write(
await asyncio.sleep(0)
def _write_schema(self, table_name: str, schema_entries: Iterable[control_pb2.SchemaEntry]) -> None:
table_schema = self.schema.get(table_name, {})
# Probably should check I'm not overwriting schema
for entry in schema_entries:
table_schema[entry.name] = SCHEMA_TYPE_TO_PYTHON_TYPE[entry.type]
def write_schema(table_name: str, schema_entries) -> None:
LOGGER.info("Writing schumea: %s", schema_entries)
async def run():
command_queue = asyncio.Queue()
data_manager = DataManager()
data_manager.process(command_queue)
client_handler = functools.partial(on_connect, command_queue)
server = await asyncio.start_server(client_handler, host="localhost", port=9988)
async with server:
try:
await server.serve_forever()
except KeyboardInterrupt:
LOGGER.info("Exiting at user request.")
def _select_format(client_formats: Iterable[str], supported_formats: Iterable[str]) -> str:
"Pick a format to use with the client."
for f in supported_formats:
if f in client_formats:
return f
async def _write_error(writer, code: int, message: str) -> None:
await writer.write(f"ERR{code}: {message}".encode("UTF-8"))
writer.close()
LOGGER.info("Client error: %d %s", code, message)
raise ClientError("Failed")
if __name__ == "__main__":
main()