我遇到了一个问题,在并发运行时,Firestore事务会冻结。作为背景,我有一个Firestore集合,其中每个文档都有一个美元金额和一个时间。我正在创建一个函数,从这个集合中释放一个所需的美元金额,从最古老的文档开始。
例如,释放$150的函数调用将遍历集合,将美元金额从集合中删除,直到总共删除$150。我使用递归函数这样做:( 1)查找最老的美元金额;( 2)从该数字中删除输入的金额(即$150);如果输入的数量大于该数字,则删除该数字;如果仍有需要删除的剩余金额,则返回。我为步骤(2)使用了一个Firestore事务,因为这个集合有可能同时由多个用户进行更改(并且注意,如果我将(1)和(2)组合在一起以将查询包含在事务中,代码行为将保持不变)。
下面的代码正确地更新集合。然而,如果在前面的实例已经运行时调用它需要非常长的时间:如果我调用它一次,然后在第一个调用完成之前再次调用它,它就会冻结,并且有可能花费20-30分钟,而不是通常的1-5秒。虽然很明显,并发事务之间的争用导致冻结(当我删除事务的写入部分时,没有冻结),但未知的问题是是什么具体导致了冻结,以及如何修复它。
加法:看来这种冻结可能与https://github.com/firebase/firebase-tools/issues/2452有关。与那篇文章一致,我面临30秒的冻结每个事务,如果一个版本有多个事务,这就会变成很多分钟。
function releaseAmountFromStack(amount) {
return new Promise((resolve, reject) => {
let db = admin.firestore();
let stackRef = db.collection("stack");
stackRef.orderBy("expirationTime", "asc").limit(1)
.get().then((querySnapshot) => {
if(querySnapshot.empty) {
return reject("None left in stack");
}
let itemToRelease = querySnapshot.docs[0];
releaseItem(itemToRelease.ref, amount)
.then((actualReleaseAmount) => {
// If there is still more to release, trigger the next recursion
// If the full amount has been released, return it
if (amount > actualReleaseAmount) {
releaseAmountFromStack(amount-actualReleaseAmount)
.then((nextActualReleaseAmount) => {
return resolve(actualReleaseAmount + nextActualReleaseAmount);
})
.catch(() => {
return resolve(actualReleaseAmount);
});
} else {
return resolve(actualReleaseAmount);
}
});
});
});
}
function releaseItem(itemRef, amountToRelease) {
let db = admin.firestore();
return db.runTransaction((transaction) => {
return transaction.get(itemRef).then((itemDoc) => {
let itemAmount = itemDoc.data().amount;
let actualReleaseAmount = Math.min(amountToRelease, itemAmount);
// If item is exhausted, delete it. Else, update amount
if (actualReleaseAmount >= itemAmount) {
transaction.delete(itemDoc.ref);
} else {
transaction.set(itemDoc.ref, {
amount: admin.firestore.FieldValue.increment(-1*Number(actualReleaseAmount)),
}, {merge: true});
}
return actualReleaseAmount;
});
});
}以下是到目前为止调试过程中的一些有用的事实。非常感谢。
在冻结过程中,
中释放出所需的数量。
发布于 2021-08-25 20:57:53
我知道是什么导致了难以捉摸的冰冻。我注意到每个事务的冻结时间始终是30秒,这促使我搜索30秒事务冻结的讨论。我发现(https://github.com/firebase/firebase-tools/issues/2452),这表明这是Firebase模拟器如何处理并发事务的一个问题。实际上,当我部署代码而不是使用模拟器时,冻结就不再存在了!
TLDR: Firebase仿真器对并发事务有一个非自然的延迟。
发布于 2021-08-24 16:34:00
首先,让我们修复您的releaseAmountFromStack函数,这样您就不会为返回承诺的API (称为Explicit Promise Construction Antipattern)使用允诺构造函数。如果您的stackRef查询或releaseItem函数抛出错误,您的代码将遇到一个UnhandledPromiseRejection,因为两个承诺链都没有catch处理程序。
function releaseAmountFromStack(amount) {
const db = admin.firestore();
const stackRef = db.collection("stack");
return stackRef
.orderBy("expirationTime", "asc")
.limit(1)
.get()
.then((querySnapshot) => {
if(querySnapshot.empty) {
return Promise.reject("Out of stock");
}
const itemToRelease = querySnapshot.docs[0];
return releaseItem(itemToRelease.ref, amount);
})
.then((releasedAmount) => {
const amountLeft = amount - releasedAmount;
if (amountLeft <= 0)
return releasedAmount;
return releaseAmountFromStack(amountLeft)
.then((nextReleasedAmount) => nextReleasedAmount + releasedAmount)
.catch((err) => {
if (err === "Out of stock") {
return releasedAmount;
} else {
// rethrow unexpected errors
throw err;
}
});
});
}由于该函数涉及嵌套的承诺链,因此切换到async/await语法可以将其简化为:
async function releaseAmountFromStack(amount) {
const db = admin.firestore();
const stackRef = db.collection("stack");
const querySnapshot = await stackRef
.orderBy("expirationTime", "asc")
.limit(1)
.get();
if (querySnapshot.empty)
return 0; // out of stock
const itemToRelease = querySnapshot.docs[0];
const releasedAmount = await releaseItem(itemToRelease.ref, amount);
const amountLeft = amount - releasedAmount;
if (amountLeft <= 0) {
// nothing left to release, return released amount
return releasedAmount;
}
// If here, there is more to release, trigger the next recursion
const nextReleasedAmount = await releaseAmountFromStack(amountLeft);
return nextReleasedAmount + releasedAmount;
}注意:在上面的扁平版本中,我们不需要处理,因为我们可以只返回0。这意味着任何不相关的错误都将被正常抛出。
接下来,我们可以继续讨论releaseItem,这是导致您的问题的实际原因。在这里,您没有处理另一个实例正在删除正在读取的项的情况,您只是假设它存在,然后处理NaN和/或负余额。作为最终发生的事件的示例(),您可以得到以下事件序列(因为您使用的是FieldValue.increment() -也添加了一个服务器客户端进行事务处理),所以比这更复杂:
Client A Server Client B
Release 50 Release 80
┌┐ Get Doc #1 ┌┐ ┌┐
││ ────────────────► ││ ││
││ ││ ││
││ ││ ││
││ Doc #1 Snapshot ││ Get Doc #1 ││
││ { amount: 10 } ││ ◄──────────────── ││
││ ◄──────────────── ││ ││
││ ││ ││
││ ││ Doc #1 Snapshot ││
││ ││ { amount: 10 } ││
││ Result: Delete Doc #1 ││ ────────────────► ││
││ ────────────────► ││ ││
││ ACCEPTED ││ ││
││ ◄──────────────── ││ ││
││ ││ Result: Delete Doc #1 ││
││ Get Doc #2 ││ ◄──────────────── ││
││ ────────────────► ││ ││
││ ││ REJECTED ││
││ ││ New Doc #1 Snapshot ││
││ ││ <null> ││
││ Doc #2 Snapshot ││ ────────────────► ││
││ { amount: 15 } ││ ││
││ ◄──────────────── ││ ││
││ ││ ││
││ ││ Result: Set Doc #1 to -10 ││
││ ││ ◄──────────────── ││
││ Result: Delete Doc #2 ││ ▲ ││
││ ────────────────► ││ │ ││
││ ACCEPTED ││ ERROR ││
││ ◄──────────────── ││ ││
└┘ └┘ └┘因为您使用的是事务,所以您已经知道了amount的当前值,所以您可以在客户机上进行计算,并编写新的金额,而不是使用FieldValue.increment()。在不需要知道当前值的情况下,该操作符更适合于对值进行简单更新。
function releaseItem(itemRef, amountToRelease) {
const db = admin.firestore();
return db.runTransaction((transaction) => {
return transaction.get(itemRef).then((itemDoc) => {
if (!itemDoc.exists) {
// target has been deleted, do nothing & return
// amount that was released (0)
return 0;
}
const itemAmount = itemDoc.get("amount");
const actualReleaseAmount = Math.min(amountToRelease, itemAmount);
if (actualReleaseAmount >= itemAmount) {
// exhausted supply. delete item
transaction.delete(itemDoc.ref);
} else {
// have leftover supply. update amount
transaction.set(itemDoc.ref, {
amount: itemAmount - actualReleaseAmount,
}, {merge: true});
}
// return amount that was released
return actualReleaseAmount;
});
});
}这不会完全解决您的问题,因为如果两个或多个客户端同时尝试从同一个堆栈中删除项,则仍然存在数据争用问题。作为这方面的一个示例,请参见以下事件流(同样,计时将是事情如何发展的一个主要因素):
Client A Server Client B
Release 50 Release 80
┌┐ Get Doc #1 ┌┐ ┌┐
││ ────────────────► ││ ││
││ ││ ││
││ ││ ││
││ Doc #1 Snapshot ││ Get Doc #1 ││
││ { amount: 10 } ││ ◄──────────────── ││
││ ◄──────────────── ││ ││
││ ││ ││
││ ││ Doc #1 Snapshot ││
││ ││ { amount: 10 } ││
││ Result: Delete Doc #1 ││ ────────────────► ││
││ ────────────────► ││ ││
││ ACCEPTED ││ ││
││ ◄──────────────── ││ ││
││ ││ Result: Delete Doc #1 ││
││ Get Doc #2 ││ ◄──────────────── ││
││ ────────────────► ││ ││
││ ││ REJECTED ││
││ ││ New Doc #1 Snapshot ││
││ ││ <null> ││
││ Doc #2 Snapshot ││ ────────────────► ││
││ { amount: 15 } ││ ││
││ ◄──────────────── ││ Result: Cancelled ││
││ ││ ◄──────────────── ││
││ ││ ││
││ ││ ││
││ Result: Delete Doc #2 ││ Get Doc #2 ││
││ ────────────────► ││ ◄──────────────── ││
││ ACCEPTED ││ ││
││ ◄──────────────── ││ ││
││ ││ Doc #2 Snapshot ││
││ Get Doc #3 ││ <null> ││
││ ────────────────► ││ ────────────────► ││
││ ││ ││
││ ││ Result: Cancelled ││
││ ││ ◄──────────────── ││
││ Doc #3 Snapshot ││ ││
││ { amount: 10 } ││ ││
││ ◄──────────────── ││ Get Doc #3 ││
││ ││ ◄──────────────── ││
││ ││ ││
││ ││ ││
││ Result: Delete Doc #3 ││ Doc #3 Snapshot ││
││ ────────────────► ││ <null> ││
││ ACCEPTED ││ ────────────────► ││
││ ◄──────────────── ││ ││
││ ││ Result: Cancelled ││
││ Get Doc #4 ││ ◄──────────────── ││
└┘ ────────────────► └┘ └┘一种解决方法是一次提取多个项,根据需要使用它们并将更改写回,所有这些都在事务中完成。然而,没有处理这个问题,只是降低了客户一次又一次地争夺同一份文件的可能性。
async function releaseAmountFromStack(amount) {
const db = admin.firestore();
const stackQuery = db
.collection("stack")
.orderBy("expirationTime", "asc")
.limit(1);
const releasedAmount = await _releaseAmountFromStack(stackQuery, amount);
const amountLeft = amount - releasedAmount;
if (amountLeft <= 0) {
// nothing left to release, return released amount
return releasedAmount;
}
// If here, there is more to release, trigger the next recursion
const nextReleasedAmount = await releaseAmountFromStack(amountLeft)
.catch((err) => {
if (err !== "Empty stack") throw err;
return 0;
});
return nextReleasedAmount + releasedAmount;
}
function _releaseAmountFromStack(query, amountToRelease) => {
return db.runTransaction((transaction) => {
return transaction.get(query).then((querySnapshot) => {
if (!querySnapshot.empty) {
// nothing in stack, return released amount (0)
return Promise.reject("Empty stack");
}
let remainingAmountToRelease = amountToRelease;
for (const doc of querySnapshot.docs) {
const itemAmount = doc.get("amount");
const amountChange = Math.min(itemAmount, remainingAmountToRelease);
if (amountChange >= itemAmount) {
transaction.delete(doc.ref);
} else {
remainingAmountToRelease -= amountChange;
transaction.set(doc.ref, {
amount: itemAmount - amountChange
}, { merge: true });
}
if (remainingAmountToRelease <= 0) break; // stop iterating early
}
// return amount that was released
return /* totalAmountReleased = */ amountToRelease - remainingAmountToRelease;
});
});
}https://stackoverflow.com/questions/68822038
复制相似问题