这是对以下代码的后续:从亚马逊和eBay提取urls的网络刮板 --对以前版本的多线程修改,也就是专门针对亚马逊的修改,大多数必要的文档都在文档字符串中。
您将发现源代码的副本以及必要的文件这里 (包括(proxies.txt、amazon_log.txt、user_agents.txt) )将与代码的文件夹封装在一起。
我将实现另一个类,它使用公共方法管理这个类,将文件组织到csv/json文件中,并对这个类执行一些数据分析和优化。我会在做完后贴上后续报道的。
评审员的
print_progress、cleanup_empty现在都是类属性。顺序提取现在是可选的,以及多线程提取,这是500倍的速度。Docstring是最新的,并且在样式和内容上完全改变了。这个版本的代码组织得更有条理,可读性也更强。_get_response()方法收集的响应,proxies和headers参数是否完成了必要的工作?代理是这样运作的吗?有什么改进可以做吗?_private(),因为这个类将被管理提取的另一个类使用,并且主要包含公共方法。#!/usr/bin/env python3
from requests.exceptions import HTTPError, ConnectionError, ConnectTimeout
from concurrent.futures import ThreadPoolExecutor, as_completed
from bs4 import BeautifulSoup
from time import perf_counter
from random import choice
import requests
import bs4
import os
class AmazonScraper:
"""
A tool to scrape Amazon different sections.
Sections:
Best Sellers - New Releases - Gift Ideas - Movers and Shakers - Most Wished For.
Features:
Category/Subcategory Urls and names.
Product Urls and details(title, features, technical details, price, review count)
"""
def __init__(
self, path=None, print_progress=False, cache_contents=True, cleanup_empty=True, threads=1, log=None):
"""
Args:
path: Folder path to save scraped and cached contents.
print_progress: If True then the progress will be displayed.
cache_contents: If True then the scraped contents will be cached for further re-use.
cleanup_empty: If True, empty .txt files that might result will be deleted.
threads: If number of threads(1 by default) is increased, multiple threads will be used.
log: If print_progress is True, content will be saved to the log (a file name + .txt).
"""
if not path:
self.path = '/Users/user_name/Desktop/Amazon Scraper/'
if path:
self.path = path
self.headers = [{'User-Agent': item.rstrip()} for item in open('user_agents.txt').readlines()]
self.print_progress = print_progress
self.cache_contents = cache_contents
self.cleanup_empty = cleanup_empty
self.session = requests.session()
self.threads = threads
if log:
if log in os.listdir(self.path):
os.remove(log)
self.log = open(log, 'w')
self.proxies = [{'https:': 'https://' + item.rstrip(), 'http':
'http://' + item.rstrip()} for item in open('proxies.txt').readlines()]
self.modes = {'bs': 'Best Sellers', 'nr': 'New Releases', 'gi': 'Gift Ideas',
'ms': 'Movers and Shakers', 'mw': 'Most Wished For'}
self.starting_target_urls = \
{'bs': ('https://www.amazon.com/gp/bestsellers/', 'https://www.amazon.com/Best-Sellers'),
'nr': ('https://www.amazon.com/gp/new-releases/', 'https://www.amazon.com/gp/new-releases/'),
'ms': ('https://www.amazon.com/gp/movers-and-shakers/', 'https://www.amazon.com/gp/movers-and-shakers/'),
'gi': ('https://www.amazon.com/gp/most-gifted/', 'https://www.amazon.com/gp/most-gifted'),
'mw': ('https://www.amazon.com/gp/most-wished-for/', 'https://www.amazon.com/gp/most-wished-for/')}
def _cache_main_category_urls(self, text_file_names: dict, section: str, category_class: str,
content_path: str, categories: list):
"""
Cache the main category/subcategory URLs to .txt files.
Args:
text_file_names: Section string indications mapped to their corresponding .txt filenames.
section: Keyword indication of target section.
'bs': Best Sellers
'nr': New Releases
'ms': Movers & Shakers
'gi': Gift Ideas
'mw': Most Wished For
category_class: Category level indication 'categories' or 'subcategories'.
content_path: Path to folder to save cached files.
categories: The list of category/subcategory urls to be saved.
Return:
None
"""
os.chdir(content_path + 'Amazon/')
with open(text_file_names[section][category_class], 'w') as cats:
for category in categories:
cats.write(category + '\n')
if self.print_progress:
if not open(text_file_names[section][category_class]).read().isspace():
print(f'Saving {category} ... done.')
if self.log:
print(f'Saving {category} ... done.', file=self.log, end='\n')
if open(text_file_names[section][category_class]).read().isspace():
print(f'Saving {category} ... failure.')
if self.log:
print(f'Saving {category} ... failure.', file=self.log, end='\n')
if self.cleanup_empty:
self._cleanup_empty_files(self.path)
def _read_main_category_urls(self, text_file_names: dict, section: str, category_class: str, content_path: str):
"""
Read the main category/subcategory cached urls from their respective .txt files.
Args:
text_file_names: Section string indications mapped to their corresponding .txt filenames.
section: Keyword indication of target section.
'bs': Best Sellers
'nr': New Releases
'ms': Movers & Shakers
'gi': Gift Ideas
'mw': Most Wished For
category_class: Category level indication 'categories' or 'subcategories'.
content_path: Path to folder to save cached files.
Return:
A list of the main category/subcategory urls specified.
"""
os.chdir(content_path + 'Amazon')
if text_file_names[section][category_class] in os.listdir(content_path + 'Amazon/'):
with open(text_file_names[section][category_class]) as cats:
if self.cleanup_empty:
self._cleanup_empty_files(self.path)
return [link.rstrip() for link in cats.readlines()]
def _get_response(self, url):
"""
Send a get request to target url.
Args:
url: Target Url.
Return:
Response object.
"""
return self.session.get(url, headers=choice(self.headers), proxies=choice(self.proxies))
def _scrape_main_category_urls(self, section: str, category_class: str, prev_categories=None):
"""
Scrape links of all main category/subcategory Urls of the specified section.
Args:
section: Keyword indication of target section.
'bs': Best Sellers
'nr': New Releases
'ms': Movers & Shakers
'gi': Gift Ideas
'mw': Most Wished For
category_class: Category level indication 'categories' or 'subcategories'.
prev_categories: A list containing parent category Urls.
Return:
A sorted list of scraped category/subcategory Urls.
"""
target_url = self.starting_target_urls[section][1]
if category_class == 'categories':
starting_url = self._get_response(self.starting_target_urls[section][0])
html_content = BeautifulSoup(starting_url.text, features='lxml')
target_url_part = self.starting_target_urls[section][1]
if not self.print_progress:
return sorted({str(link.get('href')) for link in html_content.findAll('a')
if target_url_part in str(link)})
if self.print_progress:
categories = set()
for link in html_content.findAll('a'):
if target_url_part in str(link):
link_to_add = str(link.get('href'))
categories.add(link_to_add)
print(f'Fetched {self.modes[section]}-{category_class[:-3]}y: {link_to_add}')
if self.log:
print(f'Fetched {self.modes[section]}-{category_class[:-3]}y: '
f'{link_to_add}', file=self.log, end='\n')
return categories
if category_class == 'subcategories':
if not self.print_progress:
if self.threads == 1:
responses = [self._get_response(category)
for category in prev_categories]
category_soups = [BeautifulSoup(response.text, features='lxml') for response in responses]
pre_sub_category_links = [str(link.get('href')) for category in category_soups
for link in category.findAll('a') if target_url in str(link)]
return sorted({link for link in pre_sub_category_links if link not in prev_categories})
if self.threads > 1:
with ThreadPoolExecutor(max_workers=self.threads) as executor:
future_html = {
executor.submit(self._get_response, category): category for category in prev_categories}
responses = [future.result() for future in as_completed(future_html)]
category_soups = [BeautifulSoup(response.text) for response in responses]
pre_sub_category_links = [str(link.get('href')) for category in category_soups
for link in category.findAll('a') if target_url in str(link)]
return sorted({link for link in pre_sub_category_links if link not in prev_categories})
if self.print_progress:
if self.threads == 1:
responses, pre, subcategories = [], [], set()
for category in prev_categories:
response = self._get_response(category)
responses.append(response)
print(f'Got response {response} for {self.modes[section]}-{category}')
if self.log:
print(f'Got response {response} for {self.modes[section]}-{category}',
file=self.log, end='\n')
category_soups = [BeautifulSoup(response.text, features='lxml') for response in responses]
for soup in category_soups:
for link in soup.findAll('a'):
if target_url in str(link):
fetched_link = str(link.get('href'))
pre.append(fetched_link)
print(f'Fetched {self.modes[section]}-{fetched_link}')
if self.log:
print(f'Fetched {self.modes[section]}-{fetched_link}', file=self.log,
end='\n')
return sorted({link for link in pre if link not in prev_categories})
if self.threads > 1:
with ThreadPoolExecutor(max_workers=self.threads) as executor:
category_soups = []
future_responses = {
executor.submit(self._get_response, category): category for category in prev_categories}
for future in as_completed(future_responses):
url = future_responses[future]
try:
response = future.result()
print(f'Got response {response} for {self.modes[section]}-{url}')
if self.log:
print(f'Got response {response} for {self.modes[section]}-{url}',
file=self.log, end='\n')
except(HTTPError, ConnectTimeout, ConnectionError):
print(f'Failed to get response from {url}')
if self.log:
print(f'Failed to get response from {url}', file=self.log, end='\n')
else:
category_soups.append(BeautifulSoup(response.text, features='lxml'))
pre_sub_category_links = [str(link.get('href')) for category in category_soups
for link in category.findAll('a') if target_url in str(link)]
return sorted({link for link in pre_sub_category_links if link not in prev_categories})
def _get_main_category_urls(self, section: str, subs=True):
"""
Manage the scrape/read from previous session cache operations and return section Urls.
If the program found previously cached files, will read and return existing data, else
new content will be scraped and returned.
Args:
section: Keyword indication of target section.
'bs': Best Sellers
'nr': New Releases
'ms': Movers & Shakers
'gi': Gift Ideas
'mw': Most Wished For
subs: If False, only categories will be returned.
Return:
2 sorted lists: categories and subcategories.
"""
text_file_names = \
{section_short: {'categories': self.modes[section_short] + ' Category Urls.txt',
'subcategories': self.modes[section_short] + ' Subcategory Urls.txt'}
for section_short in self.modes}
if 'Amazon' not in os.listdir(self.path):
os.mkdir('Amazon')
os.chdir(self.path + 'Amazon')
if 'Amazon' in os.listdir(self.path):
categories = self._read_main_category_urls(text_file_names, section, 'categories', self.path)
if not subs:
if self.cleanup_empty:
self._cleanup_empty_files(self.path)
return sorted(categories)
subcategories = self._read_main_category_urls(text_file_names, section, 'subcategories', self.path)
try:
if categories and subcategories:
if self.cleanup_empty:
self._cleanup_empty_files(self.path)
return sorted(categories), sorted(subcategories)
except UnboundLocalError:
pass
if not subs:
categories = self._scrape_main_category_urls(section, 'categories')
if self.cache_contents:
self._cache_main_category_urls(text_file_names, section, 'categories', self.path, categories)
if self.cleanup_empty:
self._cleanup_empty_files(self.path)
return sorted(categories)
if subs:
categories = self._scrape_main_category_urls(section, 'categories')
if self.cache_contents:
self._cache_main_category_urls(text_file_names, section, 'categories', self.path, categories)
subcategories = self._scrape_main_category_urls(section, 'subcategories', categories)
if self.cache_contents:
self._cache_main_category_urls(text_file_names, section, 'subcategories', self.path, subcategories)
if self.cleanup_empty:
self._cleanup_empty_files(self.path)
return sorted(categories), sorted(subcategories)
def _extract_page_product_urls(self, page_url: str):
"""
Extract product Urls from an Amazon page and the page title.
Args:
page_url: Target page.
Return:
The page category title(string) and a sorted list of product Urls.
"""
prefix = 'https://www.amazon.com'
response = self._get_response(page_url)
soup = BeautifulSoup(response.text, features='lxml')
try:
title = soup.h1.text.strip()
except AttributeError:
title = 'N/A'
product_links = {prefix + link.get('href') for link in soup.findAll('a') if 'psc=' in str(link)}
return title, sorted(product_links)
@staticmethod
def _cleanup_empty_files(dir_path: str):
"""
Cleanup a given folder from empty .txt files.
Args:
dir_path: Path to the target folder to be cleaned up.
Return:
None
"""
for file_name in [file for file in os.listdir(dir_path)]:
if not os.path.isdir(file_name):
try:
contents = open(file_name).read().strip()
if not contents:
os.remove(file_name)
except(UnicodeDecodeError, FileNotFoundError):
pass
def _category_page_title_to_url(self, section: str, category_class: str, delimiter='&&&'):
"""
Map category/subcategory names to their respective Urls.
Args:
section:
'bs': Best Sellers
'nr': New Releases
'ms': Movers & Shakers
'gi': Gift Ideas
'mw': Most Wished For
category_class: Category level indication 'categories' or 'subcategories'.
delimiter: Delimits category/subcategory names and their respective Urls in the .txt files.
Return:
A list of lists(pairs): [[category/subcategory name, Url], ...]
"""
file_names = {'categories': self.modes[section] + ' Category Names.txt',
'subcategories': self.modes[section] + ' Subcategory Names.txt'}
names_urls = []
os.chdir(self.path)
if 'Amazon' in os.listdir(self.path):
os.chdir('Amazon')
file_name = file_names[category_class]
if file_name in os.listdir(self.path + 'Amazon'):
with open(file_name) as names:
if self.cleanup_empty:
self._cleanup_empty_files(self.path)
return [line.rstrip().split(delimiter) for line in names.readlines()]
if 'Amazon' not in os.listdir(self.path):
os.mkdir('Amazon')
os.chdir('Amazon')
categories, subcategories = self._get_main_category_urls(section)
if not self.print_progress:
if self.threads == 1:
responses_urls = [(self._get_response(url), url)
for url in eval('eval(category_class)')]
soups_urls = [(BeautifulSoup(item[0].text, features='lxml'), item[1]) for item in responses_urls]
for soup, url in soups_urls:
try:
title = soup.h1.text.strip()
names_urls.append([title, url])
except AttributeError:
pass
if self.threads > 1:
with ThreadPoolExecutor(max_workers=self.threads) as executor:
future_responses = {
executor.submit(self._get_response, category): category
for category in eval('eval(category_class)')}
responses = [future.result() for future in as_completed(future_responses)]
responses_urls = [
(response, url) for response, url in zip(responses, eval('eval(category_class)'))]
soups_urls = [
(BeautifulSoup(item[0].text, features='lxml'), item[1]) for item in responses_urls]
for soup, url in soups_urls:
try:
title = soup.h1.text.strip()
names_urls.append([title, url])
except AttributeError:
pass
if self.print_progress:
if self.threads == 1:
for url in eval('eval(category_class)'):
response = self._get_response(url)
print(f'Got response {response} for {url}')
print(f'Fetching name of {url} ...')
if self.log:
print(f'Got response {response} for {url}', file=self.log, end='\n')
print(f'Fetching name of {url} ...', file=self.log, end='\n')
soup = BeautifulSoup(response.text, features='lxml')
try:
title = soup.h1.text.strip()
names_urls.append([title, url])
print(f'Fetching name {title} ... done')
if self.log:
print(f'Fetching name {title} ... done', file=self.log, end='\n')
except AttributeError:
print(f'Fetching name failure for {url}')
if self.log:
print(f'Fetching name failure for {url}', file=self.log, end='\n')
if self.threads > 1:
with ThreadPoolExecutor(max_workers=self.threads) as executor:
future_responses = {
executor.submit(self._get_response, category): category
for category in eval('eval(category_class)')}
for future_response in as_completed(future_responses):
response = future_response.result()
url = future_responses[future_response]
print(f'Got response {response} for {url}')
if self.log:
print(f'Got response {response} for {url}', file=self.log, end='\n')
soup = BeautifulSoup(response.text, features='lxml')
try:
title = soup.h1.text.strip()
names_urls.append([title, url])
print(f'Fetching name {title} ... done')
if self.log:
print(f'Fetching name {title} ... done', file=self.log, end='\n')
except AttributeError:
print(f'Fetching name failure for {url}')
if self.log:
print(f'Fetching name failure for {url}', file=self.log, end='\n')
if self.cache_contents:
with open(file_names[category_class], 'w') as names:
for name, url in names_urls:
names.write(name + delimiter + url + '\n')
if self.cleanup_empty:
self._cleanup_empty_files(self.path + 'Amazon')
return names_urls
def _extract_section_products(self, section: str, category_class: str):
"""
For every category/subcategory successfully scraped from the given section, product urls will be extracted.
Args:
section:
'bs': Best Sellers
'nr': New Releases
'ms': Movers & Shakers
'gi': Gift Ideas
'mw': Most Wished For
category_class: Category level indication 'categories' or 'subcategories'.
Return:
List of tuples(category name, product urls) containing product Urls for each scraped category/subcategory.
"""
products = []
names_urls = self._category_page_title_to_url(section, category_class)
urls = [item[1] for item in names_urls]
folder_name = ' '.join([self.modes[section], category_class[:-3].title() + 'y', 'Product Urls'])
if not self.print_progress:
if self.threads == 1:
products = [
(category_name, [product_url for product_url in self._extract_page_product_urls(category_url)[1]])
for category_name, category_url in names_urls]
products = [item for item in products if item[1]]
if self.threads > 1:
with ThreadPoolExecutor(max_workers=self.threads) as executor:
future_products = {executor.submit(self._extract_page_product_urls, category_url): category_url
for category_url in urls}
products = [future.result() for future in as_completed(future_products)]
products = [item for item in products if item[1]]
if self.print_progress:
products = []
if self.threads == 1:
for category_name, category_url in names_urls:
product_urls = self._extract_page_product_urls(category_url)
if product_urls[1]:
print(f'Extraction of {category_name} products ... done')
if self.log:
print(f'Extraction of {category_name} products ... done', file=self.log, end='\n')
products.append(product_urls)
else:
print(f'Extraction of {category_name} products ... failure')
if self.log:
print(f'Extraction of {category_name} products ... failure', file=self.log, end='\n')
if self.threads > 1:
with ThreadPoolExecutor(max_workers=self.threads) as executor:
future_products = {executor.submit(self._extract_page_product_urls, category_url): category_url
for category_url in urls}
for future in as_completed(future_products):
category_name, category_urls = future.result()
if category_urls:
print(f'Extraction of {category_name} products ... done')
if self.log:
print(f'Extraction of {category_name} products ... done', file=self.log, end='\n')
products.append((category_name, category_urls))
else:
print(f'Extraction of {category_name} products ... failure')
if self.log:
print(f'Extraction of {category_name} products ... failure', file=self.log, end='\n')
if self.cache_contents:
if folder_name not in os.listdir(self.path + 'Amazon'):
os.mkdir(folder_name)
os.chdir(folder_name)
for category_name, category_product_urls in products:
with open(category_name + '.txt', 'w') as links:
for url in category_product_urls:
links.write(url + '\n')
if self.cleanup_empty:
self._cleanup_empty_files(self.path + 'Amazon/' + folder_name)
return products
def _get_amazon_product_details(self, product_url: str):
"""
Extract product details including:
[Price, Title, URL, Rating, Number of reviews, Sold by, Features, Technical table]
Args:
product_url: Target product.
Return:
A dictionary with the scraped details.
"""
product_html_details, text_details = {}, {}
response = self._get_response(product_url).text
html_content = BeautifulSoup(response, features='lxml')
product_html_details['Price'] = html_content.find('span', {'id': 'price_inside_buybox'})
product_html_details['Url'] = product_url
product_html_details['Title'] = html_content.title
product_html_details['Rating'] = html_content.find('span',
{'class': 'reviewCountTextLinkedHistogram noUnderline'})
product_html_details['Number of reviews'] = html_content.find('span', {'id': 'acrCustomerReviewText'})
product_html_details['Sold by'] = html_content.find('a', {'id': 'bylineInfo'})
product_html_details['Features'] = html_content.find('div', {'id': 'feature-bullets'})
if product_html_details['Features']:
product_html_details['Features'] = product_html_details['Features'].findAll('li')
technical_table = html_content.find('table', {'class': 'a-keyvalue prodDetTable'})
if technical_table:
product_html_details['Technical details'] = list(
zip([item.text.strip() for item in technical_table.findAll('th')],
[item.text.strip() for item in technical_table.findAll('td')]))
for item in product_html_details:
if isinstance(product_html_details[item], bs4.element.Tag):
text_details[item] = product_html_details[item].text.strip()
if isinstance(product_html_details[item], bs4.element.ResultSet):
text_details[item] = ' • '.join([tag.text.strip() for tag in product_html_details[item]])
if isinstance(product_html_details[item], str):
text_details[item] = product_html_details[item]
if item == 'Technical details':
text_details[item] = ' • '.join([' : '.join(pair) for pair in product_html_details[item]])
return text_details
if __name__ == '__main__':
start_time = perf_counter()
path = input('Enter path to save files: ')
session = AmazonScraper(print_progress=True, threads=20, log='amazon_log.txt', path=path)
print(session._extract_section_products('bs', 'categories'))
print(session._extract_section_products('bs', 'subcategories'))
end_time = perf_counter()
print(f'Time: {end_time - start_time} seconds.')发布于 2019-10-23 00:50:22
此默认设置:
path=None不是有效的None,而是'/Users/user_name/Desktop/Amazon Scraper/'。这是一个不可变的值,所以直接放入默认值是安全的。
该路径的一个明显问题是,它是绝对的,而不是每个用户。考虑将os.path.expanduser与~一起使用。
写这篇文章:
self.proxies = [{'https:': 'https://' + item.rstrip(), 'http':
'http://' + item.rstrip()} for item in open('proxies.txt').readlines()]每行应该有一个迪克项目,否则会让人感到困惑。换句话说,
self.proxies = [{'https:': 'https://' + item.rstrip(),
'http': 'http://' + item.rstrip()}
for item in open('proxies.txt').readlines()] self.starting_target_urls = \
{'bs': ('https://www.amazon.com/gp/bestsellers/', 'https://www.amazon.com/Best-Sellers'),
'nr': ('https://www.amazon.com/gp/new-releases/', 'https://www.amazon.com/gp/new-releases/'),
'ms': ('https://www.amazon.com/gp/movers-and-shakers/', 'https://www.amazon.com/gp/movers-and-shakers/'),
'gi': ('https://www.amazon.com/gp/most-gifted/', 'https://www.amazon.com/gp/most-gifted'),
'mw': ('https://www.amazon.com/gp/most-wished-for/', 'https://www.amazon.com/gp/most-wished-for/')}可以是
self.starting_target_urls = {
'bs': ('https://www.amazon.com/gp/bestsellers/', 'https://www.amazon.com/Best-Sellers'),
'nr': ('https://www.amazon.com/gp/new-releases/', 'https://www.amazon.com/gp/new-releases/'),
'ms': ('https://www.amazon.com/gp/movers-and-shakers/', 'https://www.amazon.com/gp/movers-and-shakers/'),
'gi': ('https://www.amazon.com/gp/most-gifted/', 'https://www.amazon.com/gp/most-gifted'),
'mw': ('https://www.amazon.com/gp/most-wished-for/', 'https://www.amazon.com/gp/most-wished-for/')
}这是:
os.chdir(content_path + 'Amazon/')应该使用pathlib和/运算符。
这是:
if self.log:
print(f'Saving {category} ... failure.', file=self.log, end='\n')不应该直接写到文件。相反,您应该使用指向该文件的文件处理程序来设置股票Python日志记录。它更灵活,更易于维护。
像这样的台词:
return [link.rstrip() for link in cats.readlines()]你不需要打电话给readlines。迭代一个文件对象,遍历它的行。
_get_response应该包括对raise_for_status的调用。这是对HTTP调用进行更好验证的一种快速而简单的方法。
https://codereview.stackexchange.com/questions/231126
复制相似问题