事务是一组数据库操作的逻辑单元,要么全部成功,要么全部失败。MongoDB 4.0+支持多文档ACID事务。
// MongoDB的单文档操作天然具有原子性
db.accounts.updateOne(
{ _id: "account1" },
{ $inc: { balance: -100 } }
)
// 即使包含多个字段更新,也是原子操作
db.users.updateOne(
{ _id: "user1" },
{
$set: { status: "active" },
$inc: { loginCount: 1 },
$currentDate: { lastLogin: true }
}
)
// 使用session进行多文档事务
const session = db.getMongo().startSession();
session.startTransaction();
try {
const accountsCol = session.getDatabase("bank").accounts;
// 从账户A扣款
accountsCol.updateOne(
{ _id: "accountA" },
{ $inc: { balance: -100 } },
{ session }
);
// 向账户B加款
accountsCol.updateOne(
{ _id: "accountB" },
{ $inc: { balance: 100 } },
{ session }
);
// 提交事务
session.commitTransaction();
print("转账成功");
} catch (error) {
// 回滚事务
session.abortTransaction();
print("转账失败: " + error);
} finally {
session.endSession();
}
// 配置事务选项
session.startTransaction({
readConcern: { level: "snapshot" },
writeConcern: { w: "majority" },
readPreference: "primary",
maxCommitTimeMS: 30000 // 最大提交时间30秒
});
// MongoDB使用snapshot隔离级别
// 事务开始时创建数据快照,事务中读取的都是快照数据
// 读关注级别
// - local: 读取本地最新数据(默认)
// - majority: 读取已被大多数节点确认的数据
// - snapshot: 读取事务开始时的快照数据
session.startTransaction({
readConcern: { level: "snapshot" }
});
// 事务要求:
// 1. MongoDB 4.0+
// 2. 副本集或分片集群(不支持单机)
// 3. 存储引擎:WiredTiger
// 检查是否支持事务
db.adminCommand({ getParameter: 1, featureCompatibilityVersion: 1 })
// MongoDB 4.2+支持跨分片事务
// 性能开销更大,应谨慎使用
const session = db.getMongo().startSession();
session.startTransaction();
try {
// 操作分片1的数据
session.getDatabase("db1").collection1.updateOne(..., { session });
// 操作分片2的数据
session.getDatabase("db2").collection2.updateOne(..., { session });
session.commitTransaction();
} catch (error) {
session.abortTransaction();
} finally {
session.endSession();
}
// 事务可能因为临时错误失败,需要重试
function runTransactionWithRetry(txnFunc, session) {
while (true) {
try {
txnFunc(session);
break;
} catch (error) {
if (error.hasOwnProperty("errorLabels") &&
error.errorLabels.includes("TransientTransactionError")) {
print("临时错误,重试事务...");
continue;
} else {
throw error;
}
}
}
}
function commitWithRetry(session) {
while (true) {
try {
session.commitTransaction();
print("事务提交成功");
break;
} catch (error) {
if (error.hasOwnProperty("errorLabels") &&
error.errorLabels.includes("UnknownTransactionCommitResult")) {
print("提交结果未知,重试...");
continue;
} else {
throw error;
}
}
}
}
// 创建订单并扣减库存(原子操作)
function createOrder(userId, productId, quantity) {
const session = db.getMongo().startSession();
session.startTransaction({
readConcern: { level: "snapshot" },
writeConcern: { w: "majority" }
});
try {
const ordersCol = session.getDatabase("shop").orders;
const productsCol = session.getDatabase("shop").products;
// 1. 检查库存
const product = productsCol.findOne(
{ _id: productId },
{ session }
);
if (!product || product.stock < quantity) {
throw new Error("库存不足");
}
// 2. 扣减库存
const updateResult = productsCol.updateOne(
{
_id: productId,
stock: { $gte: quantity }
},
{
$inc: { stock: -quantity, sales: quantity }
},
{ session }
);
if (updateResult.modifiedCount === 0) {
throw new Error("库存扣减失败");
}
// 3. 创建订单
ordersCol.insertOne({
userId: userId,
productId: productId,
quantity: quantity,
amount: product.price * quantity,
status: "pending",
createdAt: new Date()
}, { session });
// 4. 提交事务
session.commitTransaction();
print("订单创建成功");
return true;
} catch (error) {
session.abortTransaction();
print("订单创建失败: " + error.message);
return false;
} finally {
session.endSession();
}
}
// 调用
createOrder("user123", "prod456", 2);
// 使用版本号实现乐观锁(性能更好)
// 1. 读取文档和版本号
const account = db.accounts.findOne({ _id: "account1" });
// 2. 更新时检查版本号
const result = db.accounts.updateOne(
{
_id: "account1",
version: account.version // 版本号必须匹配
},
{
$inc: { balance: -100, version: 1 }
}
);
// 3. 检查是否更新成功
if (result.modifiedCount === 0) {
print("更新失败,数据已被其他操作修改");
} else {
print("更新成功");
}
// 需要事务的场景:
// 1. 转账操作
// 2. 订单和库存同步
// 3. 多表关联更新
// 4. 需要严格一致性的操作
// 不需要事务的场景:
// 1. 单文档操作(天然原子性)
// 2. 可以容忍最终一致性
// 3. 使用嵌入式文档设计
// 4. 性能要求极高的场景