AWS offers hundreds of cloud services for organizations to power their business. Monitoring these services individually for detection and response can be challenging at best. Thankfully CloudTrail helps centralize AWS logs into a single format. Allowing security teams to create multiple detections with a minimal variance – making it easier to write, test, and deploy new detections in minutes. This blog will discuss five critical detections across AWS log types by leveraging CloudTrail.
Organizations typically use S3 buckets to house sensitive customer data or internal information. Security teams are often looking to establish policies to control user access permissions. Particularly permissions to manipulate stored data. The sample log below is an example of a GET request into an S3 bucket:
{
"additionalEventData": {
"AuthenticationMethod": "AuthHeader",
"CipherSuite": "ECDHE-RSA-AES128-GCM-SHA256",
"SignatureVersion": "SigV4",
"bytesTransferredIn": 0,
"bytesTransferredOut": 137,
"x-amz-id-2": "VO/HeE9Ls3CGevx5Do/KfpQ6Pym2c31jTwEzsHUzseSEmSNBmQqJlPMg4aOkXa3B8M+eSvmBFMg="
},
"awsRegion": "us-west-2",
"eventID": "d5a31d9e-26a8-4375-a68b-d5a6f3d34ee6",
"eventName": "GetBucketLocation",
"eventSource": "s3.amazonaws.com",
"eventTime": "2021-11-05 06:50:24",
"eventType": "AwsApiCall",
"eventVersion": "1.08",
"managementEvent": true,
"p_any_aws_account_ids": [
""
],
"p_any_aws_arns": [
"arn:aws:iam:::role/PantherLogProcessingRole-sensorfu-observation-logs",
"arn:aws:s3:::sensorfu-template-sensorfubucket-14qxo63srxtd5",
"arn:aws:sts:::assumed-role/PantherLogProcessingRole-sensorfu-observation-logs/12343453453452"
],
"p_any_ip_addresses": [
"127.0.30.231"
],
"p_event_time": "2021-11-05 06:50:24",
"p_log_type": "AWS.CloudTrail",
"p_parse_time": "2021-11-05 06:55:07.416",
"p_row_id": "dac4e40798bce29ea6cfad9f0d83b901",
"p_source_id": "c13ed2bf-4de2-4c80-9f5b-e50f03163d28",
"p_source_label": "Cloudtrail-Dev",
"readOnly": true,
"recipientAccountId": "",
"requestID": "TYFYWW752Y2X5PHP",
"requestParameters": {
"Host": "s3.us-west-2.amazonaws.com",
"bucketName": "sensorfu-template-sensorfubucket-14qxo63srxtd5",
"location": ""
},
"resources": [
{
"accountId": "",
"arn": "arn:aws:s3:::sensorfu-template-sensorfubucket-14qxo63srxtd5",
"type": "AWS::S3::Bucket"
}
],
"sourceIPAddress": "127.0.30.231",
"userAgent": "[aws-sdk-go/1.38.37 (go1.16; linux; amd64) exec-env/AWS_Lambda_go1.x]",
"userIdentity": {
"accessKeyId": "ASIAY7IURJ3CWMBEKMGI",
"accountId": "",
"arn": "arn:aws:sts:::assumed-role/PantherLogProcessingRole-sensorfu-observation-logs/1636095024272525161",
"principalId": "AROAY7IURJ3CUUUECXF3R:13453455024272525161",
"sessionContext": {
"attributes": {
"creationDate": "2021-11-05T06:50:24Z",
"mfaAuthenticated": "false"
},
"sessionIssuer": {
"accountId": "",
"arn": "arn:aws:iam:::role/PantherLogProcessingRole-sensorfu-observation-logs",
"principalId": "AROAY7IURJ3CUUUECXF3R",
"type": "Role",
"userName": "PantherLogProcessingRole-sensorfu-observation-logs"
},
"webIdFederationData": {}
},
"type": "AssumedRole"
}
}
Code language: JSON / JSON with Comments (json)
Security teams could create checks that monitor for GET or PUT requests into S3 buckets that contain highly sensitive information. By looking at the “eventName” field, CloudTrail makes it easy to determine the type of API call this is. Therefore, we can create a detection that verifies when an API call occurs for S3 and then checks if the call is to a sensitive bucket. Here’s what the respective code would look like:
from panther_base_helpers import deep_get
def rule(event):
# Named out all sensitive S3 buckets (You can also do this in a library)
securebucket = ["sensitivebucket1","sensitivebucket2","sensitivebucket3"]
# Filter if it's a non-S3 log
if event.get("eventSource") != "s3.amazonaws.com":
return False
# Filter if it's a API Call
if event.get("eventType") != "AwsApiCall":
return False
# Check GET or PUT Request
if event.get("eventName") in ["GET", "PUT"] and deep_get(event, "requestParameters","bucketName") in securebucket:
return True
Code language: Python (python)
A security engineer can accomplish this with just lines 14 & 15 of the code. However, the checks beforehand help filter the detection engine in Panther to pass over event logs that don’t apply faster. By monitoring PUT and GET requests, security teams can quickly signal if a potential intruder is calling into their most sensitive S3 buckets.
Now, let’s move on to EC2. Downtime for EC2 production instances can impact critical business functions. Security teams can monitor for potential state changes to any EC2 instance. Let’s look at an event log of a recently terminated instance.
{
"Records":[
{
"eventVersion":"1.03",
"userIdentity":{
"type":"Root",
"principalId":"123456789012",
"arn":"arn:aws:iam::123456789012:root",
"accountId":"123456789012",
"accessKeyId":"AKIAIOSFODNN7EXAMPLE",
"userName":"user"
},
"eventTime":"2016-05-20T08:27:45Z",
"eventSource":"ec2.amazonaws.com",
"eventName":"TerminateInstances",
"awsRegion":"us-west-2",
"sourceIPAddress":"198.51.100.1",
"userAgent":"aws-cli/1.10.10 Python/2.7.9 Windows/7botocore/1.4.1",
"requestParameters":{
"instancesSet":{
"items":[{
"instanceId":"i-1a2b3c4d"
}]
}
},
"responseElements":{
"instancesSet":{
"items":[{
"instanceId":"i-1a2b3c4d",
"currentState":{
"code":32,
"name":"shutting-down"
},
"previousState":{
"code":16,
"name":"running"
}
}]
}
},
"requestID":"be112233-1ba5-4ae0-8e2b-1c302EXAMPLE",
"eventID":"6e12345-2a4e-417c-aa78-7594fEXAMPLE",
"eventType":"AwsApiCall",
"recipientAccountId":"123456789012"
}
]
}
Code language: JSON / JSON with Comments (json)
Because CloudTrial logs are formatted the same way for any AWS resource they are logging, security teams can easily reuse previous code to write similar detections. You’ll see the code below looks very similar to the one we previously wrote.
from panther_base_helpers import deep_get
def rule(event):
# Filter if it's a non-EC2 log
if event.get("eventSource") != "ec2.amazonaws.com":
return False
# Check for terminating instance
if "Terminate" in event.get("eventName"):
return True
Code language: Python (python)
By simply changing the initial “eventName” and “eventSource” we can now monitor EC2, searching for the Terminate instance state. Teams can run additional checks by creating a library of all crucial EC2 instances or weed out AWS users with permissions to execute such tasks.
Next, we’ll monitor an Identity Access Management group policy change with CloudTrail. Security teams will typically create strict IAM policies that only provide users access to AWS services they require for their jobs. IAM policies ensure that very few users possess a “God-Mode” level of permissions. The sample log below shows a user being created and given root permissions to all AWS services within the organization.
{
"eventVersion": "1.05",
"userIdentity": {
"type": "IAMUser",
"principalId": "AIDACKCEVSQ6C2EXAMPLE",
"arn": "arn:aws:iam::444455556666:user/JaneDoe",
"accountId": "444455556666",
"accessKeyId": "AKIAI44QH8DHBEXAMPLE",
"userName": "JaneDoe",
"sessionContext": {
"attributes": {
"mfaAuthenticated": "false",
"creationDate": "2014-07-15T21:39:40Z"
}
},
"invokedBy": "signin.amazonaws.com"
},
"eventTime": "2014-07-15T21:40:14Z",
"eventSource": "iam.amazonaws.com",
"eventName": "CreateUser",
"awsRegion": "us-east-2",
"sourceIPAddress": "signin.amazonaws.com",
"userAgent": "signin.amazonaws.com",
"requestParameters": {
"userName": "JaneDoe",
"policyName": "GodGroup1"
},
"responseElements": null,
"requestID": "9EXAMPLE-0c68-11e4-a24e-d5e16EXAMPLE",
"eventID": "cEXAMPLE-127e-4632-980d-505a4EXAMPLE"
}
Code language: JSON / JSON with Comments (json)
Whether this is intentional or not, it’s essential for security teams to monitor this activity and store it for historical purposes. New permissions can also indicate compromised accounts trying to gain higher permissions.
from panther_base_helpers import deep_get
def rule(event):
# Named out all sensitive IAM policies (You can also do this in a library)
rootpermissiongroups = ["GodGroup1","GodGroup2","TopGroup"]
# Filter if it's a non-S3 log
if event.get("eventSource") != "iam.amazonaws.com":
return False
# Filter if a Root user made the change, then all is good.
if deep_get(event, "userIdentity", "type") == "Root"
return False
# If a user was given root access
if deep_get(event, "requestParameters","policyName") in rootpermissiongroups:
return True
Code language: Python (python)
Similarly to the last two examples, by using the same code, we can check if the event log is related to IAM, if a root user created the new user, and if one of the root-level policies applied to the newly added user. This way, the alert stays less noisy. Again, using the same code block, I created a new detection for IAM due to CloudTrail’s consistent log formatting and the ease of Python reuse.
Infrastructure teams use CloudFormation to spin up and down critical company resources. Typically, organizations only deploy critical stacks in AWS regions where they choose to operate. For this example, let’s take a log from a team that only deploys new CloudFormation templates in ‘us-west-1’ and ‘us-west-2’.
{
"eventVersion": "1.01",
"userIdentity": {
"type": "IAMUser",
"principalId": "AIDAABCDEFGHIJKLNMOPQ",
"arn": "arn:aws:iam::012345678910:user/Alice",
"accountId": "012345678910",
"accessKeyId": "AKIDEXAMPLE",
"userName": "Alice"
},
"eventTime": "2014-03-24T21:02:43Z",
"eventSource": "cloudformation.amazonaws.com",
"eventName": "CreateStack",
"awsRegion": "us-east-1",
"sourceIPAddress": "127.0.0.1",
"userAgent": "aws-cli/1.2.11 Python/2.7.4 Linux/2.6.18-164.el5",
"requestParameters": {
"templateURL": "DOC-EXAMPLE-BUCKET1",
"tags": [
{
"key": "test",
"value": "tag"
}
],
"stackName": "my-test-stack",
"disableRollback": true,
"parameters": [
{
"parameterKey": "password"
},
{
"parameterKey": "securitygroup"
}
]
},
"responseElements": {
"stackId": "arn:aws:cloudformation:us-east-1:012345678910:stack/my-test-stack/a38e6a60-b397-11e3-b0fc-08002755629e"
},
"requestID": "9f960720-b397-11e3-bb75-a5b75389b02d",
"eventID": "9bf6cfb8-83e1-4589-9a70-b971e727099b"
}
Code language: JSON / JSON with Comments (json)
Now, we can create a detection that alerts when a low-privileged user makes a CloudFormation stack in an AWS region where the organization doesn’t operate.
# Libraries
from panther_base_helpers import deep_get
# AWS Regions operated in
regions = ["us-west-1", "us-west-2", "us-east-1", "us-east-2"]
def rule(event):
# Check if Create Stack or Update Stack
if event.get("eventName") not in ("CreateStack", "UpdateStack"):
return False
# Check if event is from CloudFormation
if event.get("eventSource") != "cloudformation.amazonaws.com":
return False
if event.get("awsRegion") not in regions:
return True
Code language: Python (python)
Using a severity function, we can dynamically alert for any CF template created in a permitted AWS region vs. a non-permitted one. A stack in a permitted region will provide an “INFO” level severity alert vs. a non-permitted region at a “HIGH” alert.
This blog focuses primarily on using CloudTrail as a standardized format to write python detections. With GuardDuty, we can do the same thing; however, Panther can add a deeper portion of analysis by leveraging threat intelligence partner GreyNoise directly in a detection. First, let’s take a look at a GuardDuty finding sent to a CloudTrail log.
{
"eventVersion": "1.05",
"userIdentity": {
"type": "AssumedRole",
"principalId": "AIDACKCEVSQ6C2EXAMPLE",
"arn": "arn:aws:iam::444455556666:user/Alice",
"accountId": "444455556666",
"accessKeyId": "AKIAI44QH8DHBEXAMPLE",
"sessionContext": {
"attributes": {
"mfaAuthenticated": "false",
"creationDate": "2018-06-14T22:54:20Z"
},
"sessionIssuer": {
"type": "Role",
"principalId": "AIDACKCEVSQ6C2EXAMPLE",
"arn": "arn:aws:iam::444455556666:user/Alice",
"accountId": "444455556666",
"userName": "Alice"
}
}
},
"eventTime": "2018-06-14T22:57:56Z",
"eventSource": "guardduty.amazonaws.com",
"eventName": "RDPBruteForce",
"severity": 8,
"awsRegion": "us-west-2",
"sourceIPAddress": "54.240.230.177",
"userAgent": "console.amazonaws.com",
"requestParameters": {
"detectorId": "5ab04b1110c865eecf516eee2435ede7",
"name": "Example",
"format": "TXT",
"activate": false,
"location": "https://s3.amazonaws.com/bucket.name/file.txt"
},
"responseElements": {
"threatIntelSetId": "1ab200428351c99d859bf61992460d24"
},
"requestID": "5f6bf981-7026-11e8-a9fc-5b37d2684c5c",
"eventID": "81337b11-e5c8-4f91-b141-deb405625bc9",
"readOnly": false,
"eventType": "AwsApiCall",
"recipientAccountId": "444455556666"
}
Code language: JSON / JSON with Comments (json)
By first filtering the event and severity of the finding, we can call in GreyNoise to search the source IP address responsible for the alert within its database. GreyNoise classifies IPs as malicious, benign, or unknown. Based on this information, we can use the severity function we used in our previous detection to categorize alerts as “Info” if the source is benign or “Critical” if it’s malicious.
from panther_base_helpers import deep_get, pattern_match_list
from panther_greynoise_helpers import GetGreyNoiseObject, GetGreyNoiseRiotObject
def rule(event):
# Bring in GreyNoise Global Variable
global noise
noise = GetGreyNoiseObject(event)
# Filter if Internal AWS
if deep_get(event, "userIdentity", "type") in ("AWSAccount", "AWSService"):
return False
# Rule Check
if event.get("severity") >= 5 and event.get("eventSource") == "guardduty.amazonaws.com":
return True
# Use Severity Function to weed out benign IP's
def severity(event):
if noise.classification("sourceIPAddress") == "malicious":
return "CRITICAL"
if noise.classification("sourceIPAddress") == "benign":
return "INFO"
return "MEDIUM"
Code language: Python (python)
This enrichment happens during the ingestion process when the logs are parsed and normalized by Panther’s detection engine. It will then analyze the detections in real-time with the information from GreyNoise and offer immediate remediation to low-level GuardDuty alerts.
With detection-as-code principles applied to CloudTrail, security teams can create, test, deploy, and iterate detections quickly and efficiently to secure their AWS cloud stack.
We hope these sample detections can give you a starting point to protect your AWS environment at a deeper level. If you’d like to learn more about how to apply python-detections to a SIEM tool, read this blog.