Skip to content

Commit

Permalink
Refactor the setup and REPL processing better
Browse files Browse the repository at this point in the history
Instead of one giant setup+REPL method, now we have an independent
setup-once method and then the REPL run loop.

We now add a better external exception handler to the REPL and a more
logical run = setup + REPL instead of run = run logic.

Also refactored the command parsing and processing out into class-level
methods instead of nested closures (even thouogh those functions are
only used directly in the REPL it's just cleaner to have them broken out
so we don't have 100 lines of utility logic in our command flow to read
through all the time).
  • Loading branch information
mattsta committed Jun 29, 2024
1 parent 1e03b5e commit a26edd5
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 161 deletions.
8 changes: 5 additions & 3 deletions icli/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ async def initcli():
# patch entire application with prompt-toolkit-compatible stdout
with patch_stdout(raw=True):
try:
await app.dorepl()
except SystemExit:
await app.runall()
except (SystemExit, EOFError):
# known-good exit condition
...
pass
except:
logger.exception("Major uncaught exception?")
else:
logger.error("Attached input isn't a console, so we can't do anything!")

Expand Down
333 changes: 175 additions & 158 deletions icli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2755,7 +2755,161 @@ async def addQuotes(self, symbols):
# can then use to index into the quoteState[] dict directly later)
return list(qs)

async def dorepl(self):
async def runCollective(self, concurrentCmds):
"""Given a list of commands and arguments, run them all concurrently."""

# Run all our concurrent tasks NOW
cmds = "; ".join([x[2] for x in concurrentCmds])
with Timer(cmds):
try:
await asyncio.gather(
*[
self.dispatch.runop(
collectiveCmd,
collectiveRest[0] if collectiveRest else None,
self.opstate,
)
for collectiveCmd, collectiveRest, _originalFullCommand in concurrentCmds
]
)
except:
logger.exception("[{}] Collective command running failed?", cmds)

async def runSingleCommand(self, cmd, rest):
with Timer(cmd):
try:
await self.dispatch.runop(cmd, rest[0] if rest else None, self.opstate)
except Exception as e:
if "token" in str(e):
# don't show a 100 line stack trace for mistyped inputs.
# Just tell the user it needs to be corrected.
logger.error("Error parsing your input: {}", e)
else:
logger.exception("sorry, what now?")

def buildRunnablesFromCommandRequest(self, text1):
# Attempt to run the command(s) submitted into the prompt.
#
# Commands can be:
# Regular single-line commands:
# > COMMAND
#
# Multiple commands on a single line with semicolons splitting them:
# > COMMAND1; COMMAND2
#
# Multiple commands across multiple lines (easy for pasting from other scripts generating commands)
# > COMMAND1
# COMMAND2
#
# Commands can have end of line comments which *do* get saved to history, but *DO NOT* get sent to the command
# > COMMAND # Comment about command
#
# Commands can also be run in groups all at once concurrently.
# Concurrent commands requested back-to-back all run at the same time and non-concurrent commands between concurrent groups block as expected.
#
# This will run (1, 2) concurrently, then 3, then 4, then (5, 6) concurrently again.
# > COMMAND1&; COMMAND2&; COMMAND3; COMMAND4; COMMAND5&; COMMAND6&
#
# Command processing process is:
# - Detect end-of-line comment and remove it (comments are PER FULL INPUT so "CMD1; CMD2; # CMD3; CMD4; CMD5" only runs "CMD1; CMD2")
# - Split input text on newlines and semicolons
# - Remove leading/trailing whitespace from each split command
# - Check if command is a concurrent command request (add to concurrent group if necessary)
# - Check if command is regular (add to regular runner if necessary)
# - Run collected concurrent and sequential command(s) in submitted group order.
#
# Originally we didn't have concurrency groups, so we processed commands in a simple O(N) loop,
# but now we pre-process (concurrent, sequential) commands first, then we run commands after we
# accumulate them, so we have ~O(2N) processing, but our N is almost always less than 10.
#
# (This command processing logic went from "just parse 1 command per run" to our
# current implementation of handling multi-commands and comments and concurrent commands,
# so command parsing has increased in complexity, but hopefully the increased running logic is
# useful to enable more efficient order entry/exit management.)
#
# These nice helpers require some extra input processing work, but our
# basic benchmark shows cleaning up these commands only requires an
# extra 30 us at the worst case, so it still allows over 30,000 command
# parsing events per second (and we always end up blocked by the IBKR
# gateway latency anyway which takes 100 ms to 300 ms for replies to the API)

runnables = []

# 'collective' holds the current accumulating concurrency group
collective = []

commentRemoved = re.sub(r"#.*", "", text1).strip()
ccmds = re.split(r"[\n;]", commentRemoved)
for ccmd in ccmds:
# if the split generated empty entries (like running ;;;;), just skip the command
ccmd = ccmd.strip()

if not ccmd:
continue

# custom usability hack: we can detect math ops and not need to prefix 'math' to them manually
if ccmd[0] == "(":
ccmd = f"math {ccmd}"

# Check if this command is a background command then clean it up
isBackgroundCmd = ccmd[-1] == "&"
if isBackgroundCmd:
# remove ampersand from background request and re-strip command...
ccmd = ccmd[:-1].rstrip()

# split into command dispatch lookup and arguments to command
cmd, *rest = ccmd.split(" ", 1)

# If background command, add to our background concurrency group for this block
if isBackgroundCmd:
# now fixup background command...
collective.append((cmd, rest, ccmd))

# this 'run group' count is BEFORE the runnable is added
logger.info(
"[{} :: concurrent] Added command to run group {}",
ccmd,
len(runnables),
)
continue

# if we have previously saved concurrent tasks and this task is NOT concurrent, add all concurrent tasks,
# THEN add this task.
if collective and not isBackgroundCmd:
runnables.append(self.runCollective(collective.copy()))

# now since we added everything, remove the pending tasks so we don't schedule them again.
collective.clear()

# now schedule SINGLE command since we know the collective is properly handled already
runnables.append(self.runSingleCommand(cmd, rest))

if len(runnables) and len(ccmds) > 1:
# this 'run group' count is AFTER the runnable is added (so we subtract one to get the actual order number)
logger.info(
"[{} :: sequential] Added command to run group {}",
ccmd,
len(runnables) - 1,
)

# extra catch: if our commands END with a collective command, we need to now add them here too
# (because the prior condition only checks if we went collective->single; but if we are ALL collective,
# we never trigger the "is single, cut previously collective into a full group" condition)
if collective:
runnables.append(self.runCollective(collective.copy()))

return runnables

async def runall(self):
await self.prepare()
while not self.exiting:
try:
await self.dorepl()
except:
logger.exception("Uncaught exception in repl? Restarting...")
continue

async def prepare(self):
# Setup...

# wait until we start getting data from the gateway...
Expand Down Expand Up @@ -2926,9 +3080,10 @@ async def reconnect():
# do not pass go, do not continue, throw the exit upward
sys.exit(0)

set_title(f"{self.levelName().title()} Trader")
set_title(f"{self.levelName().title()} Trader ({self.clientId})")
self.ib.disconnectedEvent += lambda: asyncio.create_task(reconnect())

async def dorepl(self):
session = PromptSession(
history=ThreadedHistory(
FileHistory(
Expand All @@ -2939,6 +3094,7 @@ async def reconnect():
)

app = session.app
loop = asyncio.get_event_loop()

async def updateToolbar():
"""Update account balances"""
Expand All @@ -2954,40 +3110,6 @@ async def updateToolbar():

loop.create_task(updateToolbar())

async def runCollective(concurrentCmds):
"""Given a list of commands and arguments, run them all concurrently."""

# Run all our concurrent tasks NOW
cmds = "; ".join([x[2] for x in concurrentCmds])
with Timer(cmds):
try:
await asyncio.gather(
*[
self.dispatch.runop(
collectiveCmd,
collectiveRest[0] if collectiveRest else None,
self.opstate,
)
for collectiveCmd, collectiveRest, _originalFullCommand in concurrentCmds
]
)
except:
logger.exception("[{}] Collective command running failed?", cmds)

async def runSingleCommand(cmd, rest):
with Timer(cmd):
try:
await self.dispatch.runop(
cmd, rest[0] if rest else None, self.opstate
)
except Exception as e:
if "token" in str(e):
# don't show a 100 line stack trace for mistyped inputs.
# Just tell the user it needs to be corrected.
logger.error("Error parsing your input: {}", e)
else:
logger.exception("sorry, what now?")

# The Command Processing REPL
while True:
try:
Expand All @@ -3007,131 +3129,26 @@ async def runSingleCommand(cmd, rest):
# log user input to our active logfile(s)
logger.trace("{}> {}", self.levelName(), text1)

# Attempt to run the command(s) submitted into the prompt.
#
# Commands can be:
# Regular single-line commands:
# > COMMAND
#
# Multiple commands on a single line with semicolons splitting them:
# > COMMAND1; COMMAND2
#
# Multiple commands across multiple lines (easy for pasting from other scripts generating commands)
# > COMMAND1
# COMMAND2
#
# Commands can have end of line comments which *do* get saved to history, but *DO NOT* get sent to the command
# > COMMAND # Comment about command
#
# Commands can also be run in groups all at once concurrently.
# Concurrent commands requested back-to-back all run at the same time and non-concurrent commands between concurrent groups block as expected.
#
# This will run (1, 2) concurrently, then 3, then 4, then (5, 6) concurrently again.
# > COMMAND1&; COMMAND2&; COMMAND3; COMMAND4; COMMAND5&; COMMAND6&
#
# Command processing process is:
# - Detect end-of-line comment and remove it (comments are PER FULL INPUT so "CMD1; CMD2; # CMD3; CMD4; CMD5" only runs "CMD1; CMD2")
# - Split input text on newlines and semicolons
# - Remove leading/trailing whitespace from each split command
# - Check if command is a concurrent command request (add to concurrent group if necessary)
# - Check if command is regular (add to regular runner if necessary)
# - Run collected concurrent and sequential command(s) in submitted group order.
#
# Originally we didn't have concurrency groups, so we processed commands in a simple O(N) loop,
# but now we pre-process (concurrent, sequential) commands first, then we run commands after we
# accumulate them, so we have ~O(2N) processing, but our N is almost always less than 10.
#
# (This command processing logic went from "just parse 1 command per run" to our
# current implementation of handling multi-commands and comments and concurrent commands,
# so command parsing has increased in complexity, but hopefully the increased running logic is
# useful to enable more efficient order entry/exit management.)
#
# These nice helpers require some extra input processing work, but our
# basic benchmark shows cleaning up these commands only requires an
# extra 30 us at the worst case, so it still allows over 30,000 command
# parsing events per second (and we always end up blocked by the IBKR
# gateway latency anyway which takes 100 ms to 300 ms for replies to the API)

# 'collective' holds the current accumulating concurrency group
collective = []

# 'runnables' is the list of all commands to run after we collect them
runnables = []
commentRemoved = re.sub(r"#.*", "", text1).strip()
ccmds = re.split(r"[\n;]", commentRemoved)
for ccmd in ccmds:
# if the split generated empty entries (like running ;;;;), just skip the command
ccmd = ccmd.strip()

if not ccmd:
continue
runnables = self.buildRunnablesFromCommandRequest(text1)

# custom usability hack: we can detect math ops and not need to prefix 'math' to them manually
if ccmd[0] == "(":
ccmd = f"math {ccmd}"

# Check if this command is a background command then clean it up
isBackgroundCmd = ccmd[-1] == "&"
if isBackgroundCmd:
# remove ampersand from background request and re-strip command...
ccmd = ccmd[:-1].rstrip()

# split into command dispatch lookup and arguments to command
cmd, *rest = ccmd.split(" ", 1)

# If background command, add to our background concurrency group for this block
if isBackgroundCmd:
# now fixup background command...
collective.append((cmd, rest, ccmd))

# this 'run group' count is BEFORE the runnable is added
logger.info(
"[{} :: concurrent] Added command to run group {}",
ccmd,
len(runnables),
)
continue

# if we have previously saved concurrent tasks and this task is NOT concurrent, add all concurrent tasks,
# THEN add this task.
if collective and not isBackgroundCmd:
runnables.append(runCollective(collective.copy()))

# now since we added everything, remove the pending tasks so we don't schedule them again.
collective.clear()

# now schedule SINGLE command since we know the collective is properly handled already
runnables.append(runSingleCommand(cmd, rest))

if len(runnables) and len(ccmds) > 1:
# this 'run group' count is AFTER the runnable is added (so we subtract one to get the actual order number)
logger.info(
"[{} :: sequential] Added command to run group {}",
ccmd,
len(runnables) - 1,
)

# extra catch: if our commands END with a collective command, we need to now add them here too
# (because the prior condition only checks if we went collective->single; but if we are ALL collective,
# we never trigger the "is single, cut previously collective into a full group" condition)
if collective:
runnables.append(runCollective(collective.copy()))

if runnables:
if len(runnables) == 1:
# if only one command, don't run with an extra Timer() report like we do below
# with multiple commands (individual commands always report their individual timing)
await runnables[0]
else:
# only show the "All commands" timer if we have multiple commands to run
with Timer("All commands"):
for run in runnables:
try:
# run either a SINGLE command or a COLLECTIVE GROUP as we previously created
await run
except:
logger.exception("[{}] Runnable failed?", run)
# if no commands, just draw the prompt again
if not runnables:
continue

if len(runnables) == 1:
# if only one command, don't run with an extra Timer() report like we do below
# with multiple commands (individual commands always report their individual timing)
await runnables[0]
else:
# only show the "All commands" timer if we have multiple commands to run
with Timer("All commands"):
for run in runnables:
try:
# run a COLLECTIVE COMMAND GROUP we previously created
await run
except:
logger.exception("[{}] Runnable failed?", run)
except KeyboardInterrupt:
# Control-C pressed. Try again.
continue
Expand Down

0 comments on commit a26edd5

Please sign in to comment.