diff --git a/langhuan/progress.py b/langhuan/progress.py index 1b692bd..4997866 100644 --- a/langhuan/progress.py +++ b/langhuan/progress.py @@ -1,89 +1,47 @@ from typing import List, Callable, Union import logging + class Dispatcher: def __init__(self, n, v): self.n = n self.v = v - self.sent = -1 - self.started = dict() - self.by_user = dict() - self.busy_by_user = dict() - - def new_user_todo(self, user_id): - return {user_id: list(range(self.n))[max(0, self.sent):]} - - def user_progress(self, user_id): - try: - user_progress = self.by_user[user_id] - except KeyError: - self.by_user.update(self.new_user_todo(user_id)) - user_progress = self.by_user[user_id] - return user_progress + self.cache_data = dict() + self.new = list(range(n)) + self.processing = dict() def __repr__(self): - return str(self.by_user)+"\n"+str(self.sent) + return f"Job Dispatcher: n:{self.n},v:{self.v}" def __getitem__(self, user_id): - """ - get index by user_id - """ - if user_id in self.busy_by_user: - # read cache - return self.busy_by_user[user_id] - - self.user_clear_progress(user_id) - user = self.user_progress(user_id) - try: - index = user[0] - self.after_get_update(user_id, index) - - except IndexError: - return -1 - return index - - def after_get_update(self, user_id, index): - # save cache - self.busy_by_user[user_id] = index - user = self.user_progress(user_id) - if index in user: - user.remove(index) - if self.started.get(index) is None: - self.started[index] = [user_id, ] + if user_id in self.cache_data: + return self.cache_data[user_id] else: - self.started[index].append(user_id) - if len(self.started[index]) >= self.v: - self.tick_sent(index) - - def finish_update( - self, - user_id: str, - index: int, - callbacks: List[Callable] = [] - ): - """ - callbacks allow for furthur manuvers - each callback function can process: - callback(user_id, index) - """ - # delete cache - if self.busy_by_user.get(user_id) == index: - del self.busy_by_user[user_id] - for callback in callbacks: - callback(user_id, index) - - def user_clear_progress(self, user_id): - user_progress = self.user_progress(user_id) - - new_progress = [] - for i in user_progress: - if i > self.sent: - new_progress.append(i) - self.by_user[user_id] = new_progress + for k, v in self.processing.items(): + if len(self.processing[k]) >= self.v: + continue + if user_id in v: + continue + else: + v.append(user_id) + self.cache_data[user_id] = k + return k + + # read_new + if len(self.new) > 0: + item = self.new[0] + self.processing[item] = [] + self.new.remove(item) + return self[user_id] + else: + return -1 - def tick_sent(self, index): - self.sent = index - del self.started[index] + def finish_update(self, user_id, index): + if user_id in self.cache_data: + del self.cache_data[user_id] + if index in self.processing: + if len(self.processing[index]) >= self.v: + del self.processing[index] class Progress: @@ -129,11 +87,12 @@ def recover_history(self, data): Recover single entry of history data to the dispatcher """ - self.tagging(data) user_id = data["user_id"] pandas = data["pandas"] + + # using pandas to map to new index index = self.idx_to_index[pandas] - self.dispatcher.after_get_update(user_id=user_id, index=index) + self.tagging(data) self.dispatcher.finish_update(user_id=user_id, index=index) def tagging(self, data): diff --git a/langhuan/tasks.py b/langhuan/tasks.py index 9507f9b..4193f22 100644 --- a/langhuan/tasks.py +++ b/langhuan/tasks.py @@ -97,10 +97,12 @@ def from_df( cross_verify_num=cross_verify_num) if load_history: + logging.info(f"start loading history") # loading the history to progress if len(app.task_history.history) > 0: for data in app.task_history.history: app.progress.recover_history(data) + logging.info(f"history loaded") return app def create_progress( diff --git a/langhuan/version.py b/langhuan/version.py index 9b36b86..b2f0155 100644 --- a/langhuan/version.py +++ b/langhuan/version.py @@ -1 +1 @@ -__version__ = "0.0.10" +__version__ = "0.0.11" diff --git a/ner_test.ipynb b/ner_test.ipynb index 7b3f938..3dcf034 100644 --- a/ner_test.ipynb +++ b/ner_test.ipynb @@ -65,23 +65,23 @@ " \n", " \n", " \n", - " 0\n", + " 0\n", " From: lerxst@wam.umd.edu (where's my thing)\\nS...\n", " \n", " \n", - " 1\n", + " 1\n", " From: guykuo@carson.u.washington.edu (Guy Kuo)...\n", " \n", " \n", - " 2\n", + " 2\n", " From: twillis@ec.ecn.purdue.edu (Thomas E Will...\n", " \n", " \n", - " 3\n", + " 3\n", " From: jgreen@amber (Joe Green)\\nSubject: Re: W...\n", " \n", " \n", - " 4\n", + " 4\n", " From: jcm@head-cfa.harvard.edu (Jonathan McDow...\n", " \n", " \n", @@ -130,95 +130,6 @@ "app.run(\"0.0.0.0\", port=5000)" ] }, - { - "cell_type": "code", - "execution_count": 12, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "'Corporation\\njgreen@csd.harris.com\\t\\t\\tComputer Systems Division\\n\"The only thing that really scares me is a person with no sense of humor.\"\\n\\t\\t\\t\\t\\t\\t-- Jonathan Winters\\n'" - ] - }, - "execution_count": 12, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "df.loc[3][\"text\"][652:]" - ] - }, - { - "cell_type": "code", - "execution_count": 13, - "metadata": {}, - "outputs": [], - "source": [ - "from langhuan.utility import cleanup_tags" - ] - }, - { - "cell_type": "code", - "execution_count": 18, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "3" - ] - }, - "execution_count": 18, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "len(\"˃˃˃\")" - ] - }, - { - "cell_type": "code", - "execution_count": 20, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "3" - ] - }, - "execution_count": 20, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "len(\"˂˂˂\")" - ] - }, - { - "cell_type": "code", - "execution_count": 14, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "'Harris Corporation\\njgreen@csd.harris.com\\t\\t\\tComputer Systems Division\\n\"The only thing that really scares me is a person with no sense of humor.\"\\n\\t\\t\\t\\t\\t\\t-- Jonathan Winters\\n'" - ] - }, - "execution_count": 14, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "cleanup_tags(df.loc[3][\"text\"])[652:]" - ] - }, { "cell_type": "markdown", "metadata": {}, diff --git a/settings.ini b/settings.ini index b99db52..c6edc03 100644 --- a/settings.ini +++ b/settings.ini @@ -6,7 +6,7 @@ keywords = python pandas label data science author = xiaochen(ray) zhang author_email = b2ray2c@gmail.com branch = main -version = 0.0.10 +version = 0.0.11 min_python = 3.6 audience = Developers language = English diff --git a/tests/dispatcher_test.ipynb b/tests/dispatcher_test.ipynb index b8a6233..24475d8 100644 --- a/tests/dispatcher_test.ipynb +++ b/tests/dispatcher_test.ipynb @@ -15,23 +15,80 @@ "metadata": {}, "outputs": [], "source": [ - "logger = logging.getLogger()\n", - "logger.setLevel(logging.DEBUG)" + "import unittest" + ] + }, + { + "cell_type": "code", + "execution_count": 49, + "metadata": {}, + "outputs": [], + "source": [ + "class Dispatcher:\n", + " def __init__(self, n, v):\n", + " self.n = n\n", + " self.v = v\n", + " self.cache_data = dict()\n", + " self.new = list(range(n))\n", + " self.processing = dict()\n", + " \n", + " def __repr__(self):\n", + " return f\"Job Dispatcher: n:{self.n},v{self.v}\"\n", + " \n", + " def __getitem__(self, user_id):\n", + " if user_id in self.cache_data:\n", + " return self.cache_data[user_id]\n", + " else:\n", + " for k, v in self.processing.items():\n", + " if len(self.processing[k]) >= self.v:\n", + " continue\n", + " if user_id in v:\n", + " continue\n", + " else:\n", + " v.append(user_id)\n", + " self.cache_data[user_id] = k\n", + " return k\n", + "\n", + " # read_new\n", + " if len(self.new)>0:\n", + " item = self.new[0]\n", + " self.processing[item] = []\n", + " self.new.remove(item)\n", + " return self[user_id]\n", + " else:\n", + " return -1\n", + " \n", + " def finish_update(self, user_id, index):\n", + " if user_id in self.cache_data:\n", + " del self.cache_data[user_id]\n", + " if index in self.processing:\n", + " if len(self.processing[index]) >= self.v:\n", + " del self.processing[index]\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": 50, + "metadata": {}, + "outputs": [], + "source": [ + "# from langhuan.progress import Dispatcher" ] }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 51, "metadata": {}, "outputs": [], "source": [ - "import unittest\n", - "from langhuan.progress import Dispatcher" + "logger = logging.getLogger()\n", + "logger.setLevel(logging.DEBUG)" ] }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 52, "metadata": {}, "outputs": [], "source": [ @@ -87,7 +144,7 @@ " dispatcher.finish_update(2, index=i2)\n", " if i % 3 != 2:\n", " dispatcher.finish_update(3, index=i3)\n", - "\n", + " print(result)\n", " expected = {\n", " 'step_0': [0, 0, 1],\n", " 'step_1': [0, 1, 2],\n", @@ -107,16 +164,30 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": 53, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ - "...\n", + "..." + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'step_0': [0, 0, 1], 'step_1': [0, 1, 2], 'step_2': [2, 1, 3], 'step_3': [3, 4, 3], 'step_4': [3, 5, 4], 'step_5': [5, 5, 6], 'step_6': [6, 7, 6], 'step_7': [6, 8, 7], 'step_8': [8, 8, 9], 'step_9': [9, -1, 9], 'step_10': [9, -1, -1], 'step_11': [-1, -1, -1]}\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "\n", "----------------------------------------------------------------------\n", - "Ran 3 tests in 0.004s\n", + "Ran 3 tests in 0.008s\n", "\n", "OK\n" ]