Add script to import data from ofxhome into our DB
Makes my life a little easier to have all of these ofxsources so that I can test out different sources and start building out support for communicating with these different institutions I'm throwing away quite a bit of the data I have but that's okay for now until I know that I need them
This commit is contained in:
parent
140f8fe6c7
commit
b9dcf0e9a9
|
@ -0,0 +1,23 @@
|
|||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
|
||||
import vanth.main
|
||||
import vanth.ofxhome
|
||||
import vanth.platform.ofxsource
|
||||
|
||||
|
||||
def main():
|
||||
vanth.main.setup_logging()
|
||||
config = vanth.main.get_config()
|
||||
vanth.main.create_db_connection(config)
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('dbfile', help='The database file of XML dumped from the open OFX Home DB')
|
||||
args = parser.parse_args()
|
||||
|
||||
with open(args.dbfile, 'r') as f:
|
||||
data = vanth.ofxhome.parse(f.read())
|
||||
vanth.platform.ofxsource.ensure_exist(data)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
|
@ -0,0 +1,12 @@
|
|||
import xml.etree.ElementTree
|
||||
|
||||
|
||||
def parse_child(element):
|
||||
values = {child.tag: child.text for child in element}
|
||||
values['id'] = element.attrib['id']
|
||||
return values
|
||||
|
||||
def parse(data):
|
||||
root = xml.etree.ElementTree.fromstring(data)
|
||||
assert root.tag == 'institutions'
|
||||
return [parse_child(element) for element in root]
|
|
@ -1,6 +1,8 @@
|
|||
import logging
|
||||
import uuid
|
||||
|
||||
import chryso.connection
|
||||
import sqlalchemy
|
||||
|
||||
import vanth.tables
|
||||
|
||||
|
@ -11,10 +13,32 @@ def _query_and_convert(query):
|
|||
results = engine.execute(query).fetchall()
|
||||
return [dict(result) for result in results]
|
||||
|
||||
def by_uuid(uuid):
|
||||
query = vanth.tables.OFXSource.select().where(vanth.tables.OFXSource.c.uuid == uuid)
|
||||
def by_uuid(uuid_):
|
||||
query = vanth.tables.OFXSource.select().where(vanth.tables.OFXSource.c.uuid == uuid_)
|
||||
return _query_and_convert(query)
|
||||
|
||||
def get():
|
||||
query = vanth.tables.OFXSource.select()
|
||||
return _query_and_convert(query)
|
||||
|
||||
def ensure_exist(institutions):
|
||||
engine = chryso.connection.get()
|
||||
query = sqlalchemy.select([
|
||||
vanth.tables.OFXSource.c.fid,
|
||||
])
|
||||
results = engine.execute(query).fetchall()
|
||||
LOGGER.debug("Found %d OFX sources", len(results))
|
||||
known_records = {result[vanth.tables.OFXSource.c.fid] for result in results}
|
||||
new_records = [institution for institution in institutions if institution['fid'] not in known_records]
|
||||
LOGGER.debug("Have %d new transactions to save", len(new_records))
|
||||
to_insert = [{
|
||||
'name' : institution['name'],
|
||||
'fid' : institution['fid'],
|
||||
'url' : institution['url'],
|
||||
'uuid' : str(uuid.uuid4()),
|
||||
} for institution in new_records]
|
||||
if to_insert:
|
||||
engine.execute(vanth.tables.OFXSource.insert(), to_insert) # pylint: disable=no-value-for-parameter
|
||||
LOGGER.debug("Done inserting %d records", len(new_records))
|
||||
else:
|
||||
LOGGER.debug("Not performing insert, nothing to do")
|
||||
|
|
Loading…
Reference in New Issue