Skip to content

Commit

Permalink
fixed: /manager/queue/status - race condition issue
Browse files Browse the repository at this point in the history
  • Loading branch information
ltdrdata committed Feb 2, 2025
1 parent 829784f commit 3c29333
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 57 deletions.
2 changes: 1 addition & 1 deletion glob/manager_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from node_package import InstalledNodePackage


version_code = [3, 13]
version_code = [3, 13, 1]
version_str = f"V{version_code[0]}.{version_code[1]}" + (f'.{version_code[2]}' if len(version_code) > 2 else '')


Expand Down
117 changes: 64 additions & 53 deletions glob/manager_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,43 +372,43 @@ def nickname_filter(json_obj):
task_queue = queue.Queue()
nodepack_result = {}
model_result = {}
tasks_in_progress = set()
task_worker_lock = threading.Lock()

async def task_worker():
global task_queue
global nodepack_result
global model_result
global tasks_in_progress

async def do_install(item):
async def do_install(item) -> str:
ui_id, node_spec_str, channel, mode, skip_post_install = item

try:
node_spec = core.unified_manager.resolve_node_spec(node_spec_str)

if node_spec is None:
logging.error(f"Cannot resolve install target: '{node_spec_str}'")
nodepack_result[ui_id] = f"Cannot resolve install target: '{node_spec_str}'"
return
return f"Cannot resolve install target: '{node_spec_str}'"

node_name, version_spec, is_specified = node_spec
res = await core.unified_manager.install_by_id(node_name, version_spec, channel, mode, return_postinstall=skip_post_install)
# discard post install if skip_post_install mode

if res.action not in ['skip', 'enable', 'install-git', 'install-cnr', 'switch-cnr']:
logging.error(f"[ComfyUI-Manager] Installation failed:\n{res.msg}")
nodepack_result[ui_id] = res.msg
return
return res.msg

elif not res.result:
logging.error(f"[ComfyUI-Manager] Installation failed:\n{res.msg}")
nodepack_result[ui_id] = res.msg
return
return res.msg

nodepack_result[ui_id] = 'success'
return 'success'
except Exception:
traceback.print_exc()
nodepack_result[ui_id] = f"Installation failed:\n{node_spec_str}"
return f"Installation failed:\n{node_spec_str}"

async def do_update(item):
async def do_update(item) -> str:
ui_id, node_name, node_ver = item

try:
Expand All @@ -417,70 +417,68 @@ async def do_update(item):
manager_util.clear_pip_cache()

if res.result:
nodepack_result[ui_id] = 'success'
return
return 'success'

logging.error(f"\nERROR: An error occurred while updating '{node_name}'.")
nodepack_result[ui_id] = f"An error occurred while updating '{node_name}'."
except Exception:
traceback.print_exc()
nodepack_result[ui_id] = f"An error occurred while updating '{node_name}'."

async def do_fix(item):
return f"An error occurred while updating '{node_name}'."

async def do_fix(item) -> str:
ui_id, node_name, node_ver = item

try:
res = core.unified_manager.unified_fix(node_name, node_ver)

if res.result:
nodepack_result[ui_id] = 'success'
return
return 'success'
else:
logging.error(res.msg)

logging.error(f"\nERROR: An error occurred while fixing '{node_name}@{node_ver}'.")
nodepack_result[ui_id] = f"An error occurred while fixing '{node_name}@{node_ver}'."
except Exception:
traceback.print_exc()
nodepack_result[ui_id] = f"An error occurred while fixing '{node_name}@{node_ver}'."

async def do_uninstall(item):
return f"An error occurred while fixing '{node_name}@{node_ver}'."

async def do_uninstall(item) -> str:
ui_id, node_name, is_unknown = item

try:
res = core.unified_manager.unified_uninstall(node_name, is_unknown)

if res.result:
nodepack_result[ui_id] = 'success'
return
return 'success'

logging.error(f"\nERROR: An error occurred while uninstalling '{node_name}'.")
nodepack_result[ui_id] = f"An error occurred while uninstalling '{node_name}'."
except Exception:
traceback.print_exc()
nodepack_result[ui_id] = f"An error occurred while uninstalling '{node_name}'."

async def do_disable(item):
return f"An error occurred while uninstalling '{node_name}'."

async def do_disable(item) -> str:
ui_id, node_name, is_unknown = item

try:
res = core.unified_manager.unified_disable(node_name, is_unknown)

if res:
nodepack_result[ui_id] = 'success'
return
return 'success'

nodepack_result[ui_id] = f"Failed to disable: '{node_name}'"
except Exception:
traceback.print_exc()
nodepack_result[ui_id] = f"Failed to disable: '{node_name}'"

async def do_install_model(item):
return f"Failed to disable: '{node_name}'"

async def do_install_model(item) -> str:
ui_id, json_data = item

model_path = get_model_path(json_data)
model_url = json_data['url']

res = False

try:
if model_path is not None:
logging.info(f"Install model '{json_data['name']}' from '{model_url}' into '{model_path}'")
Expand All @@ -494,24 +492,21 @@ async def do_install_model(item):
res = True

if res:
model_result[ui_id] = 'success'
return
return 'success'
else:
res = download_url_with_agent(model_url, model_path)
if res and model_path.endswith('.zip'):
res = core.unzip(model_path)
else:
logging.error(f"Model installation error: invalid model type - {json_data['type']}")
return

if res:
model_result[ui_id] = 'success'
return
return 'success'

except Exception as e:
logging.error(f"[ERROR] {e}", file=sys.stderr)

model_result[ui_id] = f"Model installation error: {model_url}"
return f"Model installation error: {model_url}"

stats = {}

Expand All @@ -529,31 +524,43 @@ async def do_install_model(item):
'total_count': total_count, 'done_count': done_count})
nodepack_result = {}
task_queue = queue.Queue()
return
return # terminate worker thread

kind, item = task_queue.get()
with task_worker_lock:
kind, item = task_queue.get()
tasks_in_progress.add((kind, item[0]))

try:
if kind == 'install':
await do_install(item)
if kind == 'install-model':
await do_install_model(item)
msg = await do_install(item)
elif kind == 'install-model':
msg = await do_install_model(item)
elif kind == 'update':
await do_update(item)
msg = await do_update(item)
elif kind == 'fix':
await do_fix(item)
msg = await do_fix(item)
elif kind == 'uninstall':
await do_uninstall(item)
msg = await do_uninstall(item)
elif kind == 'disable':
await do_disable(item)
msg = await do_disable(item)
else:
msg = "Unexpected kind: " + kind
except Exception:
traceback.print_exc()
msg = f"Exception: {(kind, item)}"

stats[kind] = stats.get(kind, 0) + 1
with task_worker_lock:
tasks_in_progress.remove((kind, item[0]))

ui_target = "model_manager" if kind == 'install-model' else 'nodepack_manager'
ui_id = item[0]
if kind == 'install-model':
model_result[ui_id] = msg
ui_target = "model_manager"
else:
nodepack_result[ui_id] = msg
ui_target = "nodepack_manager"

print(f"kind: {kind} / ui_target: {ui_target}")
stats[kind] = stats.get(kind, 0) + 1

PromptServer.instance.send_sync("cm-queue-status",
{'status': 'in_progress', 'target': item[0], 'ui_target': ui_target,
Expand Down Expand Up @@ -1073,11 +1080,15 @@ async def reset_queue(request):
async def queue_count(request):
global task_queue

done_count = len(nodepack_result) + len(model_result)
total_count = done_count + task_queue.qsize()
in_progress = task_worker_thread is not None and task_worker_thread.is_alive()
with task_worker_lock:
done_count = len(nodepack_result) + len(model_result)
in_progress_count = len(tasks_in_progress)
total_count = done_count + in_progress_count + task_queue.qsize()
is_processing = task_worker_thread is not None and task_worker_thread.is_alive()

return web.json_response({'total_count': total_count, 'done_count': done_count, 'in_progress': in_progress})
return web.json_response({
'total_count': total_count, 'done_count': done_count, 'in_progress_count': in_progress_count,
'is_processing': is_processing})


@routes.post("/manager/queue/install")
Expand Down Expand Up @@ -1129,7 +1140,7 @@ async def install_custom_node(request):
return web.Response(status=200)


task_worker_thread = None
task_worker_thread:threading.Thread = None

@routes.get("/manager/queue/start")
async def queue_start(request):
Expand Down
2 changes: 1 addition & 1 deletion js/custom-nodes-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -1287,7 +1287,7 @@ export class CustomNodesManager {
async installNodes(list, btn, title, selected_version) {
let stats = await api.fetchApi('/manager/queue/status');
stats = await stats.json();
if(stats.in_progress) {
if(stats.is_processing) {
customAlert(`[ComfyUI-Manager] There are already tasks in progress. Please try again after it is completed. (${stats.done_count}/${stats.total_count})`);
return;
}
Expand Down
2 changes: 1 addition & 1 deletion js/model-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ export class ModelManager {
let stats = await api.fetchApi('/manager/queue/status');

stats = await stats.json();
if(stats.in_progress) {
if(stats.is_processing) {
customAlert(`[ComfyUI-Manager] There are already tasks in progress. Please try again after it is completed. (${stats.done_count}/${stats.total_count})`);
return;
}
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[project]
name = "comfyui-manager"
description = "ComfyUI-Manager provides features to install and manage custom nodes for ComfyUI, as well as various functionalities to assist with ComfyUI."
version = "3.13"
version = "3.13.1"
license = { file = "LICENSE.txt" }
dependencies = ["GitPython", "PyGithub", "matrix-client==0.4.0", "transformers", "huggingface-hub>0.20", "typer", "rich", "typing-extensions"]

Expand Down

0 comments on commit 3c29333

Please sign in to comment.