我正试图在Go中构建一个服务,该服务将实时流(socketio/signalR)延迟7分钟。它还应该允许一个无延迟的流.因此,Go服务应该有类似缓冲区或队列的东西,它强制数据在允许使用之前等待指定的持续时间。你要怎么做这样的事?延迟的溪流会是一条独立的峡谷吗?应该使用什么数据结构来延迟数据?
我目前的想法是使用time包等待/滴答7分钟,然后才允许使用数据,但是这种阻塞行为在这种情况下可能不是最优的。
这是一些代码来解释我想做什么。FakeStream是一个模拟函数,它模拟我从外部服务获得的实时流数据。
package main
import (
"fmt"
"time"
)
func DelayStream(input chan string, output chan string, delay string) {
// not working for some reason
// delayDuration, _ := time.ParseDuration(delay)
// fmt.Println(delayDuration.Seconds())
if delay == "5s" {
fmt.Println("sleeping")
time.Sleep(5 * time.Second)
}
data := <-input
output <- data
}
func FakeStream(live chan string) {
ticks := time.Tick(2 * time.Second)
for now := range ticks {
live <- fmt.Sprintf("%v", now.Format(time.UnixDate))
}
}
func main() {
liveData := make(chan string)
delayedData := make(chan string)
go FakeStream(liveData)
go DelayStream(liveData, delayedData, "5s")
for {
select {
case live := <-liveData:
fmt.Println("live: ", live)
case delayed := <-delayedData:
fmt.Println("delayed: ", delayed)
}
}
}由于某种原因,延迟通道只输出一次,并且没有输出预期的数据。它应该在实时频道中输出第一件东西,但它没有。
发布于 2019-03-08 13:59:25
您需要一个足够大小的缓冲区。对于简单的情况,缓冲的Go通道可以工作。
扪心自问--在这段时间里,你需要存储多少数据--你应该有一个合理的上限。例如,如果您的流每秒传送多达N个数据包,那么要延迟7分钟,您将需要存储420 N数据包。
扪心自问--如果延迟窗口中到达的数据比预期的多,会发生什么?您可以丢弃新的数据,或者丢弃旧的数据,或者只是阻塞输入流。其中哪一个对您的方案是可行的?每种方案的结果都略有不同。
问问自己-延迟是如何计算出来的?从流产生的那一刻起?从每个包裹到达的那一刻起?每个数据包的延迟是分开的,还是仅针对流中的第一个数据包?
为了开发一些示例代码,您需要在这里大大缩小设计选择范围。
对于这些设计选择的某些子集,下面是为每条消息在通道之间添加延迟的简单方法:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
// in is a channel of strings with a buffer size of 10
in := make(chan string, 10)
// out is an unbuffered channel
out := make(chan string)
// this goroutine forwards messages from in to out, ading a delay
// to each message.
const delay = 3 * time.Second
go func() {
for msg := range in {
time.Sleep(delay)
out <- msg
}
close(out)
}()
var wg sync.WaitGroup
wg.Add(1)
// this goroutine drains the out channel
go func() {
for msg := range out {
fmt.Printf("Got '%s' at time %s\n", msg, time.Now().Format(time.Stamp))
}
wg.Done()
}()
// Send some messages into the in channel
fmt.Printf("Sending '%s' at time %s\n", "joe", time.Now().Format(time.Stamp))
in <- "joe"
time.Sleep(2 * time.Second)
fmt.Printf("Sending '%s' at time %s\n", "hello", time.Now().Format(time.Stamp))
in <- "hello"
time.Sleep(4 * time.Second)
fmt.Printf("Sending '%s' at time %s\n", "bye", time.Now().Format(time.Stamp))
in <- "bye"
close(in)
wg.Wait()
}https://stackoverflow.com/questions/55062416
复制相似问题