Skip navigation
All Places > Products > RSA NetWitness Platform > Blog > Authors David Waugh

RSA NetWitness Platform

14 Posts authored by: David Waugh Employee

This post is completely unsupported by RSA Support and indeed RSA, but it might be interesting if you want to try it. 


In Netwitness 10.X the current weakness in the topology is that the SA Server is a single point of failure and it monitors the other components in your environment. If the monitoring on your SA Server has a problem would you actually be aware of it?


I've posted in the past about monitoring your Netwitness Infrastructure with Nagios, but what about Zabbix? This is a more modern monitoring system and I gave it a go on my Netwitness environment. More information on Zabbix can be found at Zabbix :: The Enterprise-Class Open Source Network Monitoring Solution 


Monitoring involves installing the Zabbix Agent onto each server. This can be done by copying the Centos 6 rpm into 

/var/netwitness/srv/www/rsa/updates/10.6.3 and then running the following.


cd /var/netwitness/srv/www/rsa/updates/10.6.3
createrepo .

Instructions for obtaining the Zabbix agent for Centos 6 can be found at: How to Install Zabbix Agent on CentOS/RHEL 7/6/5 


In the file


/etc/puppet/modules/base/manifest/init.pp add the following lines:


package { 'zabbix-agent' :
ensure => installed,
 # Configure Zabbix Agent for PreShared Key Encrypted Connections
file {'zabbix_agentd.conf' :
path => "/etc/zabbix/zabbix_agentd.conf",
mode => 0644,
ensure => present,
owner => "root",
group => "root",
source => "puppet:///modules/base/zabbix_agentd.conf",
 notify      => Service['zabbix-agent'],
file {'zabbix_agentd.psk' :
path => "/etc/zabbix/zabbix_agentd.psk",
mode => 0644,
ensure => present,
owner => "root",
group => "root",
source => "puppet:///modules/base/zabbix_agentd.psk",
notify      => Service['zabbix-agent'],
firewall {'1 Allow Zabbix Agent':
port => [10050],
proto => tcp,
action => accept,
source => '',

This will copy a standard agent configuration file and pre-shared key to encrypt the Zabbix Server and Zabbix Agent communication. It will also open a firewall port to allow communication from the Zabbix Server in this case to each Security Analytics appliance on tcp port 10050


The following files should be copied to /etc/puppet/modules/base/files


The advantages of monitoring are:


  • Ability to make nice graphs
  • Ability to have a Map of your infrastructure to see any problems easily.





I've also copied a few Zabbix Checks that can be run by using the check type "SSH Agent". Note in this example I used the root account, but best practise would be to create a specific Zabbix account on each system. Again this could be done using puppet.


Unfortunately its not currently possible to see if the maximum sessions behind on an ESA easily. This script enables it to be monitored.



./ -w VALUE -c VALUE | -h

This plug-in is used to be alerted when maximum ESA behind sessions is reached

-w/c Sessions behind integer
To warn when 200 sessions behind and critical when 300 sessions behind
example: ./ -w 200 -c 300
 ./ -w 1 -c 2
CRITICAL behind (>2), Sessions behind: : 26167 |Sessions behind=26167 ;;;
[root@rsaesa ~]# ./ -w 1 -c 2
CRITICAL behind (>2), Sessions behind: : 35446 |Sessions behind=35446 ;;;
[root@rsaesa ~]# ./ -w 1 -c 2
CRITICAL behind (>2), Sessions behind: : 35446 |Sessions behind=35446 ;;;
[root@rsaesa ~]# ./ -w 1 -c 2
CRITICAL behind (>2), Sessions behind: : 30390 |Sessions behind=30390 ;;;
[root@rsaesa ~]# ./ -w 1 -c 2
CRITICAL behind (>2), Sessions behind: : 30390 |Sessions behind=30390 ;;;
[root@rsaesa ~]# ./ -w 1 -c 40000
CRITICAL behind (>40000), Sessions behind: : 52687 |Sessions behind=52687 ;;;
[root@rsaesa ~]# ./ -w 1 -c 50000
CRITICAL behind (>50000), Sessions behind: : 52687 |Sessions behind=52687 ;;;
[root@rsaesa ~]# ./ -w 1 -c 50000
CRITICAL behind (>50000), Sessions behind: : 50476 |Sessions behind=50476 ;;;
[root@rsaesa ~]# ./ -w 1 -c 50000
CRITICAL behind (>50000), Sessions behind: : 50476 |Sessions behind=50476 ;;;
[root@rsaesa ~]# ./ -w 1 -c 50000
CRITICAL behind (>50000), Sessions behind: : 54902 |Sessions behind=54902 ;;;
[root@rsaesa ~]# ./ -w 1 -c 50000
CRITICAL behind (>50000), Sessions behind: : 60099 |Sessions behind=60099 ;;;
[root@rsaesa ~]# ./ -w 1 -c 500000
WARNING behind (>1), Sessions behind: : 60099 |Sessions behind=60099 ;;;

If no values for WARN or CRITICAL are specified then a warning value of 500 and a critical value of 1000 is assumed. These should be adjusted as to what is normal in your environment.

Last Updated: 12:41 February 27th 2017

Latest Version: 17


I had a customer who wishes to extract the raw event time for particular logs. This is because they use this raw event time for further analysis of events in a third party system. The raw event time may differ greatly from the actual log decoder processing time, especially if there is a delay in the logs reaching the log decoder, perhaps due to network latency or file collection delays.


Currently they use the event.time field. However this has some limitations:

  • If the timestamps are incomplete then this field is empty. For example many Unix systems generate a timestamp that does not contain the year.
  • Even for the same device types, event source date formats can be different. For example US based system may log the date in MM/DD/YY format, where as a UK based system may log the date in DD/MM/YY format. A date of 1/2/2017 could be interpreted as either the 1st February 2017 or the 2nd January 2017.
  • The event.time field is actually a 64 bit TimeT field which can not be manipulated within the 32 bit LUA engine that currently ships with the product.


All these issues are being addressed in future releases of the product, but the method outlines here gives something that can be used today.


Create some new meta keys for our Concentrators


We add the following to the index-concentrator-custom.xml files:


<key description="Epoch Time" level="IndexValues" name="epoch.time"format="UInt32" defaultAction="Open" valueMax="100000" />
<key description="Event Time String" level="IndexValues" name="eventtimestr" format="Text" valueMax="2500000"/>
<key description="UTCAdjust" level="IndexValues" name="UTCAdjust" format="Float32" valueMax="1000"/>

The meta key epoch.time will be used to store the raw event time in Unix Epoch format. This is seconds since 1970.

The meta key event.time.str will be used to store a timestamp that we create in the next step.

The meta key UTCAdjust will hold how many hours to add or remove from our timestamp.


Create a  Feed to tag events with a timestamp

Within the Netwitness LUA parser we are restricted on what functions we can use. As a result the functions are not available, so we need another method of getting a timestamp for our logs.


To do this, create a cronjob on the SA Server that will run every minute and populate the following CSV file.


# Script to write a timestamp in a feed file
devicetypes="rsasecurityanalytics rhlinux securityanalytics infobloxnios apache snort squid lotusdomino rsa_security_analytics_esa websense netwitnessspectrum bluecoatproxyav alcatelomniswitch vssmonitoring voyence symantecintruder sophos radwaredp ironmail checkpointfw1 websense rhlinux damballa snort cacheflowelff winevent_nic websense82 fortinet unknown"

for i in {1..60}
mydate=$(date -u)
echo "#Device Type, Timestamp" >/var/netwitness/srv/www/feeds/timestamp.csv
for j in $devicetypes
echo "$j",$mydate >>/var/netwitness/srv/www/feeds/timestamp.csv
sleep 1

This will generate a CSV file that we can use as a feed with the following format

checkpointfw1,Wed Jan 25 09:17:49 UTC 2017
citrixns,Wed Jan 25 09:17:49 UTC 2017
websense,Wed Jan 25 09:17:49 UTC 2017
rhlinux,Wed Jan 25 09:17:49 UTC 2017
damballa,Wed Jan 25 09:17:49 UTC 2017
snort,Wed Jan 25 09:17:49 UTC 2017
cacheflowelff,Wed Jan 25 09:17:49 UTC 2017
winevent_nic,Wed Jan 25 09:17:49 UTC 2017

Here the first column of the csv is our device.type and the part after the column is our UTC timestamp. 


We use this as a feed which we push to our Log decoders.


The CSV file is created every minute, and we also refresh the feed every minute. This means that potentially this timestamp could be 2 minutes out of date compared with our logs.


Here is an example of the timestamp visible in our websense logs:

eventtimestr holds our dummy timestamp

epoch.time holds the actual epoch time that the raw log was generated. 



Create an App Rule to Tag Session without a UTC Time with an alert.

Create an App Rule on your log decoders that will generate an Alert if the no UTCAdjust metakey exists. This prevents you having to define UTC Offsets of 0 for your devices when they are already logging in UTC.


It is important that the following are entered for the rule:

Rule Name:UTC Offset Not Specified

Condition:UTCAdjust !exists

Alert on Alert

Alert box is ticked.


Create a feed to specify how to adjust the calculated time on a per device ip and device type setting

Create a CSV file with the following columns:

#DeviceIP,DeviceType,UTC Offset,rhlinux,0,snort,1.0,securityanalytics,-1.5

Copy the attached UTCAdjust.xml. This is the feed definition file for a Multi-indexed feed that uses the device.ip and device.type. We are unable to do this through the GUI.

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<FDF xmlns:xsi="" xsi:noNamespaceSchemaLocation="feed-definitions.xsd">
<FlatFileFeed comment="#" separator="," path="UTCAdjust.csv" name="UTCAdjust">
<MetaCallback name="Device IP" valuetype="IPv4">
<Meta name="device.ip"/>
<MetaCallback name="DeviceType" valuetype="Text" ignorecase="true">
<Meta name="device.type"/>
<LanguageKey valuetype="Float32" name="UTCAdjust"/>
<Field type="index" index="1" key="Device IP"/>
<Field type="index" index="2" key="DeviceType"/>
<Field key="UTCAdjust" type="value" index="3"/>


Run the script to generate the feed. This generates the actual feed and then copies it to the correct directories on any log and packet decoders. (There really isn't any reason to copy it to a packet decoder). This script will download the feed from a CSV file hosted on a webserver. This script could be scheduled as a cronjob depending on how often it needs to be updated.


wget http://localhost/feeds/UTCAdjust.csv -O /root/feeds/UTCAdjust.csv --no-check-certificate

find /root/feeds | grep xml >/tmp/feeds
for feed in $(cat /tmp/feeds)
FEEDDIR=$(dirname $feed)
FEEDNAME=$(basename $feed)
NwConsole -c "feed create $FEEDNAME" -c "exit"
scp *.feed root@
scp *.feed root@
scp *.feed root@
NwConsole -c "login admin netwitness" -c "/decoder/parsers feed op=notify" -c "exit"
NwConsole -c "login admin netwitness" -c "/decoder/parsers feed op=notify" -c "exit"
NwConsole -c "login admin netwitness" -c "/decoder/parsers feed op=notify" -c "exit"


Use a LUA parser to extract the event time and then calculate Epoch time.


We then use a LUA parser to extract the raw log and then calculate epoch time. Our approach to do this is as follows:


  • Define the device types that we are interested in
  • Create a regular expression to extract the timestamps for the logs we are interested in
  • From this timestamp add additional information to calcualate the epoch time. For example for timestamps without a year, I assume the current year and then check that this does not create a date that is too far into the future. If the date is too far in the future, then I use the previous year. This should account for logs around the December 31st / 1st January boundary.
  • Finally adjust the calculated time depending on the UTC Offset feed.


I've attached and copied the code below. This should be placed in /etc/netwitness/ng/parsers on your log decoders.

Currently this LUA parser supports:


  • windows event logs
  • damballa logs
  • snort logs
  • rhlinux
  • websense


The parser could be expanded further to account for different timestamp formats for particular device.ip values. You could create another feed to tag the locale of your devices and then use this within the LUA parser to make decisions about how to process dates.


Here is the finished result:



I can investigate on epoch.time.


For example: 1485253416 is Epoch Converter - Unix Timestamp Converter 

GMT: Tue, 24 Jan 2017 10:23:36 GMT


and all the logs have an event time in them of 24th January 10:23:36



I have a customer who use something called a "Data Diode" to enforce one way connectivity through their network.

One result of this is that any syslog that is being sent through the diode gets its device IP changed.


For example any message that was sent through the diode would have the Source IP address and Sequence Numbers appended.


Original Message:


Jan 11 18:01:13: %LINEPROTO-5-UPDOWN: Line protocol on Interface GigabitEthernet13/30, changed state to up


Message after Passing through Data Diode and seen by RSA Netwitness for Logs: 1515391: Jan 11 18:01:13: %LINEPROTO-5-UPDOWN: Line protocol on Interface GigabitEthernet13/30, changed state to up


Unfortunately this means that the device.ip is populated with the Data Diode address rather than the original source address. The LUA parser below checks logs coming from the same IP as the Data Diode, and if they have this header format, the IP address is extracted and stored in device.ip. 


This method works where logs are parsed by Security Analytics even with this additional header.


1) Edit the DataDiode.Lua on Line 41. This looks for the IP to check against with the format in decimal. In the file is 3232267011 in decimal format.
2) Add the address of the datadiode in decimal format. (If you are lazy just use
3) Copy the parser to /etc/netwitness/ng/parsers on both your logdecoder
4) Reload your parsers (with your script that you use)

local lua_DataDiode= nw.createParser("DataDiode", "Registers the IP given by the Data Diode into device.ip")


Takes a message from a data diode and adds the IP address supplied into device.ip
Sample Mesage: 8936: 008934: Jan 11 17:55:03.566: %SYS-6-LOGGINGHOST_STARTSTOP: Logging to host Port 514 started - CLI initiated

There can be any number of sequence numbers after the IP and the messages might be coming from any number of different event sources


function lua_DataDiode:sessionBegin()
deviceip = nil
devicetype = nil

function lua_DataDiode:CheckDevice(index,dtype)
if dtype == 'checkpointfw1' then
devicetype = dtype

function lua_DataDiode:CheckDeviceIP(index,dip)
-- Data Diode Device IP is
-- nw.logInfo("DataDiode: DeviceIP: " .. dip .. type(dip))
-- Note IP Addresses are stored in DECIMAL notation so need to convert dotted value to DECIMAL
-- To convert a.b.c.d is a *256 ^3 =b *256 ^2 +c *256 +d
--- is therefore 3232267011
if dip == 3232267011 then
deviceip = dip
--nw.logInfo("DataDiode: Matched")

function lua_DataDiode:register()

if deviceip then
-- Reparse the message:
-- nw.logInfo("DataDiode: DeviceIP has matched" )
local fullPayload = nw.getPayload():tostring()
local o1,o2,o3,o4,sequence,rubbish = string.match(fullPayload,"(%d+).(%d+).(%d+).(%d+)%s(%d+):(.*)")

--nw.logInfo("DataDiode: DeviceIP has matched" )

-- Check we have an IP address
if o1 and o2 and o3 and o4 then
host = o1*256^3 + o2*256^2 + o3*256 + o4
--nw.logInfo("DataDiode: Registered New Device IP: " .. host)

[nwevents.OnSessionBegin] = lua_DataDiode.sessionBegin,
--[nwlanguagekey.create("device.type", nwtypes.Text)] = lua_DataDiode.CheckDevice,
[nwlanguagekey.create("device.ip", nwtypes.IPv4)] = lua_DataDiode.CheckDeviceIP,


A colleague here at RSA posed an interesting problem so I thought I would share with you how I solved it.


Imagine the following scenario. A windows machine has visited a potentially suspicious website. This is detected by an ESA rule and an incident would be created. Normal SOC procedures say that when such an incident happens the ECAT Agent should be installed on the windows machine so that further analysis can take place. It is a busy day in the SOC and unfortunately there is a long time gap between when the agent is actually installed. Can this actually be automated?


The answer is yes and here is how....


First of all, I have a centos 6 server where I will run the commands from. This is to avoid installing any extra packages on the ESA Server that could potentially cause problems.


On the centos 6 server I have installed the openvas-smb package. This provides the winexe linux command.

This is a tool that allows you to run psexec type commands from a linux machine.


[root@centos6 ~]# yum install wmi
Loaded plugins: fastestmirror, refresh-packagekit
Setting up Install Process
Loading mirror speeds from cached hostfile
* atomic:
* base:
* centosplus:
* contrib:
* epel:
* extras:
* updates:
Package wmi is obsoleted by openvas-smb, trying to install instead
Resolving Dependencies
--> Running transaction check
---> Package openvas-smb.x86_64 will be installed
--> Processing Dependency: for package:
--> Processing Dependency: for package:
--> Processing Dependency: for package:
--> Processing Dependency: for package:
--> Processing Dependency: for package:
--> Processing Dependency: for package:
--> Processing Dependency: atomic-gnutls3-gnutls for package:
--> Processing Dependency: atomic-glib2-glib2 for package:
--> Processing Dependency: for package:
--> Processing Dependency: for package:
--> Processing Dependency: for package:
--> Processing Dependency: for package:
--> Processing Dependency: for package:
--> Processing Dependency: for package:
--> Processing Dependency: for package:
--> Processing Dependency: for package:
--> Processing Dependency: for package:
--> Processing Dependency: for package:
--> Running transaction check
---> Package atomic-glib2-glib2.x86_64 will be installed
---> Package atomic-gnutls3-gnutls.x86_64 will be installed
--> Processing Dependency: for package:
--> Processing Dependency: for package:
---> Package heimdal-libs.x86_64 0:1.6.0-0.9.20140621gita5adc06.el6 will be installed
--> Running transaction check
---> Package nettle.x86_64 will be installed
--> Finished Dependency Resolution

Dependencies Resolved

Package Arch Version Repository Size
openvas-smb x86_64 atomic 3.3 M
Installing for dependencies:
atomic-glib2-glib2 x86_64 atomic 2.2 M
atomic-gnutls3-gnutls x86_64 atomic 610 k
heimdal-libs x86_64 1.6.0-0.9.20140621gita5adc06.el6 epel 1.0 M
nettle x86_64 atomic 307 k

Transaction Summary
Install 5 Package(s)

Total download size: 7.4 M
Installed size: 28 M
Is this ok [y/N]: y
Downloading Packages:
(1/5): | 2.2 MB 00:02
(2/5): | 610 kB 00:00
(3/5): heimdal-libs-1.6.0-0.9.20140621gita5adc06.el6.x86_64.rpm | 1.0 MB 00:01
(4/5): | 307 kB 00:00
(5/5): | 3.3 MB 00:01
Total 1.1 MB/s | 7.4 MB 00:06
Running rpm_check_debug
Running Transaction Test
Transaction Test Succeeded
Running Transaction
Warning: RPMDB altered outside of yum.
Installing : 1/5
Installing : heimdal-libs-1.6.0-0.9.20140621gita5adc06.el6.x86_64 2/5
Installing : 3/5
Installing : 4/5
Installing : 5/5
Verifying : 1/5
Verifying : heimdal-libs-1.6.0-0.9.20140621gita5adc06.el6.x86_64 2/5
Verifying : 3/5
Verifying : 4/5
Verifying : 5/5


Dependency Installed:
atomic-glib2-glib2.x86_64 atomic-gnutls3-gnutls.x86_64 heimdal-libs.x86_64 0:1.6.0-0.9.20140621gita5adc06.el6



After installing the package I then created a file called credentials.cfg containing the account I would use to run the commands on my windows target machine.




I was then able to run command from my centos 6 server to a windows machine as follows:


root@centos6 ~]# winexe -A credentials.cfg // 'cmd.exe /c echo "hello"'
You have mail in /var/spool/mail/root
[root@centos6 ~]# winexe -A credentials.cfg // 'hostname'


This now gives us the capability of running commands from our Centos 6 machine onto a windows machine.


The next part of the puzzle was to allow the ESA Server to be able to connect to our Centos 6 Server to be able to run commands. We set up passwordless SSH keys to do this. The important thing here is that when scripts run on the ESA Server, they are run under the user "notification" so we have to set this up as follows:


SSH onto the ESA Server as root, then type sudo su notification

This switches us to the user notification.


[notification@rsaesa tmp]$ whoami
[notification@rsaesa tmp]$ cd ~
[notification@rsaesa .ssh]$ ssh-keygen
Generating public/private rsa key pair.
Enter file in which to save the key (/home/notification/.ssh/id_rsa):
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /home/notification/.ssh/id_rsa.
Your public key has been saved in /home/notification/.ssh/
The key fingerprint is:
d9:ed:61:48:db:c1:aa:75:d5:b7:64:3f:ac:1c:c4:2b notification@rsaesa
The key's randomart image is:
+--[ RSA 2048]----+
| |
| .. . |
| . oo.oo|
| + *.o= +|
| S *E*o +.|
| o +o.o .|
| . .o |
| |
| |
[notification@rsaesa .ssh]$ ssh-copy-id root@centos6.waugh.local
root@centos6.waugh.local's password:
Now try logging into the machine, with "ssh 'root@centos6.waugh.local'", and check in:


to make sure we haven't added extra keys that you weren't expecting.

[notification@rsaesa .ssh]$ ssh root@centos6.waugh.local "winexe -A credentials.cfg // 'hostname'"
[notification@rsaesa .ssh]$ exit


Here we can see that we can run a command on the ESA Server, which will run a command on our Centos 6 Server which will then actually run a command on our target windows machine.


We now define our ESA Rule. My ESA Rule fires whenever a suspiciousIP is detected.



My ESA Script is as follows (Attached as file ESAScript to import into the Global Notifications Page). Note this is python so indentations matter. Please import the file into your system to see the format of the script.


#!/usr/bin/env python
from smtplib import SMTP
import subprocess
import datetime
import json
import sys

def dispatch(alert):
The default dispatch just prints the 'last' alert to /tmp/esa_alert.json. Alert details
are available in the Python hash passed to this method e.g. alert['id'], alert['severity'],
alert['module_name'], alert['events'][0], etc.
These can be used to implement the external integration required.
with open("/tmp/esa_alert.json", mode='w') as alert_file:
alert_file.write(json.dumps(alert, indent=True))

def read():
# Define our metakey tuples
# Each tuple is ["metakey","meta key description, "value"]
metakeys=[ ["device_type","Device Type: ",""], \
["device_ip","Device IP: ",""] , \
["device_class","Device Class: ",""] , \
["ip_src","Source IP: ",""] , \
["ip_dst","Dest IP: ",""] , \
["ip_srcport","Source Port: ",""], \
["ip_dstport","Dest Port: ",""] , \
["ec_activity","Activity: ",""] ,\
["ec_subject","Subject: ",""] ,\
["ec_theme", "Theme: ",""] ,\
["event_description", "Description: ",""] ,\
["event_cat_name","Category: ",""] ,\
["msg_id", "Message ID: ",""] ,\
["event_source_id", "Concentrator: ",""] \

# Keys we want in our Subject
subj_metakeys=[ ["device_type","Device Type: ","" ] , \
["msg_id", "Message ID: ",""] ]

sa_server = ''
brokerid = '6'
smtp_server = ''
smtp_port = '25'
smtp_user = ''
smtp_pass = ''
from_addr = "RSA Security Analytics <>"
to_addr = ['']

# Get data from JSON
esa_alert = json.loads(open('/tmp/esa_alert.json').read())
#Extract Variables (Add as required)

module_name = esa_alert["module_name"]
except KeyError:
module_name = "null"

sessionid = str(esa_alert["events"][0]["sessionid"])
except KeyError:
sessionid = "null"
ip_src = str(esa_alert["events"][0]["ip_src"])
except KeyError:
ip_src = "null"

#Extract Values for each of our Variables
for tuples in metakeys:
tuples[2] = str(esa_alert["events"][0][tuples[0]])
except KeyError:
tuples[2] = "null"

#Extract Our Subject Variables
for tuples in subj_metakeys:
tuples[2] = str(esa_alert["events"][0][tuples[0]])
except KeyError:
tuples[2] = "null"

event_source_id =esa_alert["events"][0]["event_source_id"]
except KeyError:
event_source_id = "null"

#Work out Concentrator ID depending on Event Source
if event_source_id.startswith( "" ):
elif event_source_id.startswith( "" ):
elif event_source_id.startswith( "CONC3" ):
elif event_source_id.startswith( "CONC4" ):

# Sends Email
smtp = SMTP()

# Runs command
mycommand = "ssh root@centos6.waugh.local \"winexe -A credentials.cfg //" + ip_src + " \'hostname\'\""
myresult = ""

# Runs a System Command
p = subprocess.Popen(mycommand,shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
for line in p.stdout.readlines():
myresult = myresult + line + "\n",
retval = p.wait()

date = "%d/%m/%Y %H:%M" )
subj = ( module_name ) + " :: " + ( date )
for subj_meta in subj_metakeys:
subj += ":: " + ( subj_meta[2])

header = "Use the Investigation tab within Security Analytics to view more details related to this alert.\n\n"
header += ""
header += str( concid )
header += "/navigate/event/DETAILS/"
header += str( sessionid )
header += "\n\n"

message_text = "Alert Name: \t\t%s\n" % ( module_name )
message_text += "Date/Time: \t\t%s\n" % ( date )
message_text += "Command: \t\t%s\n" % ( mycommand )
message_text += "Return Value: \t\t%s\n" % ( retval )
message_text += "Result: \t\t%s\n" % ( myresult )

for tuple in metakeys:
message_text += tuple[1] + "\t\t%s\n" % ( tuple[2] )

body_text= header + message_text
msg = "From: %s\nTo: %s\nSubject: %s\nDate: %s\n\n%s\n" % ( from_addr, to_addr, subj, date, body_text )

smtp.sendmail(from_addr, to_addr, msg)

if __name__ == "__main__":


You can import the rule as follows:


After this, when the ESA Rule is triggered, the command will be run and you will also get an email of the results of the command.


Sample Email Body:




Here what we have achieved is that we run the hostname command on the machine when the ESA Rule triggers.


If you wanted to install ECAT for example we need to be a bit more inventive.


On our centos command server we use the following file 



#Mount the remote machine to copy over ECAT Agent
mount -t cifs //$1/c$ /mnt/temp-mount -o credentials=~/credentials.cfg
#Mount our ECAT source directory where the Agent.exe package lives
mount -t cifs //$/Packages/ECAT /mnt/ecat -o credentials=~/credentials.cfg
cp -rf /mnt/ecat/ecatagent43proxy.exe /mnt/temp-mount/ecatagent43proxy.exe
winexe -A credentials.cfg //$1 'cmd.exe /c "c:\ecatagent43proxy.exe"'
rm /mnt/temp-mount/ecatagent43proxy.exe
umount /mnt/temp-mount


To install ECAT on a remotemachine we would run ./


We now incorporate this into our ECATInstall Script, by replacing the existing command line with:

# Runs command
mycommand = "ssh root@centos6.waugh.local \"./ " + ip_src + "\""


When we ping a suspiciousIP, ECAT then gets installed on the windows machine.

Building on the excellent work in Security Analytics Log Parser I had a minor irritation in that the query time was not particulary useful if you had a lot of concentrators.


A typical log message would look like:


Nov 20 07:08:46 mybroker NwBroker[3306]: [SDK-Query] [audit] User admin (session 3421280, has finished query (channel 3424049, queued 00:00:00, execute 00:20:01, id1=5729990302 id2=19760696376362 threshold=0 query="select time, ip.src, ip.dst, tcp.srcport, tcp.dstport, service,, directory, filename, query, referer, sessionid where (time='2016-Sep-01 00:00:00'-'2016-Nov-19 06:47:11') && ((( contains '' || contains '' )))"


Unfortunately the part in red got put into the query.time meta key so it was not particularly useful.


I wrote a LUA parser to extract the data from Query.time so that the time the query took was put into an Integer metakey called execute.time.


Here is the LUA parser. Perhaps it will be useful for someone...


With this parser you can create reports that show your longest running queries, and who ran them.




I've updated the parser so that it has the following features:

  • Able to parse all concentrator times even if the length of querytime exceeds 256 characters, so no statistics are lost
  • Added a key to store the slowest concentrator for each query so you can identify if one particular concentrator is always the slowest. We only consider query times greater than 5 seconds for this feature.

Hermes Bojaxi posted the following useful information which is well worth sharing. Normally a Maintenance Job runs every week which will shrink the Database Transaction logs. If for some reason the maintenance job repeatedly fails you may have very large database transaction logs.


You can shrink transaction logs with this script which is actually inside the Maintenance job. The script should be run against either the ECAT$PRIMARY or ECAT$SECONDARY databases.

DECLARE @LogicalLogFileName nvarchar(50)
, @SQL nvarchar(1000)
, @Continue bigint = 100
, @CurrentDate datetime2(3) = getutcdate()
-- Shrink Log file
SELECT @LogicalLogFileName = FILE_NAME(2) ;
SET @SQL = 'DBCC SHRINKFILE('+@LogicalLogFileName+', 1)' ;
EXEC ( @SQL) ;


After running this script go to database -> TASKS -> Shrink ->Files and chose the option to release unused space.


Further information:


Shrinking the Transaction Log 

The EsperTech Esper EPL Online can be a bit daunting if you are new to writing rules for your ESA, so here is a quick example that will hopefully get you started.



The screen is divided into three vertical areas:


- EPL Statements: - this is where you define what your events will look like and the ESA rule that will work on your Events

- Time and Event Sequence: -this is where you enter your events and advance time

- Scenario Results: - where the results of running the ESA rule against sequence of events is shown.


Here is an example to paste into the different areas so that you can see the results:


Paste into the EPL Statement Area:


create schema Event(user_dst string,device_ip string, ec_outcome string, ec_theme string, medium integer, ec_activity string);

@Name('Out') select * from Event;

@Name('Result') SELECT * FROM Event(

medium = 32

AND ec_activity = 'Logon'

AND ec_theme = 'Authentication'

AND ec_outcome = 'Failure'

).std:groupwin(user_dst).win:time_length_batch(60 sec,3)



Paste into the Time and Event Sequence Area:


Event={user_dst='joe', device_ip='', ec_outcome='Failure', ec_theme='Authentication', medium=32, ec_activity='Logon'}
Event={user_dst='joe', device_ip='', ec_outcome='Failure', ec_theme='Authentication', medium=32, ec_activity='Logon'}
Event={user_dst='joe', device_ip='', ec_outcome='Failure', ec_theme='Authentication', medium=32, ec_activity='Logon'}


In this example we have a simple ESA rule that will detect a brute force login attempt. We are looking for three failed logins within a 60 seconds period, grouped on the same user_dst. Our events describe a user Joe logging in from three different devices. using the t=t+n we advance the time between the events by n seconds.


The statement

@Name('Out') select * from Event;

will print out any event.


The statement 

@Name('Result') SELECT * FROM Event( .....

will only print out events that match our criteria.

(I've only managed to get this method working with UDP)


Customers often want to forward syslog traffic that is being sent to Netwitness for Logs to another syslog destination, so that the syslog traffic is processed by both Netwitness and the other syslog receiver.


There are many ways that this can be achieved (such as Configure Syslog Forwarding to Destination - RSA Security Analytics Documentation ) but here is another method.


1) On the Remote Log Collector go to Event Sources -> Syslog and Config.

2) Disable the current syslog sources. These will typically be upd514 and tcp514. To disable edit each source and untick the Enabled box.


3) Set up new TCP and UDP listeners on a different port. In this case we use UDP 5514 and TCP 5514.

4) Stop and start the Syslog Collection Method

Make sure that in explore view under logcollection/syslog/eventsources/syslog-tcp and syslog udp that you set the following headers to true. This will ensure that when security analytics processes the message the original IP is seen.



5) Confirm with netstat -na |grep -i 514 that the system is only listening on your new ports 5514

 netstat -na |grep -i 514
tcp 0 0* LISTEN
tcp 1 0 CLOSE_WAIT
tcp 0 0 :::5514 :::* LISTEN
udp 0 0*
udp 0 0*
udp 0 0 :::5514 :::*


6) Edit the file /etc/rsyslog.conf


Uncomment the following lines:

# Provides UDP syslog reception
$ModLoad imudp
$UDPServerRun 514

# Provides TCP syslog reception
$ModLoad imtcp
$InputTCPServerRun 514
And add the line

$PreserveFQDN on


7) At the end of the file add the following lines:


*.* @@




*.* @


This will forward all TCP traffic to port 514 or all UDP traffic to


Also add one of the following lines


*.* @@localhost:5514




*.* @localhost:514


This will forward all syslog traffic to our log collector for processing. The difference is that if @ is used the traffic gets forwarded as UDP syslog 514. If @@ is used the traffic gets forwarded as TCP. Forwarding as TCP is better as large syslog messages can be transferred and TCP has more robust packet delivery than UDP.


7) Restart the rsyslog service with service rsyslog restart

The official Rapid 7 Nexpose Guide seemed unfortunately to be short of a few details (Rapid7 NeXpose Event Source Configuration Guide ) so I described how I integrated the Windows version of Rapid 7 Nexpose into Security Analytics.


I was using Nexpose 5.17.1 on a Windows 2008 Server.

The screenshots have been taken from Security Analytics 10.6.1


This document assumes that the reader is familiar with installing the SFTP Agent and setting it up.


  1. On your Nexpose Server ,create a CSV Report in Nexpose using the "Basic Vulnerability Check Results (CSV) Template)
  2. This will output a CSV Report of the scan. However, the problem is that the file will be gz compressed and this is not compatible when sending it to the Log Collector. As a result we will uncompress it using 7-zip. Install 7-Zip on your server ( )
  3.  Still on your Nexpose Server create a directory called NexposeScripts and populate it with the contents of that is attached to this document. Run a scheduled task in Windows to run the batch file Nexpose.bat every 5 minutes.
  4. On the Nexpose Server install the RSA SFTP Agent and use the attached sftagent.conf to process the nexpose log messages. 
  5. On the LogCollector copy the file rapid7.xml to /etc/netwitness/ng/logcollection/content/collection/file and restart the logcollector service.
  6. On the log decoder make a directory called /etc/netwitness/ng/envision/etc/devices/rapid7 and copy the files /v20_rapid7msg.xml and rapid7.ini into this directory.
  7. Restart the logdcoder


The parser makes use of the vuln_ref reference key so make sure that in your table-map-custom.xml file you have the line

<mapping envisionName="vuln_ref" nwName="vuln.ref" flags="None" format="Text"/>


If everything is correctly setup then you should see a new rapid7 device type, with Threat Category, Threat Description and also the Vuln Ref key populated with CVE numbers.



Note by default, the Script Nexpose.bat will leave the reports reports.csv.gz in the original directory. If you want them to be deleted after processing then add the line highlighted in bold below to the c:\nexposescripts\nexpose.bat


cscript nexpose-audits.vbs
cscript nexpose-authevents.vbs
cscript nexpose-nscevents.vbs
cd "C:\Program Files\rapid7\nexpose\nsc\htroot\reports"
for /R %%f in (report.csv.gz) do "c:\program files\7-Zip\7z.exe" e -y "%%f"
for /R %%f in (report.csv.gz) do del /q "%%f"

I had a customer who was trying to investigate ip source and destination addresses but was having to manually do reverse DNS Lookups on each IP addresses to find out the corresponding hostname.


This was a similar situation to my post:

User Agent to Device/OS/Application


This script is only provided as a proof of concept so I would strongly recommend testing it first in a test environment. Please be aware that it will perform a large number of reverse DNS lookups.


The following script will create a feed of Reverse DNS Names producing output such as the following:,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


Where an ip address resolves to multiple domain names then the domain names are separated by commas so for example on the last line the ip address maps to domain names,


The script is designed to be placed on a Centos 6 we bserver where it will write the feed to /var/www/html/RDNS-src.csv

It is designed to be run as a cron job.


The script looks for all source ip addresses, but can also be modified to look for destination ip addresses.


#Copy the existing Feed to a backup location

mv /var/www/html/RDNS-src.csv /var/www/html/RDNS-src.csv.bak

# We keep all IP Addresses that we have processed in /tmp/ipprocessed.txt

touch /tmp/ipprocessed.txt


# First Get a list of ip.src from our Broker.

curl -s --user 'admin:netwitness' '' |grep field |cut -f 2 -d ">" |cut -d "<" -f1 |grep -v rsyslogd | grep -v pts |grep -v ignored |grep -v \(\) >/tmp/RDNS-src.txt


while read p; do

  cmd=$(grep -ci "$p" /tmp/ipprocessed.txt)

  escape_p=$(echo "$p" |sed 's/\[/\\[/')

  cmd2=$(grep -ci "$escape_p" /tmp/ipprocessed.txt)

  if [ $cmd == "0" ] && [ $cmd2 == "0" ]; then

    #echo  "IP SRC "$p"  not previously seen so process it"

    #echo  "$p" >>/tmp/ipprocessed.txt

    OUTPUT=$(host $p  |grep -v "not found" |grep "domain name pointer"   |cut -d" " -f 5 | rev | cut -c 2- | rev |sed -n -e 'H;${x;s/\n/,/g;s/^,//;p;}')



   if [ "$OUTPUT" != "" ]; then

    echo "$p",$OUTPUT >>/var/www/html/RDNS-src.csv



  #  echo "UserAgent already in processed"


done </tmp/RDNS-src.txt

I had a request from a customer to parse out some messages from a mail conversation.


Basically the email contains the following headers:


Received-SPF: pass ( is whitelisted); client-ip=; helo=ECAT.waugh.local; envelope-from=david.waugh@waugh.local; x-software=spfmilter 2.001 with libspf2-1.2.10;
Received: from ECAT.waugh.local ([])
  by (8.13.8/8.13.8) with ESMTP id u3RCCOxW024832
  for <>; Wed, 27 Apr 2016 12:12:24 GMT
x-metascan-quarantine-id: e3c4973b-d836-4f37-a47d-62271c21a5cc
Received: from UKXXWAUGHDL1C ([]) by ECAT.waugh.local with ESMTP ; Wed, 27 Apr 2016 13:12:23 +0100
From: "10.5 Test" <>
To: <>
Subject: RE: This is a test through my mail system
Date: Wed, 27 Apr 2016 13:12:23 +0100
Message-ID: <0bd101d1a07e$0e479230$2ad6b690$@waugh.local>
MIME-Version: 1.0
Content-Type: multipart/alternative;
X-Mailer: Microsoft Outlook 14.0
Thread-Index: AdGget+xlnYyEWhiQrysrAxYJ7qXxgAApK1AAAAdpuAAAAfMIA==
Content-Language: en-gb
X-Virus-Scanned: clamav-milter devel-clamav-0.98-dmgxar-126-gfde6749 at infra1
X-Virus-Status: Clean


The header of interest here is the one called Received-SPF:


I created a parser based on Detecting Sinkholed Domains With The X-Factor Parser


On my Packet Decoder I created a parser called SPF.parser in /etc/netwitness/ng/parsers

containing the following:


<?xml version="1.0" encoding="utf-8"?>

<parsers xmlns:xsi="" xsi:noNamespaceSchemaLocation="parsers.xsd">

      <parser name="SPF Factor" desc="This extracts the SPF string from a Header">


                                               <token name="tXfactor" value="Received-SPF:" options="linestart" />

                                                <number name="vPosition" scope="stream" />

                                                <string name="vXfactor" scope="stream" />

                                                <meta name="meta" key="xfactor" format="Text" />




                                <match name="tXfactor">

                                                <find name="vPosition" value="&#x0d;&#x0a;" length="512">

                                                 <read name="vXfactor" length="$vPosition">

                                                  <register name="meta" value="$vXfactor"/>







Whenever the Received-SPF header was seen, then the rest of the header was put into the xfactor metakey.




If your SPF fields are from a different mail provider, then you could adjust the parser accordingly.

For example if your messages had the following header:

Authentication-Results:; spf=pass


Then the line in the parser could be changed from:

<token name="tXfactor" value="Received-SPF:" options="linestart" />


<token name="tXfactor" value="Authentication-Results:; spf=" options="linestart" />


If you wanted to put the result into a different metakey (for example result) then change

<meta name="meta" key="xfactor" format="Text" />


<meta name="meta" key="result" format="Text" />

I had a question about how to disable root login on Security Analytics for SSH.


Note before proceeding, make sure that you have IDrac access set up to your appliances. Although I have tested this script and found it working in my test environment, this does have the potential to lock you out of your appliances. Please take necessary precautions to prevent this....


You may also wish to explore STIG hardening Configure DISA STIG Hardening - RSA Security Analytics Documentation


The steps to do this are widely available and an example is

How do I disable SSH login for the root user? - Media Temple


This article will demonstrate how we implement these steps in puppet so that they will automatically be propagated through the Security Analytics Deployment.

  1. Changes to SSH Config are made in the file /etc/puppet/modules/ssh/manifests/init.pp on the SA Server.
  2. Make a backup of this file and copy it to a different directory
  3. Replace the existing /etc/puppet/modules/ssh/manifests/init.pp with the one attached. It contains the following content. Make sure that you amend the password for your emergencyroot user.


The changes will then be propagated throughout the deployment on the next Puppet run so could take up to 30 minutes to take effect.


class ssh {
  service { 'sshd':
    enable      =>  true,
    ensure      =>  running,

    exec { 'fix-ssh':
      #path        => ["/bin", "/usr/bin"],
      command     => "/etc/puppet/scripts/",
    augeas { "sshd_config_X11Forwarding":
       context => "/files/etc/ssh/sshd_config",
       changes => "set X11Forwarding no",

# Fixes to disable root login and to create an Emergency Root User

group { 'emergencyroot' :
        ensure          => 'present',
  gid             => '10018',

user { 'emergencyroot' :
        ensure          => 'present',
        home            => '/home/emergencyroot',
        gid             => '10018',
        password        => 'myverysecurepassword',
        uid             => '10018',
        shell           => '/bin/bash',

augeas { "Disable_Root_Login":
       context => "/files/etc/ssh/sshd_config",
       changes => "set PermitRootLogin no",
       notify    =>  Service['sshd']


augeas { "sudo-emergencyroot":
  context => "/files/etc/sudoers",
  changes => [
    "set spec[user = 'emergencyroot']/user emergencyroot",
    "set spec[user = 'emergencyroot']/host_group/host ALL",
    "set spec[user = 'emergencyroot']/host_group/command ALL",
    "set spec[user = 'emergencyroot']/host_group/command/runas_user ALL",
 notify    =>  Service['sshd']
 file { '/etc/ssh/sshd_config':
      ensure    =>  present,
      owner     =>  'root',
      group     =>  'root',
      mode      =>  600,
      #source    =>  'puppet:///modules/ssh/sshd_config',
      notify    =>  Service['sshd']


I had a look in my test system today and noticed that my ESA Trial Rules had been disabled.


esa Trial Rules Disabled.png


Further investigation in health and wellness showed that a particular rule was using much more memory than any others. As this is a test VM system, the virtual memory available is much less than available in a physical appliance. Here we can see that this rule is using 311 MB!


Bad ESA Rule.png


This was not what I intended for this rule, so looking at the EPL for the rule reveals the following:


module UserAgent_Alert;


@Hint('reclaim_group_aged=60 minutes,reclaim_group_freq=1')

// The stage setters - the output of these statements is for Esper to retain (internally)

CREATE WINDOW minutes) (client string);

INSERT INTO UserAgentWatchList SELECT client from Event(client IS NOT NULL AND alert_id IS NOT "Client Ignore");

// The real “alerter”. The annotation, identifies it as the one that ESA needs to watch for.



@Name('UserAgent Watchlist')

@Description('This alert will trigger on new User Agents but only 1 alert per individual IP address.')

SELECT * FROM Event(client is NOT NULL AND alert_id IS NOT "Client Ignore" ) WHERE client NOT in (SELECT client FROM UserAgentWatchList) ;


Here the issue is that I put all new clients into the UserAgentWatchList and not just new ones. This would mean that duplicate values were stored in this named window.


The correct rule is the following. In this example I only add a client to the named window UserAgentWatchListLowMem if it is not already in the named window.


@Hint('reclaim_group_aged=60 minutes,reclaim_group_freq=1')

// The stage setters - the output of these statements is for Esper to retain (internally)

CREATE WINDOW minutes) (client string);

On Event(client IS NOT NULL AND alert_id IS NOT "Client Ignore") as myevent

merge UserAgentWatchListLowMem WatchListClient

where WatchListClient.client=myevent.client

when not matched

then insert select client;


// The real “alerter”. The annotation, identifies it as the one that ESA needs to watch for.



@Name('UserAgent WatchlistLowMem')

@Description('This alert will trigger on new User Agents but only 1 alert per individual IP address.')

SELECT * FROM Event(client is NOT NULL AND alert_id IS NOT "Client Ignore" ) WHERE client NOT in (SELECT client FROM UserAgentWatchListLowMem) ;


You can check memory utilisation of your Named Windows using this article.

How do I look in the Named Windows of my ESA Rules

Filter Blog

By date: By tag: