我刚开始使用flink,我正在实现一个模式识别模块(不使用CEP实现模式匹配),它将从EventHub主题读取json流,如果模式匹配,则推送到另一个事件中心主题。我的模块功能如下
接收JSON负载
中。
下面的
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);StateBackend stateBackend = new RocksDBStateBackend(incrementalCheckpointPath,true;env.setStateBackend(stateBackend);env.getCheckpointConfig().setCheckpointInterval(12000);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);env.getCheckpointConfig().setCheckpointTimeout(120000);env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);env.getCheckpointConfig().enableExternalizedCheckpoints(org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
但是,当任务管理器失败时,它试图从州后端恢复状态,我使用RocksDB作为我的州支持,但是它失败了,错误如下。我使用的是Flink 1.10.0版本和Java1.8
05:39:14.260 [Source: Custom Source -> Flat Map (5/12)] WARN org.apache.flink.streaming.api.operators.BackendRestorerProcedure cisco - Exception while restoring operator state backend for StreamSource_1171dea6747ab509fdaefbe74f7195af_(5/12) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:565) ~[flink-statebackend-rocksdb_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:243) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:252) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) [flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) [flink-runtime_2.12-1.10.0.jar:1.10.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
Caused by: java.io.IOException: Stream Closed
at java.io.FileInputStream.read0(Native Method) ~[?:1.8.0_252]
at java.io.FileInputStream.read(FileInputStream.java:207) ~[?:1.8.0_252]
at org.apache.flink.core.fs.local.LocalDataInputStream.read(LocalDataInputStream.java:68) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51) ~[flink-core-1.10.0.jar:1.10.0]
at java.io.DataInputStream.readInt(DataInputStream.java:389) ~[?:1.8.0_252]
at org.apache.flink.util.LinkedOptionalMapSerializer.readOptionalMap(LinkedOptionalMapSerializer.java:86) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.readDefaultKryoSerializerClasses(KryoSerializerSnapshotData.java:208) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.createFrom(KryoSerializerSnapshotData.java:72) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.readSnapshot(KryoSerializerSnapshot.java:77) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:182) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:149) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
... 15 more
05:39:14.261 [flink-akka.actor.default-dispatcher-95] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor cisco - Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source -> Flat Map (5/12) 0b418e2ffcd028a58f39029d3f8be08e.
05:39:14.261 [Source: Custom Source -> Flat Map (3/12)] WARN org.apache.flink.streaming.api.operators.BackendRestorerProcedure cisco - Exception while restoring operator state backend for StreamSource_1171dea6747ab509fdaefbe74f7195af_(3/12) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:565) ~[flink-statebackend-rocksdb_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:243) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:252) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) [flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) [flink-runtime_2.12-1.10.0.jar:1.10.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
Caused by: java.io.IOException: Stream Closed
at java.io.FileInputStream.read0(Native Method) ~[?:1.8.0_252]
at java.io.FileInputStream.read(FileInputStream.java:207) ~[?:1.8.0_252]
at org.apache.flink.core.fs.local.LocalDataInputStream.read(LocalDataInputStream.java:68) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51) ~[flink-core-1.10.0.jar:1.10.0]
at java.io.DataInputStream.readInt(DataInputStream.java:389) ~[?:1.8.0_252]
at org.apache.flink.util.LinkedOptionalMapSerializer.readOptionalMap(LinkedOptionalMapSerializer.java:86) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.readDefaultKryoSerializerClasses(KryoSerializerSnapshotData.java:208) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData.createFrom(KryoSerializerSnapshotData.java:72) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.readSnapshot(KryoSerializerSnapshot.java:77) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:182) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:149) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
... 15 more
05:39:14.262 [flink-akka.actor.default-dispatcher-95] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor cisco - Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source -> Flat Map (3/12) a973d1d62f5086d1126d83d81278cc0a.
05:39:14.283 [Source: Custom Source -> Flat Map (1/12)] WARN org.apache.flink.streaming.api.operators.BackendRestorerProcedure cisco - Exception while restoring operator state backend for StreamSource_1171dea6747ab509fdaefbe74f7195af_(1/12) from alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:565) ~[flink-statebackend-rocksdb_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:243) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:252) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) ~[flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) [flink-streaming-java_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) [flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) [flink-runtime_2.12-1.10.0.jar:1.10.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
Caused by: java.io.IOException: Stream Closed
at java.io.FileInputStream.read0(Native Method) ~[?:1.8.0_252]
at java.io.FileInputStream.read(FileInputStream.java:207) ~[?:1.8.0_252]
at org.apache.flink.core.fs.local.LocalDataInputStream.read(LocalDataInputStream.java:68) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51) ~[flink-core-1.10.0.jar:1.10.0]
at java.io.DataInputStream.readInt(DataInputStream.java:389) ~[?:1.8.0_252]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:165) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:182) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:149) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-core-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) ~[flink-runtime_2.12-1.10.0.jar:1.10.0]
... 15 more
05:39:14.283 [flink-akka.actor.default-dispatcher-95] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor cisco - Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source -> Flat Map (1/12) c1a83f3812be2a4099737d6eee5b41d0.
05:39:14.441 [flink-akka.actor.default-dispatcher-95] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor cisco - Un-registering task and sending final execution state CANCELED to JobManager for task Sink: Cassandra Sink (1/4) caadf9ad0d011d308659cf47a3b74cc4.
05:40:36.616 [flink-akka.actor.default-dispatcher-95] INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl cisco - Free slot TaskSlot(index:2, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=85.333mb (89478482 bytes), taskOffHeapMemory=0 bytes, managedMemory=136.533mb (143165578 bytes), networkMemory=34.133mb (35791394 bytes)}, allocationId: f5741b19f3f1281ae65d67994dba045b, jobId: a0d922bbf1c20ed9417415827c32e1a3).
05:40:36.617 [flink-akka.actor.default-dispatcher-95] INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl cisco - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=85.333mb (89478482 bytes), taskOffHeapMemory=0 bytes, managedMemory=136.533mb (143165578 bytes), networkMemory=34.133mb (35791394 bytes)}, allocationId: 5a92c83b6a105b726105cb0432980be6, jobId: a0d922bbf1c20ed9417415827c32e1a3).
05:40:36.618 [flink-akka.actor.default-dispatcher-95] INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl cisco - Free slot TaskSlot(index:1, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=85.333mb (89478482 bytes), taskOffHeapMemory=0 bytes, managedMemory=136.533mb (143165578 bytes), networkMemory=34.133mb (35791394 bytes)}, allocationId: dd952690f30c88860b451b1ce4e2fc6d, jobId: a0d922bbf1c20ed9417415827c32e1a3).
05:40:36.618 [flink-akka.actor.default-dispatcher-95] INFO org.apache.flink.runtime.taskexecutor.JobLeaderService cisco - Remove job a0d922bbf1c20ed9417415827c32e1a3 from job leader monitoring.
05:40:36.618 [flink-akka.actor.default-dispatcher-95] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor cisco - Close JobManager connection for job a0d922bbf1c20ed9417415827c32e1a3.
05:40:36.621 [flink-akka.actor.default-dispatcher-110] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor cisco - Close JobManager connection for job a0d922bbf1c20ed9417415827c32e1a3.
05:40:36.621 [flink-akka.actor.default-dispatcher-110] INFO org.apache.flink.runtime.taskexecutor.JobLeaderService cisco - Cannot reconnect to job a0d922bbf1c20ed9417415827c32e1a3 because it is not registered.如果我做错了什么,请帮助我解决这个问题,如果需要任何信息,请告诉我。
下面是BroadcastProcess函数和Cassandra的代码,我用它来持久化传入信号的状态,以供审计之用。
================================Source Function To Read Patterns from API Call=================================================
public class PatternSource extends RichSourceFunction<Map<String, Map<String, Pattern>>> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Map<String, Map<String, Pattern>>> ctx) throws Exception {
String patternUrl =
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
Map<String, Map<String, Pattern>> patterns = getPatternData(patternUrl);
ctx.collect(patterns);
while (isRunning) {
Thread.sleep(10000);
}
}
@Override
public void cancel() {
isRunning = false;
}
=================================================================================================================================
====================================================BroadcastProcessFunction Class================================================
public static final MapStateDescriptor<String, Map<String, String>> patternPatternDescriptor = new MapStateDescriptor("PatternPatternDescriptor",
BasicTypeInfo.STRING_TYPE_INFO, new MapTypeInfo(String.class, Pattern.class));
public class PatternDetection extends BroadcastProcessFunction<Tuple2<String, InputSignal>, Tuple2<String, Map<String, Pattern>>, Tuple2<String, InputSignal>> {
public void processElement(Tuple2<String, InputSignal> InputSignal, BroadcastProcessFunction<Tuple2<String, InputSignal>, Tuple2<String, Map<String, Pattern>>, Tuple2<String, InputSignal>>.ReadOnlyContext ctx, Collector<Tuple2<String, InputSignal>> out) throws Exception {
InputSignal signal = (InputSignal)InputSignal.f1;
JSONObject InputSignalPayLoad = new JSONObject(signal.getSignalPayload());
HashMap<String, InputSignal> finalOutput = new HashMap();
String sourceName = ((InputSignal)InputSignal.f1).getSignalHeader().getSignalSource().toUpperCase();
Map<String, Pattern> patternList = ctx.getBroadcastState(patternPatternDescriptor).get(sourceName);
String patternName = Pattern.getPatternName();
String patternDefinition = Pattern.getPatternDefinition();
/*Implemented my custom JSON data matched*/
Matcher<?> jsonMatcher = this.buildMatcher(patternDefinition);
if (jsonMatcher != null && jsonMatcher.matches(Arrays.asList(InputSignalPayLoad))) {
ctx.output(validSignalOutput, InputSignalPayLoad);
}
}
}
public void processBroadcastElement(Tuple2<String, Map<String, Pattern>> patternCondition, BroadcastProcessFunction<Tuple2<String, InputSignal>, Tuple2<String, Map<String, Pattern>>, Tuple2<String, InputSignal>>.Context ctx, Collector<Tuple2<String, InputSignal>> out) throws Exception {
String signalSource = ((String)patternCondition.f0).toUpperCase();
BroadcastState<String, Map<String, Pattern>> state = ctx.getBroadcastState(patternPatternDescriptor);
Map<String, Pattern> patterns = ctx.getBroadcastState(patternPatternDescriptor).get(signalSource);
}
}
======================================================================================================================================
====================================================Cassandra Sink====================================================================
public static void createInputSignalSink(DataStream<InputSignalSignalHistory> dataStream, Properties properties, int parallelism) {
try {
log.info(LogMessageBuilder.buildLogMessage("Inserting InputSignal signal history to cassandra database"));
CassandraSink.addSink(dataStream).setClusterBuilder(buildClusterBuilder(properties)).setMapperOptions(() -> {
return new Option[]{Option.saveNullFields(true)};
}).build().setParallelism(parallelism);
log.info(LogMessageBuilder.buildLogMessage("Cassandra sink cluster builder is ready"));
} catch (Exception exp) {
(exp.printstacktrace());
}
}
=====================================================================================================================================发布于 2021-01-29 17:36:08
我读到flink在内部维护状态,甚至没有显式实现状态
一些内置运算符会隐式地维护状态,例如
一些源和sinks
从您共享的堆栈跟踪中,似乎检查点包含无法还原的源运算符的状态;大概是您的自定义源。
如果您想共享自定义源代码的代码,那么诊断问题就更容易了,但是我首先要看看您是否正确地实现了CheckpointedFunction接口--特别是initializeState(FunctionInitializationContext context)方法。
https://stackoverflow.com/questions/65956349
复制相似问题