Skip to content

Commit

Permalink
💌 easier and faster dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
raynardj committed Feb 11, 2021
1 parent b1eed80 commit 9088b1a
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 181 deletions.
109 changes: 34 additions & 75 deletions langhuan/progress.py
Original file line number Diff line number Diff line change
@@ -1,89 +1,47 @@
from typing import List, Callable, Union
import logging


class Dispatcher:
def __init__(self, n, v):
self.n = n
self.v = v
self.sent = -1
self.started = dict()
self.by_user = dict()
self.busy_by_user = dict()

def new_user_todo(self, user_id):
return {user_id: list(range(self.n))[max(0, self.sent):]}

def user_progress(self, user_id):
try:
user_progress = self.by_user[user_id]
except KeyError:
self.by_user.update(self.new_user_todo(user_id))
user_progress = self.by_user[user_id]
return user_progress
self.cache_data = dict()
self.new = list(range(n))
self.processing = dict()

def __repr__(self):
return str(self.by_user)+"\n"+str(self.sent)
return f"Job Dispatcher: n:{self.n},v:{self.v}"

def __getitem__(self, user_id):
"""
get index by user_id
"""
if user_id in self.busy_by_user:
# read cache
return self.busy_by_user[user_id]

self.user_clear_progress(user_id)
user = self.user_progress(user_id)
try:
index = user[0]
self.after_get_update(user_id, index)

except IndexError:
return -1
return index

def after_get_update(self, user_id, index):
# save cache
self.busy_by_user[user_id] = index
user = self.user_progress(user_id)
if index in user:
user.remove(index)
if self.started.get(index) is None:
self.started[index] = [user_id, ]
if user_id in self.cache_data:
return self.cache_data[user_id]
else:
self.started[index].append(user_id)
if len(self.started[index]) >= self.v:
self.tick_sent(index)

def finish_update(
self,
user_id: str,
index: int,
callbacks: List[Callable] = []
):
"""
callbacks allow for furthur manuvers
each callback function can process:
callback(user_id, index)
"""
# delete cache
if self.busy_by_user.get(user_id) == index:
del self.busy_by_user[user_id]
for callback in callbacks:
callback(user_id, index)

def user_clear_progress(self, user_id):
user_progress = self.user_progress(user_id)

new_progress = []
for i in user_progress:
if i > self.sent:
new_progress.append(i)
self.by_user[user_id] = new_progress
for k, v in self.processing.items():
if len(self.processing[k]) >= self.v:
continue
if user_id in v:
continue
else:
v.append(user_id)
self.cache_data[user_id] = k
return k

# read_new
if len(self.new) > 0:
item = self.new[0]
self.processing[item] = []
self.new.remove(item)
return self[user_id]
else:
return -1

def tick_sent(self, index):
self.sent = index
del self.started[index]
def finish_update(self, user_id, index):
if user_id in self.cache_data:
del self.cache_data[user_id]
if index in self.processing:
if len(self.processing[index]) >= self.v:
del self.processing[index]


class Progress:
Expand Down Expand Up @@ -129,11 +87,12 @@ def recover_history(self, data):
Recover single entry of history data
to the dispatcher
"""
self.tagging(data)
user_id = data["user_id"]
pandas = data["pandas"]

# using pandas to map to new index
index = self.idx_to_index[pandas]
self.dispatcher.after_get_update(user_id=user_id, index=index)
self.tagging(data)
self.dispatcher.finish_update(user_id=user_id, index=index)

def tagging(self, data):
Expand Down
2 changes: 2 additions & 0 deletions langhuan/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,12 @@ def from_df(
cross_verify_num=cross_verify_num)

if load_history:
logging.info(f"start loading history")
# loading the history to progress
if len(app.task_history.history) > 0:
for data in app.task_history.history:
app.progress.recover_history(data)
logging.info(f"history loaded")
return app

def create_progress(
Expand Down
2 changes: 1 addition & 1 deletion langhuan/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.0.10"
__version__ = "0.0.11"
99 changes: 5 additions & 94 deletions ner_test.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,23 @@
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <td>0</td>\n",
" <th>0</th>\n",
" <td>From: [email protected] (where's my thing)\\nS...</td>\n",
" </tr>\n",
" <tr>\n",
" <td>1</td>\n",
" <th>1</th>\n",
" <td>From: [email protected] (Guy Kuo)...</td>\n",
" </tr>\n",
" <tr>\n",
" <td>2</td>\n",
" <th>2</th>\n",
" <td>From: [email protected] (Thomas E Will...</td>\n",
" </tr>\n",
" <tr>\n",
" <td>3</td>\n",
" <th>3</th>\n",
" <td>From: jgreen@amber (Joe Green)\\nSubject: Re: W...</td>\n",
" </tr>\n",
" <tr>\n",
" <td>4</td>\n",
" <th>4</th>\n",
" <td>From: [email protected] (Jonathan McDow...</td>\n",
" </tr>\n",
" </tbody>\n",
Expand Down Expand Up @@ -130,95 +130,6 @@
"app.run(\"0.0.0.0\", port=5000)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'Corporation\\[email protected]\\t\\t\\tComputer Systems Division\\n\"The only thing that really scares me is a person with no sense of humor.\"\\n\\t\\t\\t\\t\\t\\t-- Jonathan Winters\\n'"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.loc[3][\"text\"][652:]"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"from langhuan.utility import cleanup_tags"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"3"
]
},
"execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"len(\"˃˃˃\")"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"3"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"len(\"Ë‚Ë‚Ë‚\")"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'Harris Corporation\\[email protected]\\t\\t\\tComputer Systems Division\\n\"The only thing that really scares me is a person with no sense of humor.\"\\n\\t\\t\\t\\t\\t\\t-- Jonathan Winters\\n'"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"cleanup_tags(df.loc[3][\"text\"])[652:]"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down
2 changes: 1 addition & 1 deletion settings.ini
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ keywords = python pandas label data science
author = xiaochen(ray) zhang
author_email = [email protected]
branch = main
version = 0.0.10
version = 0.0.11
min_python = 3.6
audience = Developers
language = English
Expand Down
Loading

0 comments on commit 9088b1a

Please sign in to comment.