构建方案 I

接下来我们将:

  • 构建并运行一个Lambda来收集有关KMS密钥策略信息,这些数据来自一个AWS帐户
  • 将数据输出到一个S3存储桶(已创建)中的csv文件
  • 在Athena做ETL
  • 将数据加载到Quicksight
  • 构建一个QuicksightDashboard来可视化密钥策略和关注点

Dynamo DB

DynamoDB表为Lambda函数提供AWS账户信息。我们先导航到 DynamoDB选择"创建表”:

image-20240622113339112

对于表名,设置为"accounts”,对于分区键,设置为"accountId”

sort key为空, 保留默认的表设置, 然后单击"创建表”

image-20240622113510512

等待直到表创建完成。表创建完成后, 单击表名进去添加一条记录

image-20240622113617194

要添加的内容如下:

image-20240622113804509

我们现在已经为我们的账户创建了一个DynamoDB item, 以便Lambda可以获取账户数据并知道在哪些账户中运行

更新Lambda

Lambda函数封装了KMS密钥策略的分析。它读取它有权访问的所有KMS密钥策略, 对它们进行分析, 并将输出提供给S3。

导航到Lambda,选择名为"KMSRead"的函数。检查函数并识别需要更新的一个区域, 方法是查找TODO。我们可以在下面看到包含这个的部分代码:

    if account_ids:  # Did we find some accounts in the DynamoDB table?
        for account in account_ids:  # For each account we found do the following
            account_number = account["accountId"]
            logger.info(f"Processing Account Number: {account_number}")
            # Assume role into the target account
            session = getAssumedRoleSession(account_number, role_name="XA-KMSRead-Role")

            # If we successfully Assumed a role in the target account now loop round each region and get the KMS key data
            if session:
                for region in regions:
                    # TODO: CODE or PASTE the CODE IN HERE:
                    # 1. Create KMS client
                    # 2. Using the KMS client, query the KMS keys, aliases and policies and build into a single JSON object
                    # 3. Define a unique filename for the account number and region
                    # 4. Convert the JSON object to a csv. and store in the local Lambda filesystem
                    # 5. Push the file to S3
                    print(f"REPLACE THIS LINE WITH CODE FROM INSTRUCTIONS!")
            else:
                logger.error(f"Unable to create session for {account_number}")
    else:
        logger.error(f"No Account IDs found: account_ids={account_ids}")

添加以下代码:

                # 1. Create KMS client
                KMS = session.client("kms", region_name=region)
                # 2. Using the KMS client, query the KMS keys, aliases and policies and build into a single JSON object
                keyMap = getEverythingJson(KMS)
                # 3. Define a unique filename for the account number and region
                filename = account_number + "-" + region + "-KMS-details.csv"
                # 4. Convert the JSON object to a csv. and store in the local Lambda filesystem
                getEverythingToCSV(account_number, filename, keyMap, region)
                # 5. Push the file to S3
                pushToS3(filename, destination_s3_bucket)

删除语句print(f"REPLACE THIS LINE WITH CODE FROM INSTRUCTIONS!")

部署并测试:

image-20240622114434067

如果需要: 设置一个事件名称,例如Testevent#XX, 通过单击"测试"启动测试

检查Lambda是否运行成功, 导航到S3 ,检查是否有最近几分钟的csv.文件作为Lambda的输出:

image-20240622114946002

完成这一步后, 我们应该在S3存储桶中有一个包含密钥策略详细信息的csv.文件。csv内容如下:

image-20240622115058649

附:getEverythingJson代码

def getEverythingJson(kms):
    kms_keys = getKeys(kms)
    logger.debug(f"Keys: {kms_keys}")

    keyMap = {"kms_keys": []}

    for kms_key in kms_keys:
        logger.debug(f"Key: {kms_key}")

        # Instantiate kms_key_object
        kms_key_object = {}
        kms_key_object["KeyId"] = kms_key["KeyId"]
        kms_key_object["Aliases"] = []
        list_of_aliases = []
        list_of_policies = []
        logger.debug(f"KMS KEY OBJECT {kms_key_object}")

        keyid = kms_key["KeyId"]
        logger.debug(f"KeyId: {keyid}")

        kms_key_policies = getKeyPolicies(kms, keyid)
        logger.debug(f"Keys Policies: {kms_key_policies}")

        # kms_key_tags = getKeyPoliciesTags(kms,keyid)
        # logger.debug(f"Keys Tags: {kms_key_tags}")

        kms_aliases = getAliases(kms, keyid)
        logger.debug(f"Aliases: {kms_aliases}")

        if kms_aliases:
            logger.debug(f"KMS KeyId: {keyid} - Aliases: {kms_aliases}")
            for ind in range(len(kms_aliases)):
                logger.debug(kms_aliases[ind]["AliasName"])
                list_of_aliases.append(kms_aliases[ind]["AliasName"])

        if kms_key_policies:
            key_tag = getTag(kms, keyid)
            creation_date = getCreationDate(kms, keyid)
            for policy in kms_key_policies:
                key_policy = getPolicy(kms, keyid, policy)
                list_of_policies.append(key_policy)
                logger.debug(f"KMS KeyId: {keyid} - [{policy}] - policy: {key_policy}")

        logger.debug(f"Aliases: {list_of_aliases}")
        logger.debug(f"Policies: {list_of_policies}")

        kms_key_object["Aliases"] = list_of_aliases
        kms_key_object["Policies"] = list_of_policies
        kms_key_object["Tags"] = key_tag
        kms_key_object["CreationDate"] = creation_date

        keyMap["kms_keys"].append(kms_key_object)

    logger.debug(json.dumps(keyMap, indent=3))
    return keyMap