郴州市北湖建设局网站,dedecms怎么做网站,小公司企业简介300字,南宁网站提升排名Go实现LogCollect#xff1a;海量日志收集系统【上篇——LogAgent实现】
下篇#xff1a;Go实现LogCollect#xff1a;海量日志收集系统【下篇——开发LogTransfer】
项目架构图#xff1a;
0 项目背景与方案选择
背景 当公司发展的越来越大#xff0c;业务越来越复杂…Go实现LogCollect海量日志收集系统【上篇——LogAgent实现】
下篇Go实现LogCollect海量日志收集系统【下篇——开发LogTransfer】
项目架构图
0 项目背景与方案选择
背景 当公司发展的越来越大业务越来越复杂时每个业务系统都有自己的日志。此时我们就应该将不同业务线的日志进行实时收集存储到一个日志收集中心最后再通过web页面展示出来。 解决方案 把机器上的日志实时收集统一的存储到中心系统对这些日志建立索引通过搜索即可以找到对应日志提供界面友好的web界面通过web即可以完成日志搜索 该系统可能会出现的问题
实时日志量非常大每天几十亿条日志准实时收集 延迟控制在分钟级别能够水平可扩展
方案选择与设计
①方案选择: 早期的ELKElasticsearch,Logstash, Kibana到现在的EFKElasticsearch,FilebeatorFluentd, Kibana。ELK在每台服务器上部署logstash比较重量级所以演化成客户端部署filebeat的EFK由filebeat收集向logstash中写数据最后落地到elasticsearch通过kibana界面进行日志检索。其中Logstash主要用于收集、解析、转换 优现成的解决方案可以直接拿来使用缺运维成本高每增加一个日志收集项都需要手动修改配置无法准确获取logstash的状态无法做到定制化开发与维护 方案设计 各个组件说明 Log Agent日志收集客户端用来收集服务器上的日志Kafka高吞吐量的分布式消息队列Elasticsearch开源搜索引擎框架提供基于http RESTFul的web接口Flink、Spark分布式计算框架能够对大量数据进行分布式处理 1 开发
1.1 收集日志信息到Kafka
①docker-compose搭建kafka vim docker-compose.ymldocker-compose.yml:
version: 3
services:zookeeper:image: confluentinc/cp-zookeeper:6.2.0ports:- 2181:2181environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000kafka:image: confluentinc/cp-kafka:6.2.0ports:- 9092:9092environment:KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181#KAFKA_ADVERTISED_LISTENERS后面改为自己本地宿主机的ip例如我本地mac的ip为192.168.0.101KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.101:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1depends_on:- zookeeper# 进入到docker-compose.yml所在目录执行下面命令
docker-compose up -d
# 查看部署结果,状态为up表明部署成功
docker-compose ps ②创建topic并通过golang消费数据
# 1. 创建对应topic
docker-compose exec kafka kafka-topics --create --topic nginx_log --partitions 1 --replication-factor 1 --bootstrap-server 192.168.0.101:9092# 2. 查看topic列表
docker-compose exec kafka kafka-topics --list --zookeeper zookeeper:2181//golang中操作kafka的库
go get github.com/IBM/saramapackage mainimport (fmttimegithub.com/IBM/sarama
)func main() {config : sarama.NewConfig()config.Producer.RequiredAcks sarama.WaitForAll // 发送完数据需要leader和follow都确认config.Producer.Partitioner sarama.NewRandomPartitioner // 新选出⼀个partitionconfig.Producer.Return.Successes true // 成功交付的消息将在success channel返回// 连接kafkaclient, err : sarama.NewSyncProducer([]string{localhost:9092}, config)if err ! nil {fmt.Println(producer close, err:, err)return}defer client.Close()for {// 构造⼀个消息msg : sarama.ProducerMessage{}msg.Topic nginx_logmsg.Value sarama.StringEncoder(this is a good test, my message is good)// 发送消息pid, offset, err : client.SendMessage(msg)if err ! nil {fmt.Println(send message failed,, err)return}fmt.Printf(pid:%v offset:%v\n, pid, offset)time.Sleep(10 * time.Millisecond)}
}
1.2 简单版本LogAgent的实现 根据log_agent.conf的LogAgent配置初始化LogAgent参数确认LogAgent工作日志(log_agent.log)的存放位置tail读取nginx_log.log日志信息将读取到的信息通过kafka连接发送到kafka中kafka消费对应的信息 ①代码结构 .├─conf│ log_agent.conf│├─kafka│ kafka.go │ ├─consumer│ consumer.go│├─logs│ log_agent.log│├─main│ config.go│ log.go│ main.go│ server.go│├─tailf│ tail.go│ go.mod└─ go.sum ②代码
1. conf/log_agent.confLogAgent的配置文件
[logs]
log_level debug
log_path /Users/xxx/GolandProjects/LogAgent/log/log_agent.log[collect]
log_path /Users/xxx/GolandProjects/LogAgent/nginx_log.log
topic nginx_log
chan_size 100[kafka]
server_addr localhost:90922. kafka/consumer/consumer.go创建kafka消费者 用于消费发送到kafka分区中的数据 package mainimport (fmtgithub.com/IBM/sarama
)// kafka consumerfunc main() {consumer, err : sarama.NewConsumer([]string{localhost:9092}, nil)if err ! nil {fmt.Printf(fail to start consumer, err:%v\n, err)return}partitionList, err : consumer.Partitions(nginx_log) // 根据topic取到所有的分区if err ! nil {fmt.Printf(fail to get list of partition:err%v\n, err)return}fmt.Println(partitionList)for partition : range partitionList { // 遍历所有的分区// 针对每个分区创建一个对应的分区消费者pc, err : consumer.ConsumePartition(nginx_log, int32(partition), sarama.OffsetNewest)if err ! nil {fmt.Printf(failed to start consumer for partition %d,err:%v\n, partition, err)return}defer pc.AsyncClose()// 异步从每个分区消费信息go func(sarama.PartitionConsumer) {for msg : range pc.Messages() {fmt.Printf(Partition:%d Offset:%d Key:%v Value:%v\n, msg.Partition, msg.Offset, msg.Key, string(msg.Value))}}(pc)}//演示时使用select {}
}3. kafka/kafka.go初始化kafka向kafka中发送数据
package kafkaimport (github.com/IBM/saramagithub.com/astaxie/beego/logs
)var (client sarama.SyncProducer
)func InitKafka(addr string) (err error) {// Kafka生产者配置config : sarama.NewConfig()config.Producer.RequiredAcks sarama.WaitForAll // 发送完数据需要leader和follow都确认config.Producer.Partitioner sarama.NewRandomPartitioner // 新选出⼀个partitionconfig.Producer.Return.Successes true // 成功交付的消息将在success channel返回// 新建一个生产者对象client, err sarama.NewSyncProducer([]string{addr}, config)if err ! nil {logs.Error(初识化Kafka producer失败:, err)return}logs.Debug(初始化Kafka producer成功,地址为:, addr)return
}func SendToKafka(data, topic string) (err error) {msg : sarama.ProducerMessage{}msg.Topic topicmsg.Value sarama.StringEncoder(data)pid, offset, err : client.SendMessage(msg)if err ! nil {logs.Error(发送信息失败, err:%v, data:%v, topic:%v, err, data, topic)return}logs.Debug(read success, pid:%v, offset:%v, topic:%v\n, pid, offset, topic)return
}
4. main/config.go用于解析log_agent.conf文件
package mainimport (LogAgent/tailferrorsfmtgithub.com/astaxie/beego/config
)var (logConfig *Config
)// 日志配置
type Config struct {logLevel stringlogPath stringchanSize intKafkaAddr stringCollectConf []tailf.CollectConf
}// 日志收集配置
func loadCollectConf(conf config.Configer) (err error) {var c tailf.CollectConfc.LogPath conf.String(collect::log_path)if len(c.LogPath) 0 {err errors.New(无效的 collect::log_path )return}c.Topic conf.String(collect::topic)if len(c.Topic) 0 {err errors.New(无效的 collect::topic )return}logConfig.CollectConf append(logConfig.CollectConf, c)return
}// 导入解析LogAgent初始化配置
func loadInitConf(confType, filename string) (err error) {conf, err : config.NewConfig(confType, filename)if err ! nil {fmt.Printf(初始化配置文件出错:%v\n, err)return}// 导入配置信息logConfig Config{}// 日志级别logConfig.logLevel conf.String(logs::log_level)if len(logConfig.logLevel) 0 {logConfig.logLevel debug}// 日志输出路径logConfig.logPath conf.String(logs::log_path)if len(logConfig.logPath) 0 {logConfig.logPath /Users/xxx/GolandProjects/LogAgent/log/log_agent.log}// 管道大小logConfig.chanSize, err conf.Int(collect::chan_size)if err ! nil {logConfig.chanSize 100}// KafkalogConfig.KafkaAddr conf.String(kafka::server_addr)if len(logConfig.KafkaAddr) 0 {err fmt.Errorf(初识化Kafka失败)return}err loadCollectConf(conf)if err ! nil {fmt.Printf(导入日志收集配置错误:%v, err)return}return
}5. main/log.go初始化LogAgent的日志打印
package mainimport (encoding/jsonfmtgithub.com/astaxie/beego/logs
)func convertLogLevel(level string) int {switch level {case debug:return logs.LevelDebugcase warn:return logs.LevelWarncase info:return logs.LevelInfocase trace:return logs.LevelTrace}return logs.LevelDebug
}func initLogger() (err error) {config : make(map[string]interface{})config[filename] logConfig.logPathconfig[level] convertLogLevel(logConfig.logLevel)configStr, err : json.Marshal(config)if err ! nil {fmt.Println(初始化日志, 序列化失败:, err)return}_ logs.SetLogger(logs.AdapterFile, string(configStr))return
}6. main/main.go服务入口
package mainimport (LogAgent/kafkaLogAgent/tailffmtgithub.com/astaxie/beego/logs
)func main() {fmt.Println(开始)// 读取logAgent配置文件filename : /Users/xxx/GolandProjects/LogAgent/conf/log_agent.conferr : loadInitConf(ini, filename)if err ! nil {fmt.Printf(导入配置文件错误:%v\n, err)panic(导入配置文件错误)return}// 初始化日志信息err initLogger()if err ! nil {fmt.Printf(导入日志文件错误:%v\n, err)panic(导入日志文件错误)return}// 输出成功信息logs.Debug(导入日志成功%v, logConfig)// 初始化tailf解析nginx_log日志文件所在路径等管道大小err tailf.InitTail(logConfig.CollectConf, logConfig.chanSize)if err ! nil {logs.Error(初始化tailf失败:, err)return}logs.Debug(初始化tailf成功!)// 初始化Kafkaerr kafka.InitKafka(logConfig.KafkaAddr)if err ! nil {logs.Error(初识化kafka producer失败:, err)return}logs.Debug(初始化Kafka成功!)// 运行err serverRun()if err ! nil {logs.Error(serverRun failed:, err)}logs.Info(程序退出)
}7. main/server.go向kafka发送数据
package mainimport (LogAgent/kafkaLogAgent/tailffmtgithub.com/astaxie/beego/logstime
)func serverRun() (err error) {for {msg : tailf.GetOneLine()err sendToKafka(msg)if err ! nil {logs.Error(发送消息到Kafka 失败, err:%v, err)time.Sleep(time.Second)continue}}}func sendToKafka(msg *tailf.TextMsg) (err error) {fmt.Printf(读取 msg:%s, topic:%s\n, msg.Msg, msg.Topic) // 将消息打印在终端_ kafka.SendToKafka(msg.Msg, msg.Topic)return
}8. tailf/tail.go用于读取nginx_log.log中的日志信息并将信息发送到kafka
package tailfimport (fmtgithub.com/astaxie/beego/logsgithub.com/hpcloud/tailtime
)// 将日志收集配置放在tailf包下,方便其他包引用
type CollectConf struct {LogPath stringTopic string
}// 存入Collect
type TailObj struct {tail *tail.Tailconf CollectConf
}// 定义Message信息
type TextMsg struct {Msg stringTopic string
}// 管理系统所有tail对象
type TailObjMgr struct {tailsObjs []*TailObjmsgChan chan *TextMsg
}// 定义全局变量
var (tailObjMgr *TailObjMgr
)func GetOneLine() (msg *TextMsg) {msg -tailObjMgr.msgChanreturn
}func InitTail(conf []CollectConf, chanSize int) (err error) {// 加载配置项if len(conf) 0 {err fmt.Errorf(无效的log collect conf:%v, conf)return}tailObjMgr TailObjMgr{msgChan: make(chan *TextMsg, chanSize), // 定义Chan管道}// 循环导入for _, v : range conf {// 初始化Tailfmt.Println(v)tails, errTail : tail.TailFile(v.LogPath, tail.Config{ReOpen: true,Follow: true,Location: tail.SeekInfo{Offset: 0, Whence: 0},MustExist: false,Poll: true,})if errTail ! nil {err errTailfmt.Println(tail 操作文件错误:, err)return}// 导入配置项obj : TailObj{conf: v,tail: tails,}tailObjMgr.tailsObjs append(tailObjMgr.tailsObjs, obj)go readFromTail(obj)}return
}// 读入日志数据
func readFromTail(tailObj *TailObj) {for true {msg, ok : -tailObj.tail.Linesif !ok {logs.Warn(Tail file close reopen, filename:%s\n, tailObj.tail.Filename)time.Sleep(100 * time.Millisecond)continue}textMsg : TextMsg{Msg: msg.Text,Topic: tailObj.conf.Topic,}// 放入chan里面tailObjMgr.msgChan - textMsg}
}③效果 消费结果
tailf读取nginx_log.log文件中的日志信息并发送到kafka由kakfa的消费者来进行消费 如果发现无法访问到docker中的kafka了可能是因为你物理主机的ip更换了。docker-compose down暂停部署然后重新修改docker-compose.yml中kafka绑定的物理主机IP即可然后docker-compose up -d 重新部署。 1.3 引入etcd创建多个tailtask
①环境准备docker启动etcd与项目结构
1. docker启动etcd搭建etcd集群
新建一个docker网络方便etcd集群内部通信
docker network create etcd-network启动etcd1etcd第一个节点
docker run -d --name etcd1 --network etcd-network -p 2379:2379 -p 2380:2380 quay.io/coreos/etcd:v3.4.13 etcd \
--name etcd1 \
--advertise-client-urls http://0.0.0.0:2379 \
--listen-client-urls http://0.0.0.0:2379 \
--initial-advertise-peer-urls http://0.0.0.0:2380 \
--listen-peer-urls http://0.0.0.0:2380 \
--initial-cluster-token etcd-cluster-1 \
--initial-cluster etcd1http://0.0.0.0:2380 \
--initial-cluster-state new启动etcd2
docker run -d --name etcd2 --network etcd-network -p 22379:2379 -p 22380:2380 quay.io/coreos/etcd:v3.4.13 etcd \
--name etcd2 \
--advertise-client-urls http://0.0.0.0:22379 \
--listen-client-urls http://0.0.0.0:22379 \
--initial-advertise-peer-urls http://0.0.0.0:22380 \
--listen-peer-urls http://0.0.0.0:22380 \
--initial-cluster-token etcd-cluster-1 \
--initial-cluster etcd1http://etcd1:2380,etcd2http://0.0.0.0:22380 \
--initial-cluster-state existing启动etcd3
docker run -d --name etcd3 --network etcd-network -p 32379:2379 -p 32380:2380 quay.io/coreos/etcd:v3.4.13 etcd \
--name etcd3 \
--advertise-client-urls http://0.0.0.0:32379 \
--listen-client-urls http://0.0.0.0:32379 \
--initial-advertise-peer-urls http://0.0.0.0:32380 \
--listen-peer-urls http://0.0.0.0:32380 \
--initial-cluster-token etcd-cluster-1 \
--initial-cluster etcd1http://etcd1:2380,etcd2http://etcd2:2380,etcd3http://0.0.0.0:32380 \
--initial-cluster-state existing这样我们就成功在Docker中搭建了一个由3个etcd节点组成的集群并分别暴露了端口2379、22379和32379。您可以使用docker ps命令来查看正在运行的容器使用docker logs container_name命令来查看每个etcd容器的日志 2. 项目结构
.
│ go.mod
│ go.sum
│
│
├─conf
│ log_agent.conf
│
├─kafka
│ kafka.go
│
├─logs
│ log_agent.log
│
├─main
│ config.go
│ etcd.go
│ ip.go
│ log.go
│ main.go
│ server.go
│
├─tailf
│ tail.go
│
└─tools└─SetConfmain.go
②代码
1. tools/SetConf/main.go将配置信息存入etcd
package mainimport (LogAgent/tailfcontextencoding/jsonfmtgo.etcd.io/etcd/client/v3time
)// 定义etcd的前缀key
const (EtcdKey /backend/logagent/config/192.168.0.101
)func SetLogConfToEtcd() {cli, err : clientv3.New(clientv3.Config{Endpoints: []string{localhost:2379, localhost:22379, localhost:32379},DialTimeout: 5 * time.Second,})if err ! nil {fmt.Println(connect failed, err:, err)return}fmt.Println(connect succ)defer cli.Close()var logConfArr []tailf.CollectConflogConfArr append(logConfArr,tailf.CollectConf{LogPath: /Users/xxx/GolandProjects/LogAgent/mysql_log.log,Topic: mysql_log,},)logConfArr append(logConfArr,tailf.CollectConf{LogPath: /Users/xxx/GolandProjects/LogAgent/nginx_log.log,Topic: nginx_log,},)// Json打包data, err : json.Marshal(logConfArr)if err ! nil {fmt.Println(json failed, , err)return}ctx, cancel : context.WithTimeout(context.Background(), time.Second)_, err cli.Put(ctx, EtcdKey, string(data))cancel()if err ! nil {fmt.Println(put failed, err:, err)return}ctx, cancel context.WithTimeout(context.Background(), time.Second)resp, err : cli.Get(ctx, EtcdKey)cancel()if err ! nil {fmt.Println(get failed, err:, err)return}for _, ev : range resp.Kvs {fmt.Printf(%s : %s\n, ev.Key, ev.Value)}
}func main() {SetLogConfToEtcd()
}注意编写完之后要先运行该代码将对应的k-v存入etcd然后再启动LogAgent因为我们的LogAgent会从etcd中获取对应配置 2. main/etcd.go 用于初始化连接etcd、从etcd中取出配置信息 package mainimport (LogAgent/tailfcontextencoding/jsonfmtgithub.com/astaxie/beego/logsclientv3 go.etcd.io/etcd/client/v3stringstime
)type EtcdClient struct {client *clientv3.Client
}var (etcdClient *EtcdClient
)func initEtcd(addr string, key string) (collectConf []tailf.CollectConf, err error) {// 初始化连接etcdcli, err : clientv3.New(clientv3.Config{//Endpoints: []string{localhost:2379, localhost:22379, localhost:32379},Endpoints: []string{addr},DialTimeout: 5 * time.Second,})if err ! nil {logs.Error(连接etcd失败:, err)return}etcdClient EtcdClient{client: cli,}// 如果Key不是以/结尾, 则自动加上/if strings.HasSuffix(key, /) false {key key /}for _, ip : range localIPArray {etcdKey : fmt.Sprintf(%s%s, key, ip)ctx, cancel : context.WithTimeout(context.Background(), time.Second)resp, err : cli.Get(ctx, etcdKey)if err ! nil {logs.Error(etcd get请求失败:, err)continue}cancel()logs.Debug(resp from etcd:%v, resp.Kvs)for _, v : range resp.Kvs {if string(v.Key) etcdKey {// 将从etcd中取出来的json格式反序列化为结构体err json.Unmarshal(v.Value, collectConf)if err ! nil {logs.Error(反序列化失败:, err)continue}logs.Debug(日志设置为%v, collectConf)}}}logs.Debug(连接etcd成功)return
}3. main/ip.go 获取本机所有网卡ip去连接etcd 考虑到以后添加新服务器时不需要手动添加ip这里将ip信息全部存入localIPArray中 package mainimport (fmtnet
)var (localIPArray []string
)func init() {addrs, err : net.InterfaceAddrs()if err ! nil {panic(fmt.Sprintf(获取网卡ip失败, %v, err))}for _, addr : range addrs {if ipnet, ok : addr.(*net.IPNet); ok !ipnet.IP.IsLoopback() {if ipnet.IP.To4() ! nil {localIPArray append(localIPArray, ipnet.IP.String())}}}fmt.Println(localIPArray)
}4. main/config.go
package mainimport (LogAgent/tailferrorsfmtgithub.com/astaxie/beego/config
)var (logConfig *Config
)// 日志配置
type Config struct {logLevel stringlogPath stringchanSize intKafkaAddr stringCollectConf []tailf.CollectConfetcdAddr stringetcdKey string
}// 日志收集配置
func loadCollectConf(conf config.Configer) (err error) {var c tailf.CollectConfc.LogPath conf.String(collect::log_path)if len(c.LogPath) 0 {err errors.New(无效的 collect::log_path )return}c.Topic conf.String(collect::topic)if len(c.Topic) 0 {err errors.New(无效的 collect::topic )return}logConfig.CollectConf append(logConfig.CollectConf, c)return
}// 导入解析LogAgent初始化配置
func loadInitConf(confType, filename string) (err error) {conf, err : config.NewConfig(confType, filename)if err ! nil {fmt.Printf(初始化配置文件出错:%v\n, err)return}// 导入配置信息logConfig Config{}// 日志级别logConfig.logLevel conf.String(logs::log_level)if len(logConfig.logLevel) 0 {logConfig.logLevel debug}// 日志输出路径logConfig.logPath conf.String(logs::log_path)if len(logConfig.logPath) 0 {logConfig.logPath /Users/xxx/GolandProjects/LogAgent/log/log_agent.log}// 管道大小logConfig.chanSize, err conf.Int(collect::chan_size)if err ! nil {logConfig.chanSize 100}// KafkalogConfig.KafkaAddr conf.String(kafka::server_addr)if len(logConfig.KafkaAddr) 0 {err fmt.Errorf(初识化Kafka失败)return}err loadCollectConf(conf)if err ! nil {fmt.Printf(导入日志收集配置错误:%v, err)return}// etcdlogConfig.etcdAddr conf.String(etcd::addr)if len(logConfig.etcdAddr) 0 {err fmt.Errorf(初识化etcd addr失败)return}logConfig.etcdKey conf.String(etcd::configKey)if len(logConfig.etcdKey) 0 {err fmt.Errorf(初识化etcd configKey失败)return}return
}5. tailf/tail.go 修改tail.go文件添加json标签用于反序列化 package tailfimport (fmtgithub.com/astaxie/beego/logsgithub.com/hpcloud/tailtime
)// 将日志收集配置放在tailf包下,方便其他包引用
type CollectConf struct {LogPath string json:logpathTopic string json:topic
}// 存入Collect
type TailObj struct {tail *tail.Tailconf CollectConf
}// 定义Message信息
type TextMsg struct {Msg stringTopic string
}// 管理系统所有tail对象
type TailObjMgr struct {tailsObjs []*TailObjmsgChan chan *TextMsg
}// 定义全局变量
var (tailObjMgr *TailObjMgr
)func GetOneLine() (msg *TextMsg) {msg -tailObjMgr.msgChanreturn
}func InitTail(conf []CollectConf, chanSize int) (err error) {// 加载配置项if len(conf) 0 {err fmt.Errorf(无效的log collect conf:%v, conf)return}tailObjMgr TailObjMgr{msgChan: make(chan *TextMsg, chanSize), // 定义Chan管道}// 循环导入for _, v : range conf {// 初始化Tailfmt.Println(v)tails, errTail : tail.TailFile(v.LogPath, tail.Config{ReOpen: true,Follow: true,Location: tail.SeekInfo{Offset: 0, Whence: 0},MustExist: false,Poll: true,})if errTail ! nil {err errTailfmt.Println(tail 操作文件错误:, err)return}// 导入配置项obj : TailObj{conf: v,tail: tails,}tailObjMgr.tailsObjs append(tailObjMgr.tailsObjs, obj)go readFromTail(obj)}return
}// 读入日志数据
func readFromTail(tailObj *TailObj) {for true {msg, ok : -tailObj.tail.Linesif !ok {logs.Warn(Tail file close reopen, filename:%s\n, tailObj.tail.Filename)time.Sleep(100 * time.Millisecond)continue}textMsg : TextMsg{Msg: msg.Text,Topic: tailObj.conf.Topic,}// 放入chan里面tailObjMgr.msgChan - textMsg}
}
6. main/main.go 将initEtcd放到InitTail函数之前不然无法从etcd中获取值 package mainimport (LogAgent/kafkaLogAgent/tailffmtgithub.com/astaxie/beego/logs
)func main() {fmt.Println(开始)// 读取初始化配置文件filename : /Users/xxx/GolandProjects/LogAgent/conf/log_agent.conferr : loadInitConf(ini, filename)if err ! nil {fmt.Printf(导入配置文件错误:%v\n, err)panic(导入配置文件错误)return}// 初始化日志信息err initLogger()if err ! nil {fmt.Printf(导入日志文件错误:%v\n, err)panic(导入日志文件错误)return}// 输出成功信息logs.Debug(导入日志成功%v, logConfig)// 初识化etcdcollectConf, err : initEtcd(logConfig.etcdAddr, logConfig.etcdKey)if err ! nil {logs.Error(初始化etcd失败, err)}logs.Debug(初始化etcd成功!)// 初始化tailferr tailf.InitTail(collectConf, logConfig.chanSize)if err ! nil {logs.Error(初始化tailf失败:, err)return}logs.Debug(初始化tailf成功!)// 初始化Kafkaerr kafka.InitKafka(logConfig.KafkaAddr)if err ! nil {logs.Error(初识化Kafka producer失败:, err)return}logs.Debug(初始化Kafka成功!)// 运行err serverRun()if err ! nil {logs.Error(serverRun failed:, err)}logs.Info(程序退出)
}效果 当没有对应日志文件存在时 当对应日志文件存在并有对应内容时
1.4 监听etcd配置项的变更 在真实生产环境中时会常常添加新的服务器, 这时我们需要借助之前的ip.go获取所有ip节点, 并且实时监控修改EtcdClient结构体增加keys ①修改main/etcd.go 在main/etcd.go中添加initEtcdWatcher与watchKey函数并且在函数initEtcd中调用 package mainimport (LogAgent/tailfcontextencoding/jsonfmtgithub.com/astaxie/beego/logsclientv3 go.etcd.io/etcd/client/v3stringstime
)type EtcdClient struct {client *clientv3.Clientkeys []string
}var (etcdClient *EtcdClient
)func initEtcd(addr string, key string) (collectConf []tailf.CollectConf, err error) {// 初始化连接etcdcli, err : clientv3.New(clientv3.Config{//Endpoints: []string{localhost:2379, localhost:22379, localhost:32379},Endpoints: []string{addr},DialTimeout: 5 * time.Second,})if err ! nil {logs.Error(连接etcd失败:, err)return}etcdClient EtcdClient{client: cli,}// 如果Key不是以/结尾, 则自动加上/if strings.HasSuffix(key, /) false {key key /}for _, ip : range localIPArray {etcdKey : fmt.Sprintf(%s%s, key, ip)ctx, cancel : context.WithTimeout(context.Background(), time.Second)resp, err : cli.Get(ctx, etcdKey)if err ! nil {logs.Error(etcd get请求失败:, err)continue}cancel()logs.Debug(resp from etcd:%v, resp.Kvs)for _, v : range resp.Kvs {if string(v.Key) etcdKey {// 将从etcd中取出来的json格式反序列化为结构体err json.Unmarshal(v.Value, collectConf)if err ! nil {logs.Error(反序列化失败:, err)continue}logs.Debug(日志设置为%v, collectConf)}}}logs.Debug(连接etcd成功)initEtcdWatcher(addr)return
}// 初始化多个watch监控etcd中配置节点
func initEtcdWatcher(addr string) {for _, key : range etcdClient.keys {go watchKey(addr, key)}
}func watchKey(addr string, key string) {// 初始化连接etcdcli, err : clientv3.New(clientv3.Config{//Endpoints: []string{localhost:2379, localhost:22379, localhost:32379},Endpoints: []string{addr},DialTimeout: 5 * time.Second,})if err ! nil {logs.Error(连接etcd失败:, err)return}logs.Debug(开始监控key:, key)// Watch操作wch : cli.Watch(context.Background(), key)for resp : range wch {for _, ev : range resp.Events {fmt.Printf(Type: %v, Key:%v, Value:%v\n, ev.Type, string(ev.Kv.Key), string(ev.Kv.Value))}}
}②修改tailf/tail.go
package tailfimport (github.com/astaxie/beego/logsgithub.com/hpcloud/tailtime
)// 定义常量
const (StatusNormal 1 // 正常状态StatusDelete 2 // 删除状态
)// 将日志收集配置放在tailf包下,方便其他包引用
type CollectConf struct {LogPath string json:logpathTopic string json:topic
}// 存入Collect
type TailObj struct {tail *tail.Tailconf CollectConfstatus intexitChan chan int
}// 定义Message信息
type TextMsg struct {Msg stringTopic string
}// 管理系统所有tail对象
type TailObjMgr struct {tailsObjs []*TailObjmsgChan chan *TextMsg
}// 定义全局变量
var (tailObjMgr *TailObjMgr
)func GetOneLine() (msg *TextMsg) {msg -tailObjMgr.msgChanreturn
}// 初始化tail
func InitTail(conf []CollectConf, chanSize int) (err error) {tailObjMgr TailObjMgr{msgChan: make(chan *TextMsg, chanSize), // 定义Chan管道}// 加载配置项if len(conf) 0 {logs.Error(无效的日志collect配置: , conf)}// 循环导入for _, v : range conf {createNewTask(v)}return
}// 读入日志数据
func readFromTail(tailObj *TailObj) {for true {select {case msg, ok : -tailObj.tail.Lines:if !ok {logs.Warn(Tail file close reopen, filename:%s\n, tailObj.tail.Filename)time.Sleep(100 * time.Millisecond)continue}textMsg : TextMsg{Msg: msg.Text,Topic: tailObj.conf.Topic,}// 放入chan里tailObjMgr.msgChan - textMsg// 如果exitChan为1, 则删除对应配置项case -tailObj.exitChan:logs.Warn(tail obj 退出, 配置项为conf:%v, tailObj.conf)return}}
}// 新增etcd配置项
func UpdateConfig(confs []CollectConf) (err error) {// 创建新的tailtaskfor _, oneConf : range confs {// 对于已经运行的所有实例, 路径是否一样var isRuning falsefor _, obj : range tailObjMgr.tailsObjs {// 路径一样则证明是同一实例if oneConf.LogPath obj.conf.LogPath {isRuning trueobj.status StatusNormalbreak}}// 检查是否已经存在if isRuning {continue}// 如果不存在该配置项 新建一个tailtask任务createNewTask(oneConf)}// 遍历所有查看是否存在删除操作var tailObjs []*TailObjfor _, obj : range tailObjMgr.tailsObjs {obj.status StatusDeletefor _, oneConf : range confs {if oneConf.LogPath obj.conf.LogPath {obj.status StatusNormalbreak}}// 如果status为删除, 则将exitChan置为1if obj.status StatusDelete {obj.exitChan - 1}// 将obj存入临时的数组中tailObjs append(tailObjs, obj)}// 将临时数组传入tailsObjs中tailObjMgr.tailsObjs tailObjsreturn
}func createNewTask(conf CollectConf) {// 初始化Tailf实例tails, errTail : tail.TailFile(conf.LogPath, tail.Config{ReOpen: true,Follow: true,Location: tail.SeekInfo{Offset: 0, Whence: 2},MustExist: false,Poll: true,})if errTail ! nil {logs.Error(收集文件[%s]错误: %v, conf.LogPath, errTail)return}// 导入配置项obj : TailObj{conf: conf,exitChan: make(chan int, 1),}obj.tail tailstailObjMgr.tailsObjs append(tailObjMgr.tailsObjs, obj)go readFromTail(obj)
}
③测试etcd的watch机制 执行下面命令将下面的key1换成自己真实的key将value换成自己真实想要配置的value比如docker exec etcd1 etcdctl put /backend/logagent/config/192.168.0.103 [{\logpath\:\/Users/xxx/GolandProjects/LogCollect/LogAgent/mysql_log.log\,\topic\:\mysql_log\},{\logpath\:\/Users/xxx/GolandProjects/LogCollect/LogAgent/nginx_log.log\,\topic\:\nginx_log\}] 该命令是操作docker中的etcd向etcd中新增一个key/backend/logagent/config/192.168.0.101 value注意转义 “[{“logpath”:”/Users/xxx/GolandProjects/LogCollect/LogAgent/mysql_log.log,“topic”:“mysql_log”},{“logpath”:“/Users/xxx/GolandProjects/LogCollect/LogAgent/nginx_log.log”,“topic”:“nginx_log”}] # 查看etcd中所有key
docker exec etcd1 etcdctl get --prefix --keys-only# 向etcd中添加key-value对
docker exec etcd1 etcdctl put key1 value1#从etcd中删除指定的key
docker exec etcd1 etcdctl del key1#从etcd中获取指定的key的值
docker exec etcd1 etcdctl get key1执行对应操作后观察日志信息 可以从LogAgent的日志中发现已经成功监听到了etcd的变化 参考https://blog.csdn.net/qq_43442524/article/details/105024906