When I was originally tasked to enable Logstash and start collecting log files I realized I would overwealm a single Elasticsearch index easily. With this knowledge I decided to try and break down the mountain slightly by using multiple indexers. Knowing [Kibana 3][kibana] could easily specify a different index or per Dashboard helps greatly.

logstash_indexer "logstash_indexer_" + "systemlogs" do
  inputs([
          { :redis => {
            :type => "redis-int",
            :host => "127.0.0.1",
            :data_type => "list",
            :key => "systemlogs:logstash",
            :threads => 8,
            :format => "json_event"
            } } ,
          { :udp => {
              :port => 514,
              :type => "syslog"
            } } ,
          { :tcp => {
              :port => 514,
              :type => "syslog"
            } }
         ])
  patterns([
           :linuxsyslog => {
             :SYSLOGBASE2 => "(?:%{SYSLOGTIMESTAMP:timestamp}|%{TIMESTAMP_ISO8601:timestamp8601}) (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:",
             :SYSLOGPAMSESSION => "%{SYSLOGBASE} (?=%{GREEDYDATA:message})%{WORD:pam_module}\(%{DATA:pam_caller}\): session %{WORD:pam_session_state} for user %{USERNAME:username}(?: by %{GREEDYDATA:pam_by})?",
             :CRON_ACTION => "[A-Z ]+",
             :CRONLOG => "%{SYSLOGBASE} \(%{USER:user}\) %{CRON_ACTION:action} \(%{DATA:message}\)",
             :SYSLOGLINE => "%{SYSLOGBASE2} %{GREEDYDATA:message}",
             # IETF 5424 syslog(8) format (see http://www.rfc-editor.org/info/rfc5424)
             :SYSLOG5424PRI => "(?:\<%{NONNEGINT}\>)",
             :SYSLOG5424SD => "(?:\[%{DATA}\]+|-)",
             :SYSLOG5424LINE => "%{SYSLOG5424PRI:syslog5424_pri}%{NONNEGINT:syslog5424_ver} (%{TIMESTAMP_ISO8601:syslog5424_ts}|-) (%{HOSTNAME:syslog5424_host}|-) (%{WORD:syslog5424_app}|-) (%{WORD:syslog5424_proc}|-) (%{WORD:syslog5424_msgid}|-) %{SYSLOG5424SD:syslog5424_sd} %{GREEDYDATA:syslog5424_msg}"
           }])
  filters([
           { :grok => {
               :type => "syslog",
               :pattern => [ "<%{POSINT:syslog_pri}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" ],
             } },
           { :syslog_pri => {
               :type => "syslog"
             } },
           { :date => {
               :type => "syslog",
               :match => [ "MMM  d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
             } },
          { :mutate => {
             :type => "syslog",
             :exclude_tags => "_grokparsefailure",
             :replace => [ "@source_host", "%{syslog_hostname}" ]
           } },
          { :mutate => {
             :type => "syslog",
             :exclude_tags => "_grokparsefailure",
             :replace => [ "@message", "%{syslog_message}" ]
           } }
          ])
  output([
          :elasticsearch_http => {
            :host => "192.168.1.1",
            :port => 9200,
            :index => "systemlogs-%{+YYYY.MM.dd}",
            :flush_size => 100
          }
         ])
  action :create
end
  • Graphite Indexer

I modified carbon so the output was JSON like so I did not have to write Grok filters. Then made a Logstash indexer to collect the logs. I used beaver to push the logs to Logstash. I wrote a cookbook to deploy beaver.

logstash_indexer "logstash_indexer_" + "graphite" do
    inputs([
          :redis => {
            :type => "redis-int",
            :host => "127.0.0.1",
            :data_type => "list",
            :threads => 16,
            :key => "graphite:logstash",
            :format => "json"
          }])
            filters([
           {     :json => {
               :source => "message",
               :target => "data",
               :tags => "json"
             } },
           {
             :json => {
               :source => "@fields",
               :type => "data",
               :type => "carbonrelay"
             } },
           {
             :json => {
               :source => "@fields",
               :type => "data",
               :type => "carboncache"
           } },
           {
             :json => {
               :source => "@message",
               :type => "data",
               :type => "apache"
             } },
           {   :geoip => {
                :type => "apache",
                :field => "client",
                :database => "/opt/logstash/server/lib/GeoLiteCity.dat"
             } }
          ])
  output([
          :elasticsearch_http => {
            :host => "192.168.1.1",
            :port => 9200,
            :index => "graphite-%{+YYYY.MM.dd}",
            :flush_size => 100,
          }])
  action :create

This post was originally meant to be published in August; I need to update my fork to install a current version of Logstash and use the built in patterns instead of re-deploying the same patterns.