Skip navigation
All Places > Products > RSA NetWitness Platform > RSA NetWitness Platform Online Documentation > Documents
Log in to create and rate content, and to follow, bookmark, and share content with other members.

Logstash:Build Custom JSON Parser

Document created by RSA Information Design and Development Employee on Sep 9, 2020
Version 1Show Document
  • View in full screen mode
 

This section is intended for advanced programmers who want to build their own JSON parser. It describes how to build a Logstash parser for a sample device. We use the Linux device as an example throughout.

Note: This chapter is optional: you do not need to build a custom JSON parser from scratch to input logs from Logstash to NetWitness Platform.

Major sections in this document:

  • Configure a filter by defining several required pieces of metadata.
  • Examine a sample log message from the Linux device
  • Walk through creating the parser, based on the sample log message
  • View the parsed meta from the sample log message, as it appears on the Log Decoder

Sample JSON Log Received on Log Decoder

Let's examine a sample log and discuss its contents.

<13>1 - Centos7 linux - LOGSTASH001 [lc@36807 lc.ctime="1585886465037" lc.cid="Centos7" lc.ctype="logstash"] {"message": "msg='op=PAM:accounting grantors=pam_access,pam_unix,pam_localuser acct=root exe=/usr/sbin/crond hostname=? addr=? terminal=cron res=success'", "user":{ "email":"john.deaux@test.com", "username":"CORP\\deauxj" }, "host": { "name": "Centos7", "hostname": "Centos7", "containerized": false, "architecture": "x86_64", "id": "d1059ac783b24eb7bbde70a41fa572c9", "os": { "name": "CentOS Linux", "kernel": "3.10.0-1062.el7.x86_64", "version": "7 (Core)", "codename": "Core", "platform": "centos", "family": "redhat" } }, "@timestamp": "2020-04-03T04:01:05.037Z", "files": [ "test1.log", "test2.log", "test3.log" ],"machine_details" : { "1" : { "hostname" : "USXXLinux" }, "2" : { "hostname" : "USXXWindows" }}}

The first portion of the log is the RFC-5424 header:

<13>1 - Centos7 linux - LOGSTASH001 [lc@36807 lc.ctime="1585886465037" lc.cid="Centos7" lc.ctype="logstash"]

This header contains the information that we used in setting our fields above:

  • nw_source_hostCentos7 (Hostname)
  • nw_typelinux (Device Type)
  • nw_msgidLOGSTASH001 (Message ID)

The remainder of the log is the JSON Payload.

JSON Payload

                                                                                                                        
{
 
    "message": "msg='op=PAM:accounting grantors=pam_access,pam_unix,pam_localuser acct=root exe=/usr/sbin/crond hostname=? addr=? terminal=cron res=success'",
    
    "user": {
        "email": "john.deaux@test.com",
        "username": "CORP\\deauxj"
    },
 
    "host": {
        "name": "Centos7",
        "hostname": "Centos7",
        "containerized": false,
        "architecture": "x86_64",
        "id": "d1059ac783b24eb7bbde70a41fa572c9",
        "os": {
            "name": "CentOS Linux",
            "kernel": "3.10.0-1062.el7.x86_64",
            "version": "7 (Core)",
            "codename": "Core",
            "platform": "centos",
            "family": "redhat"
        }
    },
 
    "@timestamp": "2020-04-03T04:01:05.037Z",
 
    "files": [
        "test1.log",
        "test2.log",
        "test3.log"
    ],
    
    "machine_details": {
        "1": { "hostname": "USXXLinux"},
        "2": { "hostname": "USXXWindows"}
    }
 
}

Create the JSON Parser for a Linux Device

Now that we have the sample log from the Linux device, we can construct a filter plugin for this device.

Initial Parser to Match Message ID and Device Type

The parser name should match the device type. We call this initial parser v20_linuxmsg.xml, which matches the message ID from the event. We set content to a variable, logstash_json_payload, which represents the JSON payload. We will parse the payload later in the process.

Message ID and Device Type Parsing

                                                
<?xml version="1.0" encoding="ISO-8859-1"?>
<DEVICEMESSAGES
    name="linux"
    displayname="Linux"
    group="Unix">
    <VERSION device="2.0"/>
 
    <MESSAGE
        id1="LOGSTASH001"
        id2="LOGSTASH001"
        content="&lt;logstash_json_payload&gt;" />
 
    <!-- Additional logic to parse JSON payload -->
 
</DEVICEMESSAGES>

Map Payload Contents to Datatypes

We create datatypes to map each element from the payload to meta that can be saved to the NetWitness database.

The entire payload is assigned to the FileBeatsEvent datatype.

<VARTYPE name="logstash_json_payload" dataType="FileBeatsEvent"/>

The timestamp is parsed an assigned to the InternetTime datatype.

<DataType name="InternetTime"dateTime="%W-%M-%DT%H:%T:%S.%V%E" />

Parse the Message String

Using the parser above, we parse (using the FineParse type defined above) the message string from the log file.

"message": "msg='op=PAM:accounting grantors=pam_access,pam_unix,pam_localuser acct=\"root\"
    exe=\"/usr/sbin/crond\" hostname=? addr=? terminal=cron res=success'",

The following code extracts the values from the string and saves them to meta keys:

  • op is saved to operation.id
  • acct is saved to service.account
  • res is saved to result
  • exe is saved to process.src

Note that he search flag is set to true to parse key/value pars regardless of their order in the string.

Extract Values: save to Meta

                     
<DataType name="TagValParse" regex="(?: |^)(?:exe=(\S+)|acct=(\S+)|res=(\S+)|op=(\S+))" search="true">
    <Capture index="1" meta="process.src" />
    <Capture index="2" meta="service.account" />
    <Capture index="3" meta="result" />
    <Capture index="4" meta="operation.id" />
</DataType>

The following code:

  • Assigns the whole key as message and maps it to the FineParse type.
  • The FineParse type is then mapped to TagValParse type.
                                    
<DataType name="FineParse" regex="msg='(.*)'">
    <Capture index="1" type="TagValParse" />
</DataType>
 
<DataType name="FileBeatsEvent" type="ElasticCommonSchemaSubset">
    <Capture key="/message" type="FineParse" meta="message"/>
    <Capture key="/user/email" meta="email"    />
    <Capture key="/user/username" type="DomainUser"/>
    <Capture key="/files/" meta="sourcefile" />
    <Capture key="/machine_details//hostname"   meta="host.dst"/>
</DataType>

Give the previous string and the code above, the output on the Log Decoder is as follows:

message: msg='op=PAM:accounting grantors=pam_access,pam_unix,pam_localuser acct=root exe=/usr/sbin/crond
   hostname=? addr=? terminal=cron res=success'
service.account: root
process.src: /usr/sbin/crond
result: success
operation.id: PAM:accounting

Parse an Array in JSON

In our sample JSON log file from earlier, one section contained an array object:

"files": [
   "test1.log",
   "test2.log",
   "test3.log"
]

To fetch all the values of an array, you need to define a capture key enclosed in forward slashes to fetch all values, for example /files/.

            
<DataType name="FileBeatsEvent" type="ElasticCommonSchemaSubset">
   <Capture key="/files/" meta="sourcefile" />
</DataType>

Using the code above on the sample array, the following would be the output on the Log Decoder:

sourcefile: test1.log
sourcefile: test2.log
sourcefile: test3.log

Parse a Nested JSON Object

Let's look at an example nested object from our sample log file from earlier:

"host": {
   "name": "Centos7",
   "hostname": "Centos7",
   "containerized": false,
   "architecture": "x86_64",
   "id": "d1059ac783b24eb7bbde70a41fa572c9",
   "os": {
     "name": "CentOS Linux",
     "kernel": "3.10.0-1062.el7.x86_64",
     "version": "7 (Core)",
     "codename": "Core",
     "platform": "centos",
     "family": "redhat"
   }
}

To fetch nested values, you need to build a path that contains the keys from each nested level. For example, to fetch the OS name from our example, you use the following code:

            
<DataType name="ElasticCommonSchemaSubset">
   <Capture key="/host/os/name"> meta="OS" />
</DataType>

Using the code above on the sample nested object, the following would be the output on the Log Decoder:

OS: CentOS Linux

Capture Data That Has Varying Parent Key

When capturing structured data types like JSON, instead of a numbered capture index, you can provide a field name path that uses the key attribute. For example, assume we want to capture the hostname from machine_details and ignore the indexed key:

"machine_details": {
   "1": { "hostname": "USXXLinux"},
   "2": { "hostname": "USXXWindows"}
}

To fetch the required values, which have a varying parent key name, we leave the parent key empty in the path:

            
<DataType name="FileBeatsEvent" type="ElasticCommonSchemaSubset">
   <Capture key="/machine_details//hostname">  meta="host.dst" />
</DataType>

Using the code above on the sample, the following would be the output on the Log Decoder:

host.dst: USXXLinux
host.dst: USXXWindows

The Parsed Example Event on the Log Decoder

Assuming the sample log message from the beginning of this document, and using the parser that we have built, the image below details the event as it would appear on the Log Decoder.

The following representation of the sample log has meta values highlighted.

<13>1 - Centos7 linux - LOGSTASH001 [lc@36807 lc.ctime="1585886465037" lc.cid="Centos7" lc.ctype="logstash"] {"message": "msg='op=PAM:accounting grantors=pam_access,pam_unix,pam_localuser acct=root exe=/usr/sbin/crond hostname=? addr=? terminal=cron res=success'", "user":{ "email":"john.deaux@test.com", "username":"CORP\\deauxj" }, "host": { "name": "Centos7", "hostname": "Centos7", "containerized": false, "architecture": "x86_64", "id": "d1059ac783b24eb7bbde70a41fa572c9", "os": { "name": "CentOS Linux", "kernel": "3.10.0-1062.el7.x86_64", "version": "7 (Core)", "codename": "Core", "platform": "centos", "family": "redhat" } }, "@timestamp": "2020-04-03T04:01:05.037Z", "files": [ "test1.log", "test2.log", "test3.log" ],"machine_details" : { "1" : { "hostname" : "USXXLinux" }, "2" : { "hostname" : "USXXWindows" }} }

Example Parser Listing

The following code represents the complete parser, including the components we built earlier in this document.

Example Parser Listing

                                                                                                                                                               
<?xml version="1.0" encoding="ISO-8859-1"?>
 
<DEVICEMESSAGES
   name="linux"
   displayname="Linux"
   group="Unix">
   <VERSION device="2.0" />
 
<MESSAGE
      id1="LOGSTASH001"
      id2="LOGSTASH001"
      content="&lt;logstash_json_payload&gt;" />
 
<VARTYPE name="logstash_json_payload" dataType=FileBeatsEvent"/>
 
<DataType name="InternetTime" dateTime="%W-%M-%DT%H:%T:%S.%V%E" />
 
<DataType name="CollectionTime" type="InternetTime" meta="lc.ctime"/>
 
<DataType name="ElasticCommonSchemaSubset" format="JSON">
   <Capture key="/@timestamp">     type="CollectionTime" />
   <Capture key="/host/hostname">  meta="alias.host" />
   <Capture key="/host/id">        meta="hardware.id" />
   <Capture key="/host/os/name">   meta="OS" />
</DataType>
 
<DataType name="DomainUser" regex="(?:(\w+)\\)?(\w+)">
   <Capture index="0" meta="user" />
   <Capture index="1" meta="domain" />
   <Capture index="2" meta="username" />
</DataType>
 
<DataType name="TagValParse" regex="(?: |^)(?:exe=(\S+)|acct=(\S+)|res=(\S+)|op=(\S+))" search="true">
   <Capture index="1" meta="process.src" />
   <Capture index="2" meta="service.account" />
   <Capture index="3" meta="result" />
   <Capture index="4" meta="operation.id" />
</DataType>
 
<DataType name="FineParse" regex="msg='(.*)'">
   <Capture index="1" type="TagValParse" />
</DataType>
 
<DataType name="FileBeatsEvent" type="ElasticCommonSchemaSubset">
   <Capture key="/message" type="FineParse" meta="message"/>
   <Capture key="/user/email" meta="email" />
   <Capture key="/user/username" type="DomainUser"/>
   <Capture key="/files/" meta="sourcefile" />
   <Capture key="/machine_details//hostname" meta="host.dst" />
</DataType>
 
</DEVICEMESSAGES>

Deploy JSON parser

After you have built or changed a JSON parser, you need to upload it to the NetWitness Platform Log Decoder.

  1. SSH to the Log Decoder system.
  2. Copy the custom parser file to the following folder:

    /etc/netwitness/ng/envision/etc/devices/eventsource

    where eventsource is the name of the event source. You may need to create the folder if it doesn't already exist.

    For example, we need to create linux folder under /etc/netwitness/ng/envision/etc/devices directory and copy the v20_linuxmsg.xml parser file to /etc/netwitness/ng/envision/etc/devices/linux directory.

  3. To get the new parser loaded into memory, you need to reload the parsers on the Log Decoder.

Reload Parsers from REST

From a browser, run the REST reload command by entering the following URL:

http://<logdecoder_ip>:50102/decoder/parsers?msg=reload

For example, if your Log Decoder IP address is 10.10.100.101, use the following string:

http://10.10.100.101:50102/decoder/parsers?msg=reload

If the call is successful, you should see a REST response, "The parsers have been reloaded."

Reload Parsers from NetWitness Platform UI

You can also reload your parsers from the UI as follows.

  1. In the NetWitness Platform UI, navigate to (Admin) > Services.

    The Services view is displayed.

  2. Select the Log Decoder to which your want to reload the parsers, and click View > Explore.
  3. In the left pane, navigate to decoder > parsers.
  4. Right-click parsers and select Properties.
  5. From the drop-down menu in the Properties panel, select reload.

     

  6. Click Send.

You are here
Table of Contents > Coding Appendix: Build a Parser

Attachments

    Outcomes