首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >StormCrawler:集群的最佳拓扑结构

StormCrawler:集群的最佳拓扑结构
EN

Stack Overflow用户
提问于 2018-05-29 12:04:34
回答 1查看 355关注 0票数 1

我使用风暴爬虫来爬行40k站点,使用max_depth=2,我想尽可能快地完成它。我有5个风暴节点(具有不同的静态ips)和3个弹性节点。就目前而言,我最好的拓扑是:

代码语言:javascript
复制
spouts:
  - id: "spout"
    className: "com.digitalpebble.stormcrawler.elasticsearch.persistence.CollapsingSpout"
    parallelism: 10

bolts:
  - id: "partitioner"
    className: "com.digitalpebble.stormcrawler.bolt.URLPartitionerBolt"
    parallelism: 1
  - id: "fetcher"
    className: "com.digitalpebble.stormcrawler.bolt.FetcherBolt"
    parallelism: 5
  - id: "sitemap"
    className: "com.digitalpebble.stormcrawler.bolt.SiteMapParserBolt"
    parallelism: 5
  - id: "parse"
    className: "com.digitalpebble.stormcrawler.bolt.JSoupParserBolt"
    parallelism: 100
  - id: "index"
    className: "com.digitalpebble.stormcrawler.elasticsearch.bolt.IndexerBolt"
    parallelism: 25
  - id: "status"
    className: "com.digitalpebble.stormcrawler.elasticsearch.persistence.StatusUpdaterBolt"
    parallelism: 25
  - id: "status_metrics"
    className: "com.digitalpebble.stormcrawler.elasticsearch.metrics.StatusMetricsBolt"
    parallelism: 5

和爬虫配置:

代码语言:javascript
复制
config: 
  topology.workers: 5
  topology.message.timeout.secs: 300
  topology.max.spout.pending: 250
  topology.debug: false
  fetcher.threads.number: 500
  worker.heap.memory.mb: 4096

问题: 1)我应该使用AggreationsSpout还是CollapsingSpout,区别是什么?我尝试了AggregationSpout,但性能与默认配置的1台计算机的性能相同。

( 2)这种并行性的说法正确吗?

3)当我从1个节点跳到5个节点时,“获取错误”增加了约20%,许多站点没有被正确提取。可能是什么原因?

更新:

S-conf.yaml.

代码语言:javascript
复制
# configuration for Elasticsearch resources

config:
  # ES indexer bolt
  # adresses can be specified as a full URL
  # if not we assume that the protocol is http and the port 9200
  es.indexer.addresses: "1.1.1.1"
  es.indexer.index.name: "index"
  es.indexer.doc.type: "doc"
  es.indexer.create: false
  es.indexer.settings:
    cluster.name: "webcrawler-cluster"

  # ES metricsConsumer
  es.metrics.addresses: "http://1.1.1.1:9200"
  es.metrics.index.name: "metrics"
  es.metrics.doc.type: "datapoint"
  es.metrics.settings:
    cluster.name: "webcrawler-cluster"

  # ES spout and persistence bolt
  es.status.addresses: "http://1.1.1.1:9200"
  es.status.index.name: "status"
  es.status.doc.type: "status"
  #es.status.user: "USERNAME"
  #es.status.password: "PASSWORD"
  # the routing is done on the value of 'partition.url.mode'
  es.status.routing: true
  # stores the value used for the routing as a separate field
  # needed by the spout implementations
  es.status.routing.fieldname: "metadata.hostname"
  es.status.bulkActions: 500
  es.status.flushInterval: "5s"
  es.status.concurrentRequests: 1
  es.status.settings:
    cluster.name: "webcrawler-cluster"

  ################
  # spout config #
  ################

  # positive or negative filter parsable by the Lucene Query Parser
  # es.status.filterQuery: "-(metadata.hostname:stormcrawler.net)"

  # time in secs for which the URLs will be considered for fetching after a ack of fail
  es.status.ttl.purgatory: 30

  # Min time (in msecs) to allow between 2 successive queries to ES
  es.status.min.delay.queries: 2000

  es.status.max.buckets: 50
  es.status.max.urls.per.bucket: 2
  # field to group the URLs into buckets
  es.status.bucket.field: "metadata.hostname"
  # field to sort the URLs within a bucket
  es.status.bucket.sort.field: "nextFetchDate"
  # field to sort the buckets
  es.status.global.sort.field: "nextFetchDate"

  # Delay since previous query date (in secs) after which the nextFetchDate value will be reset
  es.status.reset.fetchdate.after: -1

  # CollapsingSpout : limits the deep paging by resetting the start offset for the ES query 
  es.status.max.start.offset: 500

  # AggregationSpout : sampling improves the performance on large crawls
  es.status.sample: false

  # AggregationSpout (expert): adds this value in mins to the latest date returned in the results and
  # use it as nextFetchDate
  es.status.recentDate.increase: -1
  es.status.recentDate.min.gap: -1

  topology.metrics.consumer.register:
       - class: "com.digitalpebble.stormcrawler.elasticsearch.metrics.MetricsConsumer"
         parallelism.hint: 1
         #whitelist:
         #  - "fetcher_counter"
         #  - "fetcher_average.bytes_fetched"
         #blacklist:
         #  - "__receive.*"
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-05-29 13:12:38

1)我应该使用AggreationsSpout还是CollapsingSpout,区别是什么?我尝试了AggregationSpout,但性能与默认配置的1台计算机的性能相同。

顾名思义,AggregationSpout使用聚合作为按主机(或域或IP )分组URL的机制,而CollapsingSpout使用塌陷。如果您将后者配置为每个桶(es.status.max.urls.per.bucket)有超过一个URL,则后者可能会慢一些,因为它为每个桶发出子查询。AggregationSpout应该具有良好的性能,特别是当es.status.sample设置为true时。CollapsingSpouts在这个阶段是实验性的。

( 2)这种并行结构正确吗?

这可能比需要的JSoupParserBolts更多。实际上,即使有500个抓取线程,与fetching螺栓相比,1:4的比例也不错。Storm对于发现瓶颈以及哪些组件需要扩展非常有用。其他一切看起来都没问题,但实际上,您应该查看Storm和度量标准,以便将拓扑调整到爬行的最佳设置。

3)当我从1个节点跳到5个节点时,“获取错误”增加了约20%,许多站点没有被正确提取。可能是什么原因?

这可能意味着您正在饱和您的网络连接,但当使用更多的节点时,情况不应该是这样的,相反。也许可以向Storm检查FetcherBolts是如何跨节点分布的。是一个工作人员运行所有实例,还是它们都得到一个相同的数字?看看日志,看看会发生什么,例如,是否有大量超时异常?

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50583974

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档