input{ kafka { bootstrap_servers => "127.0.0.1:9092" client_id => "nginxlog" auto_offset_reset => "latest" consumer_threads => decorate_events => true topics => ["nginx_log"] codec => "json" type => "nginx_log" } } filter{ mutate { gsub => ["message", "\\x22", '"'] gsub => ["message", "\\x09", ''] } json { source => "message" remove_field=>["message","beat","@version","@timestamp"] } if [type] == "nginx_log" { ruby { code => ' -- 获取白名单 file = File.open("/usr/local/logstash/config/white.txt", "r") text = file.read file.close -- 判断日志中request_uri属性是否在白名单中 -- 也可直接将不在白名单的日志排除 event.cancel if !text.include?(event.get("request_uri")) if !text.include?(event.get("request_uri")) then -- 如果不存在就增加一个属性es_flag=0表示该日志没用 event.set("es_flag","") else -- 如果不存在就增加一个属性es_flag=1表示该日志有用 event.set("es_flag","") end ' } } } } output { if [type] == "nginx_log" { -- 判断es_flag=1放到nginx-log-yes索引中 if [es_flag] =="" { elasticsearch { hosts => "127.0.0.1:9200" index => "nginx-log-yes" } } -- 判断es_flag=0放到nginx-log-no索引中 else { elasticsearch { hosts => "127.0.0.1:9200" index => "nginx-log-no" } } } }
Lostash event API说明
除了基本的get和set外,还提供了丰富的接口。我们能用到的方法包括:
删除事件:cancel
取消删除事件:uncancel
是否删除:cancelled?
是否包含字段:include?
删除字段:remove
事件转字符串:to_s
事件转hash字典(不含metadata字段):to_hash
事件转hash字典(含metadata字段):to_hash_with_metadata
事件转json字符串:to_json
增加tag:tag
取事件时间戳:timestamp
测试配置文件
input{ stdin{ codec=>json } }filter{ ruby{ code=>' event.cancel event.set("cancelled",event.cancelled?) event.uncancel event.set("include",event.include?("hello")) event.remove("hello") event.set("to_s",event.to_s) event.set("to_hash",event.to_hash) event.set("to_hash_with_metadata",event.to_hash_with_metadata) event.set("to_json",event.to_json) event.tag("_test_tag") event.set("timestamp",event.timestamp) ' } }output{ stdout{ codec=>rubydebug } }
启动logstash,然后输入如下,查看结果
{"hello":"world"}