首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >超快亚马逊刮刀多线程

超快亚马逊刮刀多线程
EN

Code Review用户
提问于 2019-10-22 01:29:46
回答 1查看 559关注 0票数 5

这是对以下代码的后续:从亚马逊和eBay提取urls的网络刮板 --对以前版本的多线程修改,也就是专门针对亚马逊的修改,大多数必要的文档都在文档字符串中。

您将发现源代码的副本以及必要的文件这里 (包括(proxies.txtamazon_log.txtuser_agents.txt) )将与代码的文件夹封装在一起。

特性:

  • 内容的多线程抓取。
  • 将urls保存到.txt文件
  • 包括:畅销书,新版本,最希望.
  • 将名称保存到.txt文件中。
  • 把名字映射到urls。
  • 缓存内容以供进一步重用。
  • 提取产品特征包括(名称,标题,网址,特征,技术细节.

我将实现另一个类,它使用公共方法管理这个类,将文件组织到csv/json文件中,并对这个类执行一些数据分析和优化。我会在做完后贴上后续报道的。

评审员的

  • 修改:我在这个版本中做了很多修改,与之前的版本完全不同。它只专注于亚马逊,很多不必要的前方法参数print_progresscleanup_empty现在都是类属性。顺序提取现在是可选的,以及多线程提取,这是500倍的速度。Docstring是最新的,并且在样式和内容上完全改变了。这个版本的代码组织得更有条理,可读性也更强。
  • 更短的代码建议:我想缩短代码并消除重复(如果有的话),大部分代码是免费的,但是任务通常是不同形式的重复。
  • 代理和用户代理:关于使用_get_response()方法收集的响应,proxiesheaders参数是否完成了必要的工作?代理是这样运作的吗?有什么改进可以做吗?
  • 随机偶然失败:在不包括畅销书或最想要的部分中,在特征提取中偶尔出现或随机发生故障。为什么这些失败有时会发生,有时却不会呢?如何控制这一点,以获得尽可能少的失效率?
  • 私有方法:这里定义的方法是私有的_private(),因为这个类将被管理提取的另一个类使用,并且主要包含公共方法。
  • 建议:改进代码的一般建议是最受欢迎的,如果您需要澄清问题,可以随意提问。

注意:除非你在谷歌或美国宇航局工作,也许甚至艾伦图灵在其他维度工作,否则人们会拒绝这样做,至少让我知道为什么这可能没有达到你的超级敬虔标准。

代码语言:javascript
复制
#!/usr/bin/env python3
from requests.exceptions import HTTPError, ConnectionError, ConnectTimeout
from concurrent.futures import ThreadPoolExecutor, as_completed
from bs4 import BeautifulSoup
from time import perf_counter
from random import choice
import requests
import bs4
import os


class AmazonScraper:
    """
    A tool to scrape Amazon different sections.

    Sections:
    Best Sellers - New Releases - Gift Ideas - Movers and Shakers - Most Wished For.

    Features:
    Category/Subcategory Urls and names.
    Product Urls and details(title, features, technical details, price, review count)
    """

    def __init__(
            self, path=None, print_progress=False, cache_contents=True, cleanup_empty=True, threads=1, log=None):
        """
        Args:
            path: Folder path to save scraped and cached contents.
            print_progress: If True then the progress will be displayed.
            cache_contents: If True then the scraped contents will be cached for further re-use.
            cleanup_empty: If True, empty .txt files that might result will be deleted.
            threads: If number of threads(1 by default) is increased, multiple threads will be used.
            log: If print_progress is True, content will be saved to the log (a file name + .txt).
        """
        if not path:
            self.path = '/Users/user_name/Desktop/Amazon Scraper/'
        if path:
            self.path = path
        self.headers = [{'User-Agent': item.rstrip()} for item in open('user_agents.txt').readlines()]
        self.print_progress = print_progress
        self.cache_contents = cache_contents
        self.cleanup_empty = cleanup_empty
        self.session = requests.session()
        self.threads = threads
        if log:
            if log in os.listdir(self.path):
                os.remove(log)
            self.log = open(log, 'w')
        self.proxies = [{'https:': 'https://' + item.rstrip(), 'http':
                        'http://' + item.rstrip()} for item in open('proxies.txt').readlines()]
        self.modes = {'bs': 'Best Sellers', 'nr': 'New Releases', 'gi': 'Gift Ideas',
                      'ms': 'Movers and Shakers', 'mw': 'Most Wished For'}
        self.starting_target_urls = \
            {'bs': ('https://www.amazon.com/gp/bestsellers/', 'https://www.amazon.com/Best-Sellers'),
             'nr': ('https://www.amazon.com/gp/new-releases/', 'https://www.amazon.com/gp/new-releases/'),
             'ms': ('https://www.amazon.com/gp/movers-and-shakers/', 'https://www.amazon.com/gp/movers-and-shakers/'),
             'gi': ('https://www.amazon.com/gp/most-gifted/', 'https://www.amazon.com/gp/most-gifted'),
             'mw': ('https://www.amazon.com/gp/most-wished-for/', 'https://www.amazon.com/gp/most-wished-for/')}

    def _cache_main_category_urls(self, text_file_names: dict, section: str, category_class: str,
                                  content_path: str, categories: list):
        """
        Cache the main category/subcategory URLs to .txt files.
        Args:
            text_file_names: Section string indications mapped to their corresponding .txt filenames.
            section: Keyword indication of target section.
                'bs': Best Sellers
                'nr': New Releases
                'ms': Movers & Shakers
                'gi': Gift Ideas
                'mw': Most Wished For
            category_class: Category level indication 'categories' or 'subcategories'.
            content_path: Path to folder to save cached files.
            categories: The list of category/subcategory urls to be saved.
        Return:
             None
        """
        os.chdir(content_path + 'Amazon/')
        with open(text_file_names[section][category_class], 'w') as cats:
            for category in categories:
                cats.write(category + '\n')
                if self.print_progress:
                    if not open(text_file_names[section][category_class]).read().isspace():
                        print(f'Saving {category} ... done.')
                        if self.log:
                            print(f'Saving {category} ... done.', file=self.log, end='\n')
                    if open(text_file_names[section][category_class]).read().isspace():
                        print(f'Saving {category} ... failure.')
                        if self.log:
                            print(f'Saving {category} ... failure.', file=self.log, end='\n')
        if self.cleanup_empty:
            self._cleanup_empty_files(self.path)

    def _read_main_category_urls(self, text_file_names: dict, section: str, category_class: str, content_path: str):
        """
        Read the main category/subcategory cached urls from their respective .txt files.
        Args:
            text_file_names: Section string indications mapped to their corresponding .txt filenames.
            section: Keyword indication of target section.
                'bs': Best Sellers
                'nr': New Releases
                'ms': Movers & Shakers
                'gi': Gift Ideas
                'mw': Most Wished For
            category_class: Category level indication 'categories' or 'subcategories'.
            content_path: Path to folder to save cached files.
        Return:
             A list of the main category/subcategory urls specified.
        """
        os.chdir(content_path + 'Amazon')
        if text_file_names[section][category_class] in os.listdir(content_path + 'Amazon/'):
            with open(text_file_names[section][category_class]) as cats:
                if self.cleanup_empty:
                    self._cleanup_empty_files(self.path)
                return [link.rstrip() for link in cats.readlines()]

    def _get_response(self, url):
        """
        Send a get request to target url.
        Args:
            url: Target Url.
        Return:
             Response object.
        """
        return self.session.get(url, headers=choice(self.headers), proxies=choice(self.proxies))

    def _scrape_main_category_urls(self, section: str, category_class: str, prev_categories=None):
        """
        Scrape links of all main category/subcategory Urls of the specified section.
        Args:
            section: Keyword indication of target section.
                'bs': Best Sellers
                'nr': New Releases
                'ms': Movers & Shakers
                'gi': Gift Ideas
                'mw': Most Wished For
            category_class: Category level indication 'categories' or 'subcategories'.
            prev_categories: A list containing parent category Urls.
        Return:
             A sorted list of scraped category/subcategory Urls.
        """
        target_url = self.starting_target_urls[section][1]
        if category_class == 'categories':
            starting_url = self._get_response(self.starting_target_urls[section][0])
            html_content = BeautifulSoup(starting_url.text, features='lxml')
            target_url_part = self.starting_target_urls[section][1]
            if not self.print_progress:
                return sorted({str(link.get('href')) for link in html_content.findAll('a')
                               if target_url_part in str(link)})
            if self.print_progress:
                categories = set()
                for link in html_content.findAll('a'):
                    if target_url_part in str(link):
                        link_to_add = str(link.get('href'))
                        categories.add(link_to_add)
                        print(f'Fetched {self.modes[section]}-{category_class[:-3]}y: {link_to_add}')
                        if self.log:
                            print(f'Fetched {self.modes[section]}-{category_class[:-3]}y: '
                                  f'{link_to_add}', file=self.log, end='\n')
                return categories
        if category_class == 'subcategories':
            if not self.print_progress:
                if self.threads == 1:
                    responses = [self._get_response(category)
                                 for category in prev_categories]
                    category_soups = [BeautifulSoup(response.text, features='lxml') for response in responses]
                    pre_sub_category_links = [str(link.get('href')) for category in category_soups
                                              for link in category.findAll('a') if target_url in str(link)]
                    return sorted({link for link in pre_sub_category_links if link not in prev_categories})
                if self.threads > 1:
                    with ThreadPoolExecutor(max_workers=self.threads) as executor:
                        future_html = {
                            executor.submit(self._get_response, category): category for category in prev_categories}
                        responses = [future.result() for future in as_completed(future_html)]
                        category_soups = [BeautifulSoup(response.text) for response in responses]
                        pre_sub_category_links = [str(link.get('href')) for category in category_soups
                                                  for link in category.findAll('a') if target_url in str(link)]
                        return sorted({link for link in pre_sub_category_links if link not in prev_categories})
            if self.print_progress:
                if self.threads == 1:
                    responses, pre, subcategories = [], [], set()
                    for category in prev_categories:
                        response = self._get_response(category)
                        responses.append(response)
                        print(f'Got response {response} for {self.modes[section]}-{category}')
                        if self.log:
                            print(f'Got response {response} for {self.modes[section]}-{category}',
                                  file=self.log, end='\n')

                    category_soups = [BeautifulSoup(response.text, features='lxml') for response in responses]
                    for soup in category_soups:
                        for link in soup.findAll('a'):
                            if target_url in str(link):
                                fetched_link = str(link.get('href'))
                                pre.append(fetched_link)
                                print(f'Fetched {self.modes[section]}-{fetched_link}')
                                if self.log:
                                    print(f'Fetched {self.modes[section]}-{fetched_link}', file=self.log,
                                          end='\n')
                    return sorted({link for link in pre if link not in prev_categories})
                if self.threads > 1:
                    with ThreadPoolExecutor(max_workers=self.threads) as executor:
                        category_soups = []
                        future_responses = {
                            executor.submit(self._get_response, category): category for category in prev_categories}
                        for future in as_completed(future_responses):
                            url = future_responses[future]
                            try:
                                response = future.result()
                                print(f'Got response {response} for {self.modes[section]}-{url}')
                                if self.log:
                                    print(f'Got response {response} for {self.modes[section]}-{url}',
                                          file=self.log, end='\n')
                            except(HTTPError, ConnectTimeout, ConnectionError):
                                print(f'Failed to get response from {url}')
                                if self.log:
                                    print(f'Failed to get response from {url}', file=self.log, end='\n')
                            else:
                                category_soups.append(BeautifulSoup(response.text, features='lxml'))
                        pre_sub_category_links = [str(link.get('href')) for category in category_soups
                                                  for link in category.findAll('a') if target_url in str(link)]
                        return sorted({link for link in pre_sub_category_links if link not in prev_categories})

    def _get_main_category_urls(self, section: str, subs=True):
        """
        Manage the scrape/read from previous session cache operations and return section Urls.
        If the program found previously cached files, will read and return existing data, else
        new content will be scraped and returned.
        Args:
            section: Keyword indication of target section.
                'bs': Best Sellers
                'nr': New Releases
                'ms': Movers & Shakers
                'gi': Gift Ideas
                'mw': Most Wished For
            subs: If False, only categories will be returned.
        Return:
            2 sorted lists: categories and subcategories.
        """
        text_file_names = \
            {section_short: {'categories': self.modes[section_short] + ' Category Urls.txt',
                             'subcategories': self.modes[section_short] + ' Subcategory Urls.txt'}
             for section_short in self.modes}
        if 'Amazon' not in os.listdir(self.path):
            os.mkdir('Amazon')
            os.chdir(self.path + 'Amazon')
        if 'Amazon' in os.listdir(self.path):
            categories = self._read_main_category_urls(text_file_names, section, 'categories', self.path)
            if not subs:
                if self.cleanup_empty:
                    self._cleanup_empty_files(self.path)
                return sorted(categories)
            subcategories = self._read_main_category_urls(text_file_names, section, 'subcategories', self.path)
            try:
                if categories and subcategories:
                    if self.cleanup_empty:
                        self._cleanup_empty_files(self.path)
                    return sorted(categories), sorted(subcategories)
            except UnboundLocalError:
                pass
        if not subs:
            categories = self._scrape_main_category_urls(section, 'categories')
            if self.cache_contents:
                self._cache_main_category_urls(text_file_names, section, 'categories', self.path, categories)
            if self.cleanup_empty:
                self._cleanup_empty_files(self.path)
            return sorted(categories)
        if subs:
            categories = self._scrape_main_category_urls(section, 'categories')
            if self.cache_contents:
                self._cache_main_category_urls(text_file_names, section, 'categories', self.path, categories)
            subcategories = self._scrape_main_category_urls(section, 'subcategories', categories)
            if self.cache_contents:
                self._cache_main_category_urls(text_file_names, section, 'subcategories', self.path, subcategories)
            if self.cleanup_empty:
                self._cleanup_empty_files(self.path)
            return sorted(categories), sorted(subcategories)

    def _extract_page_product_urls(self, page_url: str):
        """
        Extract product Urls from an Amazon page and the page title.
        Args:
            page_url: Target page.
        Return:
             The page category title(string) and a sorted list of product Urls.
        """
        prefix = 'https://www.amazon.com'
        response = self._get_response(page_url)
        soup = BeautifulSoup(response.text, features='lxml')
        try:
            title = soup.h1.text.strip()
        except AttributeError:
            title = 'N/A'
        product_links = {prefix + link.get('href') for link in soup.findAll('a') if 'psc=' in str(link)}
        return title, sorted(product_links)

    @staticmethod
    def _cleanup_empty_files(dir_path: str):
        """
        Cleanup a given folder from empty .txt files.
        Args:
            dir_path: Path to the target folder to be cleaned up.
        Return:
             None
        """
        for file_name in [file for file in os.listdir(dir_path)]:
            if not os.path.isdir(file_name):
                try:
                    contents = open(file_name).read().strip()
                    if not contents:
                        os.remove(file_name)
                except(UnicodeDecodeError, FileNotFoundError):
                    pass

    def _category_page_title_to_url(self, section: str, category_class: str, delimiter='&&&'):
        """
        Map category/subcategory names to their respective Urls.
        Args:
        section:
            'bs': Best Sellers
            'nr': New Releases
            'ms': Movers & Shakers
            'gi': Gift Ideas
            'mw': Most Wished For
        category_class: Category level indication 'categories' or 'subcategories'.
        delimiter: Delimits category/subcategory names and their respective Urls in the .txt files.
        Return:
             A list of lists(pairs): [[category/subcategory name, Url], ...]
        """
        file_names = {'categories': self.modes[section] + ' Category Names.txt',
                      'subcategories': self.modes[section] + ' Subcategory Names.txt'}
        names_urls = []
        os.chdir(self.path)
        if 'Amazon' in os.listdir(self.path):
            os.chdir('Amazon')
            file_name = file_names[category_class]
            if file_name in os.listdir(self.path + 'Amazon'):
                with open(file_name) as names:
                    if self.cleanup_empty:
                        self._cleanup_empty_files(self.path)
                    return [line.rstrip().split(delimiter) for line in names.readlines()]
        if 'Amazon' not in os.listdir(self.path):
            os.mkdir('Amazon')
            os.chdir('Amazon')
        categories, subcategories = self._get_main_category_urls(section)
        if not self.print_progress:
            if self.threads == 1:
                responses_urls = [(self._get_response(url), url)
                                  for url in eval('eval(category_class)')]
                soups_urls = [(BeautifulSoup(item[0].text, features='lxml'), item[1]) for item in responses_urls]
                for soup, url in soups_urls:
                    try:
                        title = soup.h1.text.strip()
                        names_urls.append([title, url])
                    except AttributeError:
                        pass
            if self.threads > 1:
                with ThreadPoolExecutor(max_workers=self.threads) as executor:
                    future_responses = {
                        executor.submit(self._get_response, category): category
                        for category in eval('eval(category_class)')}
                    responses = [future.result() for future in as_completed(future_responses)]
                    responses_urls = [
                        (response, url) for response, url in zip(responses, eval('eval(category_class)'))]
                    soups_urls = [
                        (BeautifulSoup(item[0].text, features='lxml'), item[1]) for item in responses_urls]
                    for soup, url in soups_urls:
                        try:
                            title = soup.h1.text.strip()
                            names_urls.append([title, url])
                        except AttributeError:
                            pass
        if self.print_progress:
            if self.threads == 1:
                for url in eval('eval(category_class)'):
                    response = self._get_response(url)
                    print(f'Got response {response} for {url}')
                    print(f'Fetching name of {url} ...')
                    if self.log:
                        print(f'Got response {response} for {url}', file=self.log, end='\n')
                        print(f'Fetching name of {url} ...', file=self.log, end='\n')

                    soup = BeautifulSoup(response.text, features='lxml')
                    try:
                        title = soup.h1.text.strip()
                        names_urls.append([title, url])
                        print(f'Fetching name {title} ... done')
                        if self.log:
                            print(f'Fetching name {title} ... done', file=self.log, end='\n')
                    except AttributeError:
                        print(f'Fetching name failure for {url}')
                        if self.log:
                            print(f'Fetching name failure for {url}', file=self.log, end='\n')
            if self.threads > 1:
                with ThreadPoolExecutor(max_workers=self.threads) as executor:
                    future_responses = {
                        executor.submit(self._get_response, category): category
                        for category in eval('eval(category_class)')}
                    for future_response in as_completed(future_responses):
                        response = future_response.result()
                        url = future_responses[future_response]
                        print(f'Got response {response} for {url}')
                        if self.log:
                            print(f'Got response {response} for {url}', file=self.log, end='\n')
                        soup = BeautifulSoup(response.text, features='lxml')
                        try:
                            title = soup.h1.text.strip()
                            names_urls.append([title, url])
                            print(f'Fetching name {title} ... done')
                            if self.log:
                                print(f'Fetching name {title} ... done', file=self.log, end='\n')
                        except AttributeError:
                            print(f'Fetching name failure for {url}')
                            if self.log:
                                print(f'Fetching name failure for {url}', file=self.log, end='\n')

            if self.cache_contents:
                with open(file_names[category_class], 'w') as names:
                    for name, url in names_urls:
                        names.write(name + delimiter + url + '\n')
            if self.cleanup_empty:
                self._cleanup_empty_files(self.path + 'Amazon')
        return names_urls

    def _extract_section_products(self, section: str, category_class: str):
        """
        For every category/subcategory successfully scraped from the given section, product urls will be extracted.
        Args:
            section:
                'bs': Best Sellers
                'nr': New Releases
                'ms': Movers & Shakers
                'gi': Gift Ideas
                'mw': Most Wished For
            category_class: Category level indication 'categories' or 'subcategories'.
        Return:
             List of tuples(category name, product urls) containing product Urls for each scraped category/subcategory.
        """
        products = []
        names_urls = self._category_page_title_to_url(section, category_class)
        urls = [item[1] for item in names_urls]
        folder_name = ' '.join([self.modes[section], category_class[:-3].title() + 'y', 'Product Urls'])
        if not self.print_progress:
            if self.threads == 1:
                products = [
                    (category_name, [product_url for product_url in self._extract_page_product_urls(category_url)[1]])
                    for category_name, category_url in names_urls]
                products = [item for item in products if item[1]]
            if self.threads > 1:
                with ThreadPoolExecutor(max_workers=self.threads) as executor:
                    future_products = {executor.submit(self._extract_page_product_urls, category_url): category_url
                                       for category_url in urls}
                    products = [future.result() for future in as_completed(future_products)]
                    products = [item for item in products if item[1]]
        if self.print_progress:
            products = []
            if self.threads == 1:
                for category_name, category_url in names_urls:
                    product_urls = self._extract_page_product_urls(category_url)
                    if product_urls[1]:
                        print(f'Extraction of {category_name} products ... done')
                        if self.log:
                            print(f'Extraction of {category_name} products ... done', file=self.log, end='\n')
                        products.append(product_urls)
                    else:
                        print(f'Extraction of {category_name} products ... failure')
                        if self.log:
                            print(f'Extraction of {category_name} products ... failure', file=self.log, end='\n')
            if self.threads > 1:
                with ThreadPoolExecutor(max_workers=self.threads) as executor:
                    future_products = {executor.submit(self._extract_page_product_urls, category_url): category_url
                                       for category_url in urls}
                    for future in as_completed(future_products):
                        category_name, category_urls = future.result()
                        if category_urls:
                            print(f'Extraction of {category_name} products ... done')
                            if self.log:
                                print(f'Extraction of {category_name} products ... done', file=self.log, end='\n')
                            products.append((category_name, category_urls))
                        else:
                            print(f'Extraction of {category_name} products ... failure')
                            if self.log:
                                print(f'Extraction of {category_name} products ... failure', file=self.log, end='\n')
        if self.cache_contents:
            if folder_name not in os.listdir(self.path + 'Amazon'):
                os.mkdir(folder_name)
            os.chdir(folder_name)
            for category_name, category_product_urls in products:
                with open(category_name + '.txt', 'w') as links:
                    for url in category_product_urls:
                        links.write(url + '\n')
        if self.cleanup_empty:
            self._cleanup_empty_files(self.path + 'Amazon/' + folder_name)
        return products

    def _get_amazon_product_details(self, product_url: str):
        """
        Extract product details including:
            [Price, Title, URL, Rating, Number of reviews, Sold by, Features, Technical table]
        Args:
            product_url: Target product.
        Return:
            A dictionary with the scraped details.
        """
        product_html_details, text_details = {}, {}
        response = self._get_response(product_url).text
        html_content = BeautifulSoup(response, features='lxml')
        product_html_details['Price'] = html_content.find('span', {'id': 'price_inside_buybox'})
        product_html_details['Url'] = product_url
        product_html_details['Title'] = html_content.title
        product_html_details['Rating'] = html_content.find('span',
                                                           {'class': 'reviewCountTextLinkedHistogram noUnderline'})
        product_html_details['Number of reviews'] = html_content.find('span', {'id': 'acrCustomerReviewText'})
        product_html_details['Sold by'] = html_content.find('a', {'id': 'bylineInfo'})
        product_html_details['Features'] = html_content.find('div', {'id': 'feature-bullets'})
        if product_html_details['Features']:
            product_html_details['Features'] = product_html_details['Features'].findAll('li')
        technical_table = html_content.find('table', {'class': 'a-keyvalue prodDetTable'})
        if technical_table:
            product_html_details['Technical details'] = list(
                zip([item.text.strip() for item in technical_table.findAll('th')],
                    [item.text.strip() for item in technical_table.findAll('td')]))
        for item in product_html_details:
            if isinstance(product_html_details[item], bs4.element.Tag):
                text_details[item] = product_html_details[item].text.strip()
            if isinstance(product_html_details[item], bs4.element.ResultSet):
                text_details[item] = ' • '.join([tag.text.strip() for tag in product_html_details[item]])
            if isinstance(product_html_details[item], str):
                text_details[item] = product_html_details[item]
            if item == 'Technical details':
                text_details[item] = ' • '.join([' : '.join(pair) for pair in product_html_details[item]])
        return text_details


if __name__ == '__main__':
    start_time = perf_counter()
    path = input('Enter path to save files: ')
    session = AmazonScraper(print_progress=True, threads=20, log='amazon_log.txt', path=path)
    print(session._extract_section_products('bs', 'categories'))
    print(session._extract_section_products('bs', 'subcategories'))
    end_time = perf_counter()
    print(f'Time: {end_time - start_time} seconds.')
EN

回答 1

Code Review用户

回答已采纳

发布于 2019-10-23 00:50:22

默认参数

此默认设置:

代码语言:javascript
复制
path=None

不是有效的None,而是'/Users/user_name/Desktop/Amazon Scraper/'。这是一个不可变的值,所以直接放入默认值是安全的。

该路径的一个明显问题是,它是绝对的,而不是每个用户。考虑将os.path.expanduser~一起使用。

Dict格式

写这篇文章:

代码语言:javascript
复制
self.proxies = [{'https:': 'https://' + item.rstrip(), 'http':
                'http://' + item.rstrip()} for item in open('proxies.txt').readlines()]

每行应该有一个迪克项目,否则会让人感到困惑。换句话说,

代码语言:javascript
复制
self.proxies = [{'https:': 'https://' + item.rstrip(),
                 'http': 'http://' + item.rstrip()}
                for item in open('proxies.txt').readlines()]

避免反斜杠延拓

代码语言:javascript
复制
    self.starting_target_urls = \
        {'bs': ('https://www.amazon.com/gp/bestsellers/', 'https://www.amazon.com/Best-Sellers'),
         'nr': ('https://www.amazon.com/gp/new-releases/', 'https://www.amazon.com/gp/new-releases/'),
         'ms': ('https://www.amazon.com/gp/movers-and-shakers/', 'https://www.amazon.com/gp/movers-and-shakers/'),
         'gi': ('https://www.amazon.com/gp/most-gifted/', 'https://www.amazon.com/gp/most-gifted'),
         'mw': ('https://www.amazon.com/gp/most-wished-for/', 'https://www.amazon.com/gp/most-wished-for/')}

可以是

代码语言:javascript
复制
    self.starting_target_urls = {
        'bs': ('https://www.amazon.com/gp/bestsellers/', 'https://www.amazon.com/Best-Sellers'),
        'nr': ('https://www.amazon.com/gp/new-releases/', 'https://www.amazon.com/gp/new-releases/'),
        'ms': ('https://www.amazon.com/gp/movers-and-shakers/', 'https://www.amazon.com/gp/movers-and-shakers/'),
        'gi': ('https://www.amazon.com/gp/most-gifted/', 'https://www.amazon.com/gp/most-gifted'),
        'mw': ('https://www.amazon.com/gp/most-wished-for/', 'https://www.amazon.com/gp/most-wished-for/')
    }

避免手动路径连接

这是:

代码语言:javascript
复制
    os.chdir(content_path + 'Amazon/')

应该使用pathlib/运算符。

使用日志库

这是:

代码语言:javascript
复制
                    if self.log:
                        print(f'Saving {category} ... failure.', file=self.log, end='\n')

不应该直接写到文件。相反,您应该使用指向该文件的文件处理程序来设置股票Python日志记录。它更灵活,更易于维护。

隐式线迭代

像这样的台词:

代码语言:javascript
复制
            return [link.rstrip() for link in cats.readlines()]

你不需要打电话给readlines。迭代一个文件对象,遍历它的行。

HTTP错误检查

_get_response应该包括对raise_for_status的调用。这是对HTTP调用进行更好验证的一种快速而简单的方法。

票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/231126

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档