本节我们将使用python代码来获取IAM Roles Anywhere的Role,从而实现对AWS服务的访问
之前我们使用IAM Roles Anywhere credential helper
成功访问到S3, 并将它设置到一个profile里。在代码中也可以直接拿这个profile来用:
import boto3
boto3.setup_default_session(profile_name='workload-a')
s3 = boto3.client('s3')
response = s3.list_buckets()['Buckets']
for bucket in response:
print('Bucket name: {}, Created on: {}'.format(bucket['Name'], bucket['CreationDate']))
测试:
将下面代码保存为service.py
:
import boto3
import json
import sys, os, base64, datetime, hashlib
import requests
from cryptography import x509
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
def iam_ra_client():
duration_seconds = 3600 # From 900 to 3600
# Roles Anywhere Profile ARN
profile_arn = "<IAMRA_PROFILE_ARN>"
# IAM Role ARN
role_arn = "<IAM_ROLE_ARN>"
# Roles Anywhere Trust Anchor ARN
trust_anchor_arn = "<IAMRA_TRUST_ANCHOR_ARN>"
# Certificate file location
cert_pem_file = './' + "<CERTIFICATE_FILENAME>"
private_key_file = './' + "<PRIVATE_KEY_FILENAME>"
# Certificate data from files.
cert_pem_data = open(cert_pem_file, 'r').read()
private_key_data = open(private_key_file, 'r').read()
pass_phrase = ""
session_name = 'assume_role_session'
# AWS Region
region = profile_arn.split(":")[3]
# ************* REQUEST VALUES *************
method = 'POST'
service = 'rolesanywhere'
host = '{}.{}.amazonaws.com'.format(service, region)
endpoint = 'https://{}'.format(host)
# POST requests use a content type header.
content_type = 'application/json'
# Create a date for headers and the credential string
today = datetime.datetime.utcnow()
amz_date = today.strftime('%Y%m%dT%H%M%SZ')
# Date w/o time, used in credential scope
date_stamp = today.strftime('%Y%m%d')
# Gererate payload.
payload = json.dumps({
"durationSeconds": duration_seconds,
"profileArn": profile_arn,
"roleArn": role_arn,
"sessionName": session_name,
"trustAnchorArn": trust_anchor_arn
})
# Load public certificate
cert = x509.load_pem_x509_certificate(cert_pem_data.strip().encode("utf-8"))
# Load private key
try:
private_key = serialization.load_pem_private_key(private_key_data.strip().encode("utf-8"), None)
except:
print('encrypted')
try:
private_key = serialization.load_pem_private_key(private_key_data.strip().encode("utf-8"), \
password=str.encode(pass_phrase))
except:
print('wrong passphrase')
# X509 bash64 encoded DER data
amz_x509 = str(base64.b64encode(cert.public_bytes(encoding=serialization.Encoding.DER)), 'utf-8')
# Public certificate serial number
ca_serial_number = str(cert.serial_number)
# ************* TASK 1: CREATE A CANONICAL REQUEST *************
# http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
# Step 1 is to define the verb (GET, POST, etc.)--already done.
# Step 2: Create canonical URI--the part of the URI from domain to query
# string (use '/' if no path)
canonical_uri = '/sessions'
## Step 3: Create the canonical query string. In this example, request
# parameters are passed in the body of the request and the query string
# is blank.
canonical_querystring = ''
# Step 4: Create the canonical headers. Header names must be trimmed
# and lowercase, and sorted in code point order from low to high.
# Note that there is a trailing \n.
canonical_headers = 'content-type:' + content_type + '\n' + 'host:' + host + '\n' + 'x-amz-date:' \
+ amz_date + '\n' + 'x-amz-x509:' + amz_x509 + '\n'
# Step 5: Create the list of signed headers. This lists the headers
# in the canonical_headers list, delimited with ";" and in alpha order.
# Note: The request can include any headers; canonical_headers and
# signed_headers include those that you want to be included in the
# hash of the request. "Host" and "x-amz-date" are always required.
# For Roles Anywhere, content-type and x-amz-x509 are also required.
signed_headers = 'content-type;host;x-amz-date;x-amz-x509'
# Step 6: Create payload hash. In this example, the payload (body of
# the request) contains the request parameters.
payload_hash = hashlib.sha256(payload.encode('utf-8')).hexdigest()
# Step 7: Combine elements to create canonical request
canonical_request = method + '\n' + canonical_uri + '\n' + canonical_querystring + '\n' + \
canonical_headers + '\n' + signed_headers + '\n' + payload_hash
# ************* TASK 2: CREATE THE STRING TO SIGN*************
# Match the algorithm to the hashing algorithm you use, SHA-256
algorithm = 'AWS4-X509-RSA-SHA256'
credential_scope = date_stamp + '/' + region + '/' + service + '/' + 'aws4_request'
string_to_sign = algorithm + '\n' + amz_date + '\n' + credential_scope + '\n' + \
hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()
# ************* TASK 3: CALCULATE THE SIGNATURE *************
# Sign the string_to_sign using the private_key and hex encode
signature = private_key.sign(
data=string_to_sign.encode('utf-8'),
padding=padding.PKCS1v15(),
algorithm=hashes.SHA256()
)
signature_hex = signature.hex()
# ************* TASK 4: ADD SIGNING INFORMATION TO THE REQUEST *************
# Put the signature information in a header named Authorization.
authorization_header = algorithm + ' ' + 'Credential=' + ca_serial_number + '/' + credential_scope + ', ' \
+ 'SignedHeaders=' + signed_headers + ', ' + 'Signature=' + signature_hex
# For Roles Anywhere, the request MUST include "host", "x-amz-date",
# "x-amz-x509", "content-type", and "Authorization". Except for the authorization
# header, the headers must be included in the canonical_headers and signed_headers values, as
# noted earlier. Order here is not significant.
# # Python note: The 'host' header is added automatically by the Python 'requests' library.
headers = {'Content-Type': content_type,
'X-Amz-Date': amz_date,
'X-Amz-X509': amz_x509,
'Authorization': authorization_header}
# ************* SEND THE REQUEST *************
print('\nBEGIN REQUEST++++++++++++++++++++++++++++++++++++')
print('Request URL = ' + endpoint)
r = requests.post(endpoint + canonical_uri, data=payload, headers=headers)
response_text_json = json.loads(r.text)
print('\nRESPONSE++++++++++++++++++++++++++++++++++++')
print('Response code: %d\n' % r.status_code)
ACCESS_KEY = response_text_json['credentialSet'][0]['credentials']['accessKeyId']
SECRET_KEY = response_text_json['credentialSet'][0]['credentials']['secretAccessKey']
SESSION_TOKEN = response_text_json['credentialSet'][0]['credentials']['sessionToken']
print(ACCESS_KEY)
print(SECRET_KEY)
client = boto3.client('s3',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
aws_session_token=SESSION_TOKEN)
return client
s3 = iam_ra_client()
response = s3.list_buckets()['Buckets']
for bucket in response:
print('Bucket name: {}, Created on: {}'.format(bucket['Name'], bucket['CreationDate']))
将第11行到22行的五个变量进行替换,例如:
def iam_ra_client():
duration_seconds = 3600 # From 900 to 3600
# Roles Anywhere Profile ARN
profile_arn = "arn:aws:rolesanywhere:us-east-1:145197526627:profile/a8998516-f3a5-41bc-8a5e-592b3ced7583"
# IAM Role ARN
role_arn = "arn:aws:iam::145197526627:role/ra-access-s3"
# Roles Anywhere Trust Anchor ARN
trust_anchor_arn = "arn:aws:rolesanywhere:us-east-1:145197526627:trust-anchor/4c079004-3191-4ed4-9cef-a62d3dd3a6d2"
# Certificate file location
cert_pem_file = './' + "workload-a.iamra.test_cert.pem"
private_key_file = './' + "workload-a.iamra.test_private_u.pem"
安装依赖:
pip3 install cryptography boto3
运行代码: