import enum import logging import socket from typing import Mapping, Tuple, Type import urllib.parse from proto import control_pb2 LOGGER = logging.getLogger(__name__) # Uniquely identifies a datajack connection. # This indicates the handshake style expected and is used # to quickly and easily determine if the client is even speaking the # right protocol for the server. MAGIC = "e8437140-4347-48cc-a31d-dcdc944ffc15" # The list of formats supported by this client in descending order of preference # This is used for negotiation of the data format that will be used with the server. DATA_FORMATS = ",".join([ "protobuf", "json", ]) PYTHON_TYPE_TO_SCHEMA_TYPE = { str: control_pb2.SchemaEntry.STRING, int: control_pb2.SchemaEntry.INT, } # The types of access that are allowed with this connection class Permissions(enum.Enum): pass def _parse_netloc(netloc) -> Tuple[str, str, str, int]: app, _, connection = netloc.partition("@") appname, _, version = app.partition(":") host, _, part = connection.partition(":") return appname, version, host, part class Connection: def __init__(self, uri): parts = urllib.parse.urlparse(uri) print(parts) netloc = parts.netloc assert parts.scheme == "socket" self.app_name, self.app_version, self.host, self.port = _parse_netloc(netloc) self.namespace = parts.path.lstrip("/") self.public_key = "pretend_key" self.data_format = None self.address = None self.server_key = None def __enter__(self): self.connect() return self def __exit__(self, exc_typ, exc_val, exc_tb): pass def connect(self): self.socket = socket.socket() self.socket.connect((self.host, int(self.port))) self._handshake() def disconnect(self): pass def send(self, data): self.socket.send(data) def write_schema(self, table_name: str, schema: Mapping[str, Type]) -> None: "Send a command to write the given schema." command = control_pb2.Command( type=control_pb2.Command.Type.SCHEMA_WRITE, table_name=table_name, ) for k, v in schema.items(): entry = command.schema_entry.add() entry.name = k entry.type = PYTHON_TYPE_TO_SCHEMA_TYPE[v] self.socket.send(command.SerializeToString()) def _handshake(self): "Handshake with the server, ensure we have all the data we need." fields = [MAGIC, DATA_FORMATS, self.namespace, self.app_name, self.app_version, self.public_key,] cliend_hand = " ".join(fields) self.socket.send(cliend_hand.encode("UTF-8")) server_hand = self.socket.recv(1024) if not server_hand: print("Failed to get server hand") self.data_format, self.address, self.server_key = server_hand.decode("UTF-8").split(" ") LOGGER.info("Data format: %s", self.data_format) command = control_pb2.Command( type=control_pb2.Command.Type.STATUS ) to_send = command.SerializeToString() LOGGER.info("Sending '%s'", to_send) self.socket.send(to_send) def connection(uri) -> Connection: return Connection(uri)