logstash简介
logstash是一种分布式日志收集框架,开发语言是JRuby,当然是为了与Java平台对接,不过与Ruby语法兼容良好,非常简洁强大,经常与ElasticSearch,Kibana配置,组成著名的ELK技术栈,非常适合用来做日志数据的分析。
当然它可以单独出现,作为日志收集软件,你可以收集日志到多种存储系统或临时中转系统,如MySQL,redis,kakfa,HDFS, lucene,solr等,并不一定是ElasticSearch。
logstash安装
logstash是用JRuby语言开发的,所以要先安装JDK。
下载:
https://www.elastic.co/downloads/logstash
解压:
tar -zxvf logstash-2.3.1.tar.gz
logstash的数据处理模型
1. input => output
2. input => filter => output
其中input常用的输入源有:
file,syslog,redis,log4j,apache log或nginx log,或者其他一些自定义的log格式,业务log,搜索log,订单log等
filter常用的选项有:
grok:支持正则提取任何非结构化数据或结构化数据,其中logstash内置120多种正则,比如常见的时间,ip,用户名,等等也支持自定义正则解析
mutate:修改字段名,删除,更新等操作,转换字段类型等
drop: 删除某些时间,如debug
clone:拷贝一份事件副本,用来添加或删除字段
geoip : 通过ip获取地理位置信息,在做kibana区域统计图非常炫
ruby: 支持原生的ruby代码,操作事件,实现强大的其他功能
output常用的输出有:
elasticsearch 比较常用
file:写入文件
redis:写入队列
hdfs:写入HDFS,需插件支持
zabbix: zabbix监控
mongodb:写入mongodb库
除此之外,还有个编码插件codecs也比较常用,常用来处理json数据或者多行数据源。
简单例子
1.使用命令行命令调试
liguodong@ubuntu:~/install/logstash$ bin/logstash -e "input{stdin{}} output{stdout{}}"
Settings: Default pipeline workers: 1
Pipeline main started
HELLO
2017-02-21T01:49:16.342Z ubuntu HELLO
liguodong
2017-02-21T01:49:22.822Z ubuntu liguodong
hello world
2017-02-21T01:49:39.748Z ubuntu hello world
2.配置文件方式
命令行参数仅适合简单的配置,如果配置比较多,我们一般会写入一个以.conf结尾的配置文件里,然后使用 -f 命令加载,将(1)中的配置,写入hello.conf,然后使用bin/logstash -f hello.conf 执行加载,即可达到同样效果。
案例
1. 采集网站日志推送到Kafka集群
flow-log-kafka.conf
input {
file {
path => "/home/liguodong/data/game/basedir/*/*.log"
discover_interval => 5
start_position => "beginning"
}
}
output {
kafka {
topic_id => "access"
codec => plain {
format => "%{message}"
charset => "UTF-8"
}
bootstrap_servers => "192.168.133.252:9092"
}
}
先启动zookeeper服务
bin/zookeeper-server-start.sh config/zookeeper.properties
再启动Kafka
bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
然后创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic access
启动消费者
bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic access –from-beginning
启动logstash
bin/logstash agent -f conf/flow-log-kafka.conf
然后在目录/home/liguodong/data/game/basedir/*/
下模拟生成日志文件。
就能够看到kafka,消费者消费数据了。
2. 消费kafka数据存储到ElasticSearch
flow-kafka-es.conf
input {
kafka {
type => "level-one"
auto_offset_reset => "smallest"
codec => plain {
charset => "UTF-8"
}
group_id => "es"
topic_id => "access"
zk_connect => "192.168.133.252:2181"
}
}
filter {
mutate {
split => { "message" => " " }
add_field => {
"event_type" => "%{message[3]}"
"current_map" => "%{message[4]}"
"current_X" => "%{message[5]}"
"current_y" => "%{message[6]}"
"user" => "%{message[7]}"
"item" => "%{message[8]}"
"item_id" => "%{message[9]}"
"current_time" => "%{message[12]}"
}
remove_field => [ "message" ]
}
}
output {
elasticsearch {
index => "level-one-%{+YYYY.MM.dd}"
codec => plain {
charset => "UTF-8"
}
hosts => ["192.168.133.252:9200"]
}
}
先启动zookeeper服务
bin/zookeeper-server-start.sh config/zookeeper.properties
再启动Kafka
bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
启动es
bin/elasticsearch -d
启动logstash
bin/logstash agent -f conf/flow-kafka-es.conf
通过es插件访问head管理页面查看数据
http://192.168.133.252:9200/_plugin/head
原文链接:https://blog.csdn.net/scgaliguodong123_/article/details/56011795
本站声明:网站内容来源于网络,如有侵权,请联系我们,我们将及时处理。
还没有人抢沙发呢~