Producer消息发送
本课程将深入讲解Kafka Producer的详细配置和使用技巧。Producer是Kafka消息系统的入口,负责将业务数据发送到Kafka集群。我们将学习Producer的API使用、配置参数、错误处理、性能优化等关键内容。
Kafka提供了两种主要的Producer API:同步发送和异步发送。
同步发送会阻塞当前线程,直到收到Broker的确认响应。
// Java同步发送示例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
Producer producer = new KafkaProducer<>(props);
// 创建消息
ProducerRecord record = new ProducerRecord<>(
"test-topic", "key1", "Hello Kafka!");
try {
// 同步发送
RecordMetadata metadata = producer.send(record).get();
System.out.println("消息发送成功,分区:" + metadata.partition() +
", 偏移量:" + metadata.offset());
} catch (Exception e) {
System.err.println("消息发送失败:" + e.getMessage());
} finally {
producer.close();
}
异步发送不会阻塞当前线程,通过回调函数处理发送结果。
// Java异步发送示例
Producer producer = new KafkaProducer<>(props);
ProducerRecord record = new ProducerRecord<>(
"test-topic", "key2", "Hello Kafka Async!");
// 异步发送,使用回调函数
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("消息发送成功,分区:" + metadata.partition() +
", 偏移量:" + metadata.offset());
} else {
System.err.println("消息发送失败:" + exception.getMessage());
}
}
});
// 可以继续执行其他操作
producer.close();
Producer的性能和可靠性很大程度上取决于配置参数的选择。
| 配置项 | 说明 | 推荐值 | 影响 |
|---|---|---|---|
acks |
消息确认机制 | all(生产环境) | 可靠性 vs 性能 |
retries |
重试次数 | Integer.MAX_VALUE | 故障恢复能力 |
retry.backoff.ms |
重试间隔 | 100 | 重试策略 |
enable.idempotence |
幂等性 | true | 避免重复消息 |
| 配置项 | 说明 | 推荐值 | 影响 |
|---|---|---|---|
batch.size |
批次大小 | 16384(16KB) | 吞吐量 vs 延迟 |
linger.ms |
批次等待时间 | 5-100 | 延迟 vs 吞吐量 |
buffer.memory |
缓冲区大小 | 33554432(32MB) | 内存使用 |
compression.type |
压缩类型 | snappy | 网络带宽 vs CPU |
| 配置项 | 说明 | 推荐值 | 影响 |
|---|---|---|---|
max.request.size |
最大请求大小 | 1048576(1MB) | 大消息处理 |
request.timeout.ms |
请求超时时间 | 30000 | 网络容错 |
max.block.ms |
阻塞最大时间 | 60000 | 发送阻塞 |
connections.max.idle.ms |
连接空闲时间 | 540000 | 连接复用 |
分区策略决定了消息如何分配到不同的Partition,影响消息的顺序性和负载均衡。
// 自定义分区器示例
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 根据业务逻辑选择分区
if (key == null) {
// 没有Key,使用轮询
return ThreadLocalRandom.current().nextInt(numPartitions);
} else {
// 有Key,根据业务规则选择分区
String keyStr = key.toString();
// 示例:根据用户ID选择分区
if (keyStr.startsWith("user-")) {
String userId = keyStr.substring(5);
return Math.abs(userId.hashCode()) % numPartitions;
} else {
// 其他类型使用默认哈希
return Math.abs(keyStr.hashCode()) % numPartitions;
}
}
}
@Override
public void close() {}
@Override
public void configure(Map configs) {}
}
// 使用自定义分区器
props.put("partitioner.class", "com.example.CustomPartitioner");
| 配置项 | 功能说明 | 推荐值 |
|---|---|---|
| 配置1 | 功能描述 | 推荐配置 |
| 配置2 | 功能描述 | 推荐配置 |
| 配置3 | 功能描述 | 推荐配置 |
通过本课程的学习,你应该已经掌握了Kafka 生产者的核心知识。生产者是Kafka的重要功能,在分布式系统中有广泛应用。继续深入学习和实践,你将能够更好地运用Kafka构建高性能的消息系统。