This module provides a comprehensive SQL refresher, focusing on concepts and commands critical for threat hunting. We will cover basic and intermediate SQL techniques, with a focus on querying AWS logs (like CloudTrail and VPC Flow Logs).
By mastering these fundamentals, you’ll be prepared to create efficient and meaningful SQL queries for detecting suspicious activities.
Core SQL Concepts Overview
What is SQL?
SQL (Structured Query Language) is used to retrieve, filter, aggregate, and analyze data. Threat hunters use SQL to extract useful information from large logs to detect anomalies.
How SQL Works in AWS
In AWS, SQL can be executed directly in:
Amazon Athena: Query large datasets in S3.
Redshift: Query data warehouses for aggregated security data.
RDS (Relational Databases): Query specific log databases.
CloudWatch Insights: Query metrics and events for fast detection.
SQL Query Structure
Every SQL query follows a similar structure:
SELECT columnsFROM table_nameWHERE conditionsGROUP BY column_nameHAVING aggregate_conditionORDER BY column_nameLIMIT n;
Basic SQL Commands
SELECT: Retrieving Data
Use SELECT to retrieve specific fields from a table.
Example: List all CloudTrail events.
WHERE: Filtering Data
Use WHERE to filter records based on conditions.
Example: Retrieve only failed console login attempts.
Aggregation and Grouping
GROUP BY: Aggregating Data by Fields
GROUP BY helps aggregate data based on a field.
Example: Count the number of times each user logged in.
HAVING: Filtering Aggregated Data
Use HAVING to filter aggregated data after using GROUP BY.
Example: Identify users with more than 5 failed login attempts.
Joining Data from Multiple Sources
In AWS, joining VPC Flow Logs with CloudTrail data helps correlate API actions with network behavior.
INNER JOIN Example
Track a user’s API activity and corresponding network traffic.
Sorting and Limiting Results
ORDER BY: Sorting Query Results
Sort results by a specific column (ascending or descending).
Example: List the latest API events.
LIMIT: Limiting the Number of Results
Restrict the number of rows returned.
Example: Retrieve the first 10 API calls.
Time-Based Queries for Hunting
AWS logs often store timestamps in ISO 8601 format. Time-based queries help detect activity within specific periods.
Example: Retrieve API events in the last 24 hours.
Practical Exercises
Exercise 1: Retrieve all VPC flow logs where traffic was rejected.
Exercise 2: Find all console login failures within the last 7 days.
Exercise 3: Identify users who performed more than 3 AssumeRole actions.
Common Pitfalls in SQL Queries
Incorrect JOINs
Always ensure the field types between tables are compatible when performing joins.
Case Sensitivity
SQL is case-insensitive, but AWS log values might not be. Ensure you match exact values.
Handling NULL Values
Use IS NULL or IS NOT NULL to detect missing data (like failed responses).
Optimizing Queries
Use LIMIT for large datasets to avoid excessive costs in Athena.
SELECT eventName, eventTime, userIdentity.arn
FROM cloudtrail_logs;
SELECT eventName, eventTime, userIdentity.arn
FROM cloudtrail_logs
WHERE eventName = 'ConsoleLogin'
AND responseElements IS NULL;
SELECT userIdentity.arn, COUNT(eventName) AS login_count
FROM cloudtrail_logs
WHERE eventName = 'ConsoleLogin'
GROUP BY userIdentity.arn;
SELECT userIdentity.arn, COUNT(eventName) AS failed_logins
FROM cloudtrail_logs
WHERE eventName = 'ConsoleLogin'
AND responseElements IS NULL
GROUP BY userIdentity.arn
HAVING failed_logins > 5;
SELECT ct.userIdentity.arn, vpc.srcAddr, vpc.dstAddr
FROM cloudtrail_logs ct
JOIN vpc_flow_logs vpc
ON ct.sourceIPAddress = vpc.srcAddr
WHERE ct.eventName = 'DescribeInstances';
SELECT eventName, eventTime, userIdentity.arn
FROM cloudtrail_logs
ORDER BY eventTime DESC;
SELECT eventName, userIdentity.arn
FROM cloudtrail_logs
LIMIT 10;
SELECT eventName, eventTime, userIdentity.arn
FROM cloudtrail_logs
WHERE eventTime >= date_sub('day', 1, current_date);
SELECT srcAddr, dstAddr, action, bytes
FROM vpc_flow_logs
WHERE action = 'REJECT';
SELECT eventTime, userIdentity.arn, sourceIPAddress
FROM cloudtrail_logs
WHERE eventName = 'ConsoleLogin'
AND responseElements IS NULL
AND eventTime >= date_sub('day', 7, current_date);
SELECT userIdentity.arn, COUNT(eventName) AS assume_role_count
FROM cloudtrail_logs
WHERE eventName = 'AssumeRole'
GROUP BY userIdentity.arn
HAVING assume_role_count > 3;