A secure, scalable cloud environment is necessary for modern data science. In this blog, we will leverage Amazon Web Services (AWS) for storage and compute in the cloud. The actual data engineering and machine learning will be performed in Python.
Before working through this tutorial, please keep the following important points in mind.
The author of this tutorial assumes no responsibility for any financial costs incurred. Likewise, the author assumes no legal responsibility for any errors or bugs in the tutorial text or code.
You can create an AWS account at this link. Create an extra secure password. I recommend stringing together three non-trivial words at random, throwing in at least 2-3 special characters, and changing at least 1-2 characters to uppercase. At minimum, the password should be at least fourteen characters, which shouldn't be an issue using the previous criteria. Likewise, craft a unique password, one that you haven't used previously.
Since this tutorial uses capabilities that are not free-tier eligible, you will need to add a credit card. To underscore, this tutorial will incur charges.
The account you've created is your root account, which is extremely powerful. It has full access and control. We will almost never use our root account. Rather, we will create more-specific accounts, which will be done in this tutorial. However, we will use our root account to set up billing alarms in the next section.
Since we will be incurring charges in this tutorial, we want to be alerted when our charges reach certain levels. This will help keep us abreast if we are running unexpectedly hot or cold on cloud spending. If spending seems high, it might indicate we have misconfigured services. Conversely, if spending seems low, we perhaps over-budgeted and could adjust our expectations.
You can follow this tutorial to create billing alarms. Create multiple billing alarms for increasing levels of spend if you'd like.
Cloud security is important. For obvious reasons, we don't want our environment to be hacked or exposed. We can adopt the following security protocols to give us a strong security posture.
As discussed previously, we need to set up an administrator account. We can do so by following this tutorial. To note, you do not want your admin account to have programmatic access. It's quite powerful, and we don't want to risk the API keys getting compromised, which can cause a lot of damage in a short period.
In the last section, you learned how to create a user group by following the prescribed tutorial. Using your administrator account, create the following IAM groups:
Do not assign any permissions to just yet; they will be shells for now.
Two sections ago, you learned how to create AWS users. We shall now create a few more, which generally correspond to the groups we generated in the last section:
For the nameless_data_scientist user, assign the following permission: arn:aws:iam::aws:policy/AWSCodeCommitFullAccess. This is a data science user, and we want them to have access to git repos. In later sections, we will give this user access to specific Secrets Manager secrets. No permissions will be assigned to other users at this time. Pretty obviously, the four Pulumi users will execute different types of infrastructure tasks.
One of our scripts will use yagmail to send an email. You can get going with yagmail by following this tutorial.
We have two applications we will use in this tutorial, each supporting a different delivery mechanism. We can simply clone them both from GitHub.
$ git clone git@github.com:micahmelling/world-series-projections-ecs.git
$ git clone git@github.com:micahmelling/world-series-projections-batch.git
Our project involves training machine learning models to predict the winner of the World Series given data from only previous years. For instance, when predicting the probabilities for the 2020 World Series, we only use statistics from before 2020. A little bit of leakage occurs because our data source does not have Opening Day rosters, but this is a weakness we will accept.
The above repos have almost everything you need. The world-series-projections-ecs project expects a MySQL table to manage users' log-in information. We'll create those in a later section. Likewise, you will need to update the requirements.txt files with your private package information, which we will cover in a later section. Further, you'll need to update the infrastructure scripts with your appropriate AWS resources. I left AWS resource names in the Python scripts because those are fairly benign (and now deleted). However, infrastructure scripts can house more sensitive information (such as account numbers).
The world-series-projections-ecs is meant to be run on AWS Elastic Container Service (ECS). It's a simple REST service created in app.py (it's a Flask application). The user can simply POST a simple json object via PostMan or curl, such as {"year": 2015}, to get World Series predictions for 2015. You can also log into a web UI and interact with the model that way. You can build and test the app locally using Docker.
$ docker build -t ws-ecs .
$ docker run -p 8000:8000 --env AWS_SECRET_ACCESS_KEY=YOUR_KEY --env AWS_ACCESS_KEY_ID=YOUR_KEY --rm ws-ecs
From a second terminal, send a post request.
$ curl -k --data '{"year": 2015}' --request POST --header "Content-Type: application/json" https://localhost:8000/predict
I also included cert files in the repo so that we can use HTTPS through all layers of our application. These are simply self-signed certs, and this only a sample application that has already been axed, so I don't care if they are in the open. In production, a user will make a POST request to a domain name, which will then forward the traffic to a load balancer. This traffic will be covered by a formal SSL certificate. The load balancer will then communicate with copies of our application, which is where the self-signed certificate will kick in. This layer of traffic will only be going over AWS's internal network, so HTTPS is less important. That said, it's pretty easy just to use HTTPS at this layer as well for added protection against internal bad actors. Anyway, to generate your own self-signed certs, you can issue the following command:
$ openssl req -x509 -newkey rsa:4096 -nodes -out cert.pem -keyout key.pem -days 10000
The world-series-projections-batch repo is for a job that will run on a cron schedule. At a scheduled time, it will select a random year, generate the relevant predictions, and then email them. This is a bit of a contrived example, but it's still illustrative.
We can also clone the repository we need for our private Python library, which we will use to standardize how our programmatic accounts connect to AWS services. It also includes some utilities for interacting with databases.
$ git clone git@github.com:micahmelling/data_science_helpers.git
The code can easily be expanded as desired. For instance, you could create some nested logic for connecting to different types of databases.
At this stage, let's create a secret called yagmail-credentials with the following keys and appropriate values: username, password.
These are general credentials that will be valid across projects; it's not tied to any particular effort.
We want to rotate all of our programmatic access keys. Recall that our password rotation for console access is managed by AWS, per our account settings. Fortunately, we can use automation to rotate our access keys in Secrets Manager. In fact, we could do this for many types of secrets. At any given time, a data scientist should, ideally, have one set of AWS keys on their machine that are tied to a specific project (ideally, in a temporary terminal session). We could have the following script run early every morning to rotate all the project-specific access keys. Having data scientists update their keys daily isn't too onerous.
import boto3 | |
import json | |
import pandas as pd | |
from ds_helpers import aws, db | |
def get_accounts(db_conn): | |
df = pd.read_sql(f''' | |
select username, secret_name | |
from aws_management.service_access_keys; | |
''', db_conn) | |
return df['username'].tolist(), df['secret_name'].tolist() | |
def get_current_access_key(client, username): | |
paginator = client.get_paginator('list_access_keys') | |
for response in paginator.paginate(UserName=username): | |
return response.get('AccessKeyMetadata')[0].get('AccessKeyId') | |
def create_new_access_keys(client, username): | |
response = client.create_access_key( | |
UserName=username | |
) | |
key_id = response['AccessKey'].get('AccessKeyId') | |
secret_key = response['AccessKey'].get('SecretAccessKey') | |
access_keys = (key_id, secret_key) | |
return access_keys | |
def update_access_keys_in_secrets_manager(client, secret_name, access_keys): | |
response = client.update_secret(SecretId=secret_name, | |
SecretString=json.dumps({ | |
"access_key": f"{access_keys[0]}", | |
"secret_key": f"{access_keys[1]}", | |
})) | |
def delete_current_access_keys(client, access_key, username): | |
client.delete_access_key( | |
AccessKeyId=access_key, | |
UserName=username | |
) | |
def main(): | |
iam_client = boto3.client('iam') | |
secrets_manager_client = boto3.client("secretsmanager") | |
db_conn = db.connect_to_mysql(aws.get_secrets_manager_secret('aws-key-management-db')) | |
usernames, secrets = get_accounts(db_conn) | |
for username, secret in zip(usernames, secrets): | |
current_access_key = get_current_access_key(iam_client, username) | |
new_access_key_pair = create_new_access_keys(iam_client, username) | |
update_access_keys_in_secrets_manager(secrets_manager_client, secret, new_access_key_pair) | |
delete_current_access_keys(iam_client, current_access_key, username) | |
if __name__ == "__main__": | |
main() |
We could also take a different approach where we notify a user that keys have been changed.
The following script can accomplish our aim.
import boto3 | |
import yagmail | |
import json | |
import pandas as pd | |
from datetime import datetime, timedelta | |
from ds_helpers import aws, db | |
def get_accounts_needing_new_keys(today, db_conn): | |
df = pd.read_sql(f''' | |
select ak.username, em.email, deletion_date | |
from aws_management.user_access_keys ak | |
inner join aws_management.emails em on ak.username = em.username | |
inner join ( | |
select count(*), username | |
from aws_management.user_access_keys | |
group by username | |
having count(*) = 1 | |
) uc on ak.username = uc.username | |
where '{today}' >= ak.rotation_date; | |
''', db_conn) | |
return df['username'].tolist(), df['email'].tolist(), df['deletion_date'].tolist(), | |
def create_new_access_keys(client, username): | |
response = client.create_access_key( | |
UserName=username | |
) | |
key_id = response['AccessKey'].get('AccessKeyId') | |
secret_key = response['AccessKey'].get('SecretAccessKey') | |
access_keys = (key_id, secret_key) | |
return access_keys | |
def add_new_access_keys_to_secrets_manager(client, today, username, access_keys): | |
response = client.create_secret(Name=f'access_keys_{username}_{today}', | |
SecretString=json.dumps({ | |
"access_key": f"{access_keys[0]}", | |
"secret_key": f"{access_keys[1]}", | |
})) | |
secret_arn = response.get('ARN') | |
secret_name = response.get('Name') | |
return secret_arn, secret_name | |
def grant_access_to_new_secret(client, resource, today, secret_arn, username): | |
policy = { | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Effect": "Allow", | |
"Action": [ | |
"secretsmanager:GetResourcePolicy", | |
"secretsmanager:GetSecretValue", | |
"secretsmanager:DescribeSecret", | |
"secretsmanager:ListSecretVersionIds", | |
"secretsmanager:ListSecrets" | |
], | |
"Resource": [ | |
secret_arn | |
] | |
} | |
] | |
} | |
response = client.create_policy( | |
PolicyName=f'access_keys_{username}_{today}', | |
PolicyDocument=json.dumps(policy) | |
) | |
policy_arn = response.get('Policy').get('Arn') | |
resource.Group(f'{username}-console').attach_policy(PolicyArn=policy_arn) | |
return policy_arn | |
def save_new_key_entry_to_database(db_conn, username, access_key, secret_name, policy_arn, today_datetime, | |
days_until_rotation, days_until_deletion): | |
deletion_date = (today_datetime + timedelta(days=days_until_deletion)).strftime('%Y/%m/%d') | |
rotation_date = (today_datetime + timedelta(days=days_until_rotation)).strftime('%Y/%m/%d') | |
db_conn.execute( | |
f''' | |
INSERT INTO aws_management.access_keys (username, access_key, deletion_date, rotation_date, secret_name, | |
policy_arn) | |
VALUES ('{username}', '{access_key}', '{deletion_date}', '{rotation_date}', '{secret_name}', '{policy_arn}'); | |
''' | |
) | |
def send_notification_of_key_switch(yag, username, email, today, expiration_date): | |
subject = 'Notification of New AWS Access Keys' | |
contents = f'New AWS access keys have been generated for your account. Log into your password manager account ' \ | |
f'and check Secrets Manager. The new keys are stored in the secret titled ' \ | |
f'access_keys_{username}_{today}. Your current keys will be deleted on {expiration_date}. Switch ' \ | |
f'to these new keys before then.' | |
yag.send(to=email, subject=subject, contents=contents) | |
def get_keys_needing_deactivation(today, db_conn): | |
df = pd.read_sql(f''' | |
select ak.username, ak.access_key, em.email, ak.secret_name, ak.policy_arn | |
from aws_management.user_access_keys ak | |
inner join aws_management.emails em on ak.username = em.username | |
where '{today}' >= ak.deletion_date; | |
''', db_conn) | |
return df['username'].tolist(), df['access_key'].tolist(), df['email'].tolist(), df['secret_name'].tolist(), \ | |
df['policy_arn'].tolist(), | |
def delete_old_access_keys(client, access_key, username): | |
client.delete_access_key( | |
AccessKeyId=access_key, | |
UserName=username | |
) | |
def delete_old_secret(client, secret_name): | |
response = client.delete_secret( | |
SecretId=secret_name | |
) | |
def detach_and_delete_old_secret_policy(resource, policy_arn, username): | |
response = resource.Group(f'{username}-console').detach_policy( | |
PolicyArn=policy_arn | |
) | |
policy = resource.Policy(policy_arn) | |
response = policy.delete() | |
def remove_old_key_entry_from_database(db_conn, access_key): | |
db_conn.execute( | |
f''' | |
DELETE FROM aws_management.access_keys WHERE access_key = '{access_key}'; | |
''' | |
) | |
def send_notification_of_deactivated_keys(yag, email): | |
subject = 'Notification of Deleted AWS Access Keys' | |
contents = 'AWS Keys under your account have expired. New keys are in Secrets Manager. Please update to these ' \ | |
'new keys if you have not already.' | |
yag.send(to=email, subject=subject, contents=contents) | |
def initialize_yagmail_client(email_secret): | |
email_dict = aws.get_secrets_manager_secret(email_secret) | |
username = email_dict.get('username') | |
password = email_dict.get('password') | |
yag = yagmail.SMTP(username, password) | |
return yag | |
def main(): | |
iam_client = boto3.client('iam') | |
iam_resource = boto3.resource('iam') | |
secrets_manager_client = boto3.client("secretsmanager") | |
today_datetime = datetime.today().date() | |
today_str = today_datetime.strftime('%Y-%m-%d') | |
yag = initialize_yagmail_client('yagmail-credentials') | |
db_conn = db.connect_to_mysql(aws.get_secrets_manager_secret('aws-key-management-db')) | |
usernames, emails, deletion_dates = get_accounts_needing_new_keys(today_str, db_conn) | |
for username, email, deletion_date in zip(usernames, emails, deletion_dates): | |
access_keys = create_new_access_keys(iam_client, username) | |
secret_arn, secret_name = add_new_access_keys_to_secrets_manager(secrets_manager_client, today_str, username, | |
access_keys) | |
policy_arn = grant_access_to_new_secret(iam_client, iam_resource, today_str, secret_arn, username) | |
save_new_key_entry_to_database(db_conn, username, access_keys[0], secret_name, policy_arn, today_datetime, | |
45, 55) | |
send_notification_of_key_switch(yag, username, email, today_str, deletion_date) | |
usernames, access_keys, emails, secret_names, policy_arns = get_keys_needing_deactivation(today_str, db_conn) | |
for username, access_key, email, secret_name, policy_arn in zip(usernames, access_keys, emails, secret_names, | |
policy_arns): | |
delete_old_access_keys(iam_client, access_key, username) | |
delete_old_secret(secrets_manager_client, secret_name) | |
detach_and_delete_old_secret_policy(iam_resource, policy_arn, username) | |
remove_old_key_entry_from_database(db_conn, access_key) | |
send_notification_of_deactivated_keys(yag, email) | |
if __name__ == "__main__": | |
main() |
The previous scripts require some set up in MySQL, which can be done with the following commands.
create schema if not exists aws_management; | |
create user 'aws_management'@'%' identified by 'insert_password'; | |
grant insert, select, update, delete, create, drop on aws_management.* to 'aws_management'@'%'; | |
create table if not exists aws_management.service_access_keys ( | |
id int not null primary key auto_increment, | |
meta__inseted_at timestamp default current_timestamp, | |
username text, | |
secret_name text | |
); | |
create table if not exists aws_management.access_keys ( | |
id int not null primary key auto_increment, | |
meta__inseted_at timestamp default current_timestamp, | |
username text, | |
access_key text, | |
deletion_date date, | |
rotation_date date, | |
secret_name text, | |
policy_arn text | |
); | |
create table if not exists aws_management.emails ( | |
id int not null primary key auto_increment, | |
meta__inseted_at timestamp default current_timestamp, | |
isername text, | |
email text | |
); |
We will use AWS CodeCommit as our remote git repository. When running a CI/CD pipeline in AWS, using CodeCommit is a little easier compared to an external service like GitHub. Likewise, using CodeCommit comes with many of the security benefits on which we have been working.
You'll need to configure your machine to push and pull code from CodeCommit. Use SSH authentication as it's more secure compared to HTTPS authentication. The latter can be breached if credentials are stolen. The former can only be breached if your private key is stolen.
Using your admin account, register a domain in Route 53. You'll also want to get a corresponding SSL certificate. Whenever you create an API endpoint, it will be a subdomain for our registered domain, protected by the SSL certificate.
To get going with our infrastructure as code, we will need to give our Pulumi accounts permissions. Additionally, there are a couple of policies we want to create with our admin account via the console before we start using Pulumi.
First, let's create those custom IAM policies we will need.
{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Sid": "VisualEditor0", | |
"Effect": "Allow", | |
"Action": "firehose:*", | |
"Resource": "*" | |
} | |
] | |
} |
{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Sid": "VisualEditor0", | |
"Effect": "Allow", | |
"Action": "waf-regional:*", | |
"Resource": "*" | |
} | |
] | |
} |
Now, let's attach the following policies to each of our Pulumi groups and then assign the correspond users to those groups.
I've found creating custom policies for Pulumi users a bit challenging. In several cases, the desired resource will be created but an error will still be thrown. For example, the following policy will create an S3 bucket, but Pulumi will still raise an error on the command line. This creates some confusion, especially when it comes to debugging. Overall, Pulumi is an outstanding yet imperfect tool.
{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Effect": "Allow", | |
"Action": [ | |
"s3:CreateBucket", | |
"s3:DeleteBucket" | |
], | |
"Resource": "*" | |
} | |
] | |
} |
We now need to configure our machine to use Pulumi. You can follow this tutorial to get the ball rolling. It's quite simple.
I also recommend setting up four CodeCommit repos for your Pulumi work, one for each account, which corresponds to a segment of work.
For each subtopic, such as creating a VPC, make a subdirectory in your git repo directory. Each subdirectory then becomes its own Pulumi project that we can use to create and delete resources. In each subdirectory, we can issue the following command to initiate a Pulumi project
$ pulumi new aws-python
We could switch this design should we desire. For instance, in our project git repo, we might have a subdirectory called infrastructure, which would house our infrastructure scripts. We have to determine if we want to keep everything relevant to our project together or keep all of our infrastructure code together. This tutorial opts for the latter as the infrastructure code is generally not too relevant for the daily users of project git repos.
Further, we will need to set the appropriate AWS key environment variables when working with Pulumi. I will store these as Secrets Manager secrets in an appropriate account. When you are ready to work with a certain AWS Pulumi user, retrieve the relevant keys, export them as environment variables in your terminal, and then kill your terminal session when you are finished.
$ export AWS_ACCESS_KEY_ID=<AWS_ACCESS_KEY_ID>
$ export AWS_SECRET_ACCESS_KEY=<AWS_SECRET_ACCESS_KEY>
When you're ready to deploy your infrastructure, you simply need to issue the following from the command line:
$ pulumi up
If you want to destroy infrastructure, issue the following:
$ pulumi destroy
In the following sections, we will use our pulumi_admin account. We will create three VPCs: production, staging, and development. Production is self-explanatory. Staging is a testing environment that should be well-maintained and mimic production as closely as possible. Development is for experimentation. We can use the following function to create our three VPCs.
import pulumi_aws as aws | |
def main(vpc_name, cidr_block, subnet_list): | |
""" | |
Creates a VPC and associated subnets. NAT Gateways are configured for private subnets, and Internet Gateways are | |
configured for public subnets. Route tables are configured as well. | |
:param vpc_name: name of the VPC | |
:type vpc_name: str | |
:param cidr_block: CIDR block of the VPC | |
:type cidr_block: str | |
:param subnet_list: list of subnets, with each element being a list of the form: CIDR block, availability zone, | |
and 'public' or 'private'. If a private subnet is desired, it must be specified directly after a public subnet, | |
in which case the two will be attached. Likewise, private and public subnets should be in the same availability | |
zone. | |
:type subnet_list: list of lists | |
""" | |
vpc = aws.ec2.Vpc( | |
vpc_name, | |
cidr_block=cidr_block, | |
enable_dns_hostnames=True | |
) | |
public_subnet_ids = [] | |
private_subnet_ids = [] | |
for subnet_set in subnet_list: | |
if subnet_set[2] == 'public': | |
subnet = aws.ec2.Subnet( | |
f'{vpc_name}-{subnet_set[2]}-{subnet_set[1]}', | |
availability_zone=subnet_set[1], | |
vpc_id=vpc.id, | |
cidr_block=subnet_set[0], | |
map_public_ip_on_launch=True, | |
) | |
public_subnet_ids.append(subnet.id) | |
if subnet_set[2] == 'private': | |
subnet = aws.ec2.Subnet( | |
f'{vpc_name}-{subnet_set[2]}-{subnet_set[1]}', | |
availability_zone=subnet_set[1], | |
vpc_id=vpc.id, | |
cidr_block=subnet_set[0], | |
map_public_ip_on_launch=False, | |
) | |
private_subnet_ids.append(subnet.id) | |
for index, subnet_id in enumerate(private_subnet_ids): | |
elastic_ip = aws.ec2.Eip( | |
resource_name=f'{vpc_name}-elastic-ip-{index}', | |
vpc=True) | |
nat_gateway = aws.ec2.NatGateway( | |
resource_name=f'{vpc_name}-nat-gateway-{index}', | |
allocation_id=elastic_ip.id, | |
subnet_id=public_subnet_ids[index]) | |
routetable_nat = aws.ec2.RouteTable( | |
resource_name=f'{vpc_name}-nat-routetable-{index}', | |
vpc_id=vpc.id, | |
routes=[ | |
{ | |
"cidrBlock": "0.0.0.0/0", | |
"gatewayId": nat_gateway.id | |
}]) | |
routetable_association_nat = aws.ec2.RouteTableAssociation( | |
resource_name=f'{vpc_name}-nat-association-{index}', | |
subnet_id=subnet_id, | |
route_table_id=routetable_nat.id) | |
internet_gateway = aws.ec2.InternetGateway( | |
resource_name=f'{vpc_name}-internet-gateway', | |
vpc_id=vpc.id) | |
routetable_igw = aws.ec2.RouteTable( | |
resource_name=f'{vpc_name}-igw-routetable', | |
vpc_id=vpc.id, | |
routes=[ | |
{ | |
"cidrBlock": "0.0.0.0/0", | |
"gatewayId": internet_gateway.id | |
}]) | |
for index, subnet_id in enumerate(public_subnet_ids): | |
routetable_association_igw = aws.ec2.RouteTableAssociation( | |
resource_name=f'{vpc_name}-igw-association-{index}', | |
subnet_id=subnet_id, | |
route_table_id=routetable_igw.id) | |
RDS is AWS's service for managing relational databases, like MySQL. We will create three databases: prod. stage, and dev. These match our VPC environments above. Only prod applications will be able to access the prod database, per security group configurations. Right now, we are employing a shared database model, where we create databases that can be shared across projects. However, we could easily take this script and just create a database for every individual application (and fold it into the project initiation script below), which often happens in a microservices environment.
import pulumi_aws as aws | |
def main(database_name, allocated_gb_storage, db_engine, db_engine_version, instance_class, root_password, | |
root_username, subnet_group_name, subnet_id_list, security_group_name, vpc_id, allowed_ip_list, | |
security_group_port): | |
""" | |
Creates an RDS instance and associated security group. | |
:param database_name: name of the database | |
:param allocated_gb_storage: number of gigabytes of storage | |
:param db_engine: database engine | |
:param db_engine_version: database engine version | |
:param instance_class: compute instance type | |
:param root_password: password for the root user | |
:param root_username: username for the root user | |
:param subnet_group_name: name of the db subnet group to create | |
:param subnet_id_list: list of subnets to launch the instance in | |
:param security_group_name: name of the security group to create | |
:param vpc_id: id of the VPC to create the security group in | |
:param allowed_ip_list: list of ip addresses to allow access to the RDS instance | |
:param security_group_port: the port over which traffic should be allowed | |
""" | |
subnet_group = aws.rds.SubnetGroup( | |
subnet_group_name, | |
name=subnet_group_name, | |
subnet_ids=subnet_id_list, | |
) | |
security_group = aws.ec2.SecurityGroup( | |
security_group_name, | |
name=security_group_name, | |
vpc_id=vpc_id, | |
ingress=[aws.ec2.SecurityGroupIngressArgs( | |
protocol='tcp', | |
from_port=security_group_port, | |
to_port=security_group_port, | |
cidr_blocks=allowed_ip_list | |
)], | |
egress=[aws.ec2.SecurityGroupEgressArgs( | |
from_port=0, | |
to_port=0, | |
protocol="-1", | |
cidr_blocks=["0.0.0.0/0"], | |
)]) | |
rds = aws.rds.Instance( | |
database_name, | |
allocated_storage=allocated_gb_storage, | |
engine=db_engine, | |
engine_version=db_engine_version, | |
instance_class=instance_class, | |
name=database_name, | |
password=root_password, | |
username=root_username, | |
publicly_accessible=True, | |
skip_final_snapshot=True, | |
db_subnet_group_name=subnet_group_name, | |
vpc_security_group_ids=[security_group.id] | |
) | |
For our ECS application, we need a MySQL user and table to manage user log-ins. Now that we have our databases, we can create the resources we need in both staging and prod.
create schema world_series; | |
-- user for Flask app | |
create user 'world_series_service'@'%' identified by 'insert_password'; | |
grant select on world_series.* to 'world_series_service'@'%'; | |
-- user to perform most management operations on the schema | |
create user 'world_series_management'@'%' identified by 'insert_password'; | |
grant insert, select, update, delete, create, drop on world_series.* to 'world_series_service'@'%'; | |
flush privileges; | |
-- use world_series_management user from here on out rather than root | |
create table if not exists world_series.app_credentials ( | |
id int not null primary key auto_increment, | |
meta__inseted_at timestamp default current_timestamp, | |
username text, | |
password text | |
); | |
INSERT INTO world_series.app_credentials (username, password) | |
VALUES ('username', 'hashed_password'); |
To create a hashed password, you can issue the following commands.
$ python3
$ import hashlib
$ password = "password"
$ hashed_password = hashlib.sha256(password.encode()).hexdigest()
We also need to create some general S3 bucket that will be used across projects. Specifically, we will need to create buckets to house CI/CD artifacts for CodeBuild and CodePipeline. We can leverage the following function.
from pulumi_aws import s3 | |
def main(bucket_name, add_random_suffix=False): | |
""" | |
Creates a private S3 bucket. | |
:param bucket_name: name of the bucket; pulumi will append random characters to the end | |
:type bucket_name: str | |
:param add_random_suffix: Whether or not we want to add a random suffix to bucket_name | |
:type add_random_suffix: Boolean | |
""" | |
if add_random_suffix: | |
bucket = s3.Bucket(bucket_name, acl='private') | |
else: | |
bucket = s3.Bucket(bucket_name, bucket=bucket_name, acl='private') | |
Likewise, we have some general execution roles we need to create.
import pulumi_aws as aws | |
def main(): | |
""" | |
Creates generic service roles that we might want to reuse. These include: | |
- ecsInstanceRole: for running ECS services | |
- ecsTaskExecution: for running ECS tasks | |
- awsBatchServiceRole: for running Batch environments | |
""" | |
ecs_instance_role = aws.iam.Role("ecsInstanceRole", | |
name="ecsInstanceRole", | |
assume_role_policy="""{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Action": "sts:AssumeRole", | |
"Effect": "Allow", | |
"Principal": { | |
"Service": "ec2.amazonaws.com" | |
} | |
} | |
] | |
} | |
""") | |
ecs_instance_role_policy_attachment = aws.iam.RolePolicyAttachment("ecsInstanceRolePolicyAttachment", | |
role=ecs_instance_role.name, | |
policy_arn="arn:aws:iam::aws:policy/service-role" | |
"/AmazonEC2ContainerServiceforEC2Role" | |
) | |
ecs_task_execution_role = aws.iam.Role("ecsTaskExecution", | |
name="ecsTaskExecution", | |
assume_role_policy="""{ | |
"Version": "2008-10-17", | |
"Statement": [ | |
{ | |
"Sid": "", | |
"Effect": "Allow", | |
"Principal": { | |
"Service": "ecs-tasks.amazonaws.com" | |
}, | |
"Action": "sts:AssumeRole" | |
} | |
] | |
} | |
""") | |
ecs_task_role_policy_attachment = aws.iam.RolePolicyAttachment("ecsTaskExecution", | |
role=ecs_task_execution_role.name, | |
policy_arn="arn:aws:iam::aws:policy/service-role" | |
"/AmazonECSTaskExecutionRolePolicy" | |
) | |
aws_batch_service_role = aws.iam.Role("awsBatchServiceRole", | |
name="awsBatchServiceRole", | |
assume_role_policy="""{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Action": "sts:AssumeRole", | |
"Effect": "Allow", | |
"Principal": { | |
"Service": "batch.amazonaws.com" | |
} | |
} | |
] | |
} | |
""") | |
aws_batch_service_role_policy_attachment = aws.iam.RolePolicyAttachment("awsBatchServiceRolePolicyAttachment", | |
role=aws_batch_service_role.name, | |
policy_arn="arn:aws:iam::aws:policy/" | |
"service-role/AWSBatchService" | |
"Role") |
Another generic item we want to create is called a WAF IPSet. A WAF is web application firewall; it sits ontop of a load balancer and filters traffic based on criteria such as IP address. The IP set is a collection of IP addresses we want to allow to access something like a load balancer. This is an item we are likely to reuse, so we create it under the admin umbrella. To note, we use the wafregional module rather than the waf or wafv2 modules in Pulumi. Long story short, I've had issues getting the latter two to work properly. When using wafregional, you'll have to access those items in the console using the "WAF Classic" view in AWS Firewall Manager. The functionality with wafregional does what we want via Pulumi, but we don't get as many automated charts in the console when using the "classic" view. That said, I have also found wafregional to be a bit finicky. It works, unlike some of the other WAF modules. However, sometimes you just have to run "pulumi up" multiples times for it to work. I don't find the behavior to be predictable.
import pulumi_aws as aws | |
def main(ipset_name, ip_address): | |
""" | |
Creates a WAF IP Set. | |
:param ipset_name: name of the IP set | |
:type ipset_name: str | |
:param ip_address: IP address to allow | |
:type ip_address: str | |
""" | |
ipset = aws.wafregional.IpSet(ipset_name, | |
ip_set_descriptors=[ | |
aws.wafregional.IpSetIpSetDescriptorArgs( | |
type="IPV4", | |
value=ip_address, | |
) | |
]) | |
We'll also create a generic security group that only allows egress traffic.
import pulumi_aws as aws | |
def main(vpc_id, security_group_name): | |
""" | |
Creates an egress only security group. | |
:param vpc_id: VPC ID | |
:type vpc_id: str | |
:param security_group_name: name of the security group | |
:type security_group_name: str | |
""" | |
security_group = aws.ec2.SecurityGroup(security_group_name, | |
name=security_group_name, | |
vpc_id=vpc_id, | |
egress=[aws.ec2.SecurityGroupEgressArgs( | |
from_port=0, | |
to_port=0, | |
protocol="-1", | |
cidr_blocks=["0.0.0.0/0"], | |
)]) | |
The final "general" items we need to create are a Batch compute environment and job queue, which will give us the infrastructure we need to run jobs on a cron schedule. We will switch to the pulumi_batch_job account to perform these actions. A compute environment is the actual compute resources. A job queue is a logical separation. For example, we might have a small queue for quick ETL jobs and a large queue for re-training and running large models, each mapped to an appropriately-sized compute cluster.
import pulumi_aws as aws | |
def main(compute_environment_name, instance_role_arn, max_vcpus, min_vcpus, desired_vcpus, | |
security_group_id, subnet_ids, service_role_arn): | |
""" | |
Creates an AWS Batch compute environment. | |
Spins up an AWS Batch compute environment with the following baked-in assumptions: | |
- no more than 30 VCPUs allowed | |
- instance_type is optimal | |
- provisioning type is EC2 | |
- provisioning is managed by AWS | |
The following customizations can be passed in as parameters. | |
:param compute_environment_name: name of the compute environment | |
:type compute_environment_name: str | |
:param instance_role_arn: arn of the instance role | |
:type instance_role_arn: str | |
:param max_vcpus: maximum number of vcpus | |
:type max_vcpus: int | |
:param min_vcpus: minimum number of vcpus | |
:type min_vcpus: int | |
:param desired_vcpus: desired number of vcpus | |
:type desired_vcpus: int | |
:param security_group_id: id of the security group the compute environment should be placed in | |
:type security_group_id: str | |
:param subnet_ids: series of subnet ids to identify which subnets the EC2 instance can go in | |
:type subnet_ids: list | |
:param service_role_arn: arn of the service role | |
:type service_role_arn: str | |
""" | |
if max_vcpus > 30: | |
raise Exception(f'max_vcpus of {max_vcpus} must be less than or equal to 30') | |
compute_environment = aws.batch.ComputeEnvironment(compute_environment_name, | |
compute_environment_name=compute_environment_name, | |
compute_resources={ | |
"instanceRole": instance_role_arn, | |
"instance_type": ["optimal"], | |
"maxVcpus": max_vcpus, | |
"minVcpus": min_vcpus, | |
"desired_vcpus": desired_vcpus, | |
"security_group_ids": [security_group_id], | |
"subnets": subnet_ids, | |
"type": "EC2", | |
}, | |
service_role=service_role_arn, | |
type="MANAGED") | |
import pulumi_aws as aws | |
def main(compute_environment_arns_list, queue_name): | |
""" | |
Creates an AWS Batch job queue and associates it with an arbitrary number of compute environments. | |
:param compute_environment_arns_list: list of compute environment arns to associate with the job queue | |
:type compute_environment_arns_list: list | |
:param queue_name: name of the job queue | |
:type queue_name: str | |
""" | |
job_queue = aws.batch.JobQueue(queue_name, | |
compute_environments=compute_environment_arns_list, | |
priority=1, | |
state="ENABLED") | |
For every data science project we create, we will need a series of AWS resources. We can use a script to create many of these in one fell swoop. Unfortunately, we aren't able to 100% automate this process, but we can get close. First, we don't create any project-specific users with console-only access, though this might be useful. The reason is that we cannot do the necessary MFA coordination via a script. That said, we can do most anything from the command line or via Python scripts that we could do from the console. A console account may not be necessary, but it might be nice to follow the execution of a CodePipeline via the console. We do, however, create a project-specific account with programmatic access. We then give specific users access to a Secrets Manager secret that includes the necessary API keys to use the programmatic account. However, Pulumi has a limitation that plain-old boto3 doesn't: we can't get the actual strings of the key and secret to automatically upload them. We can export those items, retrieve them via the command line, and then manually update the secret. Using the exported names in the script below, we can accomplish that with the following commands:
$ pulumi stack output secret_key --show-secrets
$ pulumi stack output access_key
This is a bit of a pain. Again, Pulumi is a great tool, but it's not perfect.
import pulumi_aws as aws | |
import pulumi | |
import json | |
def main(codecommit_repo_name, ecr_repo_name, s3_bucket_names, secret_names, group_name, user_name, | |
iam_users_with_access): | |
""" | |
Creates the following resources to initiate a project: | |
- CodeCommit repo | |
- ECR repo | |
- S3 buckets | |
- Secrets Manager secrets | |
- Programmatic account with keys given access to specified IAM users via Secrets Manager (manual update | |
required using exported values) | |
:param codecommit_repo_name: name of the CodeCommit repo | |
:param ecr_repo_name: name of the ECR repo | |
:param s3_bucket_names: list of S3 bucket names to create | |
:param secret_names: list of Secrets Manager secrets to create | |
:param group_name: name of the group to attach the policies to | |
:param user_name: name of the user to create | |
:param iam_users_with_access: list of IAM usernames we want to be able to access the secret containing the AWS keys | |
for the programmatic user | |
""" | |
policy_list = [] | |
codecommit = aws.codecommit.Repository( | |
codecommit_repo_name, | |
repository_name=codecommit_repo_name | |
) | |
policy_list.append((f'codecommit_{codecommit_repo_name}', codecommit.arn)) | |
ecr = aws.ecr.Repository( | |
ecr_repo_name, | |
name=ecr_repo_name | |
) | |
policy_list.append((f'ecr_{ecr_repo_name}', ecr.arn)) | |
for bucket_name in s3_bucket_names: | |
bucket = aws.s3.Bucket( | |
bucket_name, | |
bucket=bucket_name, | |
acl='private' | |
) | |
policy_list.append((f's3_{bucket_name}', bucket.arn)) | |
for secret_name in secret_names: | |
secret = aws.secretsmanager.Secret( | |
secret_name, | |
name=secret_name | |
) | |
policy_list.append((f'secrets_manager_{secret_name}', secret.arn)) | |
group = aws.iam.Group(group_name, name=group_name, path="/users/") | |
for policy_item in policy_list: | |
arn = aws.get_arn(policy_item[1]).arn | |
policy = aws.iam.Policy( | |
policy_item[0], | |
name=policy_item[0], | |
path="/", | |
policy=f"""{{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{{ | |
"Effect": "Allow", | |
"Action": "*", | |
"Resource": "{arn}" | |
}} | |
] | |
}}""" | |
) | |
group_attach = aws.iam.GroupPolicyAttachment(f'{policy_item[0]}-group-attach', | |
group=group.name, | |
policy_arn=policy.arn) | |
user = aws.iam.User( | |
user_name, | |
name=user_name, | |
path="/system/" | |
) | |
access_key = aws.iam.AccessKey(f'{user_name}_access_keys', user=user.name) | |
group_membership = aws.iam.GroupMembership(f'{user_name}_group_assignment', | |
users=[ | |
user.name, | |
], | |
group=group.name) | |
key_secret = aws.secretsmanager.Secret( | |
f'{user_name}_aws_keys', | |
name=f'{user_name}_aws_keys' | |
) | |
pulumi.export('access_key', access_key.id) | |
pulumi.export('secret_key', access_key.secret) | |
secret_values = { | |
"access_key": '', | |
"secret_key": '', | |
} | |
secret_version = aws.secretsmanager.SecretVersion( | |
f'{user_name}_secret_value', | |
secret_id=key_secret.arn, | |
secret_string=json.dumps(secret_values) | |
) | |
secret_arn = aws.get_arn(secret_version).arn | |
secret_policy = aws.iam.Policy( | |
f'{user_name}_secret_policy', | |
name=f'{user_name}_secret_policy', | |
path="/", | |
policy=f"""{{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{{ | |
"Effect": "Allow", | |
"Action": "*", | |
"Resource": "{secret_arn}" | |
}} | |
] | |
}}""" | |
) | |
for user in iam_users_with_access: | |
policy_attach = aws.iam.UserPolicyAttachment( | |
"secret_policy_attach", | |
user=user, | |
policy_arn=secret_policy.arn | |
) | |
We will seed each ECR repo with an initial image. These images will be updated via our CodePipeline in later sections. I generally like to do this so that I can test applications before setting up a CodePipeline. Likewise, knowing how to interact with ECR is generally beneficial. For the below process to work, you will need the Docker daemon running on your machine.
Build Docker image locally.
$ docker build -t world-series-ecs-app .
Create connection with ECR.
$ aws ecr get-login-password --region <region> | docker login --username AWS --password-stdin < aws_account_id>.dkr.ecr.<region>.amazonaws.com
Push image to ECR.
$ docker tag <image id> aws_account_id.dkr.ecr.region.amazonaws.com/<ecr repo name>
$ docker push aws_account_id.dkr.ecr.region.amazonaws.com/<ecr repo name>
Alright, now it's time to actually deploy our REST services. The below script will create full-fledged endpoints for our application.
You'll need to deploy the private Python package first so that future processes can install it. You can then build the ECS app from world-series-projections-ecs. To note, when installing a custom, private package, you need to add some flags so that pip knows where to look (it defaults to pypi.org). For example, I deployed my package on the DNS data-science-helpers.allstardatascience.com, so I need the following statement to install the package.
pip3 install --trusted-host data-science-helpers.allstardatascience.com --find-links https://data-science-helpers.allstardatascience.com/ micah_melling_ds_package==0.1.0
In a requirements.txt file, you can add those two flags atop the file and then list the package name and pegged version like any other package. One tip: give your package a unique name, one that is not already a package on pip.
How exactly is our package "private"? We do so through IP rules. Only IP addresses listed in the argument ingress_cidr_blocks will be able to install the package. We will, obviously, want to include the IPs of the data scientists we want to use the package. Likewise, we need to include the IP address(es) that will allow our CI/CD pipelines to install the package for our Docker images. In subsequent sections, we will see that we run our CI/CD pipeline, specifically the "build" step, in a private subnet. This allows us to run the process through a NAT Gateway with an Elastic IP (EIP) attached. The EIP is known, so we can easily add it to ingress_cidr_blocks. To note, we may have to create a new WAF IPSet to accommodate the EIP.
OK, back to the ECS application. Our code is all inclusive: it includes multiple layers of security and multiple layers of logging. We might opt to break the script into smaller chunks. However, we need all of these components for a full-fledged app with all the desired bells and whistles, so, in many ways, it makes sense to create everything in one script. Likewise, as you might have noticed, I only have one mega function in each script. We could write smaller, more atomic functions. However, each line is basically synonymous with creating a new resource, so the structure is inherently "modular" and readable: one line performs one action and tells you what it is.
For each project, we want to use the following script to create two versions of our app: a staging one and a production one. This delineation will be more clear in our CI/CD pipeline. One more reminder: the WAF functionality can be funky. I oftentimes have to run "pulumi up" a few times to get the resources to create correctly. Alternatively, you could create the WAF items separately.
import pulumi_aws as aws | |
import json | |
from pulumi import ResourceOptions | |
def main(cluster_name, vpc_id, alb_security_group_name, ingress_cidr_blocks, app_port, load_balancer_name, | |
target_group_name, listener_name, role_arn, task_definition_name, cpu, memory, container_port, container_name, | |
docker_image, service_name, task_count, health_check_path, alb_subnet_ids, ecs_security_group_name, | |
web_acl_association_name, domain_name, hosted_zone_id, certificate_arn, alb_logs_s3_bucket, | |
ecs_subnet_ids, aws_region, ecs_log_group_name, environment, firehose_aws_bucket_arn, firehose_stream_name, | |
firehose_stream_arn, account_id, ipset_id, waf_rule_name, waf_acl_name, ecs_role_name, ecs_policies_to_create, | |
ecs_policies_to_attach, rds_security_group_id): | |
""" | |
Creates an ECS service that runs a specified number of tasks. An ALB sits ontop of of the ECS service. A Route53 | |
record is created and forwards traffic to the ALB; this component also enables HTTPS. A WAF protects the ALB. | |
Security groups are configured as well to ensure proper security throughout the layers of the application. | |
The ECS application runs a Docker container and is exposed over the Internet. The compute is managed by Fargate, | |
so we don't have to worry about provisioning and managing servers. | |
:param cluster_name: name for the ECS cluster to create | |
:type cluster_name: str | |
:param vpc_id: VPC id | |
:type vpc_id: str | |
:param alb_security_group_name: name of the security group to create | |
:type alb_security_group_name: str | |
:param ingress_cidr_blocks: list of IPs that can access the service | |
:type ingress_cidr_blocks: list | |
:param app_port: port the app will run on | |
:type app_port: int | |
:param load_balancer_name: name of the load balancer to create | |
:type load_balancer_name: str | |
:param target_group_name: name of the target group to create | |
:type target_group_name: str | |
:param listener_name: name of the listener on the load balancer | |
:type listener_name: str | |
:param role_arn: arn of the ECS task execution role | |
:type role_arn: str | |
:param task_definition_name: name of the ECS task definition to create | |
:type task_definition_name: str | |
:param cpu: CPU units | |
:type cpu: str | |
:param memory: Memory units | |
:type memory: str | |
:param container_port: port the container runs on | |
:type container_port: int | |
:param container_name: name of the Docker container | |
:type container_name: str | |
:param docker_image: name of or link to the Docker image | |
:type docker_image: str | |
:param service_name: name of the service to create | |
:type service_name: str | |
:param task_count: number of tasks to run | |
:type task_count: int | |
:param health_check_path: path to the health check endpoint | |
:type health_check_path: str | |
:param alb_subnet_ids: list of subnet IDs in which to put the ECS service | |
:type alb_subnet_ids: list | |
:param ecs_security_group_name: name of the security group for the ECS service | |
:type ecs_security_group_name: str | |
:param web_acl_association_name: name to give the Web ACL association | |
:type web_acl_association_name: str | |
:param domain_name: the name of the domain to associate the application to | |
:type domain_name: str | |
:param hosted_zone_id: ID of the hosted zone to place the Route53 record in | |
:type hosted_zone_id: str | |
:param certificate_arn: ARN of the SSL certificate to enable HTTPS on the domain_name | |
:type certificate_arn: str | |
:param alb_logs_s3_bucket: name of the S3 bucket for ALB logs | |
:type alb_logs_s3_bucket: str | |
:param ecs_subnet_ids: subnets for the ECS tasks | |
:type ecs_subnet_ids: list | |
:param aws_region: name of the aws region | |
:type aws_region: str | |
:param ecs_log_group_name: name of the log group | |
:type ecs_log_group_name: str | |
:param environment: environment variable to set in ECS task | |
:type environment: str | |
:param firehose_aws_bucket_arn: ARN of the AWS bucket for the Kinesis Firehose stream | |
:type firehose_aws_bucket_arn: str | |
:param firehose_stream_name: name of the Kinesis Firehose stream | |
:type firehose_stream_name: str | |
:param firehose_stream_arn: anticipated ARN of the Kinesis Firehose stream | |
:type firehose_stream_arn: str | |
:param account_id: owning account ID | |
:type account_id: str | |
:param ipset_id: ID of the IP set to use in the WAF | |
:type ipset_id: str | |
:param waf_rule_name: name of the WAF | |
:type waf_rule_name: str | |
:param waf_acl_name: name of the WAF ACL | |
:type waf_acl_name: str | |
:param ecs_role_name: name of the container role to create | |
:type ecs_role_name: str | |
:param ecs_policies_to_create: policies to create for the ECS role | |
:type ecs_policies_to_create: list of tuples, with the first item in the tuple being the policy name and the second | |
being the policy string | |
:param ecs_policies_to_attach: ARNs of policies to attach | |
:type ecs_policies_to_attach: list | |
:param rds_security_group_id: ID the RDS security group | |
:type rds_security_group_id: str | |
""" | |
firehose_role = aws.iam.Role(f"firehoseRole_{environment}", assume_role_policy="""{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Action": "sts:AssumeRole", | |
"Principal": { | |
"Service": "firehose.amazonaws.com" | |
}, | |
"Effect": "Allow", | |
"Sid": "" | |
} | |
] | |
} | |
""") | |
firehose_role_policy = aws.iam.RolePolicy(f"{firehose_stream_name}_role_policy", | |
role=firehose_role.name, | |
policy=f"""{{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{{ | |
"Sid": "", | |
"Effect": "Allow", | |
"Action": [ | |
"glue:GetTable", | |
"glue:GetTableVersion", | |
"glue:GetTableVersions" | |
], | |
"Resource": [ | |
"arn:aws:glue:us-west-2:{account_id}:catalog", | |
"arn:aws:glue:us-west-2:{account_id}:database/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%", | |
"arn:aws:glue:us-west-2:{account_id}:table/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%" | |
] | |
}}, | |
{{ | |
"Sid": "", | |
"Effect": "Allow", | |
"Action": [ | |
"s3:AbortMultipartUpload", | |
"s3:GetBucketLocation", | |
"s3:GetObject", | |
"s3:ListBucket", | |
"s3:ListBucketMultipartUploads", | |
"s3:PutObject" | |
], | |
"Resource": [ | |
"{firehose_aws_bucket_arn}", | |
"{firehose_aws_bucket_arn}/*" | |
] | |
}}, | |
{{ | |
"Sid": "", | |
"Effect": "Allow", | |
"Action": [ | |
"lambda:InvokeFunction", | |
"lambda:GetFunctionConfiguration" | |
], | |
"Resource": "arn:aws:lambda:us-west-2:{account_id}:function:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%" | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": [ | |
"kms:GenerateDataKey", | |
"kms:Decrypt" | |
], | |
"Resource": [ | |
"arn:aws:kms:us-west-2:{account_id}:key/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%" | |
], | |
"Condition": {{ | |
"StringEquals": {{ | |
"kms:ViaService": "s3.us-west-2.amazonaws.com" | |
}}, | |
"StringLike": {{ | |
"kms:EncryptionContext:aws:s3:arn": [ | |
"arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/*" | |
] | |
}} | |
}} | |
}}, | |
{{ | |
"Sid": "", | |
"Effect": "Allow", | |
"Action": [ | |
"logs:PutLogEvents" | |
], | |
"Resource": [ | |
"arn:aws:logs:us-west-2:{account_id}:log-group:/aws/kinesisfirehose/{firehose_stream_name}:log-stream:*" | |
] | |
}}, | |
{{ | |
"Sid": "", | |
"Effect": "Allow", | |
"Action": [ | |
"kinesis:DescribeStream", | |
"kinesis:GetShardIterator", | |
"kinesis:GetRecords", | |
"kinesis:ListShards" | |
], | |
"Resource": "arn:aws:kinesis:us-west-2:{account_id}:stream/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%" | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": [ | |
"kms:Decrypt" | |
], | |
"Resource": [ | |
"arn:aws:kms:us-west-2:{account_id}:key/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%" | |
], | |
"Condition": {{ | |
"StringEquals": {{ | |
"kms:ViaService": "kinesis.us-west-2.amazonaws.com" | |
}}, | |
"StringLike": {{ | |
"kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:us-west-2:{account_id}:stream/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%" | |
}} | |
}} | |
}} | |
] | |
}}""" | |
) | |
firehose_stream = aws.kinesis.FirehoseDeliveryStream(firehose_stream_name, | |
destination="s3", | |
s3_configuration= | |
aws.kinesis.FirehoseDeliveryStreamS3ConfigurationArgs( | |
role_arn=firehose_role.arn, | |
bucket_arn=firehose_aws_bucket_arn, | |
buffer_size=1, | |
buffer_interval=60, | |
)) | |
wafrule = aws.wafregional.Rule(waf_rule_name, | |
name=waf_rule_name, | |
metric_name=waf_rule_name, | |
predicates=[aws.wafregional.RulePredicateArgs( | |
data_id=ipset_id, | |
negated=False, | |
type="IPMatch", | |
)]) | |
webacl = aws.wafregional.WebAcl(waf_acl_name, | |
name=waf_acl_name, | |
metric_name=waf_acl_name, | |
default_action=aws.wafregional.WebAclDefaultActionArgs( | |
type="BLOCK", | |
), | |
rules=[aws.wafregional.WebAclRuleArgs( | |
action=aws.wafregional.WebAclRuleActionArgs( | |
type="ALLOW", | |
), | |
priority=1, | |
rule_id=wafrule.id, | |
type="REGULAR", | |
)], | |
logging_configuration=aws.wafregional.WebAclLoggingConfigurationArgs( | |
log_destination=aws.get_arn(arn=firehose_stream_arn).arn | |
), | |
opts=ResourceOptions(depends_on=[firehose_stream, wafrule]) | |
) | |
ecs_role = aws.iam.Role( | |
ecs_role_name, | |
name=ecs_role_name, | |
assume_role_policy="""{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Action": "sts:AssumeRole", | |
"Principal": { | |
"Service": "ecs-tasks.amazonaws.com" | |
}, | |
"Effect": "Allow", | |
"Sid": "" | |
} | |
] | |
} | |
""" | |
) | |
policy_arns = [] | |
for policy in ecs_policies_to_create: | |
iam_policy = aws.iam.Policy( | |
policy[0], | |
name=policy[0], | |
policy=policy[1] | |
) | |
policy_arns.append(iam_policy.arn) | |
for index, policy_arn in enumerate(policy_arns + ecs_policies_to_attach): | |
attach = aws.iam.RolePolicyAttachment( | |
f'attached_{index}_{environment}', | |
role=ecs_role.name, | |
policy_arn=policy_arn | |
) | |
ecs_cluster = aws.ecs.Cluster(cluster_name, name=cluster_name) | |
alb_security_group = aws.ec2.SecurityGroup(alb_security_group_name, | |
name=alb_security_group_name, | |
vpc_id=vpc_id, | |
description='Enable HTTPS access', | |
ingress=[aws.ec2.SecurityGroupIngressArgs( | |
protocol='tcp', | |
from_port=app_port, | |
to_port=app_port, | |
cidr_blocks=ingress_cidr_blocks | |
)], | |
egress=[aws.ec2.SecurityGroupEgressArgs( | |
protocol='-1', | |
from_port=0, | |
to_port=0, | |
cidr_blocks=['0.0.0.0/0'], | |
)], | |
) | |
ecs_security_group = aws.ec2.SecurityGroup(ecs_security_group_name, | |
name=ecs_security_group_name, | |
vpc_id=vpc_id, | |
description='Enable ALB Access', | |
ingress=[ | |
aws.ec2.SecurityGroupIngressArgs( | |
protocol='tcp', | |
from_port=container_port, | |
to_port=container_port, | |
security_groups=[alb_security_group.id] | |
) | |
], | |
egress=[aws.ec2.SecurityGroupEgressArgs( | |
protocol='-1', | |
from_port=0, | |
to_port=0, | |
cidr_blocks=['0.0.0.0/0'], | |
)], | |
) | |
rds_ingress_rule = aws.ec2.SecurityGroupRule(f"{rds_security_group_id}_attach", | |
type="ingress", | |
from_port=3306, | |
to_port=3306, | |
protocol="tcp", | |
source_security_group_id=ecs_security_group.id, | |
security_group_id=rds_security_group_id) | |
load_balancer = aws.lb.LoadBalancer(load_balancer_name, | |
name=load_balancer_name, | |
security_groups=[alb_security_group.id], | |
subnets=alb_subnet_ids, | |
access_logs=aws.lb.LoadBalancerAccessLogsArgs( | |
bucket=alb_logs_s3_bucket, | |
prefix='alb-logs', | |
enabled=True, | |
), | |
) | |
target_group = aws.lb.TargetGroup(target_group_name, | |
name=target_group_name, | |
port=container_port, | |
protocol='HTTPS', | |
target_type='ip', | |
vpc_id=vpc_id, | |
health_check={ | |
"path": f"{health_check_path}", | |
"port": f"{container_port}", | |
"protocol": "HTTPS" | |
} | |
) | |
listener = aws.lb.Listener(listener_name, | |
load_balancer_arn=load_balancer.arn, | |
port=app_port, | |
protocol='HTTPS', | |
certificate_arn=certificate_arn, | |
default_actions=[aws.lb.ListenerDefaultActionArgs( | |
type='forward', | |
target_group_arn=target_group.arn, | |
)], | |
) | |
web_acl_association = aws.wafregional.WebAclAssociation(web_acl_association_name, | |
resource_arn=load_balancer.arn, | |
web_acl_id=webacl.id) | |
route_53_record = aws.route53.Record(domain_name, | |
zone_id=hosted_zone_id, | |
name=domain_name, | |
type="A", | |
aliases=[aws.route53.RecordAliasArgs( | |
name=load_balancer.dns_name, | |
zone_id=load_balancer.zone_id, | |
evaluate_target_health=False, | |
)] | |
) | |
ecs_log_group = aws.cloudwatch.LogGroup(f'ecs/{ecs_log_group_name}', name=f'ecs/{ecs_log_group_name}') | |
task_definition = aws.ecs.TaskDefinition(task_definition_name, | |
family=task_definition_name, | |
cpu=cpu, | |
memory=memory, | |
network_mode='awsvpc', | |
requires_compatibilities=['FARGATE'], | |
execution_role_arn=role_arn, | |
task_role_arn=ecs_role.arn, | |
container_definitions=json.dumps([{ | |
'name': container_name, | |
'image': docker_image, | |
'portMappings': [{ | |
'containerPort': container_port, | |
'protocol': 'tcp' | |
}], | |
"environment": [ | |
{ | |
"name": "ENVIRONMENT", | |
"value": f"{environment}" | |
} | |
], | |
"logConfiguration": { | |
"logDriver": "awslogs", | |
"options": { | |
"awslogs-group": f'ecs/{ecs_log_group_name}', | |
"awslogs-region": aws_region, | |
"awslogs-stream-prefix": "container-" | |
} | |
} | |
}]) | |
) | |
ecs_service = aws.ecs.Service(service_name, | |
cluster=ecs_cluster.arn, | |
desired_count=task_count, | |
launch_type='FARGATE', | |
task_definition=task_definition.arn, | |
network_configuration=aws.ecs.ServiceNetworkConfigurationArgs( | |
assign_public_ip=False, | |
subnets=ecs_subnet_ids, | |
security_groups=[ecs_security_group.id], | |
), | |
load_balancers=[aws.ecs.ServiceLoadBalancerArgs( | |
target_group_arn=target_group.arn, | |
container_name=container_name, | |
container_port=container_port, | |
)], | |
opts=ResourceOptions(depends_on=[listener]), | |
) | |
If we want to run our process as a batch process that operates on a cron schedule, we need to create a Batch job definition. We can then kick off a job based on the job definition, which we will do in a later section.
The command argument depends on your Dockerfile. In world-series-projections-batch, our ENTRYPOINT is "python3", so our command is simply the name of our script. In world-series-projections-ecs, we use CMD rather than ENTRYPOINT. Our default command is ./run.sh, which fires up a gunicorn webserver that can communicate with our Flask app. However, we can override the default CMD in our job definition. We do this so we can run a job to retrain our model on a cron schedule. In this case, our job definition command becomes something like "python3 retrain.py", which overrides the default command in our Docker container. We perform the model serving and model training in the same repository to adhere to the Twelve Factor App, which states "..admin processes should be run in an identical environment as the regular long-running processes of the app. They run against a release, using the same codebase and config as any process run against that release. Admin code must ship with application code to avoid synchronization issues." This, in my mind, applies to model training. If you aren't familiar with the Twelve Factor App, I recommend reading through the entire site. It's quite, quite good.
import pulumi_aws as aws | |
def main(definition_name, command, image, memory, vcpus, policies_to_create, policies_to_attach, role_name, | |
account_number): | |
""" | |
Creates an AWS job definition that could be used to run a cron job. | |
:param definition_name: name of the job definition | |
:type definition_name: str | |
:param command: command to run in the form of a list, with each argument being a list element | |
:type command: str | |
:param image: URI of the image | |
:type image: str | |
:param memory: memory in MB | |
:type memory: int | |
:param vcpus: number of vcpus | |
:type vcpus: int | |
:param policies_to_create: name of policy to create and the string of the policy statement | |
:type policies_to_create: list of tuples, with the first item in the tuple being the name of the policy and the | |
second item is the actual policy statement | |
:param policies_to_attach: ARNs existing policies to attach to the role | |
:type policies_to_attach: list | |
:param role_name: name of the role to create | |
:type role_name: str | |
:param account_number: ownership account number | |
:type account_number: str | |
""" | |
role = aws.iam.Role( | |
role_name, | |
name=role_name, | |
assume_role_policy="""{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Action": "sts:AssumeRole", | |
"Principal": { | |
"Service": "ecs-tasks.amazonaws.com" | |
}, | |
"Effect": "Allow", | |
"Sid": "" | |
} | |
] | |
} | |
""" | |
) | |
policy_arns = [] | |
for policy in policies_to_create: | |
iam_policy = aws.iam.Policy( | |
policy[0], | |
name=policy[0], | |
policy=policy[1] | |
) | |
policy_arns.append(iam_policy.arn) | |
for index, policy_arn in enumerate(policy_arns + policies_to_attach): | |
attach = aws.iam.RolePolicyAttachment( | |
f'attached_{index}', | |
role=role.name, | |
policy_arn=policy_arn | |
) | |
job_definition = aws.batch.JobDefinition( | |
definition_name, | |
name=definition_name, | |
container_properties=f"""{{ | |
"command": {command}, | |
"image": "{image}", | |
"memory": {memory}, | |
"vcpus": {vcpus}, | |
"jobRoleArn": "arn:aws:iam::{account_number}:role/{role_name}" | |
}}""", | |
type="container" | |
) | |
We will clearly want to be able to update our application. We, therefore, need to create a CI / CD pipeline. When we push a code update to our main git branch, the change will go through the pipeline and be pushed into production, as long as the code passes our tests. Likewise, when a file is uploaded to an S3 bucket, the pipeline will also kick off. We do this so that we can retrain our model as a batch process and release the fresh model into production on a schedule.
Let's talk a bit more about the retraining process. In general, you can take the following stances when retraining a machine learning model.
I also want to differentiate between retraining a model and releasing a wholly new model. Retraining a model simply involves taking the original model parameters and fitting them with new data. Let's say the original model is a Random Forest with max_depth of 12 and max_features set to "sqrt". None of that changes. Only the data going into these parameters changes. Releasing a whole new model might involve releasing a Random Forest with a different max_depth or switching to a XGBoost model. I would submit such a process should generally not be done automatically. We usually want to get to know the behavior of our models - their strengths, weaknesses, and idiosyncrasies. As an example, we might get a noticeably different probability calibration between an ExtraTrees and a LightGBM. We want to be prepared for such a change. In our design, unless changing the model type requires new preprocessing code, we can train an entirely new model on our data science work station and manually upload it to the desired S3 location, which will release the new model into production. Subsequently, retrain.py should pick up the new model parameters for its runs. However, if our new model requires a preprocessing change, we might need to push the model change to S3, hit "reject" in the manual approval step, and then push the code change to CodeCommit. Some level of coordination cannot be avoided.
As mentioned, the design we have implemented is pretty straightforward: our model will retrain on whatever schedule we set. We could retrain multiple times a day if we wanted. As you might notice in the below pipeline, while we automatically deploy to staging we require a manual approval for a production deployment. Therefore, our fresh models will not be automatically released into production. We could arrange such a situation if we wanted. For one, we could remove the manual approval step, but keep in mind this would also apply to any code changes. This is palatable if we have a robust and well-verified test suite. A second option is to have two separate release pipelines, one for code and another for models. In each pipeline, we would still pull from both S3 and CodeCommit to ensure each release captures the most recent code and model. If we desired, we could have a manual approval step for the code change pipeline but not for the model updates pipeline. We're simply running the same pipeline with a slight change in execution logic based on what kicked off the pipeline. In my opinion, the single pipeline with a deep test suite is preferred due to its simplicity. We can automatically test the exact same items we could manually. Likewise, by deploying to staging first, we confirm our updates actually can be successfully deployed onto ECS; this provides another confident boost.
import pulumi_aws as aws | |
def main(codecommit_repository_name, codecommit_branch_name, codecommit_repo_arn, ecr_arn, build_role_name, | |
build_project_name, network_interface_region, network_interface_owner_id, private_subnet_arn, | |
build_s3_bucket_arn, build_vpc_id, private_subnet_id, build_security_group_id, pipeline_role_name, | |
pipeline_name, pipeline_s3_bucket_name, pipeline_policy_name, pipeline_s3_bucket_arn, stage_ecs_cluster_name, | |
stage_ecs_service_name, prod_ecs_cluster_name, prod_ecs_service_name, source_s3_bucket_arn, | |
source_s3_bucket_name, s3_object_key): | |
""" | |
Creates a CodePipeline to deploy an ECS application. The following structure is employed: | |
- GitHub source | |
- S3 source | |
- CodeBuild build | |
- Stage Deploy | |
- Manual Approval | |
- Prod Deploy | |
Appropriate roles and policies are created and used. | |
:param pipeline_role_name: name for the execution role to run the pipeline | |
:type pipeline_role_name: str | |
:param pipeline_name: name of the pipeline | |
:type pipeline_name: str | |
:param pipeline_s3_bucket_name: name of the s3 bucket | |
:type pipeline_s3_bucket_name: str | |
:param repository_name: name of the git repo | |
:type repository_name: str | |
:param branch_name: name of the git branch | |
:type branch_name: str | |
:param build_project_name: name of the build project | |
:type build_project_name: str | |
:param pipeline_policy_name: name for the pipeline policy | |
:type pipeline_policy_name: str | |
:param pipeline_s3_bucket_arn: arn of the s3 bucket | |
:type pipeline_s3_bucket_arn: str | |
:param stage_ecs_cluster_name: name of the staging ECS cluster | |
:type stage_ecs_cluster_name: str | |
:param stage_ecs_service_name: name of the staging ECS service | |
:type stage_ecs_service_name: str | |
:param prod_ecs_cluster_name: name of the production ECS cluster | |
:type prod_ecs_cluster_name: str | |
:param prod_ecs_service_name: name of the production ECS service | |
:type prod_ecs_service_name: str | |
:param source_s3_bucket_arn: arn of the S3 bucket used in the source step | |
:type source_s3_bucket_arn: str | |
:param source_s3_bucket_name: name of the S3 bucket used in the source step | |
:type source_s3_bucket_name: str | |
:param s3_object_key: name of the S3 file to trigger the pipeline | |
:type s3_object_key: str | |
""" | |
codebuild_role = aws.iam.Role(build_role_name, assume_role_policy="""{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Effect": "Allow", | |
"Principal": { | |
"Service": "codebuild.amazonaws.com" | |
}, | |
"Action": "sts:AssumeRole" | |
} | |
] | |
} | |
""") | |
role_policy = aws.iam.RolePolicy(f"{build_role_name}_role_policy", | |
role=codebuild_role.name, | |
policy=f"""{{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{{ | |
"Effect": "Allow", | |
"Resource": [ | |
"*" | |
], | |
"Action": [ | |
"logs:CreateLogGroup", | |
"logs:CreateLogStream", | |
"logs:PutLogEvents" | |
] | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": [ | |
"ec2:CreateNetworkInterface", | |
"ec2:DescribeDhcpOptions", | |
"ec2:DescribeNetworkInterfaces", | |
"ec2:DeleteNetworkInterface", | |
"ec2:DescribeSubnets", | |
"ec2:DescribeSecurityGroups", | |
"ec2:DescribeVpcs" | |
], | |
"Resource": "*" | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": [ | |
"ec2:CreateNetworkInterfacePermission" | |
], | |
"Resource": [ | |
"arn:aws:ec2:{network_interface_region}:{network_interface_owner_id}:network-interface/*" | |
], | |
"Condition": {{ | |
"StringEquals": {{ | |
"ec2:Subnet": [ | |
"{private_subnet_arn}" | |
], | |
"ec2:AuthorizedService": "codebuild.amazonaws.com" | |
}} | |
}} | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": [ | |
"*" | |
], | |
"Resource": [ | |
"{build_s3_bucket_arn}", | |
"{build_s3_bucket_arn}/*", | |
"{pipeline_s3_bucket_arn}", | |
"{pipeline_s3_bucket_arn}/*", | |
"{source_s3_bucket_arn}", | |
"{source_s3_bucket_arn}/*" | |
] | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": [ | |
"ecr:GetRegistryPolicy", | |
"ecr:DescribeRegistry", | |
"ecr:GetAuthorizationToken", | |
"ecr:DeleteRegistryPolicy", | |
"ecr:PutRegistryPolicy", | |
"ecr:PutReplicationConfiguration" | |
], | |
"Resource": [ | |
"*" | |
] | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": [ | |
"ecr:*" | |
], | |
"Resource": [ | |
"{ecr_arn}" | |
] | |
}} | |
] | |
}} | |
""") | |
codebuild_project = aws.codebuild.Project(build_project_name, | |
name=build_project_name, | |
description=f"{build_project_name}_codebuild_project", | |
build_timeout=15, | |
queued_timeout=15, | |
service_role=codebuild_role.arn, | |
artifacts=aws.codebuild.ProjectArtifactsArgs( | |
type="CODEPIPELINE", | |
), | |
environment=aws.codebuild.ProjectEnvironmentArgs( | |
compute_type="BUILD_GENERAL1_SMALL", | |
image="aws/codebuild/standard:3.0", | |
type="LINUX_CONTAINER", | |
image_pull_credentials_type="CODEBUILD", | |
privileged_mode=True | |
), | |
logs_config=aws.codebuild.ProjectLogsConfigArgs( | |
cloudwatch_logs=aws.codebuild.ProjectLogsConfigCloudwatchLogsArgs( | |
group_name="log-group", | |
stream_name="log-stream", | |
), | |
s3_logs=aws.codebuild.ProjectLogsConfigS3LogsArgs( | |
status="ENABLED", | |
location=build_s3_bucket_arn, | |
), | |
), | |
source=aws.codebuild.ProjectSourceArgs( | |
type="CODEPIPELINE", | |
), | |
vpc_config=aws.codebuild.ProjectVpcConfigArgs( | |
vpc_id=build_vpc_id, | |
subnets=[ | |
private_subnet_id, | |
], | |
security_group_ids=[ | |
build_security_group_id, | |
], | |
), | |
) | |
codepipeline_role = aws.iam.Role(pipeline_role_name, assume_role_policy="""{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Effect": "Allow", | |
"Principal": { | |
"Service": "codepipeline.amazonaws.com" | |
}, | |
"Action": "sts:AssumeRole" | |
} | |
] | |
} | |
""") | |
codepipeline = aws.codepipeline.Pipeline(pipeline_name, | |
name=pipeline_name, | |
role_arn=codepipeline_role.arn, | |
artifact_store=aws.codepipeline.PipelineArtifactStoreArgs( | |
location=pipeline_s3_bucket_name, | |
type="S3", | |
), | |
stages=[ | |
aws.codepipeline.PipelineStageArgs( | |
name="Source", | |
actions=[ | |
aws.codepipeline.PipelineStageActionArgs( | |
name="CodeSource", | |
category="Source", | |
owner="AWS", | |
provider="CodeCommit", | |
version="1", | |
output_artifacts=["SourceArtifact"], | |
namespace='SourceVariables', | |
run_order=1, | |
configuration={ | |
"RepositoryName": codecommit_repository_name, | |
"BranchName": codecommit_branch_name, | |
}, | |
), | |
aws.codepipeline.PipelineStageActionArgs( | |
name="FileSource", | |
category="Source", | |
owner="AWS", | |
provider="S3", | |
version="1", | |
output_artifacts=["file"], | |
run_order=1, | |
configuration={ | |
"S3Bucket": source_s3_bucket_name, | |
"S3ObjectKey": s3_object_key, | |
"PollForSourceChanges": "false", | |
}, | |
), | |
], | |
), | |
aws.codepipeline.PipelineStageArgs( | |
name="Build", | |
actions=[aws.codepipeline.PipelineStageActionArgs( | |
name="Build", | |
category="Build", | |
owner="AWS", | |
provider="CodeBuild", | |
input_artifacts=["SourceArtifact"], | |
output_artifacts=["BuildArtifact"], | |
namespace='BuildVariables', | |
version="1", | |
run_order=1, | |
configuration={ | |
"ProjectName": build_project_name, | |
}, | |
)], | |
), | |
aws.codepipeline.PipelineStageArgs( | |
name="StageDeploy", | |
actions=[aws.codepipeline.PipelineStageActionArgs( | |
name="StageDeploy", | |
category="Deploy", | |
owner="AWS", | |
provider="ECS", | |
input_artifacts=["BuildArtifact"], | |
version="1", | |
configuration={ | |
"ClusterName": stage_ecs_cluster_name, | |
"ServiceName": stage_ecs_service_name | |
}, | |
)], | |
), | |
aws.codepipeline.PipelineStageArgs( | |
name="ManualApproval", | |
actions=[aws.codepipeline.PipelineStageActionArgs( | |
name="ManualApproval", | |
category="Approval", | |
owner="AWS", | |
provider="Manual", | |
version="1", | |
)], | |
), | |
aws.codepipeline.PipelineStageArgs( | |
name="ProdDeploy", | |
actions=[aws.codepipeline.PipelineStageActionArgs( | |
name="ProdDeploy", | |
category="Deploy", | |
owner="AWS", | |
provider="ECS", | |
input_artifacts=["BuildArtifact"], | |
version="1", | |
configuration={ | |
"ClusterName": prod_ecs_cluster_name, | |
"ServiceName": prod_ecs_service_name, | |
}, | |
)], | |
), | |
]) | |
codepipeline_policy = aws.iam.RolePolicy(pipeline_policy_name, | |
role=codepipeline_role.id, | |
policy=f"""{{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{{ | |
"Effect": "Allow", | |
"Action": "*", | |
"Resource": [ | |
"{pipeline_s3_bucket_arn}", | |
"{pipeline_s3_bucket_arn}/*", | |
"{source_s3_bucket_arn}", | |
"{source_s3_bucket_arn}/*" | |
] | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": "*", | |
"Resource": "{codecommit_repo_arn}" | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": [ | |
"codebuild:BatchGetBuilds", | |
"codebuild:StartBuild" | |
], | |
"Resource": "*" | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": [ | |
"ecs:*" | |
], | |
"Resource": "*" | |
}}, | |
{{ | |
"Action": [ | |
"iam:PassRole" | |
], | |
"Effect": "Allow", | |
"Resource": "*", | |
"Condition": {{ | |
"StringLike": {{ | |
"iam:PassedToService": [ | |
"ecs-tasks.amazonaws.com" | |
] | |
}} | |
}} | |
}} | |
] | |
}}""" | |
) | |
As a last note, viewing the execution of a CodePipeline is one of the main motivations for having project-specific console accounts or granting data science users view ability to such resources. Data scientists may not create the CI/CD pipeline, but they will need to know if and when the runs succeed.
Our CI/CD pipeline for a batch job is a bit different. This pipeline simply needs to update our Docker image upon a code change.
import pulumi_aws as aws | |
def main(pipeline_role_name, pipeline_name, pipeline_s3_bucket_name, pipeline_policy_name, pipeline_s3_bucket_arn, | |
codecommit_repository_name, codecommit_branch_name, codecommit_repo_arn, build_project_name, build_role_name, | |
network_interface_region, network_interface_owner_id, private_subnet_arn, private_subnet_id, ecr_arn, | |
build_s3_bucket_arn, build_vpc_id, build_security_group_id): | |
""" | |
Creates a simple CodePipeline with two steps: Source and Build. The Source step is a CodeCommit repo. The Build | |
step is a CodeBuild project, which is also created in this function. | |
All argument types are strings. | |
:param pipeline_role_name: name of the IAM execution role for the CodePipeline | |
:param pipeline_name: name of the CodePipeline | |
:param pipeline_s3_bucket_name: name of the S3 bucket for CodePipeline artifacts | |
:param pipeline_policy_name: name of the CodePipeline IAM policy | |
:param pipeline_s3_bucket_arn: ARN of the S3 bucket for CodePipeline artifacts | |
:param codecommit_repository_name: name of the CodeCommit repo for the Source | |
:param codecommit_branch_name: name of the CodeCommit repo branch for the Source | |
:param codecommit_repo_arn: ARN of the CodeCommit repo | |
:param build_project_name: name of the CodeBuild project | |
:param build_role_name: name of the IAM execution role for CodeBuild | |
:param network_interface_region: region of the network interface | |
:param network_interface_owner_id: owner ID of the network interface | |
:param private_subnet_arn: ARN of the private subnet | |
:param private_subnet_id: ID of the private subnet | |
:param ecr_arn: ARN of the ECR repo | |
:param build_s3_bucket_arn: ARN of the CodeBuild S3 bucket | |
:param build_vpc_id: VPC for the CodeBuild project | |
:param build_security_group_id: security group for the CodeBuild project | |
""" | |
codebuild_role = aws.iam.Role(build_role_name, assume_role_policy="""{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Effect": "Allow", | |
"Principal": { | |
"Service": "codebuild.amazonaws.com" | |
}, | |
"Action": "sts:AssumeRole" | |
} | |
] | |
} | |
""") | |
role_policy = aws.iam.RolePolicy(f"{build_role_name}_role_policy", | |
role=codebuild_role.name, | |
policy=f"""{{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{{ | |
"Effect": "Allow", | |
"Resource": [ | |
"*" | |
], | |
"Action": [ | |
"logs:CreateLogGroup", | |
"logs:CreateLogStream", | |
"logs:PutLogEvents" | |
] | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": [ | |
"ec2:CreateNetworkInterface", | |
"ec2:DescribeDhcpOptions", | |
"ec2:DescribeNetworkInterfaces", | |
"ec2:DeleteNetworkInterface", | |
"ec2:DescribeSubnets", | |
"ec2:DescribeSecurityGroups", | |
"ec2:DescribeVpcs" | |
], | |
"Resource": "*" | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": [ | |
"ec2:CreateNetworkInterfacePermission" | |
], | |
"Resource": [ | |
"arn:aws:ec2:{network_interface_region}:{network_interface_owner_id}:network-interface/*" | |
], | |
"Condition": {{ | |
"StringEquals": {{ | |
"ec2:Subnet": [ | |
"{private_subnet_arn}" | |
], | |
"ec2:AuthorizedService": "codebuild.amazonaws.com" | |
}} | |
}} | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": [ | |
"*" | |
], | |
"Resource": [ | |
"{build_s3_bucket_arn}", | |
"{build_s3_bucket_arn}/*", | |
"{pipeline_s3_bucket_arn}", | |
"{pipeline_s3_bucket_arn}/*" | |
] | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": [ | |
"ecr:GetRegistryPolicy", | |
"ecr:DescribeRegistry", | |
"ecr:GetAuthorizationToken", | |
"ecr:DeleteRegistryPolicy", | |
"ecr:PutRegistryPolicy", | |
"ecr:PutReplicationConfiguration" | |
], | |
"Resource": [ | |
"*" | |
] | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": [ | |
"ecr:*" | |
], | |
"Resource": [ | |
"{ecr_arn}" | |
] | |
}} | |
] | |
}} | |
""") | |
codebuild_project = aws.codebuild.Project(build_project_name, | |
name=build_project_name, | |
description=f"{build_project_name}_codebuild_project", | |
build_timeout=15, | |
queued_timeout=15, | |
service_role=codebuild_role.arn, | |
artifacts=aws.codebuild.ProjectArtifactsArgs( | |
type="CODEPIPELINE", | |
), | |
environment = aws.codebuild.ProjectEnvironmentArgs( | |
compute_type="BUILD_GENERAL1_SMALL", | |
image="aws/codebuild/standard:3.0", | |
type="LINUX_CONTAINER", | |
image_pull_credentials_type="CODEBUILD", | |
privileged_mode=True | |
), | |
logs_config=aws.codebuild.ProjectLogsConfigArgs( | |
cloudwatch_logs=aws.codebuild.ProjectLogsConfigCloudwatchLogsArgs( | |
group_name="log-group", | |
stream_name="log-stream", | |
), | |
s3_logs=aws.codebuild.ProjectLogsConfigS3LogsArgs( | |
status="ENABLED", | |
location=build_s3_bucket_arn, | |
), | |
), | |
source=aws.codebuild.ProjectSourceArgs( | |
type="CODEPIPELINE", | |
), | |
vpc_config=aws.codebuild.ProjectVpcConfigArgs( | |
vpc_id=build_vpc_id, | |
subnets=[ | |
private_subnet_id, | |
], | |
security_group_ids=[ | |
build_security_group_id, | |
], | |
), | |
) | |
codepipeline_role = aws.iam.Role(pipeline_role_name, assume_role_policy="""{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Effect": "Allow", | |
"Principal": { | |
"Service": "codepipeline.amazonaws.com" | |
}, | |
"Action": "sts:AssumeRole" | |
} | |
] | |
} | |
""") | |
codepipeline = aws.codepipeline.Pipeline(pipeline_name, | |
name=pipeline_name, | |
role_arn=codepipeline_role.arn, | |
artifact_store=aws.codepipeline.PipelineArtifactStoreArgs( | |
location=pipeline_s3_bucket_name, | |
type="S3", | |
), | |
stages=[ | |
aws.codepipeline.PipelineStageArgs( | |
name="Source", | |
actions=[aws.codepipeline.PipelineStageActionArgs( | |
name="Source", | |
category="Source", | |
owner="AWS", | |
provider="CodeCommit", | |
version="1", | |
output_artifacts=["source_output"], | |
configuration={ | |
"RepositoryName": codecommit_repository_name, | |
"BranchName": codecommit_branch_name, | |
}, | |
)], | |
), | |
aws.codepipeline.PipelineStageArgs( | |
name="Build", | |
actions=[aws.codepipeline.PipelineStageActionArgs( | |
name="Build", | |
category="Build", | |
owner="AWS", | |
provider="CodeBuild", | |
input_artifacts=["source_output"], | |
output_artifacts=["build_output"], | |
version="1", | |
configuration={ | |
"ProjectName": build_project_name, | |
}, | |
)], | |
), | |
]) | |
codepipeline_policy = aws.iam.RolePolicy(pipeline_policy_name, | |
role=codepipeline_role.id, | |
policy=f"""{{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{{ | |
"Effect": "Allow", | |
"Action": "*", | |
"Resource": [ | |
"{pipeline_s3_bucket_arn}", | |
"{pipeline_s3_bucket_arn}/*" | |
] | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": [ | |
"codebuild:BatchGetBuilds", | |
"codebuild:StartBuild" | |
], | |
"Resource": "*" | |
}}, | |
{{ | |
"Effect": "Allow", | |
"Action": "*", | |
"Resource": "{codecommit_repo_arn}" | |
}} | |
] | |
}}""" | |
) | |
We can use Postman to send a sample post to our service to validate it works. Our payload simply needs to have "year" as the key and any year between 1905-2020 as an integer for the value.
Our REST API also includes a simple little HTML user interface. We can simply go to the interface endpoint, enter our credentials, and verify we can interact with the application.
We want to run our batch job on a schedule, for which we can use CloudWatch events. You can follow this tutorial to learn how to do so.
In conclusion, we can see AWS is incredibly powerful. Putting together a platform for data science work includes many company. Thankfully, using Python and infrastructure as code, we can automate much of it.
Finally, I used Pulumi to create this website! The script is below.
import mimetypes | |
import pulumi_aws as aws | |
import pulumi | |
from pulumi import FileAsset | |
from pulumi_aws import s3 | |
def main(bucket_name, index_html_path, aliases, certificate_arn, domain_name, hosted_zone_id, ip_address): | |
""" | |
Creates a static, single-page website accessible via a Route 53 DNS and protected with SSL. | |
:param bucket_name: name of the S3 bucket to host the html document | |
:param index_html_path: path to index.html | |
:param aliases: CloudFront aliases, which must include domain_name | |
:param certificate_arn: ARN of the SSL cert, which must be in us-east-1 | |
:param domain_name: domain name | |
:param hosted_zone_id: Route53 hosted zone ID | |
:param ip_address: IP address we want to have access | |
""" | |
web_bucket = s3.Bucket(bucket_name, | |
bucket=bucket_name, | |
website=s3.BucketWebsiteArgs( | |
index_document="index.html" | |
)) | |
mime_type, _ = mimetypes.guess_type(index_html_path) | |
obj = s3.BucketObject( | |
index_html_path, | |
bucket=web_bucket.id, | |
source=FileAsset(index_html_path), | |
content_type=mime_type | |
) | |
# can easily modify the ipset to accommodate multiple IP addresses | |
ipset = aws.waf.IpSet("ipset", | |
ip_set_descriptors=[aws.waf.IpSetIpSetDescriptorArgs( | |
type="IPV4", | |
value=ip_address, | |
)]) | |
wafrule = aws.waf.Rule("wafrule", | |
metric_name="WAFRule", | |
predicates=[aws.waf.RulePredicateArgs( | |
data_id=ipset.id, | |
negated=False, | |
type="IPMatch", | |
)], | |
opts=pulumi.ResourceOptions(depends_on=[ipset])) | |
waf_acl = aws.waf.WebAcl("wafAcl", | |
metric_name="WebACL", | |
default_action=aws.waf.WebAclDefaultActionArgs( | |
type="BLOCK", | |
), | |
rules=[aws.waf.WebAclRuleArgs( | |
action=aws.waf.WebAclRuleActionArgs( | |
type="ALLOW", | |
), | |
priority=1, | |
rule_id=wafrule.id, | |
type="REGULAR", | |
)], | |
opts=pulumi.ResourceOptions(depends_on=[ | |
ipset, | |
wafrule, | |
])) | |
oai = aws.cloudfront.OriginAccessIdentity(f"{bucket_name}_oai") | |
s3_distribution = aws.cloudfront.Distribution( | |
f'{bucket_name}_distribution', | |
origins=[aws.cloudfront.DistributionOriginArgs( | |
domain_name=web_bucket.bucket_regional_domain_name, | |
origin_id=f's3{bucket_name}_origin', | |
s3_origin_config=aws.cloudfront.DistributionOriginS3OriginConfigArgs( | |
origin_access_identity=oai.cloudfront_access_identity_path, | |
), | |
)], | |
enabled=True, | |
is_ipv6_enabled=False, | |
default_root_object="index.html", | |
aliases=aliases, | |
web_acl_id=waf_acl.id, | |
default_cache_behavior=aws.cloudfront.DistributionDefaultCacheBehaviorArgs( | |
allowed_methods=[ | |
"DELETE", | |
"GET", | |
"HEAD", | |
"OPTIONS", | |
"PATCH", | |
"POST", | |
"PUT", | |
], | |
cached_methods=[ | |
"GET", | |
"HEAD", | |
], | |
target_origin_id=f's3{bucket_name}_origin', | |
forwarded_values=aws.cloudfront.DistributionOrderedCacheBehaviorForwardedValuesArgs( | |
query_string=False, | |
headers=["Origin"], | |
cookies=aws.cloudfront.DistributionOrderedCacheBehaviorForwardedValuesCookiesArgs( | |
forward="none", | |
), | |
), | |
viewer_protocol_policy="redirect-to-https", | |
), | |
restrictions=aws.cloudfront.DistributionRestrictionsArgs( | |
geo_restriction=aws.cloudfront.DistributionRestrictionsGeoRestrictionArgs( | |
restriction_type="none", | |
), | |
), | |
viewer_certificate=aws.cloudfront.DistributionViewerCertificateArgs( | |
acm_certificate_arn=certificate_arn, | |
ssl_support_method='sni-only' | |
)) | |
# you might have to wrap oai.iam_arn in an f-string on the first run and then re-run with the original way. | |
# pulumi is a bit finicky | |
source = aws.iam.get_policy_document(statements=[ | |
aws.iam.GetPolicyDocumentStatementArgs( | |
actions=["s3:GetObject"], | |
resources=[f"arn:aws:s3:::{bucket_name}/*"], | |
principals=[ | |
aws.iam.GetPolicyDocumentStatementPrincipalArgs( | |
type='AWS', | |
identifiers=[oai.iam_arn] | |
), | |
] | |
), | |
aws.iam.GetPolicyDocumentStatementArgs( | |
actions=["s3:GetObject"], | |
resources=[f"arn:aws:s3:::{bucket_name}/*"], | |
principals=[ | |
aws.iam.GetPolicyDocumentStatementPrincipalArgs( | |
type='*', | |
identifiers=['*'] | |
), | |
], | |
conditions=[ | |
aws.iam.GetPolicyDocumentStatementConditionArgs( | |
test='IpAddress', | |
variable="aws:SourceIp", | |
values=[f'{ip_address}'] | |
) | |
] | |
) | |
] | |
) | |
web_bucket_name = web_bucket.id | |
bucket_policy = s3.BucketPolicy(f"{bucket_name}_bucket-policy", | |
bucket=web_bucket_name, | |
policy=source.json) | |
route_53_record = aws.route53.Record(domain_name, | |
zone_id=hosted_zone_id, | |
name=domain_name, | |
type="A", | |
aliases=[aws.route53.RecordAliasArgs( | |
name=s3_distribution.domain_name, | |
zone_id=s3_distribution.hosted_zone_id, | |
evaluate_target_health=False, | |
)] | |
) |