Collecting ONSSI Ocularis CS RC-C Logs with nxlog / Logstash / Elasticsearch / Kibana3

Edit: This post is pretty old and Elasticsearch/Logstash/Kibana have evolved a lot since it was written.

That’s quite a title. I work with an ONSSI Ocularis CS setup. Originally installed with NetDVMS, but upgraded to RC-C.

This post builds upon a couple earlier posts
http://www.ragingcomputer.com/2014/02/logstash-elasticsearch-kibana-for-windows-event-logs
http://www.ragingcomputer.com/2014/02/sending-windows-event-logs-to-logstash-elasticsearch-kibana-with-nxlog

What does all this mean? This heavily redacted screenshot should give some idea.
kibana-ocularis-logs
Number of overall motion events over time, same for failure events. Top list of cameras with motion events, top list of cameras with failure events.

You can see we’ve got a few failed cameras. Likely a power surge or network failure. Having this information will lower the time to repair, minimizing camera down time!

We’ll pretend my logstash computer is 10.3.2.1

Port 3515 is listening for WindowsEventLog

Port 5515 is listening for onssi_recording logs
“C:\Program Files\OnSSI\NetDVMS\RecordingServer*.log”

Port 5516 is listening for onssi_islog logs
“C:\Program Files\OnSSI\NetDVMS\ISLog*.log”

Port 5517 is listening for onssi_logcheck logs
“C:\Program Files\OnSSI\NetDVMS\LogCheck*.log”

This isn’t my complete configuration, but it’s all the parts needed to monitor the windows event logs and ONSSI RC-C logs.

nxlog.conf
[text]
## Please set the ROOT to the folder your nxlog was installed into,
## otherwise it will not start.

define ROOT C:Program Filesnxlog
#define ROOT C:Program Files (x86)nxlog

Moduledir %ROOT%modules
CacheDir %ROOT%data
Pidfile %ROOT%datanxlog.pid
SpoolDir %ROOT%data
LogFile %ROOT%datanxlog.log

<Extension json>
Module xm_json
</Extension>

# Nxlog internal logs
<Input internal>
Module im_internal
Exec $EventReceivedTime = integer($EventReceivedTime) / 1000000; to_json();
</Input>

# Windows Event Log
<Input eventlog>
Module im_msvistalog
Exec $EventReceivedTime = integer($EventReceivedTime) / 1000000; to_json();
</Input>

<Input file_recording>
Module im_file
File "C:\Program Files\OnSSI\NetDVMS\RecordingServer*.log"
SavePos TRUE
Exec $Hostname = hostname(); $raw_event = $Hostname + " " + $raw_event;
</Input>
<Input file_islog>
Module im_file
File "C:\Program Files\OnSSI\NetDVMS\ISLog*.log"
SavePos TRUE
Exec $Hostname = hostname(); $raw_event = $Hostname + " " + $raw_event;
</Input>
<Input file_logcheck>
Module im_file
File "C:\Program Files\OnSSI\NetDVMS\LogCheck*.log"
SavePos TRUE
Exec $Hostname = hostname(); $raw_event = $Hostname + " " + $raw_event;
</Input>

<Output out>
Module om_tcp
Host 10.3.2.1
Port 3515
</Output>
<Output out_recording>
Module om_tcp
Host 10.3.2.1
Port 5515
</Output>
<Output out_islog>
Module om_tcp
Host 10.3.2.1
Port 5516
</Output>
<Output out_logcheck>
Module om_tcp
Host 10.3.2.1
Port 5517
</Output>

<Route 1>
Path internal, eventlog => out
</Route>
<Route 2>
Path file_recording => out_recording
</Route>
<Route 3>
Path file_islog => out_islog
</Route>
<Route 4>
Path file_logcheck => out_logcheck
</Route>
[/text]

logstash.conf
[text]
input {
tcp {
type => "WindowsEventLog"
port => 3515
codec => "line"
}

tcp {
type => "onssi_recording"
port => 5515
codec => "line"
}
tcp {
type => "onssi_islog"
port => 5516
codec => "line"
}
tcp {
type => "onssi_logcheck"
port => 5517
codec => "line"
}
}

filter{
if [type] == "WindowsEventLog" {
json{
source => "message"
}
if [SourceModuleName] == "eventlog" {
mutate {
replace => [ "message", "%{Message}" ]
}
mutate {
remove_field => [ "Message" ]
}
}
}
}

output {
redis { host => "127.0.0.1" data_type => "list" key => "logstash" }
}
[/text]

logstash-index.conf
[text]
input {
redis {
host => "127.0.0.1"
data_type => "list"
key => "logstash"
codec => json
}
}

filter {
grok {
match => [ "host", "^(?<host2>[0-2]?[0-9]?[0-9].[0-2]?[0-9]?[0-9].[0-2]?[0-9]?[0-9].[0-2]?[0-9]?[0-9]):.*" ]
}
mutate {
replace => [ "host", "%{host2}" ]
}
mutate {
remove_field => [ "host2" ]
}

if [type] == "WindowsEventLog" {
mutate {
lowercase => [ "EventType", "FileName", "Hostname", "Severity" ]
}
mutate {
rename => [ "Hostname", "source_host" ]
}
mutate {
gsub => ["source_host",".example.com",""]
}
date {
match => [ "EventTime", "YYYY-MM-dd HH:mm:ss" ]
}
mutate {
rename => [ "Severity", "eventlog_severity" ]
rename => [ "SeverityValue", "eventlog_severity_code" ]
rename => [ "Channel", "eventlog_channel" ]
rename => [ "SourceName", "eventlog_program" ]
rename => [ "SourceModuleName", "nxlog_input" ]
rename => [ "Category", "eventlog_category" ]
rename => [ "EventID", "eventlog_id" ]
rename => [ "RecordNumber", "eventlog_record_number" ]
rename => [ "ProcessID", "eventlog_pid" ]
}

if [SubjectUserName] =~ "." {
mutate {
replace => [ "AccountName", "%{SubjectUserName}" ]
}
}
if [TargetUserName] =~ "." {
mutate {
replace => [ "AccountName", "%{TargetUserName}" ]
}
}
if [FileName] =~ "." {
mutate {
replace => [ "eventlog_channel", "%{FileName}" ]
}
}

mutate {
lowercase => [ "AccountName", "eventlog_channel" ]
}

mutate {
remove => [ "SourceModuleType", "EventTimeWritten", "EventReceivedTime", "EventType" ]
}
}

if [type] =~ "^onssi.*" {
if [message] =~ /^#/ {
drop { }
}
mutate {
replace => [ "orig_msg", "%{message}" ]
}
grok {
match => [ "message", "(?<source_host>.*?) (?<datetime>[0-9][0-9][0-9][0-9]-[0-1][0-9]-[0-3][0-9] [0-9][0-9]:[0-9][0-9]:[0-9][0-9]) (?<msg>.*?)s+(?<msgid>#.*)" ]
}
mutate {
lowercase => [ "source_host" ]
}
date {
match => [ "datetime", "YYYY-MM-dd HH:mm:ss" ]
}
mutate {
replace => [ "message", "%{msg}" ]
}
mutate {
remove_field => [ "msg", "datetime", "msgid" ]
}

if [type] == "onssi_recording" {
grok {
match => [ "message", "(?<category>.*?)ss+(?<msg2>.*)" ]
}
mutate {
replace => [ "message", "%{msg2}" ]
}
mutate {
remove_field => [ "msg2" ]
}
mutate {
lowercase => [ "category" ]
}
if [category] =~ ".*" {
grok {
match => [ "message", "(?<camera>.*Cameras?[0-9]+)s+(?<event_message>.*)" ]
}
mutate {
gsub => ["camera"," ","_"]
gsub => ["camera","[",""]
gsub => ["camera","]",""]
gsub => ["camera","-",""]
gsub => ["category"," ","_"]
}
}
}

if "_grokparsefailure" not in [tags] {
mutate {
remove_field => [ "orig_msg" ]
}
}

}
}

output {
elasticsearch {
host => "127.0.0.1"
cluster => "logcatcher"
}
}
[/text]

One thought on “Collecting ONSSI Ocularis CS RC-C Logs with nxlog / Logstash / Elasticsearch / Kibana3

  1. Hi,

    Is there any way you are willing to assist with a project in OnSSI CS?

    I have multiple servers and this looks like a great solution.

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: