Managing RDS (Relational Database Service) instances efficiently is crucial for maintaining database performance and cost-effectiveness. Automating the process of scaling up and down RDS instances based on your needs can save time and resources. In this blog, I’ll guide you through setting up an automated system using AWS Lambda and Step Functions to handle RDS instance scaling and failover seamlessly.
Overview
The solution consists of:
- Lambda Functions: Triggered to determine the action (scale up or down) based on scheduled times.
- Step Functions: Orchestrates the sequence of tasks, including scaling and failover, and ensures that the actions are executed in the correct order.
Prerequisites
To follow this guide, you will need:
- An AWS account with access to RDS, Lambda, and Step Functions
- Basic knowledge of AWS services and Python
Step 1: Creating the Lambda Functions
Trigger Lambda Function
The first Lambda function determines the action to take (downgrade or upgrade) based on the time and initiates a Step Function execution.
import boto3
from datetime import datetime
import json
step_functions_client = boto3.client('stepfunctions')
rds_client = boto3.client('rds')
def get_clusters_with_reader_scale():
try:
response = rds_client.describe_db_clusters()
clusters = response['DBClusters']
for cluster in clusters:
tags_response = rds_client.list_tags_for_resource(
ResourceName=cluster['DBClusterArn']
)
tags = {tag['Key']: tag['Value'] for tag in tags_response['TagList']}
if tags.get('reader-scale') == 'yes':
return cluster
return None
except Exception as e:
raise Exception(f"Failed to describe DB clusters: {e}")
def lambda_handler(event, context):
state_machine_arn = //replace with state_machine_arn
try:
# Get cluster details
cluster_details = get_clusters_with_reader_scale()
if not cluster_details:
return {'error': "No cluster found with reader-scale = 'yes'"}
cluster_identifier = cluster_details['DBClusterIdentifier']
# Find current writer and reader instance IDs
current_writer_instance_id = None
current_reader_instance_id = []
for member in cluster_details['DBClusterMembers']:
if member['IsClusterWriter']:
current_writer_instance_id = member['DBInstanceIdentifier']
else:
current_reader_instance_id.append(member['DBInstanceIdentifier'])
if not current_writer_instance_id or not current_reader_instance_id:
return {'error': "Could not determine writer or reader instance IDs."}
# Fetch tags for the cluster
cluster_tags = {tag['Key']: tag['Value'] for tag in cluster_details['TagList']}
# Get the downgrade and upgrade instance classes from tags
downgrade_class = cluster_tags.get('downgrade')
upgrade_class = cluster_tags.get('upgrade')
if not downgrade_class or not upgrade_class:
return {'error': "Downgrade or upgrade class not specified in tags."}
# Get the current time
current_time = datetime.now().strftime("%H:%M")
# Determine action based on time
downgrade_time = cluster_tags.get('downgrade-time')
upgrade_time = cluster_tags.get('upgrade-time')
if current_time == downgrade_time:
action = 'downgrade'
instance_id = current_writer_instance_id
target_instance_id = current_reader_instance_id
elif current_time == upgrade_time:
action = 'upgrade'
instance_id = current_reader_instance_id
target_instance_id = current_writer_instance_id
else:
return {'error': "Current time does not match downgrade or upgrade time."}
# Define parameters for Step Function
input_params = {
'cluster_identifier': cluster_identifier,
'instance_id': instance_id,
'downgrade_class' : downgrade_class,
'upgrade_class' : upgrade_class,
'target_instance_id': target_instance_id,
'action': action
}
# Start the Step Function execution
response = step_functions_client.start_execution(
stateMachineArn=state_machine_arn,
input=json.dumps(input_params)
)
# Convert datetime object to string in response
response['startDate'] = response['startDate'].isoformat()
return response
except Exception as e:
return {'error': str(e)}
Handler Lambda Function
The handler function performs the actual scaling and failover actions based on the input received from Step Functions.
import boto3
from botocore.exceptions import ClientError
rds_client = boto3.client('rds')
def lambda_handler(event, context):
print("Received event:", event)
action = event['action']
cluster_identifier = event.get('cluster_identifier')
instance_id = event.get('instance_id')
downgrade_class = event.get('downgrade_class')
upgrade_class = event.get('upgrade_class')
target_instance = event.get('target_instance_id')
resource_type = event.get('resource_type')
resource_id = event.get('resource_id')
target_status = event.get('target_status')
print("resource_id:", resource_id)
print("target_instance", target_instance)
print("instance_id", instance_id)
if 'downgrade' in action:
if len(target_instance) > 1:
first_value = target_instance[0]
print("first_value", first_value)
target_instance_id = target_instance[1]
print("second_value", target_instance_id)
elif 'upgrade' in action:
target_instance_id = target_instance
instance_id = instance_id[0]
print("instance_id", instance_id)
if 'modify' in action:
if 'upgrade' in action:
try:
# Retrieve the details of the existing instance
instance_details = rds_client.describe_db_instances(DBInstanceIdentifier=instance_id)
availability_zone = instance_details['DBInstances'][0]['AvailabilityZone']
engine = instance_details['DBInstances'][0]['Engine']
print(f"Availability zone of {instance_id}: {availability_zone}")
print(f"Engine of {instance_id}: {engine}")
print(f"Modifying instance: {instance_id} to class: {upgrade_class}")
response = rds_client.modify_db_instance(
DBInstanceIdentifier=instance_id,
DBInstanceClass=upgrade_class,
ApplyImmediately=True
)
# Adding a new reader instance
new_reader_instance_id = f"{cluster_identifier}-instance-3"
print(f"Adding reader instance: {new_reader_instance_id} with class: {downgrade_class} in availability zone: {availability_zone}")
response = rds_client.create_db_instance(
DBInstanceIdentifier=new_reader_instance_id,
DBInstanceClass=downgrade_class,
Engine=engine,
DBClusterIdentifier=cluster_identifier,
AvailabilityZone=availability_zone
)
return {
'status': 'modification_initiated',
'instance_id': instance_id,
'instance_class': downgrade_class,
'cluster_identifier': cluster_identifier,
'target_instance_id': target_instance_id
}
except ClientError as e:
print(f"Modify instance error: {e}")
return {'error': str(e)}
elif 'downgrade' in action:
try:
print(f"Modifying instance: {instance_id} to class: {downgrade_class}")
response = rds_client.modify_db_instance(
DBInstanceIdentifier=instance_id,
DBInstanceClass=downgrade_class,
ApplyImmediately=True
)
# Removing a reader instance
print(f"Removing reader instance: {first_value}")
response = rds_client.delete_db_instance(
DBInstanceIdentifier=first_value,
SkipFinalSnapshot=True
)
return {
'status': 'modification_initiated',
'instance_id': instance_id,
'instance_class': downgrade_class,
'cluster_identifier': cluster_identifier,
'target_instance_id': target_instance_id
}
except ClientError as e:
print(f"Modify instance error: {e}")
return {'error': str(e)}
elif action == 'failover':
try:
print(f"Initiating failover for cluster: {cluster_identifier} to instance: {instance_id}")
response = rds_client.failover_db_cluster(
DBClusterIdentifier=cluster_identifier,
TargetDBInstanceIdentifier=instance_id
)
print(f"Failover response: {response}")
return {'status': 'failover_initiated', 'cluster_identifier': cluster_identifier}
except ClientError as e:
print(f"Failover error: {e}")
return {'error': str(e)}
elif action == 'check_status':
try:
if resource_type == 'instance':
print(f"Checking instance status: {resource_id} in cluster: {cluster_identifier}")
response = rds_client.describe_db_instances(DBInstanceIdentifier=resource_id)
status = response['DBInstances'][0]['DBInstanceStatus']
if status == 'modifying' or status == 'configuring-enhanced-monitoring':
print(f"Instance {resource_id} in progress: current status={status}")
return {'status': 'in_progress', 'instance_id': resource_id, 'current_status': status, 'cluster_identifier': cluster_identifier}
elif resource_type == 'cluster':
print(f"Checking cluster status: {resource_id}")
response = rds_client.describe_db_clusters(DBClusterIdentifier=resource_id)
status = response['DBClusters'][0]['Status']
if status == 'modifying' or status == 'configuring-enhanced-monitoring':
print(f"Cluster {resource_id} in progress: current status={status}")
return {'status': 'in_progress', 'instance_id': resource_id, 'current_status': status, 'cluster_identifier': cluster_identifier}
else:
return {'error': 'Invalid resource type'}
if status == target_status:
print(f"Resource {resource_id} is now available")
return {'status': 'success', 'instance_id': resource_id, 'cluster_identifier': cluster_identifier}
else:
print(f"Resource {resource_id} still in progress: current status={status}")
return {'status': 'in_progress', 'instance_id': resource_id, 'current_status': status, 'cluster_identifier': cluster_identifier}
except ClientError as e:
print(f"Check status error: {e}")
return {'error': str(e)}
return {'status': 'no_action'}
Step 2: Creating the Step Functions
Here is the Step Functions definition to orchestrate the scaling and failover process.
{
"Comment": "RDS Downgrade/Upgrade State Machine",
"StartAt": "DetermineAction",
"States": {
"DetermineAction": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.action",
"StringEquals": "downgrade",
"Next": "ModifyInstanceForDowngrade"
},
{
"Variable": "$.action",
"StringEquals": "upgrade",
"Next": "ModifyInstanceForUpgrade"
}
],
"Default": "NoActionRequired"
},
"ModifyInstanceForDowngrade": {
"Type": "Task",
"Resource": "<Handeler lambda arn>",
"Parameters": {
"action": "modify/downgrade",
"instance_id.$": "$.instance_id",
"downgrade_class.$": "$.downgrade_class",
"upgrade_class.$": "$.upgrade_class",
"cluster_identifier.$": "$.cluster_identifier",
"target_instance_id.$": "$.target_instance_id"
},
"Next": "EndState"
},
"ModifyInstanceForUpgrade": {
"Type": "Task",
"Resource": "<Handeler lambda arn>",
"Parameters": {
"action": "modify/upgrade",
"instance_id.$": "$.instance_id",
"downgrade_class.$": "$.downgrade_class",
"upgrade_class.$": "$.upgrade_class",
"cluster_identifier.$": "$.cluster_identifier",
"target_instance_id.$": "$.target_instance_id"
},
"Next": "WaitForModification"
},
"WaitForModification": {
"Type": "Wait",
"Seconds": 300,
"Next": "CheckInstanceStatus"
},
"CheckInstanceStatus": {
"Type": "Task",
"Resource": "<Handeler lambda arn>",
"Parameters": {
"action": "check_status",
"resource_type": "instance",
"resource_id.$": "$.instance_id",
"cluster_identifier.$": "$.cluster_identifier",
"target_status": "available"
},
"Next": "IsInstanceAvailable"
},
"IsInstanceAvailable": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "success",
"Next": "InitiateFailover"
},
{
"Variable": "$.status",
"StringEquals": "in_progress",
"Next": "WaitForModification"
},
{
"Variable": "$.current_status",
"StringEquals": "configuring-enhanced-monitoring",
"Next": "WaitForModification"
}
],
"Default": "WaitForModification"
},
"InitiateFailover": {
"Type": "Task",
"Resource": "<Handeler lambda arn>",
"Parameters": {
"action": "failover",
"cluster_identifier.$": "$.cluster_identifier",
"instance_id.$": "$.instance_id"
},
"Next": "EndState"
},
"EndState": {
"Type": "Succeed"
},
"NoActionRequired": {
"Type": "Succeed"
}
}
}
How It Works:
- Trigger Lambda Function: Determines the current action (upgrade or downgrade) based on the time and cluster tags.
- Step Functions: Manages the orchestration of scaling actions and failover.
- Handler Lambda Function: Executes the RDS instance modifications and failover operations.
Key Steps in the State Machine
- DetermineAction: Decides whether to perform a downgrade or upgrade based on the input.
- ModifyInstanceForDowngrade/Upgrade: Initiates the modification of the RDS instance.
- WaitForModification: Waits for the modification to complete.
- CheckInstanceStatus: Verifies the status of the instance.
- InitiateFailover: Performs a failover if the upgrade is successful.
Conclusion
By leveraging AWS Lambda and Step Functions, you can automate the scaling and failover of RDS instances, ensuring efficient resource utilization and minimizing manual intervention. This approach not only saves time but also enhances the reliability and availability of your database services.
Start automating your RDS instance scaling and failover with AWS Lambda and Step Functions today! Streamline operations, reduce costs, and improve database reliability.