首页
学习
活动
专区
圈层
工具
发布
    • 综合排序
    • 最热优先
    • 最新优先
    时间不限
  • 来自专栏KisFlow-Golang流式计算框架

    Golang框架实战-KisFlow流式实时计算框架(5)-Function调度

    创建kis-flow/kis/router.go,定义原型如下:kis-flow/kis/router.gopackage kisimport "context"// FaaS Function as kis-flow/kis/pool.gopackage kisimport ("context""errors""fmt""kis-flow/log""sync")var _poolOnce sync.Once kis-flow/function/kis_function_c.gopackage functionimport ("context""kis-flow/kis""kis-flow/log")type 4.4.1 自定义FaaSkis-flow/test/kis_pool_test.gopackage testimport ("context""fmt""kis-flow/common""kis-flow /config""kis-flow/flow""kis-flow/kis""testing")func funcName1Handler(ctx context.Context, flow kis.Flow

    25610编辑于 2024-04-25
  • 来自专栏KisFlow-Golang流式计算框架

    Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数

    kis-flow/flow/kis_flow.go// KisFlow 用于贯穿整条流式计算的上下文环境type KisFlow struct {// ... ... // ... ... 最后来实现,如下:kis-flow/flow/kis_flow_data.go// GetMetaData 得到当前Flow对象的临时数据func (flow *KisFlow) GetMetaData 其次,给Connector抽象层,提供获取和设置MetaData的接口,如下:kis-flow/kis/connector.gotype Connector interface {// Init 初始化 /test/kis_params_test.gopackage testimport ("context""kis-flow/common""kis-flow/file""kis-flow/kis""kis-flow /aceld KisFlow开源项目地址:https://github.com/aceld/kis-flow

    31110编辑于 2024-07-18
  • 来自专栏KisFlow-Golang流式计算框架

    Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)

    kis-flow源代码:https://github.com/aceld/kis-flow首先我们要先定义KisFlow的核心结构体,KisFlow结构体,通过上述的设计理念,我们得知,KisFlow表示整个一条数据计算流的结构 kis-flow/kis/function.gopackage kisimport ("context""kis-flow/config")// Function 流式计算基础计算模块,KisFunction kis-flow/config""kis-flow/id""kis-flow/kis")type BaseFunction struct {// Id , KisFunction的实例ID,用于KisFlow kis-flow/kis/flow.gopackage kisimport ("context""kis-flow/config")type Flow interface { // TODO}在kis-flow kis-flow/test/kis_function_test.gopackage testimport ("context""kis-flow/common""kis-flow/config""kis-flow

    23810编辑于 2024-04-19
  • 来自专栏KisFlow-Golang流式计算框架

    Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)

    2.1 KisLogger2.1.1 Logger抽象接口将Logger的定义在kis-flow/log/目录下,创建kis_log.go文件:kis-flow/log/kis_log.gopackage kis-flow/test/kis_log_test.gopackage testimport ("context""kis-flow/log""testing")func TestKisLogger( 结构体定义kis-flow/config/kis_func_config.gopackage configimport ("kis-flow/common""kis-flow/log")// FParam kis-flow/config/kis_flow_config.gopackage configimport "kis-flow/common"// KisFlowFunctionParam 一个Flow kis-flow/config/kis_conn_config.gopackage configimport ("errors""fmt""kis-flow/common")// KisConnConfig

    28410编辑于 2024-04-18
  • 来自专栏KisFlow-Golang流式计算框架

    Golang框架实战-KisFlow流式计算框架(11)-Prometheus Metrics统计

    kis-flow/下新建kis-flow/metrics/目录,作为KisFlow统计指标的代码部分。 /promhttp""kis-flow/common""kis-flow/log""net/http")// RunMetricsService 启动Prometheus监控服务func RunMetricsService 在kis-flow/test/下创建prometheus_server_test.go文件:kis-flow/test/prometheus_server_test.gopackage testimport /common""kis-flow/file""kis-flow/kis""kis-flow/test/caas""kis-flow/test/faas""testing""time")func TestMetricsDataTotal /kis_metrics_test.gopackage testimport ("context""kis-flow/common""kis-flow/file""kis-flow/kis""kis-flow

    39010编辑于 2024-07-22
  • 来自专栏KisFlow-Golang流式计算框架

    Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出

    kis-flow/test/kis_config_import_test.gopackage testimport ( "context" "kis-flow/common" "kis-flow /file" "kis-flow/kis" "kis-flow/test/caas" "kis-flow/test/faas" "testing")func TestConfigImportYmal /yaml.v3" "io/ioutil" "kis-flow/common" "kis-flow/kis")// ConfigExportYaml 将flow配置输出,且存储本地func /kis/flow.gopackage kisimport ( "context" "kis-flow/common" "kis-flow/config")type Flow interface package testimport ( "kis-flow/common" "kis-flow/file" "kis-flow/kis" "kis-flow/test/caas

    25010编辑于 2024-06-27
  • 来自专栏KisFlow-Golang流式计算框架

    Golang框架实战-KisFlow流式计算框架(12)-基于反射自适应注册FaaS形参类型

    11.1.1 FaaSDesc 回调自描述类型在kis-flow/kis/下,新创建一个文件faas.go,定义如下结构体:kis-flow/kis/faas.go// FaaS Function as 11.2 FaaS形参的自定义数据类型序列化11.2.1 Serialize序列化接口首先,我们定义一个数据序列化接口,在kis-flow/kis/下创建serialize.go 文件,如下:kis-flow 11.2.2 KisFlow默认的Serialize序列化KisFlow提供一个默认的Serialize序列化实例,主要以Json格式为主,在kis-flow/下创建serialize/文件夹,在kis-flow /test/faas/faas_stu_score_avg.gopackage faasimport ("context""kis-flow/kis""kis-flow/serialize""kis-flow ("context""kis-flow/common""kis-flow/config""kis-flow/file""kis-flow/flow""kis-flow/kis""kis-flow/test

    28510编辑于 2024-07-23
  • 来自专栏KisFlow-Golang流式计算框架

    Golang框架实战-KisFlow流式计算框架(4)-数据流

    kis-flow源代码:https://github.com/aceld/kis-flow3.1 数据类型定义KisFlow中可以传递任意类型数据作为Flow的数据源。 首先需要对KisFlow中内部支持的数据类型做一个基本的定义,我们将这部分的定义代码写在kis-flow/common/中的data_type.go 文件中。 /kis/flow.gopackage kisimport ("context""kis-flow/common""kis-flow/config")type Flow interface {// Run 在kis-flow/flow/kis_flow_data.go中实现KisFlow的该接口。 kis-flow/kis/flow.gopackage kisimport ("context""kis-flow/common""kis-flow/config")type Flow interface

    45910编辑于 2024-04-23
  • 来自专栏KisFlow-Golang流式计算框架

    Golang框架实战-KisFlow流式计算框架(6)-Connector

    5.3.1 KisConnector 定义kis-flow/conn/kis_connector.gopackage connimport ("context""kis-flow/common""kis-flow kis-flow/kis/flow.gopackage kisimport ("context""kis-flow/common""kis-flow/config")type Flow interface 5.6.1 单元测试创建kis-flow/test/kis_connector_test.go文件:package testimport ("context""kis-flow/common""kis-flow /config""kis-flow/flow""kis-flow/kis""kis-flow/test/caas""kis-flow/test/faas""testing")func TestNewKisConnector (2) FuncName2 的回调业务kis-flow/test/faas/faas_demo2.gopackage faasimport ("context""fmt""kis-flow/kis""kis-flow

    28310编辑于 2024-06-06
  • 来自专栏KisFlow-Golang流式计算框架

    Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action

    kis-flow/kis/flow.gotype Flow interface {// Run 调度Flow,依次调度Flow中的Function并且执行Run(ctx context.Context) Action的模块定义如下,在kis-flow/kis/下创建action.go文件,实现:kis-flow/kis/action.gopackage kis// Action KisFlow执行流程Actionstype kis-flow/test/kis_action_test.gopackage testimport ("context""kis-flow/common""kis-flow/file""kis-flow /kis""kis-flow/test/caas""kis-flow/test/faas""testing")func TestActionAbort(t *testing.T) {ctx := context.Background /aceld KisFlow开源项目地址:https://github.com/aceld/kis-flow

    33910编辑于 2024-07-18
  • 来自专栏KisFlow-Golang流式计算框架

    Golang框架实战-KisFlow流式计算框架(10)-Flow多副本

    9.1.1 Flow新增接口首先,给Flow的抽象层新增一个接口Fork(),原型如下:kis-flow/kis/flow.gotype Flow interface {// Run 调度Flow,依次调度 具体的实现方法如下:kis-flow/flow/kis_flow.go// Fork 得到Flow的一个副本(深拷贝)func (flow *KisFlow) Fork(ctx context.Context 上述代码为了调试,给Flow新增了一个打印全部FuncParams信息的接口GetFuncParamsAllFuncs(),具体的实现方式如下:kis-flow/kis/flow.gotype Flow 加载配置文件并构建Flowif err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err ! /kis-flow

    30710编辑于 2024-07-19
领券