首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache束Min,最大值和平均值

Apache束Min,最大值和平均值
EN

Stack Overflow用户
提问于 2019-01-28 09:46:03
回答 1查看 1.5K关注 0票数 3

从这个link中,Guillem Xercavins编写了一个计算最小和最大值的自定义类。

代码语言:javascript
复制
class MinMaxFn(beam.CombineFn):
  # initialize min and max values (I assumed int type)
  def create_accumulator(self):
    return (sys.maxint, 0)

  # update if current value is a new min or max
  def add_input(self, min_max, input):
    (current_min, current_max) = min_max
    return min(current_min, input), max(current_max, input)

  def merge_accumulators(self, accumulators):
    return accumulators

  def extract_output(self, min_max):
    return min_max

我还需要计算平均值,我发现了如下示例代码:

代码语言:javascript
复制
class MeanCombineFn(beam.CombineFn):
  def create_accumulator(self):
    """Create a "local" accumulator to track sum and count."""
    return (0, 0)

  def add_input(self, (sum_, count), input):
    """Process the incoming value."""
    return sum_ + input, count + 1

  def merge_accumulators(self, accumulators):
    """Merge several accumulators into a single one."""
    sums, counts = zip(*accumulators)
    return sum(sums), sum(counts)

  def extract_output(self, (sum_, count)):
    """Compute the mean average."""
    if count == 0:
      return float('NaN')
    return sum_ / float(count)

您知道如何将平均方法合并到MinMax中吗?这样我就只能有一个类,它能够计算最小值、最大值和平均值,并生成一组键和值--数组为3个值?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-01-29 02:23:53

下面是组合类的解决方案,加上中位数

代码语言:javascript
复制
import numpy as np

class MinMaxMeanFn(beam.CombineFn):

    def create_accumulator(self):
        # sum, min, max, count, median
        return (0.0, 999999999.0, 0.0, 0, [])

    def add_input(self, cur_data, input):
        (cur_sum, cur_min, cur_max, count, cur_median) = cur_data
        if type(input) == list:
            cur_count = len(input)
            sum_input = sum(input)
            min_input = min(input)
            max_input = max(input)
        else:
            sum_input = input
            cur_count = 1
        return cur_sum + sum_input, min(min_input, cur_min), max(max_input, cur_max), count + cur_count, cur_median + input

    def merge_accumulators(self, accumulators):
        sums, mins, maxs, counts, medians = zip(*accumulators)
        return sum(sums), min(mins), max(maxs), sum(counts), medians

    def extract_output(self, cur_data):
        (sum, min, max, count, medians) = cur_data
        avg = sum / count if count else float('NaN')
        med = np.median(medians)
        return  {
            "max": max,
            "min": min,
            "avg": avg,
            "count": count,
            "median": med
        }

示例用法:

代码语言:javascript
复制
( input |'Format Price' >> beam.ParDo(FormatPriceDoFn())
                        |'Group Price by ID' >> beam.GroupByKey()
                        |'Compute price statistic for each ID' >> beam.CombinePerKey(MinMaxMeanFn()))

*我没有测试CombinePerKey是否在没有GroupByKey的情况下工作,可以自由地测试它。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54399205

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档