import requests
import pandas as pd
from bs4 import BeautifulSoup
from datetime import date
today = date.today()
Date = today
headers = {'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
'Accept-Language': 'en-US, en;q=0.5'}
URL = ['https://www.amazon.com/Dove-Intensive-Concentrate-Technology-Protects/dp/B0B1VVXTKL',
'https://www.amazon.com/Dove-Intensive-Concentrate-Conditioner-Technology/dp/B0B1VXFLQ2']
data = []
for url in URL:
webpage = requests.get(url, headers=headers)
soup = BeautifulSoup(webpage.content)
data.append({
'Rank': soup.select_one('#detailBulletsWrapper_feature_div span:-soup-contains("Best Seller")').contents[2].get_text().split()[0],
#'rank': soup.select_one('#detailBulletsWrapper_feature_div span:-soup-contains("Best Seller")').contents[2].get_text().split()[0].replace('#', '').split(),
'Category': " ".join(soup.select_one('#detailBulletsWrapper_feature_div span:-soup-contains("Best Seller")').contents[2].get_text().split()[2:6]),
'Sub-Category Rank': soup.select_one('#detailBulletsWrapper_feature_div span:-soup-contains("Best Seller")').contents[5].get_text().split()[0],
'Sub-Category': " ".join(soup.select_one('#detailBulletsWrapper_feature_div span:-soup-contains("Best Seller")').contents[5].get_text().split()[2:6]),
# ASIN
'ASIN': soup.select_one('#detailBulletsWrapper_feature_div span:-soup-contains("ASIN")').contents[3].get_text(),
# Product Title
'Product Title': soup.find("span", attrs={"id":'productTitle'}).text.strip(),
'Date': Date
})
df = pd.DataFrame(data)
df['Rank'] = df['Rank'].str.replace('#', '')
df['Sub-Category Rank'] = df['Sub-Category Rank'].str.replace('#', '')
# to local file
df.to_csv(local_path, mode='a', header=False, index=False)我正在尝试使用木星笔记本中的导入时间表库,因为Windows中的Cron和Task调度程序对我不起作用。我每天早上8点试着执行这段代码。有人能帮我定义一下这份工作吗?谢谢!
发布于 2022-08-19 21:10:30
这个应该能用
import requests
import pandas as pd
from bs4 import BeautifulSoup
from datetime import date
import datetime
import asyncio
def wait_for_clock(hour, minute, result=None):
t = datetime.datetime.combine(
datetime.date.today(),
datetime.time(hour, minute)
)
tt = datetime.datetime.now()
if tt >= t:
t += datetime.timedelta(days=1)
delta = t - tt
delta_sec = delta.seconds + delta.microseconds * 0.000001
return asyncio.sleep(delta_sec, result)
async def do_that():
today = date.today()
Date = today
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
'Accept-Language': 'en-US, en;q=0.5'}
URL = ['https://www.amazon.com/Dove-Intensive-Concentrate-Technology-Protects/dp/B0B1VVXTKL',
'https://www.amazon.com/Dove-Intensive-Concentrate-Conditioner-Technology/dp/B0B1VXFLQ2']
data = []
for url in URL:
webpage = requests.get(url, headers=headers)
soup = BeautifulSoup(webpage.content)
data.append({
'Rank': soup.select_one('#detailBulletsWrapper_feature_div span:-soup-contains("Best Seller")').contents[
2].get_text().split()[0],
# 'rank': soup.select_one('#detailBulletsWrapper_feature_div span:-soup-contains("Best Seller")').contents[2].get_text().split()[0].replace('#', '').split(),
'Category': " ".join(
soup.select_one('#detailBulletsWrapper_feature_div span:-soup-contains("Best Seller")').contents[
2].get_text().split()[2:6]),
'Sub-Category Rank':
soup.select_one('#detailBulletsWrapper_feature_div span:-soup-contains("Best Seller")').contents[
5].get_text().split()[0],
'Sub-Category': " ".join(
soup.select_one('#detailBulletsWrapper_feature_div span:-soup-contains("Best Seller")').contents[
5].get_text().split()[2:6]),
# ASIN
'ASIN': soup.select_one('#detailBulletsWrapper_feature_div span:-soup-contains("ASIN")').contents[
3].get_text(),
# Product Title
'Product Title': soup.find("span", attrs={"id": 'productTitle'}).text.strip(),
'Date': Date
})
df = pd.DataFrame(data)
df['Rank'] = df['Rank'].str.replace('#', '')
df['Sub-Category Rank'] = df['Sub-Category Rank'].str.replace('#', '')
# to local file
df.to_csv(local_path, mode='a', header=False, index=False)
if __name__ == '__main__':
while True:
asyncio.run(wait_for_clock(8, 0))
asyncio.run(do_that())这里我使用的不是shedule库,而是用于等待时钟的代码(我也是以“异步”的方式编写的,但您也可以将函数do_that改为同步,然后在底层更改,而不是asyncio.run(do_that()),只是"do_that()")。
https://stackoverflow.com/questions/73422201
复制相似问题