首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >并发性访谈

并发性访谈
EN

Code Review用户
提问于 2015-07-17 13:26:39
回答 1查看 503关注 0票数 12

不久前,我接受了一次采访,他们提出了一个问题,总之:

  1. 查看某个目录,并处理传入的JSON文件
  2. 这些JSON文件有不同的“类型”字段
  3. 每秒钟报告一次处理每个“类型”的文件的数量,以及所有文件的平均处理时间(从添加到磁盘,到添加到和)。

这个解决方案应该优化到最小的“处理时间”,这就是我选择并发方法的原因。

在我提交了解决方案后,我有一个月没有收到回复,被拒绝了,对我的解决方案没有任何反馈。因为我以前从来没有做过这样的事情,我希望它有问题,我非常感谢任何反馈。

下面是我表示Event的方式:

代码语言:javascript
复制
type Event struct {
    Type string
    CreatedAt time.Time
}

这是我的主回路:

代码语言:javascript
复制
func main() {
    config := Config{}
    config.readFrom("./config.json")

    fmt.Println("Monitoring...")
    fmt.Println("Press Enter to Exit")
    fmt.Println("--------------------\n")

    foundEvents := make(chan Event)
    normalizedEvents := make(chan Event)
    ticker := time.NewTicker(1 * time.Second)

    go discoverNewEvents(config, foundEvents)
    go normalizeEvents(foundEvents, normalizedEvents)
    go outputOnTick(normalizedEvents, ticker)

    bufio.NewReader(os.Stdin).ReadString('\n')
}

下面是discoverNewEvents,它监视目录并反序列化json

代码语言:javascript
复制
func discoverNewEvents(appConfig Config, out chan Event) {
    for _ = range time.Tick(1 * time.Microsecond) {
        filesInfo, _ := ioutil.ReadDir(appConfig.InputDirectory)

        for _, fileInfo := range filesInfo {
            inputPath := path.Join(appConfig.InputDirectory, fileInfo.Name())
            processedPath := path.Join(appConfig.ProcessedDirectory, fileInfo.Name())
            rawJson, _ := ioutil.ReadFile(inputPath)

            event := Event{}
            _ = json.Unmarshal(rawJson, &event)

            event.CreatedAt = fileInfo.ModTime()

            os.Rename(inputPath, processedPath)
            out <- event
        }
    }
}

下面是normalizeEvents,它做了一些轻微的数据规范化(使“类型”变成相同的情况):

代码语言:javascript
复制
func normalizeEvents(in chan Event, out chan Event) {
    for event := range in {
        normalized := Event{
            Type: strings.ToLower(event.Type),
            CreatedAt: event.CreatedAt }

        out <- normalized
    }
}

下面是outputOnTickselects在1秒内得出的结果:

代码语言:javascript
复制
func outputOnTick(in chan Event, ticker *time.Ticker) {
    results := newAggregator()

    for {
        select {
        case event := <-in:
            results.Consider(event)
        case <-ticker.C:
            fmt.Println(results.String())
            results.Init()
        }
    }
}

下面是结果聚合器,它用作事件和处理信息的数据库。它还懒散地计算结果字符串(在处理结束时):

代码语言:javascript
复制
type ResultAggregator struct {
    processingTimeSum time.Duration
    processingTimeCount int64
    normalizedTypeToCount map[string]int64
    mutex sync.Mutex
}

func newAggregator() ResultAggregator {
    aggregator := ResultAggregator{}
    aggregator.Init()
    return aggregator
}

func (self *ResultAggregator) Init() {
    self.mutex.Lock()
    defer self.mutex.Unlock()

    self.processingTimeSum = 0
    self.processingTimeCount = 0
    self.normalizedTypeToCount = map[string]int64{}
}

func (self *ResultAggregator) Consider(event Event) {
    self.mutex.Lock()
    defer self.mutex.Unlock()

    self.normalizedTypeToCount[event.Type] += 1
    self.processingTimeSum += time.Since(event.CreatedAt)
    self.processingTimeCount += 1
}

func (self *ResultAggregator) String() string {
    self.mutex.Lock()
    defer self.mutex.Unlock()

    return fmt.Sprintf(
        "DoorCnt:%d, ImgCnt:%d, AlarmCnt:%d, avgProcessingTime: %dms",
        self.normalizedTypeToCount["door"],
        self.normalizedTypeToCount["img"],
        self.normalizedTypeToCount["alarm"],
        self.averageProcessingTime())
}

func (self *ResultAggregator) averageProcessingTime() int64 {
    if self.processingTimeCount == 0 {
        return 0
    }

    return self.processingTimeSum.Nanoseconds() / (1000000 * self.processingTimeCount)
}

任何反馈都是值得赞赏的,但我想知道一些具体的事情:

  1. 正确性
  2. 成语我有一个main.go,它包含所有东西,但结果聚合器除外,它位于它自己的文件中。
  3. 并发--我正确地使用了Mutexes吗?大猩猩有什么问题吗?这里还有什么不对劲的地方(例如,意外的比赛条件)?
EN

回答 1

Code Review用户

发布于 2015-08-28 16:29:25

我看到的主要问题是,您似乎正忙着等待目录,即定期读取目录内容,以查看是否有新文件。

注意-我还没有编写一个golang程序,所以如果我错了,请纠正我,但是您是否试图每微秒轮询该目录?

正确的方法是使用来自操作系统的文件系统通知。例如,查看这个库:

https://github.com/go-fsnotify/fsnotify

使用文件系统通知,您的程序可以保持空闲状态,直到发生有趣的事情,这时您在通道上接收到一个事件。

这是一个有趣的架构问题,稍后我可能会更详细地介绍您的程序的其余部分。

其他一些小事..。

变量filesInfo应该被命名为fileInfos,即“S”应该在末尾。filesInfo传递关于一堆文件的单一信息的概念,而fileInfos则更清楚地传递了多个文件信息。

此外,“信息”是一个非常通用的术语。一切都是信息。看来fileInfo实际上是一个“目录条目”。您将看到POSIX中常用的术语"dirent“来描述目录条目,例如,用于readdir()系统调用的这个手册页:

http://man7.org/linux/man-pages/man3/readdir.3.html

票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/97229

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档