Remove my SGML and OFX parsers

I'm going to use ofxparse. Promise.

This marks a really serious break with the automatic downloader code
because now we don't even have the code that it depended on for parsing
This commit is contained in:
Eli Ribble 2016-08-11 10:58:25 -06:00
parent 393ef748cc
commit dd1706c70f
6 changed files with 2 additions and 315 deletions

View File

@ -1,83 +0,0 @@
import datetime
import vanth.ofx
def MST():
return datetime.timezone(datetime.timedelta(hours=-7), 'MST')
def MDT():
return datetime.timezone(datetime.timedelta(hours=-6), 'MDT')
def test_query_transactions(mocker):
institution = {
'bankid' : "1234567",
'fid' : "12345",
'name' : "AFCU",
}
account = {
"account_id" : "123456-0.9:CHK",
"user_id" : "123456789",
"password" : "1234",
"type" : "checking",
}
with mocker.patch('vanth.ofx.now', return_value='20160102030405.000[-7:MST]'):
results = vanth.ofx.query_transactions(institution, account, start=datetime.date(2016, 1, 2))
with open('tests/files/query_transactions.ofx', 'rb') as f:
expected = f.read().decode('utf-8')
assert results == expected
def test_parse():
with open('tests/files/transactions.ofx', 'rb') as f:
transactions = f.read().decode('utf-8')
document = vanth.ofx.parse(transactions)
assert document.header == {
'CHARSET' : '1252',
'COMPRESSION' : 'NONE',
'DATA' : 'OFXSGML',
'ENCODING' : 'USASCII',
'NEWFILEUID' : 'NONE',
'OFXHEADER' : '100',
'OLDFILEUID' : 'NONE',
'SECURITY' : 'NONE',
'VERSION' : '102'
}
assert document.body.status.code == '0'
assert document.body.status.severity == 'INFO'
assert document.body.status.message == 'The operation succeeded.'
assert document.body.statement.status.code == '0'
assert document.body.statement.status.severity == 'INFO'
assert document.body.statement.status.message is None
assert document.body.statement.transactions.currency == 'USD'
assert document.body.statement.transactions.account.accountid == '123456-0.9:CHK'
assert document.body.statement.transactions.account.bankid == '324377516'
assert document.body.statement.transactions.account.type == 'CHECKING'
assert document.body.statement.transactions.start == datetime.datetime(2015, 12, 31, 17, 0, tzinfo=MST())
assert document.body.statement.transactions.end == datetime.datetime(2016, 6, 22, 11, 12, 42, tzinfo=MDT())
expected_items = [{
'amount' : -50.19,
'available' : datetime.datetime(2015, 12, 31, 12),
'id' : '0006547',
'memo' : 'POINT OF SALE PURCHASE #0006547',
'name' : 'UT LEHI COSTCO WHSE #0733',
'posted' : datetime.datetime(2015, 12, 31, 12),
'type' : 'POS',
},{
'amount' : -79.64,
'available' : datetime.datetime(2015, 12, 31, 12),
'id' : '0006548',
'memo' : '#0006548',
'name' : 'Payment to PACIFICORP ONLIN',
'posted' : datetime.datetime(2015, 12, 31, 12),
'type' : 'PAYMENT',
},{
'amount' : 0.84,
'available' : datetime.datetime(2015, 12, 31, 12),
'id' : '0006549',
'memo' : 'ANNUAL PERCENTAGE YIELD EARNED IS .05% #0006549',
'name' : 'DIVIDEND FOR 12/01/15 - 12/31/1',
'posted' : datetime.datetime(2015, 12, 31, 12),
'type' : 'INT',
}]
items = [dict(item) for item in document.body.statement.transactions.items]
assert items == expected_items

View File

@ -1,17 +0,0 @@
import vanth.sgml
def child_values(node):
return [(child.name, child.value) for child in node.children]
def test_siblings():
result = vanth.sgml.parse("<A><B><C>1<D>2<E>3</B></A>")
assert result.name == 'A'
assert child_values(result['B']) == [('C', '1'), ('D', '2'), ('E', '3')]
def test_closing():
result = vanth.sgml.parse("<A><B><C>1</B><D><E>2</D></A>")
assert result.name == 'A'
assert child_values(result) == [('B', ''), ('D', '')]
assert child_values(result['B']) == [('C', '1')]
assert child_values(result['D']) == [('E', '2')]

View File

@ -6,7 +6,6 @@ import requests.exceptions
import vanth.download import vanth.download
import vanth.main import vanth.main
import vanth.ofx
import vanth.platform.ofxaccount import vanth.platform.ofxaccount
import vanth.platform.ofxrecord import vanth.platform.ofxrecord
import vanth.platform.ofxsource import vanth.platform.ofxsource

View File

@ -1,6 +1,7 @@
import io
import ofxparse
import requests import requests
import vanth.ofx
import vanth.platform.ofxaccount import vanth.platform.ofxaccount

View File

@ -1,148 +0,0 @@
import collections
import datetime
import re
import vanth.sgml
Document = collections.namedtuple('Document', ['header', 'body'])
class Body(): # pylint:disable=too-few-public-methods
def __init__(self, sgml):
self.status = Status(sgml['SIGNONMSGSRSV1']['SONRS']['STATUS'])
self.statement = TransactionStatement(sgml['BANKMSGSRSV1']['STMTTRNRS'])
class Status(): # pylint:disable=too-few-public-methods
def __init__(self, sgml):
self.code = sgml['CODE'].value
self.severity = sgml['SEVERITY'].value
self.message = sgml['MESSAGE'].value if sgml['MESSAGE'] else None
class TransactionStatement(): # pylint:disable=too-few-public-methods
def __init__(self, sgml):
self.trnuid = sgml['TRNUID'].value
self.status = Status(sgml['STATUS'])
self.transactions = TransactionList(sgml['STMTRS'])
class TransactionList(): # pylint:disable=too-few-public-methods
def __init__(self, sgml):
self.currency = sgml['CURDEF'].value
self.account = Account(sgml['BANKACCTFROM'])
self.start = _parse_date_with_tz(sgml['BANKTRANLIST']['DTSTART'].value)
self.end = _parse_date_with_tz(sgml['BANKTRANLIST']['DTEND'].value)
self.items = [Transaction(child) for child in sgml['BANKTRANLIST'].children if child.name == 'STMTTRN']
class Transaction(): # pylint:disable=too-few-public-methods
def __init__(self, sgml):
self.amount = float(sgml['TRNAMT'].value)
self.available = _parse_date(sgml['DTAVAIL'].value)
self.id = sgml['FITID'].value
self.memo = sgml['MEMO'].value
self.name = sgml['NAME'].value
self.posted = _parse_date(sgml['DTPOSTED'].value)
self.type = sgml['TRNTYPE'].value
def __iter__(self):
return ((prop, getattr(self, prop)) for prop in ('amount', 'available', 'id', 'memo', 'name', 'posted', 'type'))
class Account(): # pylint:disable=too-few-public-methods
def __init__(self, sgml):
self.bankid = sgml['BANKID'].value
self.accountid = sgml['ACCTID'].value
self.type = sgml['ACCTTYPE'].value
def _fix_offset(offset):
result = int(offset) * 100
return "{:04d}".format(result) if result > 0 else "{:05d}".format(result)
def _parse_date(date):
return datetime.datetime.strptime(date, "%Y%m%d%H%M%S.000")
def _parse_date_with_tz(date):
match = re.match(r'(?P<datetime>\d+)\.\d+\[(?P<offset>[\d\-]+):(?P<tzname>\w+)\]', date)
if not match:
raise ValueError("Unable to extract datetime from {}".format(date))
formatted = "{datetime} {offset} {tzname}".format(
datetime = match.group('datetime'),
offset = _fix_offset(match.group('offset')),
tzname = match.group('tzname'),
)
return datetime.datetime.strptime(formatted, "%Y%m%d%H%M%S %z %Z")
def header():
return "\r\n".join([
"OFXHEADER:100",
"DATA:OFXSGML",
"VERSION:102",
"SECURITY:NONE",
"ENCODING:USASCII",
"CHARSET:1252",
"COMPRESSION:NONE",
"OLDFILEUID:NONE",
"NEWFILEUID:NONE",
])
def now():
return datetime.datetime.now().strftime("%Y%m%d%H%M%S.000[-7:MST]")
def signonmsg(institution, account):
return "\r\n".join([
"<SIGNONMSGSRQV1>",
"<SONRQ>",
"<DTCLIENT>{}".format(now()),
"<USERID>{}".format(account['user_id']),
"<USERPASS>{}".format(account['password']),
"<LANGUAGE>ENG",
"<FI>",
"<ORG>{}".format(institution['name']),
"<FID>{}".format(institution['fid']),
"</FI>",
"<APPID>QWIN",
"<APPVER>1200",
"</SONRQ>",
"</SIGNONMSGSRQV1>",
])
def bankmsg(institution, account, start):
return "\r\n".join([
"<BANKMSGSRQV1>",
"<STMTTRNRQ>",
"<TRNUID>00000000",
"<STMTRQ>",
"<BANKACCTFROM>",
"<BANKID>{}".format(institution['bankid']),
"<ACCTID>{}".format(account['account_id']),
"<ACCTTYPE>{}".format(account['type'].upper()),
"</BANKACCTFROM>",
"<INCTRAN>",
"<DTSTART>{}".format(start.strftime("%Y%m%d")),
"<INCLUDE>Y",
"</INCTRAN>",
"</STMTRQ>",
"</STMTTRNRQ>",
"</BANKMSGSRQV1>",
])
def body(institution, account, start):
return "<OFX>\r\n" + signonmsg(institution, account) + "\r\n" + bankmsg(institution, account, start) + "\r\n</OFX>"
def query_transactions(institution, account, start=None):
start = start or datetime.datetime.now() - datetime.timedelta(days=14)
return header() + (2*"\r\n") + body(institution, account, start) + "\r\n"
def _first_empty_line(lines):
for i, line in enumerate(lines):
if not line:
return i
def _parse_header(header_lines):
splits = [line.partition(':') for line in header_lines]
return {k: v for k, _, v in splits}
def parse(content):
lines = content.split('\r\n')
split = _first_empty_line(lines)
header_lines = lines[:split]
_header = _parse_header(header_lines)
_body = vanth.sgml.parse('\n'.join(lines[split+1:]))
return Document(_header, Body(_body))

View File

@ -1,65 +0,0 @@
import logging
LOGGER = logging.getLogger(__name__)
class Node(): # pylint: disable=too-few-public-methods
def __init__(self, parent, name, children=None, value=None):
self.children = children or []
self.name = name
self.parent = parent
self.value = value
if parent:
parent.children.append(self)
def __getitem__(self, key):
for child in self.children:
if child.name == key:
return child
def __repr__(self):
return "SGMLNode {} ({})".format(self.name, self.parent.name if self.parent else None)
def parse(content):
state = 'node-content'
buf = ''
parent_node = None
current_node = None
for c in content:
if c == '<':
if state == 'node-content':
if buf == '':
parent_node = current_node
LOGGER.debug("Node content was empty, setting parent node to %s", parent_node)
if current_node:
current_node.value = buf
LOGGER.debug("Set %s to %s", current_node.name, current_node.value)
buf = ''
state = 'node-name'
elif c == '>':
if state == 'node-name':
LOGGER.debug("Saw opening tag %s. With parent %s", buf, parent_node)
state = 'node-content'
current_node = Node(parent_node, buf)
buf = ''
elif state == 'closing-tag':
LOGGER.debug("Saw closing tag %s", buf)
state = 'closed-tag'
parent_node = current_node
while parent_node.parent and parent_node.name != buf:
parent_node = parent_node.parent
parent_node = parent_node.parent
buf = ''
LOGGER.debug("Set new parent to %s", parent_node.name if parent_node else None)
elif c == '/' and buf == '':
state = 'closing-tag'
parent_node = current_node.parent if current_node else None
else:
buf += c
root = current_node or parent_node
while root.parent:
root = root.parent
return root
def pformat(node, indent=0):
children = '\n'.join(pformat(child, indent+1) for child in node.children)
return "{}{}: {}{}".format('\t' * indent, node.name, node.value, "\n" + children if node.children else '')