ESA Rule Writing Best Practices

Document created by RSA Information Design and Development on May 14, 2019Last modified by RSA Product Team on Nov 5, 2019
Version 24Show Document
  • View in full screen mode

When working with RSA Live ESA or the ESA Rule Builder, you should not need to know the EPL syntax used within the rules. However, if your use case exceeds the capabilities of either of these, you should become familiar with at least the basics of the EsperTech EPL language used with ESA.

Note: NetWitness Platform 11.3 users Esper 7.1. Earlier releases use Esper 5.3.

Inefficiently written EPL rules can have a detrimental impact on how the ESA appliance functions. Therefore, it is important to write effective EPL rules. See the following topics in the Alerting Using ESA Guide for details:

This document contains hints, use cases and FAQs related to the development of statements using the Esper event processing language (EPL) with NetWitness ESA.

Performance Hints

The following table outlines a few of the areas where you can make performance optimizations to the most common statement syntax.

RecommendationExample RuleEsper Documentation

Use group by for aggregation

Successful Logins

Use only group by for aggregation. Avoid use of the view for std:groupwin

Documentation links:

Use hint with view .std:groupwin

HTTP Uploads - Office Documents Volume – High

If you must use the view std:groupwin for aggregation, add a Hint to reclaim groups. If you are also using a time batch window view, then the time in seconds should be double the time window value.

Documentation Links:

Avoid the creation of many grouping windows

Excessive upload to File Cloud Services – High

Avoid creating many grouping windows (e.g. hundreds of thousands) over a long period of time if many unique values are expected.

Avoid very general statements

-

Avoid alerts or general filter conditions that could cause hundreds of thousands of events to be added to a window. You could use the filter on the Investigation over the proposed time window to gauge the level of events that may match.

For health and wellness type alerts, see: Health and Wellness Alarms View and Health and Wellness Monitoring View.

Write strict MATCH RECOGNIZE patterns

Account Created And Deleted In Short Period Of Time

Avoid using quantifiers of + or * within MATCH RECOGNIZE statements if possible. The system will need to track fewer states. Consider writing a PATTERN-based statement if you need to use a loose pattern based on requirements.

Documentation Links:

Suppress Duplicate Matches

Detecting Dyreza infections

Use every-distinct for pattern matches or @SuppressOverlappingMatches annotation.

Use output rate limiting;

Sources Communicating With Known Bad Hosts – High

Use output rate limiting such as OUTPUT FIRST EVERY X MINUTES to suppress generation of alerts if use case allows. This saves database space.

Keep in mind that .win:time() behaves differently than .win:time_length_batch Events are not removed from the window after logic is triggered and may create additional alerts with the same events.

There are different ways to control generation of alerts within ESA. ‘output first every x minutes’ only alerts on the first set of events that match the statement. Subsequent alerts during that suppression timeframe will not be stored.

Add functions such as .toLowerCase to meta keys only as needed

 

Using functions adds processing overhead: use them only when needed. For example, you do not need case-insensitive matching for meta keys storing IP addresses. Also, there are keys such as device.type with known, static case, where using a function would not produce different results.

For more performance tips, see the following Esper Reference documentation:

Use group by for aggregation: Successful Logins

Consider the following statements: one using groupwin, and the other without groupwin.

Using view groupwin:

@RSAAlert

SELECT * FROM
Event(
medium = 32
AND
ec_activity='Logon'
AND
ec_outcome='Success'
AND
ip_dst IS NOT NULL
AND
user_dst is NOT NULL
).std:groupwin(user_dst)
.win:time_batch(60 seconds)
group by user_dst
having count(distinct ip_dst) >= 2;

Without view groupwin:

@RSAAlert

SELECT * FROM
Event(
medium = 32
AND
ec_activity='Logon'
AND
ec_outcome='Success'
AND
ip_dst IS NOT NULL
AND
user_dst is NOT NULL
).win:time_batch(60 seconds)
group by user_dst
having count(distinct ip_dst) >= 2;

The second statement (without groupwin) works the same as the first, but uses less memory.

Use hint with view .std:groupwin: HTTP Uploads - Office Documents Volume - High

The @Hint("reclaim_group_aged=age_in_seconds") hint instructs the engine to discard an aggregation state that has not been updated for age_in_seconds seconds. The age_in_seconds value should match the time window added to the statement.

@Hint('reclaim_group_aged=120')

 

SELECT * FROM Event( medium = 32
AND (msg_id.toLowerCase() = 'post')
AND user_dst IS NOT NULL
AND extension IN ('pdf','doc','xls','docx','xslx','ppt','pptx') AND rbytes > 10000
AND device_type='cacheflowelff'
AND alert IN ('VIP', 'CritAsset')
)
.win:time_batch(60 seconds)
GROUP BY user_dst HAVING COUNT(*) = 20;

Write strict MATCH RECOGNIZE patterns: Account Created And Deleted In Short Period Of Time

The pattern defined for the match recognize statement should be strict. This means you should try to eliminate repetition operators such as + or *.

From:

SELECT * FROM Event(

   medium = 32 AND
   ec_subject='User' AND
   ec_outcome='Success' AND
   user_src is NOT NULL AND
   ( ec_activity='Create' OR ec_activity='Delete' )

).win:time(7200 seconds) match_recognize

( partition by
   user_src measures C as c,
   D as d pattern (C+ D)
   define C as C.ec_activity='Create' ,
   D as D.ec_activity='Delete'

);

To:

SELECT * FROM Event(

   medium = 32 AND
   ec_subject='User' AND
   ec_outcome='Success' AND
   user_src is NOT NULL AND
   ( ec_activity='Create' OR ec_activity='Delete' )

).win:time(7200 seconds) match_recognize

( partition by
   user_src measures C as c,
   D as d pattern (C D)
   define C as C.ec_activity='Create' ,
   D as D.ec_activity='Delete'

);

Suppress Duplicate Matches: Administrative Activity followed by User Creation

In the following statement, a new thread is created for every a event. This means multiple a events will match with the same b event. This could result in an unexpected and undesirable number of alerts for the same user during the time window.

SELECT * FROM PATTERN [ every a =
Event(device_class='Web Logs'

AND host_dst = 'icanhazip.com')
-> b = Event(category LIKE '%Botnet%' AND device_class='Web Logs'
AND user_dst=a.user_dst) where timer:within(300 seconds)

Instead, we recommend using the hint @SuppressOverlappingMatches with the PATTERN syntax using every.

SELECT * FROM PATTERN @SuppressOverlappingMatches [ every
a = Event(device_class='Web Logs'

AND host_dst = 'icanhazip.com')
-> b = Event(category LIKE '%Botnet%' AND device_class='Web Logs'
AND user_dst=a.user_dst) where timer:within(300 seconds)

Use output rate limiting: Sources Communicating With Known Bad Hosts – High

If a rule is triggering frequently, you may only want to store the first occurrence within a time period per unique meta value. This means the alerts are not stored and taking up space in the database. This is separate from notification suppression, which can be done within the UI and does not influence alert storage.

The alerts below are suppressed per user_dst using output first every syntax. Only the first alert within the 60 minute time window will be stored in the database and alerted. Allow constituent events to be retained within the alert by using window aggregation window(*). The result without window aggregation would be only the first of the 20 events per user_dst. If you do not need to maintain all events for analysis, then use select * instead of window aggregation.

From:

@Hint('reclaim_group_aged=300')

SELECT * FROM Event( medium = 32
AND alert_id LIKE 'known_bad%'
AND alert IN ('VIP' , 'CritAsset')
).std:groupwin(ip_src)
.win:time(1 Minutes)
GROUP BY ip_src HAVING COUNT(*) >= 20;

To:

@Hint('reclaim_group_aged=300')

 

SELECT window(*) FROM Event( medium = 32
AND alert_id LIKE 'known_bad%'
AND alert IN ('VIP' , 'CritAsset')
).std:groupwin(ip_src)
.win:time(1 Minutes)
GROUP BY ip_src HAVING COUNT(*) >= 20
ouptut first every 60 minutes;

Use Cases

The solutions are explained through a series of comments within the solution boxes. Comments are surrounded by /* */.

Alert if events occur within a time interval and the absence of another event is detected

Statement Summary:

Alert after receiving 10 different IDS events from the same source within 10 minutes, but only if within those 10 minutes, we do not see a TCP RST sent from the destination IP.

This is an example of correlating packet and log data. Our F5's will do a TCP RST on the inbound web requests for unknown paths, so in this instance, we only want to be alerted when a source receives 10 unique attacks to a single destination and that destination has not responded to the web requests.

Solution:

/* Required annotation to trigger an alert with the advanced statement */

@RSAAlert

/* Instruct the engine remove duplicate matches to the first ‘a’ event */

SELECT * FROM pattern @SuppressOverlappingMatches
[

/* Intrushield policy event */

every a=Event (
device_type IN ( 'intrushield' )
AND ip_src is not null
AND ip_dst is not null
AND policy_name is not null
AND policy_name NOT LIKE '%P2P%'
)
-> (

/* The 10 minute time window following the first event */ timer:interval(600 seconds)

AND

/* 9 more Instrushield policy violations each with a unique policy_name and the same ip_src and ip_dst. */

[9] b= Event (
device_type IN ( 'intrushield' )
AND ip_src = a.ip_src
AND ip_dst = a.ip_dst
AND policy_name is not null
AND policy_name NOT LIKE '%P2P%'
AND policy_name != a.policy_name
)

/* Both the statement for event b and event c must evaluate to true for the syntax to match. In other words, no TCP RST can occur to match the pattern. */

AND NOT
c=Event (medium=1 AND tcp_flags_seen ='rst' AND ip_dst=a.ip_dst)
)
]

/* Ensure all b events for unique policy names between them. */

where b.distinctOf(i => i.policy_name).countOf() = 9;

Correlate events that arrive out of order

Statement Summary:

Correlate 3 events that populate the same ip_dst and occur within 30 minutes of each other, in any order.

For details on inner joins, see the following Esper Reference documentation:

/*
Intrusion Detection with Nonstandard HTTPS Traffic and ECAT Alert Single host generates IPS alert on destination IP on port TCP/443 accompanied by traffic to TCP/443 that is not HTTPS with the target host generating an ECAT alert within 5 minutes.
*/

/* Create a window to store the IPS, nonstandard traffic and ECAT alerts */

@Name('create')
Create Window HttpsJoinedWindow.win:time(15 minutes)(device_class string, ip_dstport integer, service integer , tcp_dstport integer, device_type string, ip_dst string);

/* Insert into the window the IPS, nonstandard traffic and ECAT alerts */

@Name('insert')
INSERT INTO HttpsJoinedWindow
SELECT * FROM
Event
(
(ip_dst IS NOT NULL and device_class IN ('IPS', 'IDS', 'Firewall') AND ip_dstport=443) OR
(ip_dst IS NOT NULL and service!=443 and tcp_dstport=443) OR
(ip_dst IS NOT NULL and device_type='rsaecat')
);

/* Alert to the combination of all three events: IPS, nonstandard traffic and ECAT alerts */

@RSAAlert

INSERT INTO HttpsIntrusionTrigger
SELECT * FROM
HttpsJoinedWindow(ip_dst IS NOT NULL and device_class IN ('IPS', 'IDS', 'Firewall') AND ip_dstport=443) as s1,
HttpsJoinedWindow(ip_dst IS NOT NULL and service!=443 and tcp_dstport=443) as s2,
HttpsJoinedWindow(ip_dst IS NOT NULL and device_type='rsaecat') as s3 where s1.ip_dst = s2.ip_dst and s1.ip_dst = s3.ip_dst;

/* Delete all events from the joined window that caused the alert so they won't be reused */

@Name('delete')
on HttpsIntrusionTrigger delete from HttpsJoinedWindow as j where s1.ip_dst=j.ip_dst;

Only fire rule within business hours

Statement Summary:

We want a rule to only fire if the event occurs within business hours.

For details on inner joins, see the following Esper Reference documentation:

Define non-working hours:

  • Set the working hours as '09:00' – '18:00'
  • Any event.cat.name LIKE system.config% after the working hours will trigger.

create context NotWorkingHours start (0, 18, *, *, *) end (0, 9, *, *, *); context
NotWorkingHours select * from Event(event_cat_name LIKE ‘system.config%’);

Administrative Activity followed by User Creation

Solution: Added parenthesis around the second and third events to require both the 2nd and 3rd events to occur after the 1st event.

For details, see the following Esper Reference documentation:

In the following example, we:

  • Moved the time window of 60 minutes to the pattern statement, so all 3 events would need to occur within that time window
  • Added @SuppressOverlappingMatches. Use the @SuppressOverlappingMatches pattern-level annotation to instruct the engine to discard all but the first match among multiple, overlapping matches.

SELECT * from pattern @SuppressOverlappingMatches [

every e1=Event(device_class IN ('Windows Hosts') AND reference_id IN ('4720', '4722',

'4724','4738') )

->

(

timer:interval(1 Seconds) AND e2=Event(event_cat_name LIKE 'Config.Changes%' AND user_dst=e1.user_src)

-> timer:interval(1 Seconds) AND e3=Event(device_class IN ('Windows Hosts') AND reference_id IN ('4726') AND user_src=e2.user_dst)

)

]

Leverage enrichments (CSV files, database) within ESA Rules such as domain or IPs

The NetWitness Platform has an Enrichments feature. After you define an enrichment, you can use it within a rule condition using the Rule Builder or through an Advanced rule. Below is a sample rule that is a part of the base ESA installation. This demonstrates using the Rule Builder to whitelist German sources.

You can create a Named Window to store IPs and update the window based on matching filter criteria. Only trigger if a second event occurs and the IP is on the watchlist. The user is only kept on the watchlist for 15 minutes. User can delete from a named window based on a triggering event.

For details, see the following Esper Reference documentation:

create window WatchListIPs.win:time(15 min) (ip_src string);

insert into WatchListIPs select ip_src from Event(category LIKE '%scan%');

@RSAAlert

select * from Event(category LIKE '%malicious%') WHERE ip_src in (SELECT ip_src from WatchListIPs );

Compute simple math functions in a Time Window

This section describes how to compute percentages, rations, averages, counts, min, and max within a given time window.

Note: Computations over a large number of events and/or time periods are performance- and memory-intensive. Use caution when deploying such rules.

One way is to use named windows. However, this stores events in memory, which may cause issues if storing over a long period of time or a large number of events.

CREATE WINDOW SizePerIP.win:length(100) (ip_src string,size long);

INSERT INTO SizePerIP SELECT ip_src AS ip_src, sum(size) AS size FROM Event.win:time_batch(1 minute) GROUP BY ip_src;

@RSAAlert(oneInSeconds=0)
SELECT ip_src FROM SizePerIP GROUP BY ip_src HAVING size > avg(size)*2;

Using a non-overlapping context does not retain events in memory, and thus is the preferred solution.

/* Create a non-overlapping context to store data by second */

create context Per Second start @now end after 1 second;

/* Sum session size per second */

context PerSecond insert into OneSecondBucket select
ip_src, sum(size) as size from Event group by ip_src output
snapshot when terminated;

/* Alert if the total size for one second within an hour is two times greater than average */

@RSAAlert
select ip_src from OneSecondBucket.win:time(1 hour) group by ip_src HAVING size >
avg(size)*2;

Frequently Asked Questions

Question: What Regex filters are supported?

Answer: See: The 'regexp' Keyword (Esper 7.1) or The 'regexp' Keyword (Esper 5.3).

The rexexp function matches the entire region against the pattern via java.util.regex.Matcher.matches() method. For more details, consult the Java API documentation or refer tohttp://www.regular-expressions.info/refflavors.htmlRegular Expression Flavors.

Question: What is the difference between the count vs. time length batch?

Answer: Consider the following example.

SELECT * FROM Event(filter_criteria)

.win:time_batch(1 minute)

GROUP BY ip_src

HAVING count(*) > 10;

When the time window of 1 minute is reached, the system outputs everything that matches an ip_src.

The HAVING count clause instructs the engine to only output after the time window if the count of events is greater than 10.

The GROUP BY ip_src aggregation clause instructs the count to apply to only a single ip_src, rather than across all ip_src values that match the filter criteria.

Question: Can the parameters of ESA Live rules be changed?

Answer: You can't change the ESA rules that are provided on Live. However, you can view a rule's syntax, copy it to a new rule, and use this source to tune the rule for your needs.

Question: How can I export alerts from the ESA Mongo Database?

Answer: You can use the following command.

mongoexport -d respond-server --authenticationDatabase admin -u deploy_admin -p netwitness -c alert --out alerts_by_rule_time.csv

Question: I have few ESA Rules where I filter many source IP addresses. I want to create one List and use it List into all my ESA Rules. Can I do it? How?

Answer: From 11.1 onwards, you can use Context Hub Lists in ESA Rules. Details are provided in the Configure Context Hub List as an Enrichment Source topic.

Question: How I can exclude a subnet in a ESA Rule? Example: ip_src IS NOT "10.0.0.0/8"

Answer: Use ip_src NOT REGEXP “10\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}”

Question: : How can I specify an IP range in a list to be used in an ESA Rule?

Answer: This has been answered in a blog post on RSA Link: https://community.rsa.com/docs/DOC-85694. The contents of that blog post are reproduced here:

{1,3} represents 3 digit number

[0-9] represents range number starting from 0 to 9

[0-9]{1,3} represents 3 digit number where each digit starts from 0 to 9.

Example: 000,001,002,....,997,998,999

Let me start below regex. I have highlighted the last 2 octets in red color and the first two octets highlighted in black color.

(10\.[0-9]{1,3}|172\.(3[01]|2[0-9]|1[6-9])|192\.168)\.[0-9]{1,3}\.[0-9]{1,3}

  • The simple [0-9]{1,3}\.[0-9]{1,3} specifies entire range of 000.000 to 999.999
  • 10\.[0-9]{1,3} specifies 10.000 to 10.999
  • 172\.(3[01]|2[0-9]|1[6-9]) means 172.16 to 172.19 then 172.20-172.29 then 172.30 to 172.31
  • 192\.168 represents 192.168

Thus, this example regex cover the following Private IP ranges:

  • 10.0.0.0-10.255.255.255
  • 172.16.0.0-172.31.255.255
  • 192.168.0.0-192.168.255.255

Multi-Valued Meta

RSA NetWitness supports multi-valued meta keys. A multi-valued meta key is the meta field with the array type.

Note the following:

  • Multi-valued meta keys are typed differently between the default Esper implementation and the one developed for ESA.
  • Esper within ESA uses vectors, while the default implementation assumes the array type.
  • Vectors are needed to support grouping and aggregation for mutli-valued meta.
  • Custom functions have been developed to support equivalent functions to the default implementation.
  • To see if a particular key is typed as multi-valued, go to Configure > ESA Rules > Settings > Meta Key References, then filter by String Array:

For more details about strings and string arrays in ESA, see the ESA Configuration Guide.

Convert Vector to an Array

To use standard Esper syntax within EPL, you could convert the ESA Vector to an array. This is especially helpful when working with lambda expressions or contained event selection.

    

Note: This functionality is supported on RSA NetWitness versions 11.3 and higher.

    

Available functions:     

  • asStringArray(meta_key): Converts an ESA vector to a string array     
  • asArray(meta_key): Converts an ESA vector to an object-based array   

 Example:   

@RSAAlert
   SELECT * FROM Event (
     (asStringArray(alias_host)).anyOf(v => v = 'bad.malwaredomain.com')
)

Matching Against Multi-Valued Meta

You can do an exact match for values in mutli-valued meta keys:

  • Use ANY(meta) if you want at least one of the values in the meta key to match the value:

    @RSAAlert SELECT * FROM Event (
       ‘Bo’ = ANY(username) OR ‘Joe’ = ANY(username)
    )
  • Use ALL(meta) if you want every value within the meta key to exactly match:

    @RSAAlert SELECT * FROM Event (
       ‘Bo’ = ALL(username) OR ‘Joe’ = ALL(username)
    )

Case-Insensitive Matching Against Multi-Valued Meta

For ESA version 10.6 and later, the following custom functions are available:

  • isOneOfIgnoreCase: triggers when at least of one the lower-case list of values are contained within the values of the meta key, no matter the case.

    @RSAAlert SELECT * from Event(
       isOneOfIgnoreCase(username,{'alpha','beta','gamma'})
    )

    So, for example, if the username meta key contains the value Beta, the alert is triggered.

  • isNotOneOfIgnoreCase: triggers if none of the lower-case list of values are contained within the values of the meta key, no matter the case.

    @RSAAlert SELECT * from Event(
       isNotOneOfIgnoreCase(username,{'alpha','beta','gamma'})
    )

    For example, if the username meta key does not contain alpha, beta or gamma—no matter the case for any of those strings—the alert is triggered. So, if the username meta key contains Alpha or BETA or gAmmA, the alert is not triggered.

Fuzzy Comparison of Multi-Valued Meta

RSA NetWitness contains custom Esper functions developed by RSA that you can use for fuzzy matching.

  • matchLike: Performs a LIKE comparison against multi-valued meta keys.

    @RSAAlert SELECT * from Event(
       matchLike(alias_host, '%www.xn-%')
    )

    The above example triggers if any of the values in the alias_host meta key contain a sub-string of www.xn-

  • matchRegex: Performs regular expression comparison against multi-valued meta keys.

    @RSAAlert SELECT * from Event(
       matchRegex(alias_host, '.*www.xn-.*')
    )

    The above example functions the same as the matchLike example: it triggers if any of the values in the alias_host meta key contain a sub-string of www.xn-

Intersection of Multi-Valued Meta

Intersection is an expression comparing two multi-valued meta keys and returning true if there is a least one element common between them.

RSA NetWitness includes a custom intersection function. Additionally, you can use other Java-supported syntax, such as getIntersection(alias_host, e1.alias_host).contains("abc")

@RSAAlert SELECT * from PATTERN [
   /* Statement: FireEye WebMPS Exploit.Kit */
      e1=Event(policy_name .toLowerCase() LIKE 'exploit.kit%' A

   AND user_agent .toLowerCase() NOT IN ( 'sam proxy check' )

   AND url IS NOT NULL )
   ->
   /* Statement: followed by Proxy Log NOT 403 */
      e2=Event(device_type .toLowerCase() IN ( 'mcafeewg' )

   AND result_code .toLowerCase() NOT IN ( '403' )

   AND url IS NOT NULL

   AND ip_src=e1.ip_src AND getIntersection(url, e1.url).size() > 0)

where time:within(60 Minutes)

Examples

The following examples illustrate various concepts to consider and use when you write or extend ESA Rules in the RSA NetWitness Platform.

Multiple Different Policies Violation

If you need to aggregate individual values stored within a mutli-valued meta key, you should use contained event selection.                                                           

/*
Load previously created and enabled context hub lists with policy names
*/
@UsesEnrichment(name="policy1")
@UsesEnrichment(name="policy2")
/*
The multi-valued meta for email_src will be broken into individual strings and the type must be defined
*/
CREATE SCHEMA EmailContainer(email_str string);
/*
Create the window to hold the policy names by type (policy1 or policy2)
*/
@Name('Create')
CREATE WINDOW PolicyViolations.win:time(1 hour) (policy1 string, policy2 string, policy_name string, email_str string);
/*
Insert into the window the policy names, email source string and whether the event matches to policy1 or policy 2
*/
@Name('Filter')
ON Event (
device_type = 'symantecdlp' AND
email_src IS NOT NULL AND
(
   EXISTS (SELECT * FROM policy1 WHERE (LIST = Event.policy_name))
   OR EXISTS (SELECT * FROM policy2 WHERE (LIST = Event.policy_name))
)
) [select policy_name, email_str from asStringArray(email_src)@type(EmailContainer)] as e
INSERT INTO PolicyViolations select policy_name, "policy1" as policy1, NULL as policy2, email_str where EXISTS (SELECT * FROM policy1 WHERE (LIST = e.policy_name))
INSERT INTO PolicyViolations select policy_name, NULL as policy1, "policy2" as policy2, email_str where EXISTS (SELECT * FROM policy2 WHERE (LIST = e.policy_name));
 
/*
Alert when policy1 violations are at least 1 and policy2 violations with at least 3
*/
@RSAAlert
@Name('Alert')
INSERT INTO PolicyAlert
SELECT *
FROM PolicyViolations as pv
GROUP BY email_str
HAVING COUNT(policy1) >= 1 AND
COUNT(policy2) >= 3;
 
/*
Delete all events from the window that caused the alert so they won't be reused in subsequent alerts
*/
@Name('Delete')
on PolicyAlert as pa delete from PolicyViolations as pv where pa.email_str =pv.email_str;

 

   

 

Multiple Different Policies Violation

If you need to aggregate individual values stored within a mutli-valued meta key, you should use contained event selection.

/*
Load previously created and enabled context hub lists with policy names
*/
@UsesEnrichment(name="policy1")
@UsesEnrichment(name="policy2")
/*
The multi-valued meta for email_src will be broken into individual strings and the type must be defined
*/
CREATE SCHEMA EmailContainer(email_str string);
/*
Create the window to hold the policy names by type (policy1 or policy2)
*/
@Name('Create')
CREATE WINDOW PolicyViolations.win:time(1 hour) (policy1 string, policy2 string, policy_name string, email_str string);
/*
Insert into the window the policy names, email source string and whether the event matches to policy1 or policy 2
*/
@Name('Filter')
ON Event (
device_type = 'symantecdlp' AND
email_src IS NOT NULL AND
(
   EXISTS (SELECT * FROM policy1 WHERE (LIST = Event.policy_name))
   OR EXISTS (SELECT * FROM policy2 WHERE (LIST = Event.policy_name))
)
) [select policy_name, email_str from asStringArray(email_src)@type(EmailContainer)] as e
INSERT INTO PolicyViolations select policy_name, "policy1" as policy1, NULL as policy2, email_str where EXISTS (SELECT * FROM policy1 WHERE (LIST = e.policy_name))
INSERT INTO PolicyViolations select policy_name, NULL as policy1, "policy2" as policy2, email_str where EXISTS (SELECT * FROM policy2 WHERE (LIST = e.policy_name));
 
/*
Alert when policy1 violations are at least 1 and policy2 violations with at least 3
*/
@RSAAlert
@Name('Alert')
INSERT INTO PolicyAlert
SELECT *
FROM PolicyViolations as pv
GROUP BY email_str
HAVING COUNT(policy1) >= 1 AND
COUNT(policy2) >= 3;
 
/*
Delete all events from the window that caused the alert so they won't be reused in subsequent alerts
*/
@Name('Delete')
on PolicyAlert as pa delete from PolicyViolations as pv where pa.email_str =pv.email_str;

The above example shows the individual string for the email_src meta key being split into a named window. The filter criteria for matching policy_name to either of the Context Hub lists for policy1 and policy2 must be met before the insertion into the named window occurs. On the alert statement, if at least one event matches policy1 and 3 or more events match policy2 for the individual email_str, then an alert is generated.

The contained event selection statement uses a custom RSA function of asStringArray() since multi-valued meta in ESA is typed as a vector. It must first be converted to an array in order to use this syntax.

Note: The custom function is only available in RSA NetWitness 11.3 and higher.

For details on contained-event selection, , see the following Esper Reference documentation:

For more details on using Context Hub Lists in ESA Rules, see Context Hub Lists in ESA Rules.

DLP Personal Email SMTP Monitor

If you need a timed, data window over a long time frame, it is better, in terms of memory usage, to use a named window instead.

The following example contains rewritten code for a rule that stored 12 hours of events in memory, to use instead a named data window instead.

If you need to hold a lot of events over a long time period, you can conserve memory usage by using a named window instead of a data window. The following example creates a window and stores email_src from the event stream matching the filter criteria for 12 hours. The example uses syntax for contained event selection, and splits the multi-valued meta key email_src into individual strings for analysis within the named window.

/*
The multi-valued meta for email_src will be broken into individual strings and the type must be defined
*/
CREATE SCHEMA PersonalEmailContainer(email_str string);
/*
Create the window to hold the email addresses violating the policy
*/
@Name('Create')
CREATE WINDOW PersonalEmail.win:time(12 hours) (email_str string);
/*
Create the window to hold the email addresses
*/
@Name('Filter')
INSERT INTO PersonalEmail
SELECT * FROM Event (
device_type = 'symantecdlp' AND
policy_name = 'personal_email_smtp_monitor' AND
(isNotOneOfIgnoreCase(email_src,{ 'postmaster@abc.com', 'mail delivery subsystem', 'donotreply@abc.com' }))
) [asStringArray(email_src)@type(PersonalEmailContainer)] ;
/*
Alert when the email address is reported at least 10 times within a 12 hour period
*/
@RSAAlert
@Name('Alert')
INSERT INTO PersonalEmailAlert
SELECT *
FROM PersonalEmail as pe
GROUP BY email_str
HAVING COUNT(*) >= 10
OUTPUT FIRST EVERY 12 hours;
/*
Delete all events from the window that caused the alert so they won't be reused in subsequent alerts
*/
@Name('Delete')
on PersonalEmailAlert as pe delete from PersonalEmail as pe where pe.email_str =pe.email_str;

HTTP Internal Vulnerability Scan

This rule filters for internal log events from Intrushield and Sourcefire event sources, over ports 80, 443 or 8080, that are non-informational. After the events are filtered from the stream, they are stored in a data window for 15 minutes and aggregated by ip_src. When there is a count of 5 or more unique msg_ids across a single ip_src, then an alert is generated.

Alerts are suppressed with the syntax output first every 12 hours. This means that only the first alert per ip_src is generated within the specified, 12-hour time period. The syntax for SELECT window(*) means to select all events within the data window:; otherwise, only the last event within the window will be returned.

@RSAAlert
SELECT window(*) FROM
Event (
   device_type IN ('intrushield', 'sourcefire') AND
   direction = 'lateral' AND
   msg_id IS NOT NULL AND
   severity != 'Informational' AND
   (ip_dstport in (80, 443, 8080) OR ip_srcport in (80, 443, 8080))
).win:time_batch(15 minutes)
GROUP BY ip_src
HAVING count(distinct msg_id) >= 5
OUTPUT first every 12 hours;

Documentation Links:

IS - 200000701.0001 - BitSight Alert {SI}

This alert is very general, which means that depending on the amount of traffic, there could be a performance impact. If so, adding to a report or report alert may be a better option. The view for unique seems unnecessary, because only the first event per ip_src is returned over the 24-hour period.

The following example removes the view from the alert.

SELECT * FROM Event (
  device_type = 'bitsight'
)
GROUP BY ip_src
OUTPUT first every 24 hours;

Ignore Case Function

The custom function isOneOfIgnoreCase removes case sensitivity from a list of values.

In the following example, we have the following structures:

  • LIST: a column in a Context Hub list
  • Event.alias_host: an array of meta values

The following code performs a case-insensitive comparison of the values in the array and the list. It is selecting from the Context Hub list, mylist, and returning values from the list that match with values in alias_host. If the value does not exist (that is, it is not whitelisted), then the alert may be generated.

AND NOT EXISTS (
  SELECT * FROM mylist
    where isOneOfIgnoreCase(Event.alias_host,{LIST}
)

So, are all of the following basically one, big rule? The lc_cid rule name is used in the other rules, to define the input to those rules?

@RSAAlert
SELECT * FROM Event (
   lc_cid = 'tsap-del-dc-cris-nget-ridc-vlc-01'
   AND 'other src' = ANY(netname)
   AND ip_dst NOT IN ( '164.100.14.3','164.100.9.3', '164.100.2.3')
   AND ip_src NOT LIKE '164.100.%'
   AND (event_time * 1000).getHourOfDay() IN (4,5,7,8,9,10,12,13)
)
.win:time_batch(2 Minutes)
GROUP BY ip_dst, ip_dstport
HAVING COUNT(distinct ip_src) >= 3500;
@RSAAlert
@Name ("Log Collector ID {lc_cid}")
SELECT * FROM Event (
   medium = 32
   AND device_type = 'checkpointfw1'
   AND direction = 'inbound'
   AND 'drop' = ANY( action )
)
.win:time_batch(5 Minutes)
GROUP BY lc_cid, ip_dst
HAVING COUNT(*) >= 100;
@RSAAlert
SELECT * FROM Event (
   lc_cid = 'tsap-del-dc-nic-vlc-01'
   AND device_type = 'checkpointfw1'
   AND direction = 'inbound'
   AND isOneOfIgnoreCase(action,{ 'drop', 'accept' })
).win:time(2 Minutes)
MATCH_RECOGNIZE (
PARTITION BY ip_src, ip_dst
MEASURES E1 as e1_data , E2 as e2_data
   PATTERN (E1{10} E2)
   DEFINE
      E1 as (isOneOfIgnoreCase(E1.action,{ 'drop' })),
      E2 as (isOneOfIgnoreCase(E2.action,{ 'accept' }))
);
@RSAAlert
SELECT * FROM Event(
/* Statement: device class is firewall and lc_cid is PNB */
   lc_cid = 'tsap-del-dc-pnb-vlc-01'
   AND device_class = 'firewall'
   AND direction = 'outbound'
   AND ip_dstport IN ( 135 , 137 , 138 , 139 , 445 )
)
.win:time_batch(2 Minutes)
GROUP BY ip_src
HAVING COUNT(*) >= 5;


You are here

Table of Contents > Content Development > Procedures > ESA > ESA Rule Writing Best Practices

Attachments

    Outcomes