首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >POC异步Web爬虫的实现

POC异步Web爬虫的实现
EN

Code Review用户
提问于 2014-10-03 14:09:39
回答 3查看 3.7K关注 0票数 4

我创建了一个概念web爬虫的小示例,以了解.NET中有关异步的更多信息。

当前,当运行时,它会以固定数量的当前请求(工作人员)爬行堆栈溢出。

我感兴趣的是,我是否犯了任何常见的错误,如果有更多的框架/ TPL,我可以使用它来减少代码的使用量。

要运行它,您需要安装HtmlAgilityPack。

我只对程序结构及其异步方面的反馈感兴趣。

代码语言:javascript
复制
class Program
{
    static void Main(string[] args)
    {
        var crawler = new WebCrawler();
        crawler.Start();
    }

    public class WebCrawler
    {
        private readonly HttpClient client;
        private const string SiteUrl = "http://stackoverflow.com";
        private CrawlList crawlList;
        private const int MaxWorkers =5;
        private int workers;

        public WebCrawler()
        {
            client = new HttpClient();
            crawlList = new CrawlList();
        }

        public void Start()
        {
            crawlList = new CrawlList();
            crawlList.AddUrl(SiteUrl);

            do
            {
                if (workers >= MaxWorkers) continue;

                if (!crawlList.HasNext()) continue;

                Interlocked.Increment(ref workers);

                Debug.Write("Workers " + workers);
                ProcessUrl(crawlList.GetNext());

            } while (crawlList.HasNext() || workers > 0);

        }

        private async void ProcessUrl(string url)
        {
            Debug.Print("Processing " + url);
            await client.GetAsync(url).ContinueWith(ProcessResponse);
        }

        private async void ProcessResponse(Task<HttpResponseMessage> response)
        {
            Debug.Print("Processing response ");
            var html = await response.Result.Content.ReadAsStringAsync();
            var doc = new HtmlDocument();
            doc.LoadHtml(html);
            var internalLinks = doc.DocumentNode.SelectNodes("//a[@href]").Where(x => x.Attributes["href"].Value.StartsWith("/")).Select(x =>  SiteUrl + x.Attributes["href"].Value).ToList();
            internalLinks.ForEach(x => crawlList.AddUrl(x));

            Interlocked.Decrement(ref workers);
        }
    }

    public class CrawlList
    {
        private readonly ConcurrentBag<string> urlsToCrawl;
        private readonly ConcurrentBag<string> urlsCompleted;

        public CrawlList()
        {
            urlsToCrawl = new ConcurrentBag<string>();
            urlsCompleted = new ConcurrentBag<string>();
        }

        public bool HasNext()
        {
            return urlsToCrawl.Any();
        }

        public string GetNext()
        {
            string url;
            urlsToCrawl.TryTake(out url);
            urlsCompleted.Add(url);
            return url;
        }

        public void AddUrl(string url)
        {
            if (!UrlAlreadyAdded(url))
            {
                urlsToCrawl.Add(url);
                Debug.Print("Adding Url "  + url);
            }
        }

        public bool UrlAlreadyAdded(string url)
        {
            return urlsToCrawl.Contains(url) || urlsCompleted.Contains(url);
        }
    }
}

在浏览了所有建议之后,下面我有了一个新的实现。我认为这个简单的POC涵盖了人们以前在我的代码中发现的大多数问题。很高兴听到所有的贡献者现在对此有何看法?

非常喜欢Task.WhenAny方法。认为我的工作是线程安全,但这是我唯一关心的。

代码语言:javascript
复制
class Program
    {

        static void Main(string[] args)
        {
            var crawler = new Crawler(5);

            try
            {
                crawler.Crawl("http://cask-marque.co.uk/").Wait();
            }
            catch (AggregateException e)
            {
                foreach (var ex in e.InnerExceptions)
                {
                    Console.WriteLine(ex.InnerException);
                }
                Console.ReadLine();
            }
        }


        public class Crawler
        {
            public readonly CrawlList CrawlList;
            private readonly List<Task<string>> runningTasks = new List<Task<string>>();
            private readonly HttpClient client;
            private readonly int maxConcurrentDownload;
            private const string BaseUrl = "http://cask-marque.co.uk";

            public Crawler(int maxConcurrentDownload)
            {
                CrawlList = new CrawlList();
                client = new HttpClient();
                this.maxConcurrentDownload = maxConcurrentDownload;
                ServicePointManager.DefaultConnectionLimit = maxConcurrentDownload;
            }

            public async Task<bool> Crawl(string startUrl)
            {
                runningTasks.Add(ProcessUrl(startUrl));

                while (runningTasks.Any())
                {
                    var completedTask = await Task.WhenAny(runningTasks);
                    runningTasks.Remove(completedTask);
                    var pageHtml = await completedTask;

                    while (CrawlList.HasNext() && runningTasks.Count < maxConcurrentDownload)
                    {
                        var url = CrawlList.GetNext();
                        runningTasks.Add(ProcessUrl(url));
                    }
                }

                return true;
            }


            private async Task<string> ProcessUrl(string url)
            {
                Console.WriteLine("url " +  url);
                var response = await client.GetAsync(url);
                var content = await response.Content.ReadAsStringAsync();
                var doc = new HtmlDocument();
                doc.LoadHtml(content);
                var urls = doc.DocumentNode.SelectNodes("//a[@href]").Where(x => x.Attributes["href"].Value.StartsWith("/")).Select(x => BaseUrl + x.Attributes["href"].Value).ToList();
                CrawlList.AddUrls(urls);
                return content;
            }
        }

        public class CrawlList
        {
            public readonly Queue<string> UrlsToCrawl;
            public readonly List<string> UrlsCompleted;

            public CrawlList()
            {
                UrlsToCrawl = new Queue<string>();
                UrlsCompleted = new List<string>();
            }

            public bool HasNext()
            {
                return UrlsToCrawl.Any();
            }

            public string GetNext()
            {
                return UrlsToCrawl.Dequeue();
            }

            public void AddUrls(List<string> urls)
            {
                foreach (var url in urls)
                {
                    AddUrl(url);
                }
            }

            public void AddUrl(string url)
            {
                if (UrlAlreadyAdded(url)) return;

                UrlsToCrawl.Enqueue(url);
            }

            public bool UrlAlreadyAdded(string url)
            {
                return UrlsToCrawl.Contains(url) || UrlsCompleted.Contains(url);
            }
        }
    }
}
EN

回答 3

Code Review用户

回答已采纳

发布于 2014-10-03 23:53:26

代码语言:javascript
复制
crawler.Start();

这是非常混乱的命名。如果一个方法被称为Start(),我希望它能够启动工作,然后返回,而不是完成所有的工作。更好的名称应该是类似于Run()的名称,或者更好的名称,更具体的名称。

代码语言:javascript
复制
private const string SiteUrl = "http://stackoverflow.com";

为什么这个在这里?我认为,作为构造函数参数,或者作为Start()的参数,这更有意义。

代码语言:javascript
复制
private const int MaxWorkers =5;

同样,这可能是可配置的。而且,如果您正在处理单个网站,那么比默认设置为2的ServicePointManager.DefaultConnectionLimit更多的工作人员没有多大意义。

代码语言:javascript
复制
public WebCrawler()
{
    client = new HttpClient();
    crawlList = new CrawlList();
}

public void Start()
{
    crawlList = new CrawlList();

如果每次调用crawlList时都要设置Start(),则不需要在构造函数中设置它。

代码语言:javascript
复制
do
{
    …
} while (crawlList.HasNext() || workers > 0);

所以,当一些下载正在进行中时,您将循环使用并浪费整个CPU核心来做任何有用的事情?那是个糟糕的主意。墨西哥人的回答有一种解决办法。

代码语言:javascript
复制
await client.GetAsync(url).ContinueWith(ProcessResponse);

这里不需要ContinueWith(),这就是await的目的:

代码语言:javascript
复制
var response = await client.GetAsync(url);
await ProcessResponseAsync(response);

这假设您将在async void ProcessResponse(Task<HttpResponseMessage> response)之后将async Task ProcessResponseAsync(HttpResponseMessage response)更改为基于任务的异步模式

代码语言:javascript
复制
var internalLinks = doc.DocumentNode.SelectNodes("//a[@href]").Where(x => x.Attributes["href"].Value.StartsWith("/")).Select(x =>  SiteUrl + x.Attributes["href"].Value).ToList();

为什么这都是一条单线?您应该将其分解为多行,以使其更具可读性:

代码语言:javascript
复制
var internalLinks = doc.DocumentNode.SelectNodes("//a[@href]")
    .Where(x => x.Attributes["href"].Value.StartsWith("/"))
    .Select(x =>  SiteUrl + x.Attributes["href"].Value)
    .ToList();

您还可以使用查询语法来避免重复:

代码语言:javascript
复制
var internalLinks =
    from node in doc.DocumentNode.SelectNodes("//a[@href]")
    let href = node.Attributes["href"].Value
    where href.StartsWith("/")
    select SiteUrl + href;

或者也许:

代码语言:javascript
复制
var internalLinks =
    from node in doc.DocumentNode.SelectNodes("//a[@href]")
    select node.Attributes["href"].Value into href
    where href.StartsWith("/")
    select SiteUrl + href;

不同的是,第二个版本比第一个版本的效率略高,但我也认为它的可读性较低。

另外,我删除了ToList(),见下文。

代码语言:javascript
复制
internalLinks.ForEach(x => crawlList.AddUrl(x));

我认为不应该使用List.ForEach。相反,可以使用foreach,或者向集合中添加类似AddUrls()的内容。

代码语言:javascript
复制
public string GetNext()
{
    string url;
    urlsToCrawl.TryTake(out url);
    urlsCompleted.Add(url);
    return url;
}

那么,当urlsToCrawl为空时,您要将null添加到urlsCompleted中,然后返回它?对我来说听起来不是个好主意。

如果该方法应该仅在urlsToCrawl不为空时才被调用,那么当它被错误调用时,它应该抛出一个异常。

代码语言:javascript
复制
public void AddUrl(string url)
{
    if (!UrlAlreadyAdded(url))
    {
        urlsToCrawl.Add(url);
        Debug.Print("Adding Url "  + url);
    }
}

这不是线程安全的。由于可以同时从多个线程调用此方法,因此可以多次下载相同的URL。

票数 5
EN

Code Review用户

发布于 2014-10-03 19:18:55

do { if (workers >= MaxWorkers) continue; if (!crawlList.HasNext()) continue; Interlocked.Increment(ref workers); Debug.Write("Workers " + workers); ProcessUrl(crawlList.GetNext()); } while (crawlList.HasNext() || workers > 0);

假设您第一次访问这个代码块是为了爬行,那么您很可能会删除前2个if语句,方法是让它循环一个while循环,而不是do while,如下所示

代码语言:javascript
复制
while (crawlList.HasNext() && workers > 0 && workers <= MaxWorkers)
{
    Interlocked.Increment(ref workers);
    Debug.Write("Workers " + workers);
    ProcessUrl(crawlList.GetNext());
}

我还假设,如果没有工作人员,您也不会尝试为crawlList上的下一项分配工作人员,所以我将or (||)改为an和(&&)。

我还假设您不想检查您的最大工人数,所以我也将其添加到while语句的条件中。

我在看这个,我想你可能想要等待工人们被释放,而不是真正退出循环,当工人的工作量达到程序的最大值时,你可能想要一些东西来保持循环运行,直到一个工人被释放。

所以就像这样。

代码语言:javascript
复制
while (crawlList.HasNext() && workers > 0)
{
    if (workers > MaxWorkers)
    {
        continue;
    }
    Interlocked.Increment(ref workers);
    Debug.Write("Workers " + workers);
    ProcessUrl(crawlList.GetNext());
}   
票数 3
EN

Code Review用户

发布于 2014-10-03 23:08:45

我认为你应该澄清async的作用。确实,它允许您在主线程之外运行一些缓慢的操作,但它并不是作为实现并行性的一种方式。这只是允许您的应用程序继续处理而不阻塞昂贵操作的一种方法。看看这段视频,它解释得很清楚。

您使用fire获得多个线程,而忘记了从async void派生的执行,但是您应该避免使用它们。它们只应在事件处理程序中使用。

你需要一种更有条理的方法。您需要创建多个Task实例并并行启动它们。显然,每个任务都可以执行异步操作。这篇MSDN文章是一个很好的起点,让您了解如何做到这一点。

这是我想出的代码,它应该能更干净地发挥作用。

代码语言:javascript
复制
using System;
using System.Net.Http;
using System.Collections.Generic;
using System.Threading.Tasks;
using HtmlAgilityPack;
using System.Linq;

namespace Async
{
    public class Crawler
    {
        private readonly HttpClient client;
        private readonly int maxConcurrentDownload;
        string baseUrl;

        public Crawler (HttpClient client, string baseUrl, int maxConcurrentDownload)
        {
            this.maxConcurrentDownload = maxConcurrentDownload;
            this.baseUrl = baseUrl;
            this.client = client;
        }

        public async Task<IEnumerable<string>> Crawl(string startUrl)
        {
            var result = new List<string> (); // I know list is not the ideal data structure here. Let's use it for simplicity
            var pagesToProcess = new Queue<string> ();

            var runningTasks = new List<Task<IEnumerable<string>>> ();
            runningTasks.Add (processUrl(startUrl));
            result.Add (startUrl);

            while (runningTasks.Any())
            {
                var firstCompletedTask = await Task.WhenAny (runningTasks);
                runningTasks.Remove (firstCompletedTask);
                var urlsFound = await firstCompletedTask;

                /*
                 * At this point we know the url we found in the page
                 * we finished to process. 
                 * It's time to enque them
                 */
                foreach (var url in urlsFound)
                {
                    if(!result.Contains(url))
                        pagesToProcess.Enqueue (url);
                }
                /*
                 * Now we should start some new Taks
                 * to process new pages
                 */
                while (pagesToProcess.Any () && runningTasks.Count < maxConcurrentDownload)
                {
                    var url = pagesToProcess.Dequeue ();
                    runningTasks.Add (processUrl (url));
                    result.Add(url);
                }
            }
            return result;
        }

        private async Task<IEnumerable<string>> processUrl(string url)
        {
            Console.WriteLine ("Processing {0}", url);
            var response = await client.GetAsync (url);
            var childUrls = await ProcessResponse (response);
            Console.WriteLine ("{0} child urls for {1}", childUrls.Count, url);
            return childUrls;
        }

        private async Task<List<string>> ProcessResponse(HttpResponseMessage response)
        {
            var html = await response.Content.ReadAsStringAsync();
            var doc = new HtmlDocument();
            doc.LoadHtml(html);
            return doc.DocumentNode.SelectNodes("//a[@href]").Where(x => x.Attributes["href"].Value.StartsWith("/")).Select(x => baseUrl + x.Attributes["href"].Value).ToList();
        }
    }
}

你就是这样开始的:

代码语言:javascript
复制
    public static void Main (string[] args)
    {
        Console.WriteLine ("Starting up!");
        var crawler = new Crawler (new System.Net.Http.HttpClient (), "http://stackoverflow.com", 3);
        crawler.Crawl ("http://stackoverflow.com").Wait();
    }
票数 1
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/64650

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档