200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > SpringBoot + Kafka + ELK 完成海量日志收集(超详细)

SpringBoot + Kafka + ELK 完成海量日志收集(超详细)

时间:2021-06-20 07:34:13

相关推荐

SpringBoot + Kafka + ELK 完成海量日志收集(超详细)

上一篇:深夜看了张一鸣的微博,让我越想越后怕

整体流程大概如下:

服务器准备

在这先列出各服务器节点,方便同学们在下文中对照节点查看相应内容

SpringBoot项目准备

引入log4j2替换SpringBoot默认log,demo项目结构如下:

pom

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><!--排除spring-boot-starter-logging--><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><!--log4j2--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.3.4</version></dependency></dependencies>

log4j2.xml

<?xmlversion="1.0"encoding="UTF-8"?><Configurationstatus="INFO"schema="Log4J-V2.0.xsd"monitorInterval="600"><Properties><Propertyname="LOG_HOME">logs</Property><propertyname="FILE_NAME">collector</property><propertyname="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}][%level{length=5}][%thread-%tid][%logger][%X{hostName}][%X{ip}][%X{applicationName}][%F,%L,%C,%M][%m]##'%ex'%n</property></Properties><Appenders><Consolename="CONSOLE"target="SYSTEM_OUT"><PatternLayoutpattern="${patternLayout}"/></Console><RollingRandomAccessFilename="appAppender"fileName="${LOG_HOME}/app-${FILE_NAME}.log"filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log"><PatternLayoutpattern="${patternLayout}"/><Policies><TimeBasedTriggeringPolicyinterval="1"/><SizeBasedTriggeringPolicysize="500MB"/></Policies><DefaultRolloverStrategymax="20"/></RollingRandomAccessFile><RollingRandomAccessFilename="errorAppender"fileName="${LOG_HOME}/error-${FILE_NAME}.log"filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log"><PatternLayoutpattern="${patternLayout}"/><Filters><ThresholdFilterlevel="warn"onMatch="ACCEPT"onMismatch="DENY"/></Filters><Policies><TimeBasedTriggeringPolicyinterval="1"/><SizeBasedTriggeringPolicysize="500MB"/></Policies><DefaultRolloverStrategymax="20"/></RollingRandomAccessFile></Appenders><Loggers><!--业务相关异步logger--><AsyncLoggername="com.bfxy.*"level="info"includeLocation="true"><AppenderRefref="appAppender"/></AsyncLogger><AsyncLoggername="com.bfxy.*"level="info"includeLocation="true"><AppenderRefref="errorAppender"/></AsyncLogger><Rootlevel="info"><Appender-Refref="CONSOLE"/><Appender-Refref="appAppender"/><AppenderRefref="errorAppender"/></Root></Loggers></Configuration>

IndexController

测试Controller,用以打印日志进行调试

@Slf4j@RestControllerpublicclassIndexController{@RequestMapping(value="/index")publicStringindex(){InputMDC.putMDC();log.info("我是一条info日志");log.warn("我是一条warn日志");log.error("我是一条error日志");return"idx";}@RequestMapping(value="/err")publicStringerr(){InputMDC.putMDC();try{inta=1/0;}catch(Exceptione){log.error("算术异常",e);}return"err";}}

InputMDC

用以获取log中的[%X{hostName}][%X{ip}][%X{applicationName}]三个字段值

@ComponentpublicclassInputMDCimplementsEnvironmentAware{privatestaticEnvironmentenvironment;@OverridepublicvoidsetEnvironment(Environmentenvironment){InputMDC.environment=environment;}publicstaticvoidputMDC(){MDC.put("hostName",NetUtil.getLocalHostName());MDC.put("ip",NetUtil.getLocalIp());MDC.put("applicationName",environment.getProperty("spring.application.name"));}}

NetUtil

publicclassNetUtil{publicstaticStringnormalizeAddress(Stringaddress){String[]blocks=address.split("[:]");if(blocks.length>2){thrownewIllegalArgumentException(address+"isinvalid");}Stringhost=blocks[0];intport=80;if(blocks.length>1){port=Integer.valueOf(blocks[1]);}else{address+=":"+port;//usedefault80}StringserverAddr=String.format("%s:%d",host,port);returnserverAddr;}publicstaticStringgetLocalAddress(Stringaddress){String[]blocks=address.split("[:]");if(blocks.length!=2){thrownewIllegalArgumentException(address+"isinvalidaddress");}Stringhost=blocks[0];intport=Integer.valueOf(blocks[1]);if("0.0.0.0".equals(host)){returnString.format("%s:%d",NetUtil.getLocalIp(),port);}returnaddress;}privatestaticintmatchedIndex(Stringip,String[]prefix){for(inti=0;i<prefix.length;i++){Stringp=prefix[i];if("*".equals(p)){//*,assumedtobeIPif(ip.startsWith("127.")||ip.startsWith("10.")||ip.startsWith("172.")||ip.startsWith("192.")){continue;}returni;}else{if(ip.startsWith(p)){returni;}}}return-1;}publicstaticStringgetLocalIp(StringipPreference){if(ipPreference==null){ipPreference="*>10>172>192>127";}String[]prefix=ipPreference.split("[>]+");try{Patternpattern=pile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");Enumeration<NetworkInterface>interfaces=NetworkInterface.getNetworkInterfaces();StringmatchedIp=null;intmatchedIdx=-1;while(interfaces.hasMoreElements()){NetworkInterfaceni=interfaces.nextElement();Enumeration<InetAddress>en=ni.getInetAddresses();while(en.hasMoreElements()){InetAddressaddr=en.nextElement();Stringip=addr.getHostAddress();Matchermatcher=pattern.matcher(ip);if(matcher.matches()){intidx=matchedIndex(ip,prefix);if(idx==-1)continue;if(matchedIdx==-1){matchedIdx=idx;matchedIp=ip;}else{if(matchedIdx>idx){matchedIdx=idx;matchedIp=ip;}}}}}if(matchedIp!=null)returnmatchedIp;return"127.0.0.1";}catch(Exceptione){return"127.0.0.1";}}publicstaticStringgetLocalIp(){returngetLocalIp("*>10>172>192>127");}publicstaticStringremoteAddress(SocketChannelchannel){SocketAddressaddr=channel.socket().getRemoteSocketAddress();Stringres=String.format("%s",addr);returnres;}publicstaticStringlocalAddress(SocketChannelchannel){SocketAddressaddr=channel.socket().getLocalSocketAddress();Stringres=String.format("%s",addr);returnaddr==null?res:res.substring(1);}publicstaticStringgetPid(){RuntimeMXBeanruntime=ManagementFactory.getRuntimeMXBean();Stringname=runtime.getName();intindex=name.indexOf("@");if(index!=-1){returnname.substring(0,index);}returnnull;}publicstaticStringgetLocalHostName(){try{return(InetAddress.getLocalHost()).getHostName();}catch(UnknownHostExceptionuhe){Stringhost=uhe.getMessage();if(host!=null){intcolon=host.indexOf(':');if(colon>0){returnhost.substring(0,colon);}}return"UnknownHost";}}}

启动项目,访问/index/ero接口,可以看到项目中生成了app-collector.logerror-collector.log两个日志文件:

我们将Springboot服务部署在192.168.11.31这台机器上。

Kafka安装和启用

kafka下载地址:

/downloads.html

kafka安装步骤:首先kafka安装需要依赖与zookeeper,所以小伙伴们先准备好zookeeper环境(三个节点即可),然后我们来一起构建kafka broker。

另外,Kafka 系列面试题和答案全部整理好了,微信搜索互联网架构师,在后台发送:面试,可以在线阅读。

##解压命令:tar-zxvfkafka_2.12-2.1.0.tgz-C/usr/local/##改名命令:mvkafka_2.12-2.1.0/kafka_2.12##进入解压后的目录,修改server.properties文件:vim/usr/local/kafka_2.12/config/server.properties##修改配置:broker.id=0port=9092host.name=192.168.11.51advertised.host.name=192.168.11.51log.dirs=/usr/local/kafka_2.12/kafka-logsnum.partitions=2zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181##建立日志文件夹:mkdir/usr/local/kafka_2.12/kafka-logs##启动kafka:/usr/local/kafka_2.12/bin/kafka-server-start.sh/usr/local/kafka_2.12/config/server.properties&

创建两个topic

##创建topickafka-topics.sh--zookeeper192.168.11.111:2181--create--topicapp-log-collector--partitions1--replication-factor1kafka-topics.sh--zookeeper192.168.11.111:2181--create--topicerror-log-collector--partitions1--replication-factor1

我们可以查看一下topic情况

kafka-topics.sh--zookeeper192.168.11.111:2181--topicapp-log-test--describe

可以看到已经成功启用了app-log-collectorerror-log-collector两个topic

filebeat安装和启用

filebeat下载

cd/usr/local/softwaretar-zxvffilebeat-6.6.0-linux-x86_64.tar.gz-C/usr/local/cd/usr/localmvfilebeat-6.6.0-linux-x86_64/filebeat-6.6.0

配置filebeat,可以参考下方yml配置文件

vim/usr/local/filebeat-5.6.2/filebeat.yml######################FilebeatConfigurationExample#########################filebeat.prospectors:-input_type:logpaths:##app-服务名称.log,为什么写死,防止发生轮转抓取历史数据-/usr/local/logs/app-collector.log#定义写入ES时的_type值document_type:"app-log"multiline:#pattern:'^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'#指定匹配的表达式(匹配以-11-1508:04:23:889时间格式开头的字符串)pattern:'^\['#指定匹配的表达式(匹配以"{开头的字符串)negate:true#是否匹配到match:after#合并到上一行的末尾max_lines:2000#最大的行数timeout:2s#如果在规定时间没有新的日志事件就不等待后面的日志fields:logbiz:collectorlogtopic:app-log-collector##按服务划分用作kafkatopicevn:dev-input_type:logpaths:-/usr/local/logs/error-collector.logdocument_type:"error-log"multiline:#pattern:'^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'#指定匹配的表达式(匹配以-11-1508:04:23:889时间格式开头的字符串)pattern:'^\['#指定匹配的表达式(匹配以"{开头的字符串)negate:true#是否匹配到match:after#合并到上一行的末尾max_lines:2000#最大的行数timeout:2s#如果在规定时间没有新的日志事件就不等待后面的日志fields:logbiz:collectorlogtopic:error-log-collector##按服务划分用作kafkatopicevn:devoutput.kafka:enabled:truehosts:["192.168.11.51:9092"]topic:'%{[fields.logtopic]}'partition.hash:reachable_only:truecompression:gzipmax_message_bytes:1000000required_acks:1logging.to_files:true

filebeat启动:

检查配置是否正确

cd/usr/local/filebeat-6.6.0./filebeat-cfilebeat.yml-configtest##ConfigOK

启动filebeat

/usr/local/filebeat-6.6.0/filebeat&

检查是否启动成功

ps-ef|grepfilebeat

可以看到filebeat已经启动成功

然后我们访问192.168.11.31:8001/index和192.168.11.31:8001/err,再查看kafka的logs文件,可以看到已经生成了app-log-collector-0和error-log-collector-0文件,说明filebeat已经帮我们把数据收集好放到了kafka上。

logstash安装

我们在logstash的安装目录下新建一个文件夹

mkdirscrpit

然后cd进该文件,创建一个logstash-script.conf文件

cdscrpitvimlogstash-script.conf## multiline 插件也可以用于其他类似的堆栈式信息,比如 linux 的内核日志。input{kafka{##app-log-服务名称topics_pattern=>"app-log-.*"bootstrap_servers=>"192.168.11.51:9092"codec=>jsonconsumer_threads=>1##增加consumer的并行消费线程数decorate_events=>true#auto_offset_rest=>"latest"group_id=>"app-log-group"}kafka{##error-log-服务名称topics_pattern=>"error-log-.*"bootstrap_servers=>"192.168.11.51:9092"codec=>jsonconsumer_threads=>1decorate_events=>true#auto_offset_rest=>"latest"group_id=>"error-log-group"}}filter{##时区转换ruby{code=>"event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"}if"app-log"in[fields][logtopic]{grok{##表达式,这里对应的是Springboot输出的日志格式match=>["message","\[%{NOTSPACE:currentDateTime}\]\[%{NOTSPACE:level}\]\[%{NOTSPACE:thread-id}\]\[%{NOTSPACE:class}\]\[%{DATA:hostName}\]\[%{DATA:ip}\]\[%{DATA:applicationName}\]\[%{DATA:location}\]\[%{DATA:messageInfo}\]##(\'\'|%{QUOTEDSTRING:throwable})"]}}if"error-log"in[fields][logtopic]{grok{##表达式match=>["message","\[%{NOTSPACE:currentDateTime}\]\[%{NOTSPACE:level}\]\[%{NOTSPACE:thread-id}\]\[%{NOTSPACE:class}\]\[%{DATA:hostName}\]\[%{DATA:ip}\]\[%{DATA:applicationName}\]\[%{DATA:location}\]\[%{DATA:messageInfo}\]##(\'\'|%{QUOTEDSTRING:throwable})"]}}}##测试输出到控制台:output{stdout{codec=>rubydebug}}## elasticsearch:output{if"app-log"in[fields][logtopic]{##es插件elasticsearch{#es服务地址hosts=>["192.168.11.35:9200"]#用户名密码user=>"elastic"password=>"123456"##索引名,+号开头的,就会自动认为后面是时间格式:##javalog-app-service-.01.23index=>"app-log-%{[fields][logbiz]}-%{index_time}"#是否嗅探集群ip:一般设置true;http://192.168.11.35:9200/_nodes/http?pretty#通过嗅探机制进行es集群负载均衡发日志消息sniffing=>true#logstash默认自带一个mapping模板,进行模板覆盖template_overwrite=>true}}if"error-log"in[fields][logtopic]{elasticsearch{hosts=>["192.168.11.35:9200"]user=>"elastic"password=>"123456"index=>"error-log-%{[fields][logbiz]}-%{index_time}"sniffing=>truetemplate_overwrite=>true}}}

启动logstash

/usr/local/logstash-6.6.0/bin/logstash-f/usr/local/logstash-6.6.0/script/logstash-script.conf&

等待启动成功,我们再次访问192.168.11.31:8001/err

可以看到控制台开始打印日志

ElasticSearch与Kibana

ES和Kibana的搭建之前没写过博客,网上资料也比较多,大家可以自行搜索。

搭建完成后,访问Kibana的管理页面192.168.11.35:5601,选择Management -> Kinaba - Index Patterns

然后Create index pattern

index pattern 输入app-log-*

Time Filter field name 选择 currentDateTime

这样我们就成功创建了索引。

我们再次访问192.168.11.31:8001/err,这个时候就可以看到我们已经命中了一条log信息

里面展示了日志的全量信息

到这里,我们完整的日志收集及可视化就搭建完成了!

原文链接:/lt326030434/article/details/107361190

感谢您的阅读,也欢迎您发表关于这篇文章的任何建议,关注我,技术不迷茫!小编到你上高速。

· END ·

最后,关注公众号互联网架构师,在后台回复:2T,可以获取我整理的 Java 系列面试题和答案,非常齐全。

正文结束

推荐阅读 ↓↓↓

1.不认命,从流水线工人,到谷歌上班的程序媛,一位湖南妹子的励志故事

2.如何才能成为优秀的架构师?

3.从零开始搭建创业公司后台技术栈

4.程序员一般可以从什么平台接私活?

5.37岁程序员被裁,120天没找到工作,无奈去小公司,结果懵了...

6.IntelliJ IDEA .3 首个最新访问版本发布,新特性抢先看

7.这封“领导痛批95后下属”的邮件,句句扎心!

8.15张图看懂瞎忙和高效的区别!

一个人学习、工作很迷茫?

点击「阅读原文」加入我们的小圈子!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。