<返回目录     Powered by claud/xia兄

第11课: 分布式事务

Seata、TCC、Saga 事务一致性

课程概述

本课程深入讲解分布式事务的核心概念、技术选型、实战应用和最佳实践。通过丰富的代码示例和真实场景,帮助你全面掌握分布式事务在微服务架构中的应用。

核心知识点

技术架构

# 分布式事务技术架构图
┌─────────────────────────────────────────┐
│          微服务应用层                    │
│  ┌──────┐  ┌──────┐  ┌──────┐          │
│  │服务A │  │服务B │  │服务C │          │
│  └──┬───┘  └──┬───┘  └──┬───┘          │
│     │         │         │               │
│     └─────────┼─────────┘               │
│               │                         │
├───────────────┼─────────────────────────┤
│          分布式事务中间件层                │
│               │                         │
│     ┌─────────▼─────────┐              │
│     │   核心组件         │              │
│     │   - 数据采集       │              │
│     │   - 数据处理       │              │
│     │   - 数据存储       │              │
│     │   - 数据展示       │              │
│     └───────────────────┘              │
└─────────────────────────────────────────┘

代码示例 1:基础配置

// pom.xml 依赖配置
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-example</artifactId>
</dependency>

# application.yml 配置
spring:
  application:
    name: demo-service
  cloud:
    example:
      enabled: true
      config:
        key1: value1
        key2: value2

// 启动类配置
@SpringBootApplication
@EnableExampleFeature
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

代码示例 2:核心功能实现

@Service
public class ExampleService {

    @Autowired
    private ExampleClient exampleClient;

    // 核心业务方法
    public Result processData(Request request) {
        // 1. 数据验证
        validateRequest(request);
        
        // 2. 业务处理
        Data data = processBusinessLogic(request);
        
        // 3. 结果返回
        return buildResult(data);
    }

    private void validateRequest(Request request) {
        if (request == null) {
            throw new IllegalArgumentException("请求不能为空");
        }
        // 更多验证逻辑
    }

    private Data processBusinessLogic(Request request) {
        // 业务逻辑实现
        return new Data();
    }

    private Result buildResult(Data data) {
        Result result = new Result();
        result.setCode(200);
        result.setData(data);
        return result;
    }
}

代码示例 3:高级特性

@Configuration
public class AdvancedConfig {

    // 自定义配置
    @Bean
    public CustomProcessor customProcessor() {
        CustomProcessor processor = new CustomProcessor();
        processor.setThreadPoolSize(10);
        processor.setQueueCapacity(100);
        processor.setTimeout(5000);
        return processor;
    }

    // 拦截器配置
    @Bean
    public Interceptor customInterceptor() {
        return new Interceptor() {
            @Override
            public void preProcess(Context context) {
                System.out.println("前置处理: " + context);
            }

            @Override
            public void postProcess(Context context) {
                System.out.println("后置处理: " + context);
            }
        };
    }
}

代码示例 4:异常处理

@ControllerAdvice
public class GlobalExceptionHandler {

    @ExceptionHandler(BusinessException.class)
    public ResponseEntity<ErrorResponse> handleBusinessException(
            BusinessException ex) {
        ErrorResponse error = new ErrorResponse();
        error.setCode(ex.getCode());
        error.setMessage(ex.getMessage());
        error.setTimestamp(System.currentTimeMillis());
        return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(error);
    }

    @ExceptionHandler(Exception.class)
    public ResponseEntity<ErrorResponse> handleException(Exception ex) {
        ErrorResponse error = new ErrorResponse();
        error.setCode(500);
        error.setMessage("系统内部错误");
        error.setTimestamp(System.currentTimeMillis());
        return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(error);
    }
}

代码示例 5:性能优化

@Component
public class PerformanceOptimizer {

    // 缓存配置
    @Cacheable(value = "dataCache", key = "#id")
    public Data getDataById(Long id) {
        return dataRepository.findById(id);
    }

    // 异步处理
    @Async
    public CompletableFuture<Result> asyncProcess(Request request) {
        Result result = processData(request);
        return CompletableFuture.completedFuture(result);
    }

    // 批量处理
    public List<Result> batchProcess(List<Request> requests) {
        return requests.parallelStream()
            .map(this::processData)
            .collect(Collectors.toList());
    }
}

代码示例 6:监控指标

@Component
public class MetricsCollector {

    @Autowired
    private MeterRegistry meterRegistry;

    // 计数器
    public void recordRequest(String endpoint) {
        Counter counter = Counter.builder("api.requests")
            .tag("endpoint", endpoint)
            .register(meterRegistry);
        counter.increment();
    }

    // 计时器
    public void recordLatency(String operation, long duration) {
        Timer timer = Timer.builder("operation.latency")
            .tag("operation", operation)
            .register(meterRegistry);
        timer.record(duration, TimeUnit.MILLISECONDS);
    }

    // 仪表盘
    public void recordGauge(String metric, double value) {
        Gauge.builder(metric, () -> value)
            .register(meterRegistry);
    }
}

代码示例 7:集成测试

@SpringBootTest
@AutoConfigureMockMvc
public class IntegrationTest {

    @Autowired
    private MockMvc mockMvc;

    @Test
    public void testEndpoint() throws Exception {
        mockMvc.perform(get("/api/data/1")
                .contentType(MediaType.APPLICATION_JSON))
            .andExpect(status().isOk())
            .andExpect(jsonPath("$.code").value(200))
            .andExpect(jsonPath("$.data").exists());
    }

    @Test
    public void testErrorHandling() throws Exception {
        mockMvc.perform(post("/api/data")
                .contentType(MediaType.APPLICATION_JSON)
                .content("{}"))
            .andExpect(status().isBadRequest())
            .andExpect(jsonPath("$.code").value(400));
    }
}

代码示例 8:生产环境配置

# application-prod.yml
spring:
  cloud:
    example:
      enabled: true
      config:
        # 连接池配置
        pool:
          max-size: 50
          min-idle: 10
          max-wait: 3000
        # 超时配置
        timeout:
          connect: 5000
          read: 10000
          write: 10000
        # 重试配置
        retry:
          max-attempts: 3
          backoff-delay: 1000
        # 熔断配置
        circuit-breaker:
          enabled: true
          failure-threshold: 50
          wait-duration: 10000

# 日志配置
logging:
  level:
    root: INFO
    com.example: DEBUG
  pattern:
    console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"

代码示例 9:安全配置

@Configuration
@EnableWebSecurity
public class SecurityConfig {

    @Bean
    public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
        http
            .csrf().disable()
            .authorizeHttpRequests(auth -> auth
                .requestMatchers("/api/public/**").permitAll()
                .requestMatchers("/api/admin/**").hasRole("ADMIN")
                .anyRequest().authenticated()
            )
            .oauth2ResourceServer(oauth2 -> oauth2
                .jwt(jwt -> jwt.decoder(jwtDecoder()))
            );
        return http.build();
    }

    @Bean
    public JwtDecoder jwtDecoder() {
        return NimbusJwtDecoder.withJwkSetUri("https://auth.example.com/.well-known/jwks.json").build();
    }
}

代码示例 10:Docker 部署

# Dockerfile
FROM openjdk:17-jdk-slim
WORKDIR /app
COPY target/demo-service.jar app.jar
EXPOSE 8080
ENTRYPOINT ["java", "-jar", "app.jar"]

# docker-compose.yml
version: '3.8'
services:
  demo-service:
    build: .
    ports:
      - "8080:8080"
    environment:
      - SPRING_PROFILES_ACTIVE=prod
      - JAVA_OPTS=-Xmx512m -Xms256m
    networks:
      - microservices
    restart: unless-stopped

networks:
  microservices:
    driver: bridge
最佳实践:

实践练习

练习任务:
  1. 基础搭建:搭建分布式事务的基础环境和配置
  2. 功能实现:实现核心功能和业务逻辑
  3. 性能测试:进行压力测试,优化性能瓶颈
  4. 异常处理:完善异常处理和错误恢复机制
  5. 监控集成:集成监控系统,收集关键指标
  6. 安全加固:实施安全措施,保护系统安全
  7. 文档编写:编写技术文档和使用手册
  8. 生产部署:部署到生产环境,验证稳定性

常见问题

Q: 如何选择合适的技术方案?

A: 根据业务规模、团队技术栈、性能要求等因素综合考虑。小规模项目选择简单方案,大规模项目选择成熟稳定的方案。

Q: 如何保证系统的高可用性?

A: 采用集群部署、负载均衡、熔断降级、限流保护等措施,确保单点故障不影响整体服务。

Q: 如何进行性能优化?

A: 使用缓存减少数据库访问、异步处理提升响应速度、批量操作减少网络开销、合理配置线程池和连接池。

总结

分布式事务是微服务架构的重要组成部分。通过本课学习,你应该掌握: