首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何延迟实时流?

如何延迟实时流?
EN

Stack Overflow用户
提问于 2019-03-08 11:34:29
回答 1查看 354关注 0票数 2

我正试图在Go中构建一个服务,该服务将实时流(socketio/signalR)延迟7分钟。它还应该允许一个无延迟的流.因此,Go服务应该有类似缓冲区或队列的东西,它强制数据在允许使用之前等待指定的持续时间。你要怎么做这样的事?延迟的溪流会是一条独立的峡谷吗?应该使用什么数据结构来延迟数据?

我目前的想法是使用time包等待/滴答7分钟,然后才允许使用数据,但是这种阻塞行为在这种情况下可能不是最优的。

这是一些代码来解释我想做什么。FakeStream是一个模拟函数,它模拟我从外部服务获得的实时流数据。

代码语言:javascript
复制
package main

import (
    "fmt"
    "time"
)

func DelayStream(input chan string, output chan string, delay string) {

    // not working for some reason
    // delayDuration, _ := time.ParseDuration(delay)
    // fmt.Println(delayDuration.Seconds())

    if delay == "5s" {
        fmt.Println("sleeping")
        time.Sleep(5 * time.Second)
    }
    data := <-input
    output <- data
}

func FakeStream(live chan string) {

    ticks := time.Tick(2 * time.Second)
    for now := range ticks {
        live <- fmt.Sprintf("%v", now.Format(time.UnixDate))
    }
}

func main() {
    liveData := make(chan string)
    delayedData := make(chan string)

    go FakeStream(liveData)
    go DelayStream(liveData, delayedData, "5s")

    for {
        select {
        case live := <-liveData:
            fmt.Println("live: ", live)
        case delayed := <-delayedData:
            fmt.Println("delayed: ", delayed)
        }
    }
}

由于某种原因,延迟通道只输出一次,并且没有输出预期的数据。它应该在实时频道中输出第一件东西,但它没有。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-03-08 13:59:25

您需要一个足够大小的缓冲区。对于简单的情况,缓冲的Go通道可以工作。

扪心自问--在这段时间里,你需要存储多少数据--你应该有一个合理的上限。例如,如果您的流每秒传送多达N个数据包,那么要延迟7分钟,您将需要存储420 N数据包。

扪心自问--如果延迟窗口中到达的数据比预期的多,会发生什么?您可以丢弃新的数据,或者丢弃旧的数据,或者只是阻塞输入流。其中哪一个对您的方案是可行的?每种方案的结果都略有不同。

问问自己-延迟是如何计算出来的?从流产生的那一刻起?从每个包裹到达的那一刻起?每个数据包的延迟是分开的,还是仅针对流中的第一个数据包?

为了开发一些示例代码,您需要在这里大大缩小设计选择范围。

对于这些设计选择的某些子集,下面是为每条消息在通道之间添加延迟的简单方法:

代码语言:javascript
复制
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    // in is a channel of strings with a buffer size of 10
    in := make(chan string, 10)

    // out is an unbuffered channel
    out := make(chan string)

    // this goroutine forwards messages from in to out, ading a delay
    // to each message.
    const delay = 3 * time.Second
    go func() {
        for msg := range in {
            time.Sleep(delay)
            out <- msg
        }
        close(out)
    }()

    var wg sync.WaitGroup
    wg.Add(1)
    // this goroutine drains the out channel
    go func() {
        for msg := range out {
            fmt.Printf("Got '%s' at time %s\n", msg, time.Now().Format(time.Stamp))
        }
        wg.Done()
    }()

    // Send some messages into the in channel
    fmt.Printf("Sending '%s' at time %s\n", "joe", time.Now().Format(time.Stamp))
    in <- "joe"

    time.Sleep(2 * time.Second)
    fmt.Printf("Sending '%s' at time %s\n", "hello", time.Now().Format(time.Stamp))
    in <- "hello"

    time.Sleep(4 * time.Second)
    fmt.Printf("Sending '%s' at time %s\n", "bye", time.Now().Format(time.Stamp))
    in <- "bye"
    close(in)

    wg.Wait()
}
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55062416

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档