Validating File Content Types to avoid Malicious File Hosting using ML Model

This post discusses the issue of malicious file uploads by content spoofing and how can we mitigate such vulnerabilities using AI/ML models.

Introduction

Most of the software/webapps/android applications requires file upload features which allows user to upload several file types among which most of them includes PDF, doc and images.

An attacker can exploit such feature to upload malicious by spoofing content types which can lead to XSS or distribute malicious payloads/malwares from your servers/buckets.

How Can AI/ML Models Help To Fix This Issue?

There can be several ways to solve this problem using magic numbers, libmagic or file utility.

Recently I stumbled upon a github repo from google team which used AI/ML model to predict file's content type. I wanted to give it a shot, which led to this weekend project. It's F1 score is far better than other tools and libraries available. It's using a model stored in onnix format packaged inside magika python library (package is also available for javascript but enforcing file validation check on frontend makes no sense as it can be easily bypassed).

So how can it help to secure file uploads?

Since attackers usually bypass weak validation checks by uploading malicious HTML/XSS payloads with file name your-malicious-file-name.pdf / your-malicious-file-name.png / your-malicious-file-name.jpg

Magika can help us to validate file content type instead of relying only on the name.

Approach To Solve This Vulnerability Using S3 Event Triggers and Lambda Function for AWS Environments

Below mentioned approach could be one of the approach for solving the vuln. Most of the companies will generate pre-signed post request for providing users a secure way to upload files to s3 bucket since this would reduce the load on the backend server. So let's discuss solution for validating file content type for uploaded object in the bucket.

  • Frontend will use pre-signed url for uploading object in the bucket

  • Once, the file has been uploaded, AWS S3 Event will generate a Notification for AWS Lambda

  • Lambda will be triggered which will get object from the bucket validate it's content type according to allowed bucket objects policy. If file content type is invalid then file will be deleted from the bucket.

Code Block for achieving above flow:

Github Repo Link: https://github.com/dmdhrumilmistry/file-validation

"""
Author: dmdhrumilmistry (https://github.com/dmdhrumilmistry)
Description: Lambda function for validating file content type 
against configured bucket policy.
"""
from urllib.parse import unquote_plus
from html import escape
from magika import Magika # add deps in requirements.txt file
from boto3 import client # boto3 is pre-installed on aws container/ec2 images


m = Magika()

# define which content type is allowed in which bucket
# mime_types from:
# https://github.com/google/magika/blob/main/python/magika/config/content_types_config.json

# upload bucket policy according to your needs
bucket_policy = {
    "my-aws-buckkett": {
        "allowed_content_types": [
            # json
            "application/json",

            # documents
            "application/pdf",
            "application/msword",
            "application/vnd.openxmlformats-officedocument.wordprocessingml.document",

            # images
            "image/jpeg",
            "image/png"
        ]
    },
}


def lambda_handler(event, context):
    """
    Lambda handler validates file content type using magika.
    if file content type is not valid then removes file from 
    the bucket.
    """
    # Get the S3 bucket and object key from the event
    bucket = event['Records'][0]['s3']['bucket']['name']
    obj_key = unquote_plus(event['Records'][0]['s3']['object']['key'])

    print(f'Incoming event for bucket {bucket} object {obj_key}')
    # Initialize S3 client
    s3 = client('s3')

    # validate bucket name so this same
    # function can be used with multiple buckets
    current_bucket_policy = None
    if bucket not in bucket_policy.keys():
        return {
            "error": True,
            "message": "invalid bucket"
        }
    else:
        current_bucket_policy = bucket_policy[bucket]

    # Retrieve the content of the file
    response = s3.get_object(Bucket=bucket, Key=obj_key)
    file_content = response['Body'].read()

    # Perform validation logic here
    is_valid, content_type = validate_file_content(file_content, current_bucket_policy)

    file_name = escape(obj_key.split("/")[-1])
    if is_valid:
        msg = f"File {file_name} has been uploaded with content_type {content_type} to bucket {bucket}"

    else:
        # Reject the upload by deleting object from the bucket
        msg = f"File {file_name} has been rejected with content_type {content_type} to bucket {bucket}"
        s3.delete_object(Bucket=bucket, Key=obj_key)

    print(msg)
    return {
        "error": False,
        "message": msg,
    }


def validate_file_content(content: bytes, s3_bucket_policy: dict):
    """
    validates file content type according to bucket policy.
    returns True if valid else returns False along with 
    file content type
    """
    allowed_content_types = s3_bucket_policy.get("allowed_content_types", None)

    if not allowed_content_types:
        return False, None

    res = m.identify_bytes(content)
    content_type = res.output.mime_type
    if content_type in allowed_content_types:
        return True, content_type

    return False, content_type

Deploying Lambda Function

I love using containerized applications, So I'll deploy above lambda function in containerized manner. You're free to use your deployment strategy.

Let's write a Dockerfile for the lambda function

FROM public.ecr.aws/lambda/python:3.11

# Set a working directory in the container
WORKDIR /var/task

# Copy requirements.txt
COPY requirements.txt .

# Install the specified packages
RUN pip install -r requirements.txt

# Copy function code
COPY file_validation.py .

# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)
CMD [ "file_validation.lambda_handler" ]

We'll be using AWS ECR private repo to store our docker image. So let's create a private registry in AWS ECR.

Once, we have private ECR repo configured, build and push container image to this repo.

Firstly, login to docker

aws ecr get-login-password --region us-east-1 --profile profile-name | docker login --username AWS --password-stdin aws-acc-number.dkr.ecr.us-east-1.amazonaws.com

Build amd64 arch image and push it to ECR repo

docker buildx build -t aws-acc-number.dkr.ecr.us-east-1.amazonaws.com/file-validation:latest
docker push aws-acc-number.dkr.ecr.us-east-1.amazonaws.com/file-validation:latest

Note: Initially I tried to use arm64 arch lambda function but I was facing several issues running onnx on it. After research it seems like pytorch compatiblity issue due to missing sys dir files. So I chose to use amd64 arch based container image for lambda function.

Now, Let's create a IAM Role Policy for our lambda function so it can access, delete s3 bucket objects and create cloudwatch logs.

Create IAM policy using below JSON based policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "GetAndDeleteBucketObject",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:s3:::my-aws-buckkett/*",
                "arn:aws:s3:::my-aws-buckkett/",
                "arn:aws:s3:::my-aws-buckkett"
            ]
        },
        {
            "Sid":"CreateLogGroupActionForLambda",
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "arn:aws:logs:us-east-1:aws-account-number:*"
        },
        {
            "Sid":"CreateAndPushLogsFromLambda",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:us-east-1:aws-account-number:log-group:/aws/lambda/aws-s3-file-upload-validation:*"
            ]
        }
    ]
}

Let's create a lambda function for validating file content types.

Configure ECR image, IAM role policy, memory and timeout according to your needs.

Finally Let's create a event notification in our target S3 bucket from bucket properties tab which will be used to trigger lambda function

Testing Lambda Function

Test Lambda function by uploading files to s3 bucket. After uploading you should only be able to view valid content file types configured according in bucket policy inside file_validation.py file.

Here, test.png and sample.jpg are valid files while test.csv file content type is not allowed.

Conclusion

Validating file content types is one of the crucial component for any engineering team to develop secure software. AI/ML models can help teams to validate file content types and avoid malicious uploads.

I find above malicious file upload meme hilarious πŸ˜‚ WBU?

Last updated