首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >写给大忙人看的 Flink Operator State 的存储方式

写给大忙人看的 Flink Operator State 的存储方式

作者头像
shengjk1
发布2020-04-08 15:54:39
发布2020-04-08 15:54:39
1.2K0
举报
文章被收录于专栏:码字搬砖码字搬砖
  • 1. 前言
  • 2. 详解
    • 2.1 结论
    • 2.2 跟opertor state 相关的类
    • 2.3 以 FlinkKafkaConsumerBase 为例具体说明

1.前言

最近有幸在做面试官,在面试的过程中发现很多面试者都知道 Key State 会存入 RockDB (如果设置 StateBackend 为 RockDBStateBackend ),却也同样认为 Operator State 也会存入 RockDB。其中包括一些看过这部分源码的或者已经在发布一些课程的人。

2. 详解

2.1 结论

Operator State 与 我们设置的 StateBackend 无关,并且 Operator State 也不会存入到 RockDB 中,而是会首先保存在内存中,而后在进行 checkpoint 的时候通过 stream 的方式直接写入文件( 可以是本地文件也可以是 hdfs 文件 )

2.2 跟opertor state 相关的类

在 flink-runtime 包下

2.3 以 FlinkKafkaConsumerBase 为例具体说明

首先来看一下 AbstractStateBackend

代码语言:javascript
复制
	@Override
	public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
		Environment env,
		JobID jobID,
		String operatorIdentifier,
		TypeSerializer<K> keySerializer,
		int numberOfKeyGroups,
		KeyGroupRange keyGroupRange,
		TaskKvStateRegistry kvStateRegistry,
		TtlTimeProvider ttlTimeProvider,
		MetricGroup metricGroup,
		@Nonnull Collection<KeyedStateHandle> stateHandles,
		CloseableRegistry cancelStreamRegistry) throws IOException;

	@Override
	public abstract OperatorStateBackend createOperatorStateBackend(
		Environment env,
		String operatorIdentifier,
		@Nonnull Collection<OperatorStateHandle> stateHandles,
		CloseableRegistry cancelStreamRegistry) throws Exception;

有两个方法,一个是 createKeyedStateBackend 返回 AbstractKeyedStateBackend,它有两个子类

而 createOperatorStateBackend 返回 OperatorStateBackend,它仅有一个子类

当 FlinkKafkaConsumerBase 从 checkpoint 中恢复时, 我们通过 createOperatorStateBackend 的 createOperatorStateBackend 方法可以知道它会首先执行 DefaultOperatorStateBackendBuilder 的 bulid 方法

代码语言:javascript
复制
@Override
	public DefaultOperatorStateBackend build() throws BackendBuildingException {
		Map<String, PartitionableListState<?>> registeredOperatorStates = new HashMap<>();
		Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates = new HashMap<>();
		CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
		AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy =
			new DefaultOperatorStateBackendSnapshotStrategy(
				userClassloader,
				asynchronousSnapshots,
				registeredOperatorStates,
				registeredBroadcastStates,
				cancelStreamRegistryForBackend);
		OperatorStateRestoreOperation restoreOperation = new OperatorStateRestoreOperation(
			cancelStreamRegistry,
			userClassloader,
			registeredOperatorStates,
			registeredBroadcastStates,
			restoreStateHandles
		);
		try {
			// OperatorState restore
			restoreOperation.restore();
		} catch (Exception e) {
			IOUtils.closeQuietly(cancelStreamRegistryForBackend);
			throw new BackendBuildingException("Failed when trying to restore operator state backend", e);
		}
		// 当执行完 restoreOperation.restore(); 之后将得到的 registeredOperatorStates 传入 DefaultOperatorStateBackend
		return new DefaultOperatorStateBackend(
			executionConfig,
			cancelStreamRegistryForBackend,
			registeredOperatorStates,
			registeredBroadcastStates,
			new HashMap<>(),
			new HashMap<>(),
			snapshotStrategy
		);
	}

我们可以具体来看一下 restore 的时候做了什么

代码语言:javascript
复制
@Override
	public Void restore() throws Exception {
		if (stateHandles.isEmpty()) {
			return null;
		}

		for (OperatorStateHandle stateHandle : stateHandles) {

			if (stateHandle == null) {
				continue;
			}
			//读取 file stream
			FSDataInputStream in = stateHandle.openInputStream();
			closeStreamOnCancelRegistry.registerCloseable(in);

			ClassLoader restoreClassLoader = Thread.currentThread().getContextClassLoader();

			try {
				Thread.currentThread().setContextClassLoader(userClassloader);
				OperatorBackendSerializationProxy backendSerializationProxy =
					new OperatorBackendSerializationProxy(userClassloader);

				backendSerializationProxy.read(new DataInputViewStreamWrapper(in));

				List<StateMetaInfoSnapshot> restoredOperatorMetaInfoSnapshots =
					backendSerializationProxy.getOperatorStateMetaInfoSnapshots();

				// Recreate all PartitionableListStates from the meta info
				for (StateMetaInfoSnapshot restoredSnapshot : restoredOperatorMetaInfoSnapshots) {

					final RegisteredOperatorStateBackendMetaInfo<?> restoredMetaInfo =
						new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);

					if (restoredMetaInfo.getPartitionStateSerializer() instanceof UnloadableDummyTypeSerializer) {

						// must fail now if the previous typeSerializer cannot be restored because there is no typeSerializer
						// capable of reading previous state
						// TODO when eager state registration is in place, we can try to get a convert deserializer
						// TODO from the newly registered typeSerializer instead of simply failing here

						throw new IOException("Unable to restore operator state [" + restoredSnapshot.getName() + "]." +
							" The previous typeSerializer of the operator state must be present; the typeSerializer could" +
							" have been removed from the classpath, or its implementation have changed and could" +
							" not be loaded. This is a temporary restriction that will be fixed in future versions.");
					}

					PartitionableListState<?> listState = registeredOperatorStates.get(restoredSnapshot.getName());

					if (null == listState) {
						listState = new PartitionableListState<>(restoredMetaInfo);

						registeredOperatorStates.put(listState.getStateMetaInfo().getName(), listState);
					} else {
						// TODO with eager state registration in place, check here for typeSerializer migration strategies
					}
				}

				// ... and then get back the broadcast state.
				List<StateMetaInfoSnapshot> restoredBroadcastMetaInfoSnapshots =
					backendSerializationProxy.getBroadcastStateMetaInfoSnapshots();

				for (StateMetaInfoSnapshot restoredSnapshot : restoredBroadcastMetaInfoSnapshots) {

					final RegisteredBroadcastStateBackendMetaInfo<?, ?> restoredMetaInfo =
						new RegisteredBroadcastStateBackendMetaInfo<>(restoredSnapshot);

					if (restoredMetaInfo.getKeySerializer() instanceof UnloadableDummyTypeSerializer ||
						restoredMetaInfo.getValueSerializer() instanceof UnloadableDummyTypeSerializer) {

						// must fail now if the previous typeSerializer cannot be restored because there is no typeSerializer
						// capable of reading previous state
						// TODO when eager state registration is in place, we can try to get a convert deserializer
						// TODO from the newly registered typeSerializer instead of simply failing here

						throw new IOException("Unable to restore broadcast state [" + restoredSnapshot.getName() + "]." +
							" The previous key and value serializers of the state must be present; the serializers could" +
							" have been removed from the classpath, or their implementations have changed and could" +
							" not be loaded. This is a temporary restriction that will be fixed in future versions.");
					}

					BackendWritableBroadcastState<?, ?> broadcastState = registeredBroadcastStates.get(restoredSnapshot.getName());

					if (broadcastState == null) {
						broadcastState = new HeapBroadcastState<>(restoredMetaInfo);

						registeredBroadcastStates.put(broadcastState.getStateMetaInfo().getName(), broadcastState);
					} else {
						// TODO with eager state registration in place, check here for typeSerializer migration strategies
					}
				}

				// Restore all the states
				for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> nameToOffsets :
					stateHandle.getStateNameToPartitionOffsets().entrySet()) {

					final String stateName = nameToOffsets.getKey();

					PartitionableListState<?> listStateForName = registeredOperatorStates.get(stateName);
					if (listStateForName == null) {
						BackendWritableBroadcastState<?, ?> broadcastStateForName = registeredBroadcastStates.get(stateName);
						Preconditions.checkState(broadcastStateForName != null, "Found state without " +
							"corresponding meta info: " + stateName);
						deserializeBroadcastStateValues(broadcastStateForName, in, nameToOffsets.getValue());
					} else {
						deserializeOperatorStateValues(listStateForName, in, nameToOffsets.getValue());
					}
				}

			} finally {
				Thread.currentThread().setContextClassLoader(restoreClassLoader);
				if (closeStreamOnCancelRegistry.unregisterCloseable(in)) {
					IOUtils.closeQuietly(in);
				}
			}
		}
		return null;
	}

注释已经写的很明白了,简单来说就是通过流把文件读入内存中然后进行一系列必要的操作得到 MetaInfo。并将得到的 registeredOperatorStates 传递给 DefaultOperatorStateBackend。 当 initializeState 时,

代码语言:javascript
复制
public final void initializeState(FunctionInitializationContext context) throws Exception {
.....
		this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
				OFFSETS_STATE_NAME,
				TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));
.....
	}

首先呢,从 OperateStateStore 中获取 UnionListState。而 OperateStateStore 只有一个实现类 DefaultOperatorStateBackend。通过源码我们可以知道,直接从 registeredOperatorStates 获取该 state ,最终返回的是一个 PartitionableListState。

而当进行 snapshotState 的时候,

代码语言:javascript
复制
public final void snapshotState(FunctionSnapshotContext context) throws Exception {
		...
					unionOffsetStates.add(
							Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
				...
	}

实际上 unionOffsetStates.add 是调用了 PartitionableListState 的 add 方法。

代码语言:javascript
复制
@Override
	public void add(S value) {
		Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
		internalList.add(value);
	}

而其中 internalList 实际上就是一个 ArrayList。 然后会调用 DefaultOperatorStateBackendSnapshotStrategy 中的 snapshot 进行 checkpoint

代码语言:javascript
复制
...
PartitionableListState<?> value = entry.getValue();
						//operator state 写入 hdfs
						long[] partitionOffsets = value.write(localOut);
						...

将 PartitionableListState 写入文件,实际上有调用了 PartitionableListState 自己的 write 方法

代码语言:javascript
复制
public long[] write(FSDataOutputStream out) throws IOException {

		long[] partitionOffsets = new long[internalList.size()];

		DataOutputView dov = new DataOutputViewStreamWrapper(out);

		for (int i = 0; i < internalList.size(); ++i) {
			S element = internalList.get(i);
			partitionOffsets[i] = out.getPos();
			getStateMetaInfo().getPartitionStateSerializer().serialize(element, dov);
		}

		return partitionOffsets;
	}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/04/05 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.前言
  • 2. 详解
    • 2.1 结论
    • 2.2 跟opertor state 相关的类
    • 2.3 以 FlinkKafkaConsumerBase 为例具体说明
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档