接下来我们将:
DynamoDB表为Lambda函数提供AWS账户信息。我们先导航到 DynamoDB选择"创建表”:
对于表名,设置为"accounts”,对于分区键,设置为"accountId”
sort key为空, 保留默认的表设置, 然后单击"创建表”
等待直到表创建完成。表创建完成后, 单击表名进去添加一条记录
要添加的内容如下:
我们现在已经为我们的账户创建了一个DynamoDB item, 以便Lambda可以获取账户数据并知道在哪些账户中运行
Lambda函数封装了KMS密钥策略的分析。它读取它有权访问的所有KMS密钥策略, 对它们进行分析, 并将输出提供给S3。
导航到Lambda,选择名为"KMSRead"的函数。检查函数并识别需要更新的一个区域, 方法是查找TODO
。我们可以在下面看到包含这个的部分代码:
if account_ids: # Did we find some accounts in the DynamoDB table?
for account in account_ids: # For each account we found do the following
account_number = account["accountId"]
logger.info(f"Processing Account Number: {account_number}")
# Assume role into the target account
session = getAssumedRoleSession(account_number, role_name="XA-KMSRead-Role")
# If we successfully Assumed a role in the target account now loop round each region and get the KMS key data
if session:
for region in regions:
# TODO: CODE or PASTE the CODE IN HERE:
# 1. Create KMS client
# 2. Using the KMS client, query the KMS keys, aliases and policies and build into a single JSON object
# 3. Define a unique filename for the account number and region
# 4. Convert the JSON object to a csv. and store in the local Lambda filesystem
# 5. Push the file to S3
print(f"REPLACE THIS LINE WITH CODE FROM INSTRUCTIONS!")
else:
logger.error(f"Unable to create session for {account_number}")
else:
logger.error(f"No Account IDs found: account_ids={account_ids}")
添加以下代码:
# 1. Create KMS client
KMS = session.client("kms", region_name=region)
# 2. Using the KMS client, query the KMS keys, aliases and policies and build into a single JSON object
keyMap = getEverythingJson(KMS)
# 3. Define a unique filename for the account number and region
filename = account_number + "-" + region + "-KMS-details.csv"
# 4. Convert the JSON object to a csv. and store in the local Lambda filesystem
getEverythingToCSV(account_number, filename, keyMap, region)
# 5. Push the file to S3
pushToS3(filename, destination_s3_bucket)
删除语句print(f"REPLACE THIS LINE WITH CODE FROM INSTRUCTIONS!")
部署并测试:
如果需要: 设置一个事件名称,例如Testevent#XX
, 通过单击"测试"启动测试
检查Lambda是否运行成功, 导航到S3 ,检查是否有最近几分钟的csv.文件作为Lambda的输出:
完成这一步后, 我们应该在S3存储桶中有一个包含密钥策略详细信息的csv.文件。csv内容如下:
def getEverythingJson(kms):
kms_keys = getKeys(kms)
logger.debug(f"Keys: {kms_keys}")
keyMap = {"kms_keys": []}
for kms_key in kms_keys:
logger.debug(f"Key: {kms_key}")
# Instantiate kms_key_object
kms_key_object = {}
kms_key_object["KeyId"] = kms_key["KeyId"]
kms_key_object["Aliases"] = []
list_of_aliases = []
list_of_policies = []
logger.debug(f"KMS KEY OBJECT {kms_key_object}")
keyid = kms_key["KeyId"]
logger.debug(f"KeyId: {keyid}")
kms_key_policies = getKeyPolicies(kms, keyid)
logger.debug(f"Keys Policies: {kms_key_policies}")
# kms_key_tags = getKeyPoliciesTags(kms,keyid)
# logger.debug(f"Keys Tags: {kms_key_tags}")
kms_aliases = getAliases(kms, keyid)
logger.debug(f"Aliases: {kms_aliases}")
if kms_aliases:
logger.debug(f"KMS KeyId: {keyid} - Aliases: {kms_aliases}")
for ind in range(len(kms_aliases)):
logger.debug(kms_aliases[ind]["AliasName"])
list_of_aliases.append(kms_aliases[ind]["AliasName"])
if kms_key_policies:
key_tag = getTag(kms, keyid)
creation_date = getCreationDate(kms, keyid)
for policy in kms_key_policies:
key_policy = getPolicy(kms, keyid, policy)
list_of_policies.append(key_policy)
logger.debug(f"KMS KeyId: {keyid} - [{policy}] - policy: {key_policy}")
logger.debug(f"Aliases: {list_of_aliases}")
logger.debug(f"Policies: {list_of_policies}")
kms_key_object["Aliases"] = list_of_aliases
kms_key_object["Policies"] = list_of_policies
kms_key_object["Tags"] = key_tag
kms_key_object["CreationDate"] = creation_date
keyMap["kms_keys"].append(kms_key_object)
logger.debug(json.dumps(keyMap, indent=3))
return keyMap