日志收集、消息队列
本课程将通过一个完整的实战项目,详细介绍如何使用Kafka构建日志收集和消息队列系统。我们将从项目规划、架构设计到代码实现,全面覆盖实际应用中的各个环节,帮助你将前面所学的理论知识转化为实际应用能力。
在现代分布式系统中,日志收集和消息队列是两个非常重要的组件:
项目目标:构建一个基于Kafka的日志收集和消息处理系统,实现以下功能:
Kafka在本项目中扮演着核心角色,主要负责:
| 组件 | 技术选型 | 作用 |
|---|---|---|
| 日志采集 | Filebeat | 轻量级日志采集器,部署在应用服务器上 |
| 消息队列 | Kafka | 存储和转发日志消息 |
| 数据处理 | Logstash | 对日志进行过滤、转换等处理 |
| 存储与分析 | Elasticsearch + Kibana | 存储日志并提供查询分析界面 |
// server.properties 核心配置
// 1. 基本配置
broker.id=0
listeners=PLAINTEXT://:9092
log.dirs=/path/to/kafka/logs
// 2. 副本配置
default.replication.factor=3
min.insync.replicas=2
// 3. 性能优化
num.partitions=8
topic.num.partitions=8
// 4. 日志配置
log.retention.hours=168 // 7天
log.segment.bytes=1073741824 // 1GB
// 5. 连接配置
zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
zookeeper.connection.timeout.ms=6000
// 创建日志主题
bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic application-logs \
--partitions 8 \
--replication-factor 3 \
--config retention.ms=604800000 // 7天
// 创建错误日志主题
bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic error-logs \
--partitions 4 \
--replication-factor 3 \
--config retention.ms=2592000000 // 30天
// filebeat.yml 配置
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/app/*.log
multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
multiline.negate: true
multiline.match: after
output.kafka:
hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
topic: 'application-logs'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
// LogConsumer.java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
public class LogConsumer {
public static void main(String[] args) {
// 1. 配置消费者
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "log-processing-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
// 2. 创建消费者
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("application-logs", "error-logs"));
// 3. 消费消息
try {
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
// 处理日志消息
processLog(record.value());
}
// 手动提交偏移量
consumer.commitSync();
}
} finally {
consumer.close();
}
}
private static void processLog(String logMessage) {
// 这里实现日志处理逻辑
System.out.println("Processing log: " + logMessage);
// 可以进行日志解析、过滤、转换等操作
}
}
# log_producer.py
from kafka import KafkaProducer
import json
import time
# 创建生产者
producer = KafkaProducer(
bootstrap_servers=['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8'),
acks='all', # 最高可靠性
retries=5,
batch_size=16384,
linger_ms=10
)
def send_log(log_level, message, service_name):
"""发送日志到Kafka"""
log_data = {
'timestamp': time.time(),
'level': log_level,
'message': message,
'service': service_name,
'host': 'app-server-01'
}
topic = 'error-logs' if log_level == 'ERROR' else 'application-logs'
# 发送消息
future = producer.send(
topic=topic,
key=service_name,
value=log_data
)
# 等待发送完成
try:
record_metadata = future.get(timeout=10)
print(f"Log sent to {record_metadata.topic}:{record_metadata.partition}:{record_metadata.offset}")
except Exception as e:
print(f"Error sending log: {e}")
# 示例用法
if __name__ == "__main__":
send_log('INFO', 'Application started successfully', 'user-service')
send_log('ERROR', 'Database connection failed', 'payment-service')
send_log('WARN', 'Disk space running low', 'storage-service')
# 关闭生产者
producer.close()
问题:Kafka broker无法启动,报错"Connection to Zookeeper failed"
解决方案:
问题:生产者发送的消息在某些情况下丢失
解决方案:
问题:消费者处理消息的速度跟不上生产速度
解决方案:
完成基础项目后,可以考虑以下扩展方向:
通过本实战项目的学习,你应该已经掌握了:
Kafka作为一个强大的分布式消息系统,在现代数据架构中发挥着越来越重要的作用。通过不断实践和学习,你将能够更加熟练地使用Kafka解决实际业务问题,成为真正的Kafka高手!