首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >谷歌分析网络爬虫为每个国家的代码

谷歌分析网络爬虫为每个国家的代码
EN

Code Review用户
提问于 2019-01-17 03:25:52
回答 1查看 88关注 0票数 4

目前,我想推荐哪些地方可以更改,哪些地方可以用来改进简单性和模块化。如果我正在实践良好的命名约定,以提供干净、可读的代码。任何批评都是非常感谢的。

代码语言:javascript
复制
import time
import re
import requests
from bs4 import BeautifulSoup, SoupStrainer
import pandas as pd
import time

SESSION = requests.Session()


""" This is the Google Analytics Selector Section """


class myGoogleSession:

    def fetch_google_xml(self, URL, country_code):
        format_url = f"{URL}{country_code}"
        response = SESSION.get(format_url)
        soup = BeautifulSoup(response.text, 'xml',
                             parse_only=SoupStrainer('channel'))
        return soup


google_session = myGoogleSession()


def google_trends_retriever(URL, country_code):
    xml_soup = google_session.fetch_google_xml(URL, country_code)
    print(country_code)
    return[(title.text, re.sub("[+,]", "", traffic.text))
           for title, traffic in zip(xml_soup.find_all('title')[1:],
                                     xml_soup.find_all('ht:approx_traffic'))]


def create_pdTrend(data):
    check_panda = pd.DataFrame(
        google_trends_retriever(GoogleURL, data),
        columns=['Title', 'Score']
    )
    if len(check_panda) == 0:
        print('No available data')
    else:
        return check_panda


""" This is the Country Code Selector Section """


country_code_list = []


class myCountryCodeSession:
    def fetch_countrycode_html(self, URL):
        response = SESSION.get(URL)
        soup = BeautifulSoup(response.text, 'html.parser',
                             parse_only=SoupStrainer('table'))
        return soup


countryCode_session = myCountryCodeSession()


def parse_row(url):
    rows = countryCode_session.fetch_countrycode_html(url)
    _rows = rows.findChildren(['td', 'tr'])
    for row in _rows:
        cells = row.findChildren('td')[2:3]
        for cell in cells:
            value = cell.string
            country_code_list.append(value[:2])
    return None


def create_pdCountryCode(country_code):
    return pd.DataFrame({'Country_Code': country_code})


def iterate_List(data):
    i = 1
    while i <= 239:
        selected_CountryCode = get_data_fromList(i)
        print(create_pdTrend(selected_CountryCode))
        i += 1
    else:
        print('Has reach the end of i ' + str(i))


def get_data_fromList(num):
    key = num-1
    for i in country_code_list[key:num]:
        return str(i)


if __name__ == '__main__':
    """ URL Section """
    GoogleURL = "https://trends.google.com/trends/trendingsearches/daily/rss?geo="
    CountryCodeURL = "https://countrycode.org/"
    """-------------"""
    start = time.time()
    print("hello")

    """Country Code Section """
    parse_row(CountryCodeURL)
    """---------------------"""

    """Google Analytics Section """
    iterate_List(country_code_list)
    """-------------------------"""
    end = time.time()
    print(end - start)
EN

回答 1

Code Review用户

回答已采纳

发布于 2019-01-17 10:26:13

PEP8

这是正式的Python风格指南。如果您对良好的命名约定和其他良好实践感兴趣,可以从这里开始。

除其他外,您的代码将受益于:

  • 变量名使用lower_snake_case
  • 类名使用PascalCase
  • 使用#分隔的注释,而不是代码中的原始字符串;
  • 删除代码中多余/无用的部分。

第一次重写将产生:

代码语言:javascript
复制
import re
import time

import requests
import pandas as pd
from bs4 import BeautifulSoup, SoupStrainer


SESSION = requests.Session()


# This is the Google Analytics Selector Section
class GoogleSession:
    def fetch_google_xml(self, URL, country_code):
        response = SESSION.get(f"{URL}{country_code}")
        return BeautifulSoup(
                response.text, 'xml',
                parse_only=SoupStrainer('channel'))


google_session = GoogleSession()


def google_trends_retriever(URL, country_code):
    xml_soup = google_session.fetch_google_xml(URL, country_code)
    print(country_code)
    titles = xml_soup.find_all('title')[1:]
    traffics = xml_soup.find_all('ht:approx_traffic')
    return [
            (title.text, re.sub("[+,]", "", traffic.text))
            for title, traffic in zip(titles, traffics)
    ]


def create_pd_trend(data):
    check_panda = pd.DataFrame(
            google_trends_retriever(google_URL, data),
            columns=['Title', 'Score'],
    )
    if len(check_panda) == 0:
        print('No available data')
    else:
        return check_panda


# This is the Country Code Selector Section
country_code_list = []


class CountryCodeSession:
    def fetch_country_code_html(self, URL):
        response = SESSION.get(URL)
        return BeautifulSoup(
                response.text, 'html.parser',
                parse_only=SoupStrainer('table'))


country_code_session = CountryCodeSession()


def parse_row(url):
    rows = country_code_session.fetch_country_code_html(url)
    for row in rows.find_all(['td', 'tr']):
        cells = row.find_all('td')[2:3]
        for cell in cells:
            value = cell.string
            country_code_list.append(value[:2])


def iterate_list(data):
    i = 1
    while i <= 239:
        selected_country_code = get_data_from_list(i)
        print(create_pd_trend(selected_country_code))
        i += 1
    else:
        print('Has reach the end of i', i)


def get_data_from_list(num):
    key = num - 1
    for i in country_code_list[key:num]:
        return str(i)


if __name__ == '__main__':
    # URL Section
    google_URL = "https://trends.google.com/trends/trendingsearches/daily/rss?geo="
    country_code_URL = "https://countrycode.org/"
    # -------------
    start = time.time()
    print("hello")

    # Country Code Section
    parse_row(country_code_URL)
    # ---------------------

    # Google Analytics Section
    iterate_list(country_code_list)
    # -------------------------
    end = time.time()
    print(end - start)

循环就像本地的

当我看到

def get_data_fromList(num):key = num-1表示country_code_list键:num中的i:返回str(i)

我想知道为什么你会写这么复杂的代码。提取一个元素的子列表来迭代它,并返回第一个…你可以把它简化为

代码语言:javascript
复制
def get_data_from_list(num):
    return str(country_code_list[num - 1])

但是我想知道为什么要使用这种方法,并且看到了如何迭代索引来调用这个函数。不要。使用for-循环,因为它是要使用的:直接迭代内容。

这将产生:

代码语言:javascript
复制
import re
import time

import requests
import pandas as pd
from bs4 import BeautifulSoup, SoupStrainer


SESSION = requests.Session()


# This is the Google Analytics Selector Section
class GoogleSession:
    def fetch_google_xml(self, URL, country_code):
        response = SESSION.get(f"{URL}{country_code}")
        return BeautifulSoup(
                response.text, 'xml',
                parse_only=SoupStrainer('channel'))


google_session = GoogleSession()


def google_trends_retriever(URL, country_code):
    xml_soup = google_session.fetch_google_xml(URL, country_code)
    print(country_code)
    titles = xml_soup.find_all('title')[1:]
    traffics = xml_soup.find_all('ht:approx_traffic')
    return [
            (title.text, re.sub("[+,]", "", traffic.text))
            for title, traffic in zip(titles, traffics)
    ]


def create_pd_trend(data):
    check_panda = pd.DataFrame(
            google_trends_retriever(google_URL, data),
            columns=['Title', 'Score'],
    )
    if len(check_panda) == 0:
        print('No available data')
    else:
        return check_panda


# This is the Country Code Selector Section
class CountryCodeSession:
    def fetch_country_code_html(self, URL):
        response = SESSION.get(URL)
        return BeautifulSoup(
                response.text, 'html.parser',
                parse_only=SoupStrainer('table'))


country_code_session = CountryCodeSession()


def parse_row(url):
    rows = country_code_session.fetch_country_code_html(url)
    return [
            cell.string[:2]
            for row in rows.find_all(['td', 'tr'])
            for cell in row.find_all('td')[2:3]
    ]


def iterate_list(country_codes):
    for country_code in country_codes:
        print(create_pd_trend(str(country_code)))
    else:
        print('Has reach the end of i', len(country_codes))


if __name__ == '__main__':
    # URL Section
    google_URL = "https://trends.google.com/trends/trendingsearches/daily/rss?geo="
    country_code_URL = "https://countrycode.org/"
    # -------------
    start = time.time()
    print("hello")

    # Country Code Section
    country_code_list = parse_row(country_code_URL)
    # ---------------------

    # Google Analytics Section
    iterate_list(country_code_list)
    # -------------------------
    end = time.time()
    print(end - start)

停止写类

您的类绝对不会对单个函数添加任何值。您不存储在每次调用之后重用的状态。您不能在多个函数之间共享状态。它们是名称空间中的普通函数,让它们成为简单的函数。

这段代码可以从使用类中获益,但不是这样的。

解析字节,而不是文本

lxml,它是在指示BeautifulSoup解码'xml'时使用的底层解析器,它显式地处理原始字节,而不是解码文本。这是为了能够检测到显式编码声明,并对文档的其余部分进行适当的解码;这样您就不会有解码错误。

这意味着在解析XML时,需要将response.content而不是response.text提供给BeautifulSoup

正确地管理您的状态

您的代码在很大程度上依赖于全局变量和printing数据。这是代码中最糟糕的部分,因为它使代码几乎无法重用,并且很难正确测试(比如unittestdoctest)。

不要使用全局变量,而是将它们作为参数传递,并从函数中返回它们。

而不是打印结果,而是从函数返回值。这使得提取和按摩数据变得更容易。

还有在整个代码中使用的全局SESSION。我会将它封装到一个类中,以便每个实例都有一个会话,这样,如果需要的话,您可以轻松地爬行几个地址。

我对这个问题的看法是:

代码语言:javascript
复制
import re
from functools import partial

import requests
import pandas as pd
from bs4 import BeautifulSoup, SoupStrainer


class GoogleAnalysis:
    def __init__(self, url):
        session = requests.Session()
        self.get_url = partial(session.get, url)

    def _fetch_xml(self, country_code):
        response = self.get_url(params={'geo': country_code})
        return BeautifulSoup(
                response.content, 'xml',
                parse_only=SoupStrainer('channel'))

    def _retrieve_trends(self, country_code):
        soup = self._fetch_xml(country_code)
        titles = soup.find_all('title')[1:]
        traffics = soup.find_all('ht:approx_traffic')
        return [
                (title.text, re.sub("[+,]", "", traffic.text))
                for title, traffic in zip(titles, traffics)
        ]

    def trends(self, country_code):
        df = pd.DataFrame(
                self._retrieve_trends(country_code),
                columns=['Title', 'Score'],
        )
        df['Country Code'] = country_code
        return df


def country_codes(url='https://countrycode.org/'):
    response = requests.get(url)
    soup = BeautifulSoup(
            response.text, 'lxml',
            parse_only=SoupStrainer('table'))
    return [
            cell.string[:2]
            for row in soup.find_all(['td', 'tr'])
            # Some rows don't define row.find_all('td')[2] so filter out
            for cell in row.find_all('td')[2:3]
    ]


def main(url):
    google = GoogleAnalysis(url)
    codes = country_codes()
    return pd.concat([
        google.trends(country_code)
        # Country codes are repeated twice, we only need them once
        for country_code in codes[:len(codes) // 2]
    ])


if __name__ == '__main__':
    import time
    start = time.perf_counter()
    print('Hello!')
    trends = main('https://trends.google.com/trends/trendingsearches/daily/rss')
    print(trends.to_string(index=False))
    print(time.perf_counter() - start)

注意,最后的print(trends.to_string(index=False))可能是您喜欢的任何东西,要么打印到CSV,要么使用trends.groupby重做旧格式。这里的想法是,计算是在没有任何print的情况下完成的。您可以在最后格式化数据,不管您喜欢什么。

票数 3
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/211654

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档