我创建了一个概念web爬虫的小示例,以了解.NET中有关异步的更多信息。
当前,当运行时,它会以固定数量的当前请求(工作人员)爬行堆栈溢出。
我感兴趣的是,我是否犯了任何常见的错误,如果有更多的框架/ TPL,我可以使用它来减少代码的使用量。
要运行它,您需要安装HtmlAgilityPack。
我只对程序结构及其异步方面的反馈感兴趣。
class Program
{
static void Main(string[] args)
{
var crawler = new WebCrawler();
crawler.Start();
}
public class WebCrawler
{
private readonly HttpClient client;
private const string SiteUrl = "http://stackoverflow.com";
private CrawlList crawlList;
private const int MaxWorkers =5;
private int workers;
public WebCrawler()
{
client = new HttpClient();
crawlList = new CrawlList();
}
public void Start()
{
crawlList = new CrawlList();
crawlList.AddUrl(SiteUrl);
do
{
if (workers >= MaxWorkers) continue;
if (!crawlList.HasNext()) continue;
Interlocked.Increment(ref workers);
Debug.Write("Workers " + workers);
ProcessUrl(crawlList.GetNext());
} while (crawlList.HasNext() || workers > 0);
}
private async void ProcessUrl(string url)
{
Debug.Print("Processing " + url);
await client.GetAsync(url).ContinueWith(ProcessResponse);
}
private async void ProcessResponse(Task<HttpResponseMessage> response)
{
Debug.Print("Processing response ");
var html = await response.Result.Content.ReadAsStringAsync();
var doc = new HtmlDocument();
doc.LoadHtml(html);
var internalLinks = doc.DocumentNode.SelectNodes("//a[@href]").Where(x => x.Attributes["href"].Value.StartsWith("/")).Select(x => SiteUrl + x.Attributes["href"].Value).ToList();
internalLinks.ForEach(x => crawlList.AddUrl(x));
Interlocked.Decrement(ref workers);
}
}
public class CrawlList
{
private readonly ConcurrentBag<string> urlsToCrawl;
private readonly ConcurrentBag<string> urlsCompleted;
public CrawlList()
{
urlsToCrawl = new ConcurrentBag<string>();
urlsCompleted = new ConcurrentBag<string>();
}
public bool HasNext()
{
return urlsToCrawl.Any();
}
public string GetNext()
{
string url;
urlsToCrawl.TryTake(out url);
urlsCompleted.Add(url);
return url;
}
public void AddUrl(string url)
{
if (!UrlAlreadyAdded(url))
{
urlsToCrawl.Add(url);
Debug.Print("Adding Url " + url);
}
}
public bool UrlAlreadyAdded(string url)
{
return urlsToCrawl.Contains(url) || urlsCompleted.Contains(url);
}
}
}在浏览了所有建议之后,下面我有了一个新的实现。我认为这个简单的POC涵盖了人们以前在我的代码中发现的大多数问题。很高兴听到所有的贡献者现在对此有何看法?
非常喜欢Task.WhenAny方法。认为我的工作是线程安全,但这是我唯一关心的。
class Program
{
static void Main(string[] args)
{
var crawler = new Crawler(5);
try
{
crawler.Crawl("http://cask-marque.co.uk/").Wait();
}
catch (AggregateException e)
{
foreach (var ex in e.InnerExceptions)
{
Console.WriteLine(ex.InnerException);
}
Console.ReadLine();
}
}
public class Crawler
{
public readonly CrawlList CrawlList;
private readonly List<Task<string>> runningTasks = new List<Task<string>>();
private readonly HttpClient client;
private readonly int maxConcurrentDownload;
private const string BaseUrl = "http://cask-marque.co.uk";
public Crawler(int maxConcurrentDownload)
{
CrawlList = new CrawlList();
client = new HttpClient();
this.maxConcurrentDownload = maxConcurrentDownload;
ServicePointManager.DefaultConnectionLimit = maxConcurrentDownload;
}
public async Task<bool> Crawl(string startUrl)
{
runningTasks.Add(ProcessUrl(startUrl));
while (runningTasks.Any())
{
var completedTask = await Task.WhenAny(runningTasks);
runningTasks.Remove(completedTask);
var pageHtml = await completedTask;
while (CrawlList.HasNext() && runningTasks.Count < maxConcurrentDownload)
{
var url = CrawlList.GetNext();
runningTasks.Add(ProcessUrl(url));
}
}
return true;
}
private async Task<string> ProcessUrl(string url)
{
Console.WriteLine("url " + url);
var response = await client.GetAsync(url);
var content = await response.Content.ReadAsStringAsync();
var doc = new HtmlDocument();
doc.LoadHtml(content);
var urls = doc.DocumentNode.SelectNodes("//a[@href]").Where(x => x.Attributes["href"].Value.StartsWith("/")).Select(x => BaseUrl + x.Attributes["href"].Value).ToList();
CrawlList.AddUrls(urls);
return content;
}
}
public class CrawlList
{
public readonly Queue<string> UrlsToCrawl;
public readonly List<string> UrlsCompleted;
public CrawlList()
{
UrlsToCrawl = new Queue<string>();
UrlsCompleted = new List<string>();
}
public bool HasNext()
{
return UrlsToCrawl.Any();
}
public string GetNext()
{
return UrlsToCrawl.Dequeue();
}
public void AddUrls(List<string> urls)
{
foreach (var url in urls)
{
AddUrl(url);
}
}
public void AddUrl(string url)
{
if (UrlAlreadyAdded(url)) return;
UrlsToCrawl.Enqueue(url);
}
public bool UrlAlreadyAdded(string url)
{
return UrlsToCrawl.Contains(url) || UrlsCompleted.Contains(url);
}
}
}
}发布于 2014-10-03 23:53:26
crawler.Start();这是非常混乱的命名。如果一个方法被称为Start(),我希望它能够启动工作,然后返回,而不是完成所有的工作。更好的名称应该是类似于Run()的名称,或者更好的名称,更具体的名称。
private const string SiteUrl = "http://stackoverflow.com";为什么这个在这里?我认为,作为构造函数参数,或者作为Start()的参数,这更有意义。
private const int MaxWorkers =5;同样,这可能是可配置的。而且,如果您正在处理单个网站,那么比默认设置为2的ServicePointManager.DefaultConnectionLimit更多的工作人员没有多大意义。
public WebCrawler()
{
client = new HttpClient();
crawlList = new CrawlList();
}
public void Start()
{
crawlList = new CrawlList();如果每次调用crawlList时都要设置Start(),则不需要在构造函数中设置它。
do
{
…
} while (crawlList.HasNext() || workers > 0);所以,当一些下载正在进行中时,您将循环使用并浪费整个CPU核心来做任何有用的事情?那是个糟糕的主意。墨西哥人的回答有一种解决办法。
await client.GetAsync(url).ContinueWith(ProcessResponse);这里不需要ContinueWith(),这就是await的目的:
var response = await client.GetAsync(url);
await ProcessResponseAsync(response);这假设您将在async void ProcessResponse(Task<HttpResponseMessage> response)之后将async Task ProcessResponseAsync(HttpResponseMessage response)更改为基于任务的异步模式。
var internalLinks = doc.DocumentNode.SelectNodes("//a[@href]").Where(x => x.Attributes["href"].Value.StartsWith("/")).Select(x => SiteUrl + x.Attributes["href"].Value).ToList();为什么这都是一条单线?您应该将其分解为多行,以使其更具可读性:
var internalLinks = doc.DocumentNode.SelectNodes("//a[@href]")
.Where(x => x.Attributes["href"].Value.StartsWith("/"))
.Select(x => SiteUrl + x.Attributes["href"].Value)
.ToList();您还可以使用查询语法来避免重复:
var internalLinks =
from node in doc.DocumentNode.SelectNodes("//a[@href]")
let href = node.Attributes["href"].Value
where href.StartsWith("/")
select SiteUrl + href;或者也许:
var internalLinks =
from node in doc.DocumentNode.SelectNodes("//a[@href]")
select node.Attributes["href"].Value into href
where href.StartsWith("/")
select SiteUrl + href;不同的是,第二个版本比第一个版本的效率略高,但我也认为它的可读性较低。
另外,我删除了ToList(),见下文。
internalLinks.ForEach(x => crawlList.AddUrl(x));我认为不应该使用List.ForEach。相反,可以使用foreach,或者向集合中添加类似AddUrls()的内容。
public string GetNext()
{
string url;
urlsToCrawl.TryTake(out url);
urlsCompleted.Add(url);
return url;
}那么,当urlsToCrawl为空时,您要将null添加到urlsCompleted中,然后返回它?对我来说听起来不是个好主意。
如果该方法应该仅在urlsToCrawl不为空时才被调用,那么当它被错误调用时,它应该抛出一个异常。
public void AddUrl(string url)
{
if (!UrlAlreadyAdded(url))
{
urlsToCrawl.Add(url);
Debug.Print("Adding Url " + url);
}
}这不是线程安全的。由于可以同时从多个线程调用此方法,因此可以多次下载相同的URL。
发布于 2014-10-03 19:18:55
do { if (workers >= MaxWorkers) continue; if (!crawlList.HasNext()) continue; Interlocked.Increment(ref workers); Debug.Write("Workers " + workers); ProcessUrl(crawlList.GetNext()); } while (crawlList.HasNext() || workers > 0);
假设您第一次访问这个代码块是为了爬行,那么您很可能会删除前2个if语句,方法是让它循环一个while循环,而不是do while,如下所示
while (crawlList.HasNext() && workers > 0 && workers <= MaxWorkers)
{
Interlocked.Increment(ref workers);
Debug.Write("Workers " + workers);
ProcessUrl(crawlList.GetNext());
}我还假设,如果没有工作人员,您也不会尝试为crawlList上的下一项分配工作人员,所以我将or (||)改为an和(&&)。
我还假设您不想检查您的最大工人数,所以我也将其添加到while语句的条件中。
我在看这个,我想你可能想要等待工人们被释放,而不是真正退出循环,当工人的工作量达到程序的最大值时,你可能想要一些东西来保持循环运行,直到一个工人被释放。
所以就像这样。
while (crawlList.HasNext() && workers > 0)
{
if (workers > MaxWorkers)
{
continue;
}
Interlocked.Increment(ref workers);
Debug.Write("Workers " + workers);
ProcessUrl(crawlList.GetNext());
} 发布于 2014-10-03 23:08:45
我认为你应该澄清async的作用。确实,它允许您在主线程之外运行一些缓慢的操作,但它并不是作为实现并行性的一种方式。这只是允许您的应用程序继续处理而不阻塞昂贵操作的一种方法。看看这段视频,它解释得很清楚。
您使用fire获得多个线程,而忘记了从async void派生的执行,但是您应该避免使用它们。它们只应在事件处理程序中使用。。
你需要一种更有条理的方法。您需要创建多个Task实例并并行启动它们。显然,每个任务都可以执行异步操作。这篇MSDN文章是一个很好的起点,让您了解如何做到这一点。
这是我想出的代码,它应该能更干净地发挥作用。
using System;
using System.Net.Http;
using System.Collections.Generic;
using System.Threading.Tasks;
using HtmlAgilityPack;
using System.Linq;
namespace Async
{
public class Crawler
{
private readonly HttpClient client;
private readonly int maxConcurrentDownload;
string baseUrl;
public Crawler (HttpClient client, string baseUrl, int maxConcurrentDownload)
{
this.maxConcurrentDownload = maxConcurrentDownload;
this.baseUrl = baseUrl;
this.client = client;
}
public async Task<IEnumerable<string>> Crawl(string startUrl)
{
var result = new List<string> (); // I know list is not the ideal data structure here. Let's use it for simplicity
var pagesToProcess = new Queue<string> ();
var runningTasks = new List<Task<IEnumerable<string>>> ();
runningTasks.Add (processUrl(startUrl));
result.Add (startUrl);
while (runningTasks.Any())
{
var firstCompletedTask = await Task.WhenAny (runningTasks);
runningTasks.Remove (firstCompletedTask);
var urlsFound = await firstCompletedTask;
/*
* At this point we know the url we found in the page
* we finished to process.
* It's time to enque them
*/
foreach (var url in urlsFound)
{
if(!result.Contains(url))
pagesToProcess.Enqueue (url);
}
/*
* Now we should start some new Taks
* to process new pages
*/
while (pagesToProcess.Any () && runningTasks.Count < maxConcurrentDownload)
{
var url = pagesToProcess.Dequeue ();
runningTasks.Add (processUrl (url));
result.Add(url);
}
}
return result;
}
private async Task<IEnumerable<string>> processUrl(string url)
{
Console.WriteLine ("Processing {0}", url);
var response = await client.GetAsync (url);
var childUrls = await ProcessResponse (response);
Console.WriteLine ("{0} child urls for {1}", childUrls.Count, url);
return childUrls;
}
private async Task<List<string>> ProcessResponse(HttpResponseMessage response)
{
var html = await response.Content.ReadAsStringAsync();
var doc = new HtmlDocument();
doc.LoadHtml(html);
return doc.DocumentNode.SelectNodes("//a[@href]").Where(x => x.Attributes["href"].Value.StartsWith("/")).Select(x => baseUrl + x.Attributes["href"].Value).ToList();
}
}
}你就是这样开始的:
public static void Main (string[] args)
{
Console.WriteLine ("Starting up!");
var crawler = new Crawler (new System.Net.Http.HttpClient (), "http://stackoverflow.com", 3);
crawler.Crawl ("http://stackoverflow.com").Wait();
}https://codereview.stackexchange.com/questions/64650
复制相似问题