
现代分布式系统暴露的API会触发状态变更操作,如支付、订单、账户获取流程或账户更新。在这样的环境中,由于网络重试、Kafka重平衡器发出多个请求、负载均衡器等因素,启动重复事务的概率相当高且无法避免。如果没有适当的防护措施,这些重复事务/请求可能导致数据不一致、财务差异和业务不变量的变化。
幂等性(Idempotency) 是一种成熟的技术,用于确保同一请求的重复执行产生单一、一致的结果。虽然幂等性可以在应用层使用内存缓存或请求去重逻辑来强制执行,但这些方法在水平扩展的微服务架构中会失效,因为多个应用实例可能同时处理请求,并且跨越多个不同区域。
像MySQL(使用InnoDB存储引擎)这样的关系数据库提供了事务保证和行级锁定机制,可用于实现健壮的、跨实例的幂等性。通过持久化幂等键并通过悲观锁定强制执行独占访问,系统可以确保只允许一个请求执行业务逻辑,而后续的重复请求会优雅地失败。
上述大多数实现在分布式系统中都是不充分的,因为它们不能跨应用实例协调状态,并且在崩溃恢复或重新部署时会失效。
因此,本设计要解决的问题是实现一个数据库支持的幂等性检查,使用MySQL行级锁定通过幂等键进行识别,确保在分布式Spring Boot应用实例中保持精确一次的业务执行语义。
关系数据库已经通过事务和行级锁定提供了强大的一致性保证。
通过利用以下语义:
... FOR UPDATE通过使用这种机制,我们构建了:
这种方法在支付、钱包和账户获取等事务敏感领域以及许多其他用例中都能完美工作。
核心思想:
CREATE TABLE idempotency_key (
idem_key VARCHAR(128) NOTNULL,
status ENUM('IN_PROGRESS','COMPLETED','FAILED') NOTNULL,
request_hash CHAR(64) NULL,
response_json JSONNULL,
created_at TIMESTAMPNOTNULLDEFAULTCURRENT_TIMESTAMP,
updated_at TIMESTAMPNOTNULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,
PRIMARY KEY (idem_key)
) ENGINE=InnoDB;
工作原理:
idem_key)保证每个幂等键只有一行。PESSIMISTIC_WRITE在MySQL(InnoDB)中变成 SELECT .. FOR UPDATE,在提交/回滚前阻止同一键上的并发调用者。重要考虑:
innodb_lock_wait_timeout 行为;决定如果请求已经是 IN_PROGRESS 是否返回409/429/422。步骤1:开始事务。
步骤2:锁定或插入幂等记录。
SELECT * FROM idempotency_keys WHERE key = ? FOR UPDATE;COMPLETED"状态,返回存储的记录。IN_PROGRESS,根据策略阻止或拒绝。IN_PROGRESS"的新记录。步骤3:执行业务逻辑。
步骤4:将记录标记为已完成。
COMPLETED"状态,并可选择在表中存储响应引用步骤5:提交事务。
COMPLETED"状态后会恢复(如果被阻止)安全处理并发请求:
当两个相同的请求同时到达时:
SELECT FOR UPDATE 上被阻止/拒绝这保证了业务级别的精确一次行为语义。
策略:
@TransactionalFOR UPDATE)要实现的典型组件:
IdempotencyEntityIdempotencyRepositoryIdempotencyService这种方法与现有的Spring事务管理自然集成。
spring.application.name=IdempotencyCheck
spring.datasource.url=jdbc:mysql://localhost:3306/product?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true
spring.datasource.username=root
spring.datasource.password=password
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.jpa.hibernate.ddl-auto=update
spring.jpa.show-sql=true
spring.jpa.properties.hibernate.format_sql=true
spring.jpa.open-in-view=false
spring.datasource.hikari.auto-commit=false
spring.datasource.hikari.maximum-pool-size=10
spring.datasource.hikari.minimum-idle=2
spring.datasource.hikari.idle-timeout=30000
spring.datasource.hikari.connection-timeout=20000
spring.datasource.hikari.max-lifetime=1800000
这里有两个重要的属性需要考虑:
SELECT ... FOR UPDATE 模式有好处)包含一个枚举来保存表的当前状态:IN_PROGRESS、COMPLETED 或 FAILED,供调用者采取进一步操作。对于本文,我们将抛出一个冲突异常以简化。
package repository;
import jakarta.persistence.*;
import lombok.Getter;
import lombok.Setter;
@Entity
@Table(name = "idempotency_key")
@Getter@Setter
publicclass IdempotencyKeyEntity {
@Id
@Column(name="idem_key", length = 128)
private String key;
@Enumerated(EnumType.STRING)
private Status status;
@Column(name = "request_hash", length = 64)
private String requestHash;
@Column(name = "response_json", columnDefinition = "json")
private String responseJson;
publicenum Status { IN_PROGRESS, COMPLETED, FAILED }
}
该方法通过获取相应数据库行的悲观写锁来检索幂等记录。该锁确保一次只有一个事务可以读取或修改记录,防止并发请求同时处理相同的幂等键。
import jakarta.persistence.LockModeType;
import org.springframework.data.jpa.repository.Lock;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;
import java.util.Optional;
@Repository
publicinterface IdempotencyRepositoryImpl {
@Lock(LockModeType.PESSIMISTIC_WRITE)
@Query("select idem from IdempotencyKeyEntity idem where idem.key = :key")
Optional<IdempotencyKeyEntity> lockByKey(@Param("key") String key);
}
根据状态从数据库获取锁,如果行已经有完成状态,从哈希返回响应。
@Transactional
public Optional<String> getCompletedResponse(String key, String requestHash){
//Find the key or else create a record and insert it into the database and return the entity
repo.findById(key).orElseGet(() -> {
IdempotencyKeyEntity entity = new IdempotencyKeyEntity();
entity.setKey(key);
entity.setStatus(IdempotencyKeyEntity.Status.IN_PROGRESS);
entity.setRequestHash(requestHash);
try{
return repo.saveAndFlush(entity);
}catch (DataIntegrityViolationException exception){
returnnull;
}
});
//Lock the row only one thread at a time
IdempotencyKeyEntity locked = repo.lockByKey(key)
.orElseThrow(() -> new IllegalStateException("Row must exist"));
//If already completed return the cached response
if (locked.getStatus() == IdempotencyKeyEntity.Status.COMPLETED) {
if (!Objects.equals(locked.getRequestHash(), requestHash)) {
thrownew ResponseStatusException(HttpStatus.CONFLICT,
"Idempotency-Key reuse with different request");
}
return Optional.ofNullable(locked.getResponseJson());
}
// If in progress and hash differs conflict (Not blocking here)
if (locked.getRequestHash() != null &&
!Objects.equals(locked.getRequestHash(), requestHash)) {
thrownew ResponseStatusException(HttpStatus.CONFLICT,
"Idempotency-Key reuse with different request");
}
//Not yet completed, the caller should do the work and mark it completed
return Optional.empty();
}
代码块将记录标记为已完成:
@Transactional
public void completed(String key, String responseJson) {
IdempotencyKeyEntity locked = repo.lockByKey(key)
.orElseThrow(() -> new IllegalStateException("Row must exist"));
locked.setStatus(IdempotencyKeyEntity.Status.COMPLETED);
locked.setResponseJson(responseJson);
repo.save(locked);
}
如果事务失败,其他等待的线程可以做这项工作。
@Transactional
public void failed(String key) {
IdempotencyKeyEntity locked = repo.lockByKey(key)
.orElseThrow(() -> new IllegalStateException("Row must exist"));
locked.setStatus(IdempotencyKeyEntity.Status.FAILED);
repo.save(locked);
}
@RestController
@RequiredArgsConstructor
publicclass IdempotentController {
privatefinal IdempotencyService idempotencyService;
@PostMapping("/payments")
public ResponseEntity<String> createPayments(@RequestHeader("Idempotency-Key") String idemKey,
@RequestBody PaymentRequest req){
//Using Google Guava for 256 hashing
String hashReq = Hashing.sha256()
.hashString(req.toString(), StandardCharsets.UTF_8)
.toString();
//check for cachedInDB
Optional<String> cachedInDB = idempotencyService.getCompletedResponse(idemKey, hashReq);
if(cachedInDB.isPresent()){
return ResponseEntity.ok(cachedInDB.get());
}
//Do the business logic
try{
String results = idempotencyService.doWork();
//Mark the state as completed for the idempotent key
idempotencyService.completed(idemKey, results);
return ResponseEntity.ok(results);
}catch (Exception ex){
//if the transaction fails, mark the idempotent key as failed to be processed later by other threads
idempotencyService.failed(idemKey);
throw ex;
}
}
}
Idempotency-Key 头处理幂等请求。控制器本身保持最小化,将所有并发和状态管理控制委托给 IdempotentService。COMPLETED,响应被持久化以便在未来的重试中安全重放。如果失败,键被标记为 FAILED,允许后续请求安全地重试操作。通过将幂等性强制执行隔离在服务层,并保持控制器专注于HTTP请求,这种设计确保并发请求被连贯处理,并根据需要在分布式Spring Boot实例中重试。
注意:对于简单的读密集型或最终一致的工作负载可能过于复杂
对于极高吞吐量的系统,分区策略或短命事务可以帮助保持性能
幂等性是可靠分布式系统的基础要求。通过利用MySQL行级锁定和事务保证,Spring Boot应用可以安全地处理重试、重复和并发请求,而不会引入不必要的复杂性。
这种模式在简单性、正确性和操作可靠性之间取得了平衡,使其成为事务敏感云原生应用的强有力选择。
Github链接:https://github.com/balakumaran-sugumar/idempotency
翻译:https://dzone.com/articles/implementing-idempotency-spring-boot-mysql