如何将可观测到的块划分为块,其中每个块长度为n。
例如:
可观测的发射事件a, b, c, d, e, f ..和长度3
因此,a;b;c,d,e,f,.
splitIntoChunks: int ->可观测<‘a> ->可观测<’a list>
splitIntoChunks len observable = ?
发布于 2021-01-01 00:03:11
有一个名为Seq.chunkBySize的内置函数可以执行此操作。
发布于 2020-12-30 23:10:10
为了处理可观测数据,FSharp.Control.Reactive提供了反应性扩展 对于.NET的包装器,因此
#r "nuget: System.Reactive"
#r "nuget: FSharp.Control.Reactive"
open FSharp.Control.Reactive
Observable.generate
(0,1)
(fun _ -> true)
(fun (l, c) -> (c, l + c))
snd
|> Observable.bufferCount 3
|> Observable.take 5
|> Observable.subscribe (printfn "%A")收益率
seq [1; 1; 2]
seq [3; 5; 8]
seq [13; 21; 34]
seq [55; 89; 144]
seq [233; 377; 610]如果您需要一个列表,只需在订阅之前添加|> Observable.map Seq.toList即可。
第一版,包括NIH
为了处理可观察到的问题,我会利用反应性扩展 对于.NET来创建瘦包装。
#r "nuget: System.Reactive"
open System.Reactive.Linq
let chunkBySize (chunkSize : int) observable =
Observable.Buffer(observable, chunkSize)
let take (count : int) observable =
Observable.Take(observable, count)像这样使用它们
Observable.Generate(
(0,1),
(fun _ -> true),
(fun (l, c) -> (c, l + c)),
fun (_, r) -> r)
|> chunkBySize 3
|> take 5
|> Observable.subscribe (printfn "%A")产生与上面相同的结果。
发布于 2020-12-30 20:19:20
也许如下所示(未经测试并基于http://fssnip.net/7Z/title/Sliding-window-for-Observable)。
open System
module Observable =
/// Returns an observable that yields chunks of
/// containing elements drawn from the input observable.
/// Each chunk is returned as a fresh array.
let chunkBySize (count:int) (source:IObservable<_>) =
{ new IObservable<_> with
member x.Subscribe observer =
// Start an agent that remembers chunks of length
// smaller than the count (new agent for every observer)
let agent = MailboxProcessor.Start(fun agent ->
// The parameter 'lists' contains partial lists and their lengths
let rec loop lists = async {
// Receive the next value
let! value = agent.Receive()
let chunks =
((0, []) :: lists)
|> List.chunkBySize count
// Send all chunks to the observer (as arrays)
for c in chunks do
observer.OnNext(c |> Array.ofSeq |> Array.rev)
}
// Start with an empty list of partial lists
loop []
)
// Send incoming values to the agent
source.Subscribe (agent.Post)
}https://stackoverflow.com/questions/65512035
复制相似问题