← 返回目录

第3课: 生产者

Producer消息发送

课程简介

本课程将深入讲解Kafka Producer的详细配置和使用技巧。Producer是Kafka消息系统的入口,负责将业务数据发送到Kafka集群。我们将学习Producer的API使用、配置参数、错误处理、性能优化等关键内容。

核心知识点

Producer API详解

Kafka提供了两种主要的Producer API:同步发送和异步发送。

1. 同步发送

同步发送会阻塞当前线程,直到收到Broker的确认响应。

// Java同步发送示例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");

Producer producer = new KafkaProducer<>(props);

// 创建消息
ProducerRecord record = new ProducerRecord<>(
    "test-topic", "key1", "Hello Kafka!");

try {
    // 同步发送
    RecordMetadata metadata = producer.send(record).get();
    System.out.println("消息发送成功,分区:" + metadata.partition() + 
                       ", 偏移量:" + metadata.offset());
} catch (Exception e) {
    System.err.println("消息发送失败:" + e.getMessage());
} finally {
    producer.close();
}

2. 异步发送

异步发送不会阻塞当前线程,通过回调函数处理发送结果。

// Java异步发送示例
Producer producer = new KafkaProducer<>(props);

ProducerRecord record = new ProducerRecord<>(
    "test-topic", "key2", "Hello Kafka Async!");

// 异步发送,使用回调函数
producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            System.out.println("消息发送成功,分区:" + metadata.partition() + 
                           ", 偏移量:" + metadata.offset());
        } else {
            System.err.println("消息发送失败:" + exception.getMessage());
        }
    }
});

// 可以继续执行其他操作
producer.close();

Producer关键配置详解

Producer的性能和可靠性很大程度上取决于配置参数的选择。

1. 可靠性配置

配置项 说明 推荐值 影响
acks 消息确认机制 all(生产环境) 可靠性 vs 性能
retries 重试次数 Integer.MAX_VALUE 故障恢复能力
retry.backoff.ms 重试间隔 100 重试策略
enable.idempotence 幂等性 true 避免重复消息

2. 性能配置

配置项 说明 推荐值 影响
batch.size 批次大小 16384(16KB) 吞吐量 vs 延迟
linger.ms 批次等待时间 5-100 延迟 vs 吞吐量
buffer.memory 缓冲区大小 33554432(32MB) 内存使用
compression.type 压缩类型 snappy 网络带宽 vs CPU

3. 网络配置

配置项 说明 推荐值 影响
max.request.size 最大请求大小 1048576(1MB) 大消息处理
request.timeout.ms 请求超时时间 30000 网络容错
max.block.ms 阻塞最大时间 60000 发送阻塞
connections.max.idle.ms 连接空闲时间 540000 连接复用

分区策略详解

分区策略决定了消息如何分配到不同的Partition,影响消息的顺序性和负载均衡。

1. 默认分区策略

默认策略(DefaultPartitioner):

2. 自定义分区策略

// 自定义分区器示例
public class CustomPartitioner implements Partitioner {
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, 
                        Object value, byte[] valueBytes, Cluster cluster) {
        
        List partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        
        // 根据业务逻辑选择分区
        if (key == null) {
            // 没有Key,使用轮询
            return ThreadLocalRandom.current().nextInt(numPartitions);
        } else {
            // 有Key,根据业务规则选择分区
            String keyStr = key.toString();
            
            // 示例:根据用户ID选择分区
            if (keyStr.startsWith("user-")) {
                String userId = keyStr.substring(5);
                return Math.abs(userId.hashCode()) % numPartitions;
            } else {
                // 其他类型使用默认哈希
                return Math.abs(keyStr.hashCode()) % numPartitions;
            }
        }
    }
    
    @Override
    public void close() {}
    
    @Override
    public void configure(Map configs) {}
}

// 使用自定义分区器
props.put("partitioner.class", "com.example.CustomPartitioner");

3. 分区策略选择原则

配置参考

配置项 功能说明 推荐值
配置1 功能描述 推荐配置
配置2 功能描述 推荐配置
配置3 功能描述 推荐配置
重要提示:

性能优化建议

实践练习

练习任务:
  1. 完成生产者的基础操作练习
  2. 实现一个实际应用场景
  3. 进行性能测试和调优
  4. 排查和解决常见问题
  5. 探索更多高级特性
  6. 参考官方文档深入学习
  7. 搭建测试环境验证
  8. 总结学习经验和技巧

总结

通过本课程的学习,你应该已经掌握了Kafka 生产者的核心知识。生产者是Kafka的重要功能,在分布式系统中有广泛应用。继续深入学习和实践,你将能够更好地运用Kafka构建高性能的消息系统。