Skip to content
This repository has been archived by the owner on Jan 3, 2025. It is now read-only.

✨ feat: 更换bwiki适配方式 #12

Merged
merged 1 commit into from
Oct 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions nonebot_plugin_wiki/mediawiki/util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import print_function, unicode_literals

from typing import Union, Tuple, List
import sys
import functools
from html.parser import HTMLParser


def debug(fn):
Expand All @@ -14,6 +16,65 @@ def wrapper(*args, **kwargs):
return wrapper


def get_from_attrs(attr_list, target):
if not target:
return False
if isinstance(target, str):
for attr in attr_list:
if target == attr[0]:
return attr[1]
if isinstance(target, tuple):
for attr in attr_list:
if target[0] == attr[0] and target[1] in attr[1]:
return True
if isinstance(target, list) and len(target) == 2:
find = target[0]
fetch = target[1]
got = False
temp = None
for attr in attr_list:
if find[0] == attr[0] and find[1] in attr[1]:
got = True
if fetch[0] == attr[0]:
temp = attr[1]
if temp and got:
return temp
return False


class SimpleInnerParser(HTMLParser):
def __init__(
self,
target_tag='p',
target_attr: Union[str, Tuple, List[Tuple]] = None, # type: ignore
text_context=True
):
super().__init__()
self.output = []
self.open_write = False
self.target_tag = target_tag
self.target_attr = target_attr
self.text_context = text_context

def handle_starttag(self, tag, attrs):
if tag == self.target_tag or not self.target_tag:
checker = get_from_attrs(
attrs, self.target_attr) if self.target_attr else True
self.open_write = True and checker
if value := get_from_attrs(attrs, self.target_attr):
if not self.text_context:
self.output.append(str(value).strip())
self.open_write = False

def handle_endtag(self, tag):
if tag == self.target_tag:
self.open_write = False

def handle_data(self, data: str):
if self.open_write and self.text_context and data.strip():
self.output.append(data.strip())


class cache(object):

def __init__(self, fn):
Expand Down
81 changes: 47 additions & 34 deletions nonebot_plugin_wiki/mediawiki/wikipedia.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from .exceptions import (
ApiReturnError,
NoExtractError,
PageError,
DisambiguationError,
RedirectError,
Expand Down Expand Up @@ -262,13 +263,41 @@ async def random(pages=1):
return titles


async def bwiki_summary(title: str):
"""接受来自BWiki的标题,返回文字简介

Args:
title: 标题

Returns:
str: 简介
"""
params = {
"action": "parse",
"format": "json",
"disablelimitreport": False,
"redirects": True,
"disableeditsection": True,
"page": title
}

request = await _wiki_request(params)

to_feed = request["parse"]["text"]["*"]
wikibot_parser = util.SimpleInnerParser("", ("class", "wiki-bot"))
wikibot_parser.feed(to_feed)
wikibot_parser.close()

return '\n'.join(wikibot_parser.output)


async def summary(
title,
sentences=0,
chars=0,
auto_suggest=True,
redirect=True
) -> Tuple[int, str]: # type: ignore
) -> Tuple[int, str]: # type: ignore
'''
页面id与纯文本简介
Keyword arguments:
Expand All @@ -292,35 +321,21 @@ async def summary(
'titles': title,

}
if "wiki.biligame" not in API_URL:
if sentences:
query_params['exsentences'] = sentences
elif chars:
query_params['exchars'] = chars
else:
query_params['exintro'] = ''
request = await _wiki_request(query_params)
try:
summary = (request['query']['pages'][pageid]['extract']).strip()
except KeyError as e:
raise NoExtractError from e

if sentences:
query_params['exsentences'] = sentences
elif chars:
query_params['exchars'] = chars
else:
query_params['exintro'] = ''
request = await _wiki_request(query_params)
try:
summary = (request['query']['pages'][pageid]['extract']).strip()
except KeyError:

# bili wiki解析
global USER_AGENT

headers = {
'User-Agent': USER_AGENT
}

async with httpx.AsyncClient(proxies=PROXIES, timeout=None) as client:
r = await client.get(page_info.url, headers=headers)

r = r.text

summary = re.sub(r"</?(.+?)>", "", ''.join(re.compile(
r'<p><b>(.*?)\n<p>\n<div').findall(r)))
if summary == '':
summary = re.sub(r"</?(.+?)>", "", ''.join(re.compile(
r'<p><b>(.*?)\n</p>').findall(r)))
summary = await bwiki_summary(title)

return [pageid, summary] # type: ignore

Expand Down Expand Up @@ -805,7 +820,7 @@ async def _wiki_request(params):
global PROXIES

params['format'] = 'json'
if not ('action' in params):
if 'action' not in params:
params['action'] = 'query'

headers = {
Expand All @@ -823,16 +838,14 @@ async def _wiki_request(params):
time.sleep(int(wait_time.total_seconds()))

global RETRY_TIMES
for times in range(RETRY_TIMES):
for _ in range(RETRY_TIMES):
async with httpx.AsyncClient(proxies=PROXIES, timeout=None) as client:
r = await client.get(API_URL, params=params, headers=headers)

ret = r.json()
# print(ret)

if 'error' in ret:
if ' a temporary problem' in ret['error']['info']:
pass
else:
if 'error' not in ret:
if RATE_LIMIT:
RATE_LIMIT_LAST_CALL = datetime.now()
return ret
Expand Down