Skip to content

Commit

Permalink
fix: excessive RAM usage on eddblink listings import
Browse files Browse the repository at this point in the history
fixes #125

Instead of building a list during processing of the listings file (which
uses a *lot* of RAM on large source files) and updating the DB after
processing, update the DB as each line is processed.

This keeps RAM usage very low, especially in comparison.
  • Loading branch information
eyeonus committed Apr 23, 2024
1 parent 3434b53 commit 91a71c0
Showing 1 changed file with 22 additions and 44 deletions.
66 changes: 22 additions & 44 deletions tradedangerous/plugins/eddblink_plug.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def now(self):
def execute(self, sql_cmd, args = None):
cur = self.tdb.getDB().cursor()

self.tdenv.DEBUG2(f"SQL-Statement:\n'{sql_cmd},{args}'")
success = False
result = None
while not success:
Expand All @@ -109,24 +110,6 @@ def execute(self, sql_cmd, args = None):
time.sleep(1)
return result

def executemany(self, sql_cmd, args):
cur = self.tdb.getDB().cursor()

success = False
result = None
while not success:
try:
result = cur.executemany(sql_cmd, args)
success = True
except sqlite3.OperationalError as e:
if "locked" not in str(e):
success = True
raise sqlite3.OperationalError(e)
else:
print("(execute) Database is locked, waiting for access.", end = "\r")
time.sleep(1)
return result

@staticmethod
def fetchIter(cursor, arraysize = 1000):
"""
Expand Down Expand Up @@ -235,13 +218,7 @@ def importListings(self, listings_file):

from_live = 0 if listings_file == self.listingsPath else 1

# Used to check if the listings file is using the fdev_id as a temporary
# item_id, but the item is in the DB with a permanent item_id.
fdev2item = dict()
result = self.execute("SELECT fdev_id,item_id FROM Item ORDER BY fdev_id").fetchall()
for item in result:
fdev2item[item[0]] = item[1]

self.tdenv.DEBUG0(f"Getting total number of entries in {listings_file}...")
with open(str(self.dataPath / listings_file), "r", encoding = "utf-8", errors = 'ignore') as f:
total += (sum(bl.count("\n") for bl in self.blocks(f)))

Expand All @@ -260,24 +237,31 @@ def importListings(self, listings_file):
supply_price, supply_units, supply_level, from_live)
VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"""

self.tdenv.DEBUG0("Getting list of commodities...")
items = []
it_result = self.execute("SELECT item_id FROM Item ORDER BY item_id").fetchall()
for item in it_result:
items.append(item[0])

self.tdenv.DEBUG0("Getting list of stations...")
stationList = {
stationID
for (stationID,) in self.execute("SELECT station_id FROM Station")
}

self.tdenv.DEBUG0("Processing entries...")
with open(str(self.dataPath / listings_file), "r") as fh:
prog = pbar.Progress(total, 50)
listings = csv.DictReader(fh)

cur_station = -1

for listing in listings:
prog.increment(1, postfix = lambda value, goal: " " + str(round(value / total * 100)) + "%")
if prog.increment(1, postfix = lambda value, goal: f" {(value / total * 100):.0f}% {value} / {total}"):
# Do a commit and close the DB every 2%.
# This ensures the listings are put in the DB and the WAL is cleared.
self.commit()
self.tdb.close()

station_id = int(listing['station_id'])
if station_id not in stationList:
Expand All @@ -294,15 +278,18 @@ def importListings(self, listings_file):
updated = timegm(datetime.datetime.strptime(result[0].split('.')[0], '%Y-%m-%d %H:%M:%S').timetuple())
# When the listings.csv data matches the database, update to make from_live == 0.
if int(listing['collected_at']) == updated and not from_live:
liveList.append((cur_station,))
self.tdenv.DEBUG1(f"Marking {cur_station} as no longer 'live'.")
self.execute(liveStmt, (cur_station,))
# Unless the import file data is newer, nothing else needs to be done for this station,
# so the rest of the listings for this station can be skipped.
if int(listing['collected_at']) <= updated:
skipStation = True
continue

# The data from the import file is newer, so we need to delete the old data for this station.
delList.append((cur_station,))
self.tdenv.DEBUG1(f"Deleting old listing data for {cur_station}.")
self.execute(delStmt, (cur_station,))


if skipStation:
continue
Expand All @@ -320,24 +307,18 @@ def importListings(self, listings_file):
supply_units = int(listing['supply'])
supply_level = int(listing['supply_bracket']) if listing['supply_bracket'] != '' else -1

listingList.append((station_id, item_id, modified,
self.tdenv.DEBUG1(f"Inserting new listing data for {station_id}.")
self.execute(listingStmt, (station_id, item_id, modified,
demand_price, demand_units, demand_level,
supply_price, supply_units, supply_level, from_live))

# Do a final commit to be sure
self.commit()
self.tdb.close()

while prog.value < prog.maxValue:
prog.increment(1, postfix = lambda value, goal: " " + str(round(value / total * 100)) + "%")
prog.clear()

self.tdenv.NOTE("Import file processing complete, updating database. {}", self.now())
if liveList:
self.tdenv.NOTE("Marking data now in the EDDB listings.csv as no longer 'live'. {}", self.now())
self.executemany(liveStmt, liveList)
if delList:
self.tdenv.NOTE("Deleting old listing data. {}", self.now())
self.executemany(delStmt, delList)
if listingList:
self.tdenv.NOTE("Inserting new listing data. {}", self.now())
self.executemany(listingStmt, listingList)

self.tdenv.NOTE("Finished processing market data. End time = {}", self.now())

Expand Down Expand Up @@ -497,7 +478,7 @@ def run(self):
# Remake the .db files with the updated info.
if buildCache:
self.tdb.close()
cache.buildCache(self.tdb, self.tdenv)
self.tdb.reloadCache()

self.tdenv.ignoreUnknown = True

Expand All @@ -511,9 +492,6 @@ def run(self):
if self.downloadFile(self.liveListingsPath) or self.getOption("force"):
self.importListings(self.liveListingsPath)

# self.commit()
self.tdb.close()

if self.getOption("listings"):
self.tdenv.NOTE("Regenerating .prices file.")
cache.regeneratePricesFile(self.tdb, self.tdenv)
Expand Down

0 comments on commit 91a71c0

Please sign in to comment.