首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >把事情组织在一起,形成一个最小可行的刮板应用程序

把事情组织在一起,形成一个最小可行的刮板应用程序
EN

Code Review用户
提问于 2021-06-26 01:09:51
回答 1查看 119关注 0票数 1

这是我从这里开始的一组刮板问题的后续。

到目前为止,在@Reinderien的帮助下,我已经编写了4个单独的“模块”,这些模块公开了一个search函数,用于从不同的在线数据库中抓取书目信息。其中一半使用Selenium,另一半使用Requests

我想知道将它们放在一起的最佳方法,可能将它们组织到一个可以一起导入的模块中,并/或创建一个基类,以便在它们之间共享公共代码。

当给出搜索关键字列表时,我希望最终的应用程序能够执行每个数据库的search函数,以及作为参数进行搜索的数据库的选择。

更新:

由于这个问题还没有答案,我起草了一个工作代码,其中包含了一个关键字列表以及要搜索的数据库。如果这是未指定的,则同一组关键字将在所有数据库中循环。

我希望对下面的守则作出改进,特别是在以下方面:

  1. 当涉及所有数据库时,将搜索结果合并为单个.json.bib文件。
  2. 重用通用代码,这样整个代码库就不那么庞大和可扩展了。
  3. 更灵活的搜索选项,例如从4个要搜索的数据库中选择2或3个。(可能是在搜索函数中使用*args**kwargs。)

main.py

代码语言:javascript
复制
import cnki, fudan, wuhan, qinghua

def db_search(keyword, db=None):

    db_dict = {
        "cnki": cnki.search,
        "fudan": fudan.search,
        "wuhan": wuhan.search,
        "qinghua": qinghua.search,
        }

    if db == None:
        for key in db_dict.keys():
            yield db_dict[key](keyword)
    elif db == "cnki":
        yield db_dict["cnki"](keyword)
    elif db == "fudan":
        yield db_dict["fudan"](keyword)
    elif db == "wuhan":
        yield db_dict["wuhan"](keyword)
    elif db == "qinghua":
        yield db_dict["qinghua"](keyword)


def search(keywords, db=None):
    for kw in keywords:
        yield from db_search(kw, db)



if __name__ == '__main__':
    rslt = search(['尹誥','尹至'])
    for item in rslt:
        print(item)

代码:

cnki.py

代码语言:javascript
复制
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import date
from pathlib import Path
from typing import Generator, Iterable, Optional, List, ContextManager, Dict
from urllib.parse import unquote
from itertools import chain, count
import re
import json
from math import ceil

# pip install proxy.py
import proxy
from proxy.http.exception import HttpRequestRejected
from proxy.http.parser import HttpParser
from proxy.http.proxy import HttpProxyBasePlugin
from selenium.common.exceptions import (
    NoSuchElementException,
    StaleElementReferenceException,
    TimeoutException,
    WebDriverException,
)
from selenium.webdriver import Firefox, FirefoxProfile
from selenium.webdriver.common.by import By
from selenium.webdriver.common.proxy import ProxyType
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
# from urllib3.packages.six import X


@dataclass
class Result:
    title: str        # Mozi's Theory of Human Nature and Politics
    title_link: str   # http://big5.oversea.cnki.net/kns55/detail/detail.aspx?recid=&FileName=ZDXB202006009&DbName=CJFDLAST2021&DbCode=CJFD
    html_link: Optional[str]  # http%3a%2f%2fkns.cnki.net%2fKXReader%2fDetail%3fdbcode%3dCJFD%26filename%3dZDXB202006009
    author: str       # Xie Qiyang
    source: str       # Vocational University News
    source_link: str  # http://big5.oversea.cnki.net/kns55/Navi/ScdbBridge.aspx?DBCode=CJFD&BaseID=ZDXB&UnitCode=&NaviLink=%e8%81%8c%e5%a4%a7%e5%ad%a6%e6%8a%a5
    date: date   # 2020-12-28
    download: str        #
    database: str     # Periodical

    @classmethod
    def from_row(cls, row: WebElement) -> 'Result':
        number, title, author, source, published, database = row.find_elements_by_xpath('td')

        title_links = title.find_elements_by_tag_name('a')

        if len(title_links) > 1:
            # 'http://big5.oversea.cnki.net/kns55/ReadRedirectPage.aspx?flag=html&domain=http%3a%2f%2fkns.cnki.net%2fKXReader%2fDetail%3fdbcode%3dCJFD%26filename%3dZDXB202006009'
            html_link = unquote(
                title_links[1]
                .get_attribute('href')
                .split('domain=', 1)[1])
        else:
            html_link = None

        dl_links, sno = number.find_elements_by_tag_name('a')

        published_date = date.fromisoformat(
            published.text.split(maxsplit=1)[0]
        )

        return cls(
            title=title_links[0].text,
            title_link=title_links[0].get_attribute('href'),
            html_link=html_link,
            author=author.text,
            source=source.text,
            source_link=source.get_attribute('href'),
            date=published_date,
            download=dl_links.get_attribute('href'),
            database=database.text,
        )

    def __str__(self):
        return (
            f'題名      {self.title}'
            f'\n作者     {self.author}'
            f'\n來源     {self.source}'
            f'\n發表時間  {self.date}'
            f'\n下載連結 {self.download}'
            f'\n來源數據庫 {self.database}'
        )

    def as_dict(self) -> Dict[str, str]:
        return {
        'author': self.author,
        'title': self.title,
        'date': self.date.isoformat(),
        'download': self.download,
        'url': self.html_link,
        'database': self.database,
    }


class MainPage:
    def __init__(self, driver: WebDriver):
        self.driver = driver

    def submit_search(self, keyword: str) -> None:
        wait = WebDriverWait(self.driver, 50)
        search = wait.until(
            EC.presence_of_element_located((By.NAME, 'txt_1_value1'))
        )
        search.send_keys(keyword)
        search.submit()

    def switch_to_frame(self) -> None:
        wait = WebDriverWait(self.driver, 100)
        wait.until(
            EC.presence_of_element_located((By.XPATH, '//iframe[@name="iframeResult"]'))
        )
        self.driver.switch_to.default_content()
        self.driver.switch_to.frame('iframeResult')

        wait.until(
            EC.presence_of_element_located((By.XPATH, '//table[@class="GridTableContent"]'))
        )

    def max_content(self) -> None:
        """Maximize the number of items on display in the search results."""
        max_content = self.driver.find_element(
            By.CSS_SELECTOR, '#id_grid_display_num > a:nth-child(3)',
        )
        max_content.click()

    # def get_element_and_stop_page(self, *locator) -> WebElement:
    #     ignored_exceptions = (NoSuchElementException, StaleElementReferenceException)
    #     wait = WebDriverWait(self.driver, 30, ignored_exceptions=ignored_exceptions)
    #     elm = wait.until(EC.presence_of_element_located(locator))
    #     self.driver.execute_script("window.stop();")
    #     return elm



class SearchResults:
    def __init__(self, driver: WebDriver):
        self.driver = driver


    def number_of_articles_and_pages(self) -> int:
        elem = self.driver.find_element_by_xpath(
            '//table//tr[3]//table//table//td[1]/table//td[1]'
        )
        n_articles = re.search("共有記錄(.+)條", elem.text).group(1)
        n_pages = ceil(int(n_articles)/50)

        return n_articles, n_pages


    def get_structured_elements(self) -> Iterable[Result]:
        rows = self.driver.find_elements_by_xpath(
            '//table[@class="GridTableContent"]//tr[position() > 1]'
        )

        for row in rows:
            yield Result.from_row(row)


    def get_element_and_stop_page(self, *locator) -> WebElement:
        ignored_exceptions = (NoSuchElementException, StaleElementReferenceException)
        wait = WebDriverWait(self.driver, 30, ignored_exceptions=ignored_exceptions)
        elm = wait.until(EC.presence_of_element_located(locator))
        self.driver.execute_script("window.stop();")
        return elm

    def next_page(self) -> None:
        link = self.get_element_and_stop_page(By.LINK_TEXT, "下頁")

        try:
            link.click()
            print("Navigating to Next Page")
        except (TimeoutException, WebDriverException):
            print("Last page reached")



class ContentFilterPlugin(HttpProxyBasePlugin):
    HOST_WHITELIST = {
        b'ocsp.digicert.com',
        b'ocsp.sca1b.amazontrust.com',
        b'big5.oversea.cnki.net',
    }

    def handle_client_request(self, request: HttpParser) -> Optional[HttpParser]:
        host = request.host or request.header(b'Host')
        if host not in self.HOST_WHITELIST:
            raise HttpRequestRejected(403)

        if any(
            suffix in request.path
            for suffix in (
                b'png', b'ico', b'jpg', b'gif', b'css',
            )
        ):
            raise HttpRequestRejected(403)

        return request

    def before_upstream_connection(self, request):
        return super().before_upstream_connection(request)
    def handle_upstream_chunk(self, chunk):
        return super().handle_upstream_chunk(chunk)
    def on_upstream_connection_close(self):
        pass


@contextmanager
def run_driver() -> ContextManager[WebDriver]:
    prox_type = ProxyType.MANUAL['ff_value']
    prox_host = '127.0.0.1'
    prox_port = 8889

    profile = FirefoxProfile()
    profile.set_preference('network.proxy.type', prox_type)
    profile.set_preference('network.proxy.http', prox_host)
    profile.set_preference('network.proxy.ssl', prox_host)
    profile.set_preference('network.proxy.http_port', prox_port)
    profile.set_preference('network.proxy.ssl_port', prox_port)
    profile.update_preferences()

    plugin = f'{Path(__file__).stem}.{ContentFilterPlugin.__name__}'

    with proxy.start((
        '--hostname', prox_host,
        '--port', str(prox_port),
        '--plugins', plugin,
    )), Firefox(profile) as driver:
        yield driver


def loop_through_results(driver):
    result_page = SearchResults(driver)
    n_articles, n_pages = result_page.number_of_articles_and_pages()
    
    print(f"{n_articles} found. A maximum of 500 will be retrieved.")

    for page in count(1):

        print(f"Scraping page {page}/{n_pages}")
        print()

        result = result_page.get_structured_elements()
        yield from result

        if page >= n_pages or page >= 10:
            break

        result_page.next_page()
        result_page = SearchResults(driver)


def save_articles(articles: Iterable, file_prefix: str) -> None:
    file_path = Path(file_prefix).with_suffix('.json')

    with file_path.open('w') as file:
        file.write('[\n')
        first = True

        for article in articles:
            if first:
                first = False
            else:
                file.write(',\n')
            json.dump(article.as_dict(), file, ensure_ascii=False, indent=4)

        file.write('\n]\n')


def query(keyword, driver) -> None:

    page = MainPage(driver)
    page.submit_search(keyword)
    page.switch_to_frame()
    page.max_content()


def search(keyword):
    with Firefox() as driver:
        driver.get('http://big5.oversea.cnki.net/kns55/')
        query(keyword, driver)
        result = loop_through_results(driver)
        save_articles(result, 'cnki_search_result.json')


if __name__ == '__main__':
    search('尹至')

qinghua.py

搜索功能目前正在下降。一旦Requests启动并运行,就计划使用它进行试用。

代码语言:javascript
复制
from contextlib import contextmanager
from dataclasses import dataclass, asdict, replace
from datetime import datetime, date
from pathlib import Path
from typing import Iterable, Optional, ContextManager
import re
import os
import time
import json

# pip install proxy.py
import proxy
from proxy.http.exception import HttpRequestRejected
from proxy.http.parser import HttpParser
from proxy.http.proxy import HttpProxyBasePlugin
from selenium.common.exceptions import (
    NoSuchElementException,
    StaleElementReferenceException,
    TimeoutException,
    WebDriverException,
)
from selenium.webdriver import Firefox, FirefoxProfile
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.common.proxy import ProxyType
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait


@dataclass
class PrimaryResult:
    captions: str
    date: date
    link: str

    @classmethod
    def from_row(cls, row: WebElement) -> 'PrimaryResult': 

        caption_elems = row.find_element_by_tag_name('a')
        date_elems = row.find_element_by_class_name('time')

        published_date = date.isoformat(datetime.strptime(date_elems.text, '%Y-%m-%d'))

        return cls(
            captions = caption_elems.text,
            date = published_date,
            link = caption_elems.get_attribute('href')
        )

    def __str__(self):
        return (
            f'\n標題     {self.captions}'
            f'\n發表時間  {self.date}'
            f'\n文章連結 {self.link}'
        )


class MainPage:
    def __init__(self, driver: WebDriver):
        self.driver = driver
 
    def submit_search(self, keyword: str) -> None:
        driver = self.driver
        wait = WebDriverWait(self.driver, 100)

        xpath = "//form/button/input"
        element_to_hover_over = driver.find_element_by_xpath(xpath)
        hover = ActionChains(driver).move_to_element(element_to_hover_over)
        hover.perform()

        search = wait.until(
            EC.presence_of_element_located((By.ID, 'showkeycode1015273'))
        )
        search.send_keys(keyword)
        search.submit()


    def get_element_and_stop_page(self, *locator) -> WebElement:
        ignored_exceptions = (NoSuchElementException, StaleElementReferenceException)
        wait = WebDriverWait(self.driver, 30, ignored_exceptions=ignored_exceptions)
        elm = wait.until(EC.presence_of_element_located(locator))
        self.driver.execute_script("window.stop();")
        return elm

    def next_page(self) -> None:
        try: 
            link = self.get_element_and_stop_page(By.LINK_TEXT, "下一页")
            link.click()
            print("Navigating to Next Page")

        except (TimeoutException, WebDriverException):
            print("No button with 「下一页」 found.")
            return 0


    # @contextmanager
    # def wait_for_new_window(self):
    #     driver = self.driver
    #     handles_before = driver.window_handles
    #     yield
    #     WebDriverWait(driver, 10).until(
    #         lambda driver: len(handles_before) != len(driver.window_handles))

    def switch_tabs(self):
        driver = self.driver
        print("Current Window:")
        print(driver.title)
        print()

        p = driver.current_window_handle
        
        chwd = driver.window_handles
        time.sleep(3)
        driver.switch_to.window(chwd[1])

        print("New Window:")
        print(driver.title)
        print()


class SearchResults:
    def __init__(self, driver: WebDriver):
        self.driver = driver

    def get_primary_search_result(self):
        
        filePath = os.path.join(os.getcwd(), "qinghua_primary_search_result.json")

        if os.path.exists(filePath):
            os.remove(filePath)    

        rows = self.driver.find_elements_by_xpath('//ul[@class="search_list"]/li')

        for row in rows:
            rslt = PrimaryResult.from_row(row)
            with open('qinghua_primary_search_result.json', 'a') as file:
                json.dump(asdict(rslt), file, ensure_ascii=False, indent=4)
            yield rslt


# class ContentFilterPlugin(HttpProxyBasePlugin):
#     HOST_WHITELIST = {
#         b'ocsp.digicert.com',
#         b'ocsp.sca1b.amazontrust.com',
#         b'big5.oversea.cnki.net',
#         b'gwz.fudan.edu.cn',
#         b'bsm.org.cn/index.php'
#         b'ctwx.tsinghua.edu.cn',
#     }

#     def handle_client_request(self, request: HttpParser) -> Optional[HttpParser]:
#         host = request.host or request.header(b'Host')
#         if host not in self.HOST_WHITELIST:
#             raise HttpRequestRejected(403)

#         if any(
#             suffix in request.path
#             for suffix in (
#                 b'png', b'ico', b'jpg', b'gif', b'css',
#             )
#         ):
#             raise HttpRequestRejected(403)

#         return request

#     def before_upstream_connection(self, request):
#         return super().before_upstream_connection(request)
#     def handle_upstream_chunk(self, chunk):
#         return super().handle_upstream_chunk(chunk)
#     def on_upstream_connection_close(self):
#         pass


# @contextmanager
# def run_driver() -> ContextManager[WebDriver]:
#     prox_type = ProxyType.MANUAL['ff_value']
#     prox_host = '127.0.0.1'
#     prox_port = 8889

#     profile = FirefoxProfile()
#     profile.set_preference('network.proxy.type', prox_type)
#     profile.set_preference('network.proxy.http', prox_host)
#     profile.set_preference('network.proxy.ssl', prox_host)
#     profile.set_preference('network.proxy.http_port', prox_port)
#     profile.set_preference('network.proxy.ssl_port', prox_port)
#     profile.update_preferences()

#     plugin = f'{Path(__file__).stem}.{ContentFilterPlugin.__name__}'

#     with proxy.start((
#         '--hostname', prox_host,
#         '--port', str(prox_port),
#         '--plugins', plugin,
#     )), Firefox(profile) as driver:
#         yield driver


def search(keyword) -> None:
    with Firefox() as driver:
        driver.get('http://www.ctwx.tsinghua.edu.cn/index.htm')

        page = MainPage(driver)
        # page.select_dropdown_item()
        page.submit_search(keyword)

        time.sleep(5)
        # page.switch_tabs()

        while True:
            primary_result_page = SearchResults(driver)
            primary_results = primary_result_page.get_primary_search_result()
            for result in primary_results:
                print(result)
                print()
            if page.next_page() == 0:
                break
            else:
                pass


if __name__ == '__main__':
    search('尹至')

fudan.py

代码语言:javascript
复制
# fudan.py

from dataclasses import dataclass
from itertools import count
from pathlib import Path
from typing import Dict, Iterable, Tuple, List, Optional
from urllib.parse import urljoin

from bs4 import BeautifulSoup
from requests import Session
from datetime import date, datetime

import json
import re

BASE_URL = 'http://www.gwz.fudan.edu.cn'


@dataclass
class Link:
    caption: str
    url: str
    clicks: int
    replies: int
    added: date

    @classmethod
    def from_row(cls, props: Dict[str, str], path: str) -> 'Link':
        clicks, replies = props['点击/回复'].split('/')
        # Skip number=int(props['编号']) - this only has meaning within one page

        return cls(
            caption=props['资源标题'],
            url=urljoin(BASE_URL, path),
            clicks=int(clicks),
            replies=int(replies),
            added=datetime.strptime(props['添加时间'], '%Y/%m/%d').date(),
        )
        
    def __str__(self):
        return f'{self.added} {self.url} {self.caption}'

    def author_title(self) -> Tuple[Optional[str], str]:
        sep = ':'  # full-width colon, U+FF1A

        if sep not in self.caption:
            return None, self.caption

        author, title = self.caption.split(sep, 1)
        author, title = author.strip(), title.strip()

        net_digest = '網摘'
        if author == net_digest:
            return None, title

        return author, title


@dataclass
class Article:
    author: Optional[str]
    title: str
    date: date
    download: Optional[str]
    url: str

    @classmethod
    def from_link(cls, link: Link, download: str) -> 'Article':

        author, title = link.author_title()

        download = download.replace("\r", "").replace("\n", "").strip()
        if download == '#_edn1':
            download = None
        elif download[0] != '/':
            download = '/' + download

        return cls(
            author=author,
            title=title,
            date=link.added,
            download=download,
            url=link.url,
        )

    def __str__(self) -> str:
        return(
            f"\n作者   {self.author}"
            f"\n標題   {self.title}"
            f"\n發佈日期 {self.date}"
            f"\n下載連結 {self.download}"
            f"\n訪問網頁 {self.url}"
        )

    def as_dict(self) -> Dict[str, str]:
        return {
            'author': self.author,
            'title': self.title,
            'date': self.date.isoformat(),
            'download': self.download,
            'url': self.url,
        }


def compile_search_results(session: Session, links: Iterable[Link], category_filter: str) -> Iterable[Article]:

    for link in links:
        with session.get(link.url) as resp:
            resp.raise_for_status()
            doc = BeautifulSoup(resp.text, 'html.parser')

        category = doc.select_one('#_top td a[href="#"]').text
        if category != category_filter:
            continue

        content = doc.select_one('span.ny_font_content')
        dl_tag = content.find(
            'a', {
                'href': re.compile("/?(lunwen/|articles/up/).+")
            }
        )

        yield Article.from_link(link, download=dl_tag['href'])


def get_page(session: Session, query: str, page: int) -> Tuple[List[Link], int]:
    with session.get(
        urljoin(BASE_URL, '/Web/Search'),
        params={
            's': query,
            'page': page,
        },
    ) as resp:
        resp.raise_for_status()
        doc = BeautifulSoup(resp.text, 'html.parser')

    table = doc.select_one('#tab table')
    heads = [h.text for h in table.select('tr.cap td')]
    links = []

    for row in table.find_all('tr', class_=''):
        cells = [td.text for td in row.find_all('td')]
        links.append(Link.from_row(
            props=dict(zip(heads, cells)),
            path=row.find('a')['href'],
        ))

    page_td = doc.select_one('#tab table:nth-child(2) td') # 共 87 条记录, 页 1/3
    n_pages = int(page_td.text.rsplit('/', 1)[1])

    return links, n_pages


def get_all_links(session: Session, query: str) -> Iterable[Link]:
    for page in count(1):
        links, n_pages = get_page(session, query, page)
        print(f'{page}/{n_pages}')
        yield from links

        if page >= n_pages:
            break


def save_articles(articles: Iterable[Article], file_prefix: str) -> None:
    file_path = Path(file_prefix).with_suffix('.json')

    with file_path.open('w') as file:
        file.write('[\n')
        first = True

        for article in articles:
            if first:
                first = False
            else:
                file.write(',\n')
            json.dump(article.as_dict(), file, ensure_ascii=False, indent=4)

        file.write('\n]\n')


def search(keyword):
    with Session() as session:
        links = get_all_links(session, query=keyword)
        academic_library = '学者文库'
        articles = compile_search_results(session, links, category_filter=academic_library)
        save_articles(articles, 'fudan_search_result')


if __name__ == '__main__':
    search('尹至')

wuhan.py

代码语言:javascript
复制
from dataclasses import dataclass, asdict
from itertools import count
from typing import Dict, Iterable, Tuple, List

from bs4 import BeautifulSoup
from requests import post
from datetime import date, datetime

import json
import os
import re

@dataclass
class Result:
    author: str
    title: str
    date: date
    url: str
    publication: str = "武漢大學簡帛網"

    @classmethod
    def from_metadata(cls, metadata: Dict) -> 'Result': 
        author, title = metadata['caption'].split(':')
        published_date = date.isoformat(datetime.strptime(metadata['date'], '%y/%m/%d'))
        url = 'http://www.bsm.org.cn/' + metadata['url']

        return cls(
            author = author,
            title = title,
            date = published_date,
            url = url
        )


    def __str__(self):
        return (
            f'作者    {self.author}'
            f'\n標題     {self.title}'
            f'\n發表時間  {self.date}'
            f'\n文章連結 {self.url}'
            f'\n發表平台  {self.publication}'
        )


def submit_query(keyword: str):
    query = {"searchword": keyword}
    with post('http://www.bsm.org.cn/pages.php?pagename=search', query) as resp:
        resp.raise_for_status()
        doc = BeautifulSoup(resp.text, 'html.parser')
        content = doc.find('div', class_='record_list_main')
        rows = content.select('ul')


    for row in rows:
        if len(row.findAll('li')) != 2:
            print()
            print(row.text)
            print()
        else:
            captions_tag, date_tag = row.findAll('li')
            caption_anchors = captions_tag.findAll('a')
            category, caption = [item.text for item in caption_anchors]
            url = caption_anchors[1]['href']
            date = re.sub("[()]", "", date_tag.text)

            yield {
                "category": category, 
                "caption": caption, 
                "date": date,
                "url": url}


def remove_json_if_exists(filename):
    json_file = filename + ".json"
    filePath = os.path.join(os.getcwd(), json_file)

    if os.path.exists(filePath):
        os.remove(filePath)


def search(query: str):
    remove_json_if_exists('wuhan_search_result')
    rslt = submit_query(query)

    for metadata in rslt:
        article = Result.from_metadata(metadata)
        print(article)
        print()

        with open('wuhan_search_result.json', 'a') as file:
            json.dump(asdict(article), file, ensure_ascii=False, indent=4)



if __name__ == '__main__':
    search('尹至')
EN

回答 1

Code Review用户

回答已采纳

发布于 2021-07-06 19:49:59

我将重点关注main.py,因为其他模块已经得到了评论。

  • db_dict可以是模块级全局常量DB_DICT
  • 方便的接口是用使用一个或多个数据库名称的db替换*args参数;然后如下所示:
代码语言:javascript
复制
    elif db == "cnki":
        yield db_dict["cnki"](keyword)
    elif db == "fudan":
        yield db_dict["fudan"](keyword)
    elif db == "wuhan":
        yield db_dict["wuhan"](keyword)
    elif db == "qinghua":
        yield db_dict["qinghua"](keyword)

可以是

代码语言:javascript
复制
for db in args:
    yield from db_dict[db](keyword)
  • 这个跑起来了吗?您的每个模块search方法都做错了事情,要么打印,要么保存到一个文件中,当它应该返回结果时。因此,您的任何yield db_dict[key](keyword)语句实际上只会产生None。我希望它实际上是yield from db_dict[key](keyword),这样db_search本身就是一个平面生成器。
  • main.py中看不到类型提示;它可以使用它们。
  • 您为每个关键字旋转和关闭请求会话或(更糟糕的是) Selenium浏览器实例的模型是不切实际的。最好是对任何会话级别的状态进行类化,并在给定数据库的多个搜索中保持该状态的存活。
票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/263474

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档