SQL Refresher for Threat Hunting

SQL Fundamentals Refresher for Threat Hunting

This module provides a comprehensive SQL refresher, focusing on concepts and commands critical for threat hunting. We will cover basic and intermediate SQL techniques, with a focus on querying AWS logs (like CloudTrail and VPC Flow Logs).

By mastering these fundamentals, you’ll be prepared to create efficient and meaningful SQL queries for detecting suspicious activities.

Core SQL Concepts Overview

  1. What is SQL? SQL (Structured Query Language) is used to retrieve, filter, aggregate, and analyze data. Threat hunters use SQL to extract useful information from large logs to detect anomalies.

  2. How SQL Works in AWS In AWS, SQL can be executed directly in:

    • Amazon Athena: Query large datasets in S3.

    • Redshift: Query data warehouses for aggregated security data.

    • RDS (Relational Databases): Query specific log databases.

    • CloudWatch Insights: Query metrics and events for fast detection.

SQL Query Structure

Every SQL query follows a similar structure:

SELECT columns
FROM table_name
WHERE conditions
GROUP BY column_name
HAVING aggregate_condition
ORDER BY column_name
LIMIT n;

Basic SQL Commands

SELECT: Retrieving Data

Use SELECT to retrieve specific fields from a table. Example: List all CloudTrail events.

SELECT eventName, eventTime, userIdentity.arn 
FROM cloudtrail_logs;

WHERE: Filtering Data

Use WHERE to filter records based on conditions. Example: Retrieve only failed console login attempts.

SELECT eventName, eventTime, userIdentity.arn 
FROM cloudtrail_logs 
WHERE eventName = 'ConsoleLogin' 
AND responseElements IS NULL;

Aggregation and Grouping

GROUP BY: Aggregating Data by Fields

GROUP BY helps aggregate data based on a field. Example: Count the number of times each user logged in.

SELECT userIdentity.arn, COUNT(eventName) AS login_count
FROM cloudtrail_logs
WHERE eventName = 'ConsoleLogin'
GROUP BY userIdentity.arn;

HAVING: Filtering Aggregated Data

Use HAVING to filter aggregated data after using GROUP BY. Example: Identify users with more than 5 failed login attempts.

SELECT userIdentity.arn, COUNT(eventName) AS failed_logins
FROM cloudtrail_logs
WHERE eventName = 'ConsoleLogin' 
AND responseElements IS NULL
GROUP BY userIdentity.arn
HAVING failed_logins > 5;

Joining Data from Multiple Sources

In AWS, joining VPC Flow Logs with CloudTrail data helps correlate API actions with network behavior.

INNER JOIN Example

Track a user’s API activity and corresponding network traffic.

SELECT ct.userIdentity.arn, vpc.srcAddr, vpc.dstAddr
FROM cloudtrail_logs ct
JOIN vpc_flow_logs vpc
ON ct.sourceIPAddress = vpc.srcAddr
WHERE ct.eventName = 'DescribeInstances';

Sorting and Limiting Results

ORDER BY: Sorting Query Results

Sort results by a specific column (ascending or descending). Example: List the latest API events.

SELECT eventName, eventTime, userIdentity.arn
FROM cloudtrail_logs
ORDER BY eventTime DESC;

LIMIT: Limiting the Number of Results

Restrict the number of rows returned. Example: Retrieve the first 10 API calls.

SELECT eventName, userIdentity.arn 
FROM cloudtrail_logs 
LIMIT 10;

Time-Based Queries for Hunting

AWS logs often store timestamps in ISO 8601 format. Time-based queries help detect activity within specific periods. Example: Retrieve API events in the last 24 hours.

SELECT eventName, eventTime, userIdentity.arn
FROM cloudtrail_logs
WHERE eventTime >= date_sub('day', 1, current_date);

Practical Exercises

  1. Exercise 1: Retrieve all VPC flow logs where traffic was rejected.

    SELECT srcAddr, dstAddr, action, bytes 
    FROM vpc_flow_logs 
    WHERE action = 'REJECT';
  2. Exercise 2: Find all console login failures within the last 7 days.

    SELECT eventTime, userIdentity.arn, sourceIPAddress 
    FROM cloudtrail_logs 
    WHERE eventName = 'ConsoleLogin' 
    AND responseElements IS NULL 
    AND eventTime >= date_sub('day', 7, current_date);
  3. Exercise 3: Identify users who performed more than 3 AssumeRole actions.

    SELECT userIdentity.arn, COUNT(eventName) AS assume_role_count
    FROM cloudtrail_logs
    WHERE eventName = 'AssumeRole'
    GROUP BY userIdentity.arn
    HAVING assume_role_count > 3;

Common Pitfalls in SQL Queries

  1. Incorrect JOINs

    • Always ensure the field types between tables are compatible when performing joins.

  2. Case Sensitivity

    • SQL is case-insensitive, but AWS log values might not be. Ensure you match exact values.

  3. Handling NULL Values

    • Use IS NULL or IS NOT NULL to detect missing data (like failed responses).

  4. Optimizing Queries

    • Use LIMIT for large datasets to avoid excessive costs in Athena.

Last updated