<返回目录     Powered by claud/xia兄

第13课: 事务处理

什么是事务?

事务是一组数据库操作的逻辑单元,要么全部成功,要么全部失败。MongoDB 4.0+支持多文档ACID事务。

单文档事务

// MongoDB的单文档操作天然具有原子性
db.accounts.updateOne(
    { _id: "account1" },
    { $inc: { balance: -100 } }
)

// 即使包含多个字段更新,也是原子操作
db.users.updateOne(
    { _id: "user1" },
    {
        $set: { status: "active" },
        $inc: { loginCount: 1 },
        $currentDate: { lastLogin: true }
    }
)

多文档事务

// 使用session进行多文档事务
const session = db.getMongo().startSession();

session.startTransaction();

try {
    const accountsCol = session.getDatabase("bank").accounts;

    // 从账户A扣款
    accountsCol.updateOne(
        { _id: "accountA" },
        { $inc: { balance: -100 } },
        { session }
    );

    // 向账户B加款
    accountsCol.updateOne(
        { _id: "accountB" },
        { $inc: { balance: 100 } },
        { session }
    );

    // 提交事务
    session.commitTransaction();
    print("转账成功");
} catch (error) {
    // 回滚事务
    session.abortTransaction();
    print("转账失败: " + error);
} finally {
    session.endSession();
}

事务选项

// 配置事务选项
session.startTransaction({
    readConcern: { level: "snapshot" },
    writeConcern: { w: "majority" },
    readPreference: "primary",
    maxCommitTimeMS: 30000  // 最大提交时间30秒
});

事务隔离级别

// MongoDB使用snapshot隔离级别
// 事务开始时创建数据快照,事务中读取的都是快照数据

// 读关注级别
// - local: 读取本地最新数据(默认)
// - majority: 读取已被大多数节点确认的数据
// - snapshot: 读取事务开始时的快照数据

session.startTransaction({
    readConcern: { level: "snapshot" }
});

事务和副本集

// 事务要求:
// 1. MongoDB 4.0+
// 2. 副本集或分片集群(不支持单机)
// 3. 存储引擎:WiredTiger

// 检查是否支持事务
db.adminCommand({ getParameter: 1, featureCompatibilityVersion: 1 })

跨分片事务

// MongoDB 4.2+支持跨分片事务
// 性能开销更大,应谨慎使用

const session = db.getMongo().startSession();
session.startTransaction();

try {
    // 操作分片1的数据
    session.getDatabase("db1").collection1.updateOne(..., { session });

    // 操作分片2的数据
    session.getDatabase("db2").collection2.updateOne(..., { session });

    session.commitTransaction();
} catch (error) {
    session.abortTransaction();
} finally {
    session.endSession();
}

事务重试

// 事务可能因为临时错误失败,需要重试
function runTransactionWithRetry(txnFunc, session) {
    while (true) {
        try {
            txnFunc(session);
            break;
        } catch (error) {
            if (error.hasOwnProperty("errorLabels") &&
                error.errorLabels.includes("TransientTransactionError")) {
                print("临时错误,重试事务...");
                continue;
            } else {
                throw error;
            }
        }
    }
}

function commitWithRetry(session) {
    while (true) {
        try {
            session.commitTransaction();
            print("事务提交成功");
            break;
        } catch (error) {
            if (error.hasOwnProperty("errorLabels") &&
                error.errorLabels.includes("UnknownTransactionCommitResult")) {
                print("提交结果未知,重试...");
                continue;
            } else {
                throw error;
            }
        }
    }
}

实战示例:电商订单

// 创建订单并扣减库存(原子操作)
function createOrder(userId, productId, quantity) {
    const session = db.getMongo().startSession();

    session.startTransaction({
        readConcern: { level: "snapshot" },
        writeConcern: { w: "majority" }
    });

    try {
        const ordersCol = session.getDatabase("shop").orders;
        const productsCol = session.getDatabase("shop").products;

        // 1. 检查库存
        const product = productsCol.findOne(
            { _id: productId },
            { session }
        );

        if (!product || product.stock < quantity) {
            throw new Error("库存不足");
        }

        // 2. 扣减库存
        const updateResult = productsCol.updateOne(
            {
                _id: productId,
                stock: { $gte: quantity }
            },
            {
                $inc: { stock: -quantity, sales: quantity }
            },
            { session }
        );

        if (updateResult.modifiedCount === 0) {
            throw new Error("库存扣减失败");
        }

        // 3. 创建订单
        ordersCol.insertOne({
            userId: userId,
            productId: productId,
            quantity: quantity,
            amount: product.price * quantity,
            status: "pending",
            createdAt: new Date()
        }, { session });

        // 4. 提交事务
        session.commitTransaction();
        print("订单创建成功");
        return true;

    } catch (error) {
        session.abortTransaction();
        print("订单创建失败: " + error.message);
        return false;
    } finally {
        session.endSession();
    }
}

// 调用
createOrder("user123", "prod456", 2);

事务性能优化

乐观锁模式

// 使用版本号实现乐观锁(性能更好)
// 1. 读取文档和版本号
const account = db.accounts.findOne({ _id: "account1" });

// 2. 更新时检查版本号
const result = db.accounts.updateOne(
    {
        _id: "account1",
        version: account.version  // 版本号必须匹配
    },
    {
        $inc: { balance: -100, version: 1 }
    }
);

// 3. 检查是否更新成功
if (result.modifiedCount === 0) {
    print("更新失败,数据已被其他操作修改");
} else {
    print("更新成功");
}

事务限制

何时使用事务

// 需要事务的场景:
// 1. 转账操作
// 2. 订单和库存同步
// 3. 多表关联更新
// 4. 需要严格一致性的操作

// 不需要事务的场景:
// 1. 单文档操作(天然原子性)
// 2. 可以容忍最终一致性
// 3. 使用嵌入式文档设计
// 4. 性能要求极高的场景
最佳实践:
练习题:
  1. 实现一个转账事务(从账户A转账到账户B)
  2. 实现订单创建和库存扣减的事务
  3. 使用乐观锁实现并发更新控制
  4. 实现事务重试机制
  5. 测试事务回滚功能
  6. 比较事务和非事务操作的性能差异