7.1 Action Abort(终止流程)KisFlow Action 是指在执行Function的时候,同时可以控制Flow的调度逻辑,KisFlow提供一些Action动作让开发者做选择,本节先介绍最简单的 7.1.3 Next方法实现接下来,我们需要给KisFlow模块实现这个接口,首先需要给KisFlow添加一个Action{}成员,表示每次执行完Function之后所携带的动作。 kis-flow/flow/kis_flow.go// KisFlow 用于贯穿整条流式计算的上下文环境type KisFlow struct {// 基础信息Id string kis-flow/flow/kis_flow.go// Run 启动KisFlow的流式计算, 从起始Function开始执行流func (flow *KisFlow) Run(ctx context.Context 最后就是将Action的Abort状态传递给KisFlow的Abort状态。
1.3 KisFlow系统定位KisFlow为业务上游计算层,上层接数仓/其他业务方ODS层、下游接本业务存储数据中心。 1.4 KisFlow整体架构图层级层级说明包括子模块流式计算层为KisFlow上游计算层,直接对接业务存储及数仓ODS层,如上游可以为Mysql Binlog、日志、接口数据等,为被动消费模式,提供KisFlow KisFlow:分布式批量消费者,一个KisFlow是由多个KisFunction组合。 KisConnectors:计算数据流流中间状态持久存储及连接器。 (2) = KisFlow(1) + KisFlow(2)KisFlow(3) = KisFlow(1) + KisFlow(2) + KisFlow(3)1.5 KisFlow关键模块1.5.1 KisConfigKisConfig 2、一个KisFunction可以随时动态的加入到某个KisFlow中,且KisFlow和KisFlow之间的关系可以通过KisFunction的Load和Save节点的加入,进行动态的并流和分流动作。
9.1 多副本能力KisFlow如果在执行流体中,需要被多个Goroutine来并发使用,可能需要同一个配置的创建多个Flow来匹配多个并发的计算流,所以Flow需要一个创建副本的能力。 // +++++++++++++++++++++++++// Fork 得到Flow的一个副本(深拷贝)Fork(ctx context.Context) Flow}Fork()会根据一个已有的KisFlow 实例,完全克隆一个资源隔离的但是具有相同配置的KisFlow实例。 具体的实现方法如下:kis-flow/flow/kis_flow.go// Fork 得到Flow的一个副本(深拷贝)func (flow *KisFlow) Fork(ctx context.Context 9.3 【V0.8】源代码https://github.com/aceld/kis-flow/releases/tag/v0.8KisFlow开源项目地址:https://github.com/aceld
KisFlow源代码:https://github.com/aceld/kis-flow本章将设计KisFlow的Connector模块,期功能及作用主要为挂载在某个Function下,执行第三方存储引擎的逻辑 5.1 Connector定义KisFlow中提供Connector,来给开发者定义第三方存储引擎的自定义读写插件模式。 之后,在KisFlow中实现这两个方法。 创建一个 KisFlow 配置实例myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)// 5. 创建一个KisFlow对象flow1 := flow.NewKisFlow(myFlowConfig1)// 6.
string = "kisflow_data_total"COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow全部Flow的数据总量")10.3.2 DataTotal KisFlow全部Flow的数据总量# TYPE kisflow_data_total counterkisflow_data_total 10# ... ...其中我们会发现,kisflow_data_total string = "kisflow_data_total"COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow全部Flow的数据总量" // +++ string = "kisflow_data_total"COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow全部Flow的数据总量"GANGE_FLOW_DATA_TOTAL_NAME string = "kisflow_data_total"COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow全部Flow的数据总量"GANGE_FLOW_DATA_TOTAL_NAME
kis-flow源代码:https://github.com/aceld/kis-flow4.1 Router现在,将KisFlow提供对外Function开放注册能力,首先我们要定义一些注册函数原型, map[string]FaaS// flowRouter// key: Flow Name// value: Flowtype flowRouter map[string]FlowFaaS:是开发者给KisFlow 创建一个 KisFlow 配置实例myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)// 3. 创建一个KisFlow对象flow1 := flow.NewKisFlow(myFlowConfig1)// 4. 好了,现在Function的业务能力已经开放给开发者了,接下来我们来继续完善KisFlow的能力。
而且KisFlow支持批量数据的流逝计算处理。 3.2 KisFlow数据流处理在KisFlow模块中,新增一些存放数据的成员,如下:kis-flow/flow/kis_flow.go// KisFlow 用于贯穿整条流式计算的上下文环境type KisFlow 在kis-flow/flow/kis_flow_data.go中实现KisFlow的该接口。 3.2.3 KisFlow内部数据提交现在开发者可以通过CommitRow()将数据提交到buffer中,但是在KisFlow内部需要一个内部接口来将buffer提交到KisFlow的data中,作为之后当前 kis-flow/flow/kis_flow.go// Run 启动KisFlow的流式计算, 从起始Function开始执行流func (flow *KisFlow) Run(ctx context.Context
8.1 Flow Cache 数据流缓存KisFlow也提供流式计算中的共享缓存,采用简单的本地缓存供开发者按需使用,有关本地缓存的第三方技术依赖选型: https://github.com/patrickmn (*MyStruct)// ...}}详细参考:https://github.com/patrickmn/go-cache8.1.2 KisFlow集成go-cache能力(1) Flow提供抽象层接口在 kis-flow/flow/kis_flow.go// KisFlow 用于贯穿整条流式计算的上下文环境type KisFlow struct { // ... ... // ... ... 8.2.1 Flow添加MetaData首先,KisFlow的成员新增metaData map[string]interface{}和对应的读写锁。 kis-flow/flow/kis_flow.go// KisFlow 用于贯穿整条流式计算的上下文环境type KisFlow struct {// ... ... // ... ...
kis-flow源代码:https://github.com/aceld/kis-flow首先我们要先定义KisFlow的核心结构体,KisFlow结构体,通过上述的设计理念,我们得知,KisFlow表示整个一条数据计算流的结构 KisID为Function的实例ID,用于KisFlow内部区分不同的实例对象。 KisFlow.func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {flow := new(KisFlow)// 基础信息flow.Id = id.KisID 创建一个 KisFlow 配置实例myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)// 3. 创建一个KisFlow对象flow1 := flow.NewKisFlow(myFlowConfig1)// 4.
KisFlow源代码:https://github.com/aceld/kis-flow2. V0.1-项目构建及基础模块定义首先我们创建我们的项目,项目的主文件目录就叫KisFlow,且在Github上创建对应的仓库: https://github.com/aceld/kis-flow 然后将项目代码 flow/为存放KisFlow的核心代码。 function/为存放KisFunction的核心代码。 conn/为存放KisConnector的核心代码。 example/为我们针对KisFlow的一些测试案例和test单元测试案例等,能够及时验证我们的项目效果。 kis/来存放所有模块的抽象层。 结构体定义接下来我们根据上述的配置协议,来定义KisFlow的策略配置结构体,并且提供一些响应的初始化方法。
https://github.com/aceld/kis-flow6.1 配置的导入现在每次建立Flow和Function等,都需要一系列繁琐的添加,不是很方便,接下来,我们可以通过批量读写配置文件,构建KisFlow 中的结构关系,并且也可以将KisFlow的结构导出到本地文件中。 6.1.1 创建配置文件首先我们在kis-flow/test/load\_conf/下创建需要加载的kisflow业务配置文件。 kis-flow/flow/kis_flow.go// KisFlow 用于贯穿整条流式计算的上下文环境type KisFlow struct { // 基础信息 Id string 下面想Funcs成员赋值的代码做一个简单的修正// appendFunc 将Function添加到Flow中, 链表操作func (flow \*KisFlow) appendFunc(function
接下来我们来增强KisFlow中Function对业务数据处理的聚焦,将之前Function的写法:func FuncDemo3Handler(ctx context.Context, flow kis.Flow 本章将实现KisFlow上述功能。 KisFlow会提供一个默认的Serialize给每个FaaS函数,开发者也可以自定义自己的Serialize来对FaaS传递的形参进行自定义的数据序列化动作。 11.2.2 KisFlow默认的Serialize序列化KisFlow提供一个默认的Serialize序列化实例,主要以Json格式为主,在kis-flow/下创建serialize/文件夹,在kis-flow 注意:KisFlow目前的默认序列化只实现了json格式的序列化,开发者可以参考DefaultSerialize{} 来实现自己其他格式的数据序列化和反序列化动作。