目的

boto3 ラブライブを利用して、AWS Coginit での操作 Utitl 作成方法を共有します。

実装

# -*- coding: utf-8 -*-
import secrets
import string
import boto3

USERPOOL_ID = 'USERPOOL_ID'
client = boto3.client('cognito-idp')

def show_user_list():
    """Cognitoおよびエンティティからユーザリストを取得する(最大60名a)
    Returns:
        ユーザ情報(ユーザーID、ユーザー名、状態、グループ権限)
    """
    def _user_list(pagination_token=None):
        """ Cognitoユーザーリストの再帰取得
        Args:
            pagination_token: 前回呼び出しトークン

        Returns:
            ユーザ情報(ユーザーID、ユーザー名、状態、グループ権限)
        """
        if pagination_token is None:
            response = client.list_users(
                UserPoolId=USERPOOL_ID,
            )
        else:
            response = client.list_users(
                UserPoolId=USERPOOL_ID,
                PaginationToken=pagination_token,
            )
        for user in response['Users']:
            record = {}
            attrs = [attr['Value'] for attr in user['Attributes'] if attr['Name'] == 'name']
            record.update(
                user_id=user['Username'],
                user_name=attrs[0],
                status=user['Enabled']
            )
            groups = client.admin_list_groups_for_user(
                UserPoolId=USERPOOL_ID,
                Username=user['Username']
            )
            record.update(
                authority=groups['Groups'][0]['GroupName'],
            )
            yield record

        if 'PaginationToken' in response:
            yield from _user_list(response.get('PaginationToken'))

    try:
        yield from _user_list()
    except Exception:
        raise


def add_user(username, userid, user_auth, status, password):
    """Cognitoおよびエンティティにユーザ情報を登録する。
    Args:
        username: Cognitoの標準属性name
        userid: ユーザID
        user_auth: ユーザグループ(admin,referrer,updaterより選択)
        status: ユーザの有効(True)、無効(False)を指定
        password: ユーザパスワード
	Returns:
        処理結果(True/False)
    """
    try:
        # ユーザ作成
        client.admin_create_user(
            UserPoolId=USERPOOL_ID,
            Username=userid,
            UserAttributes=[
                {
                    'Name': 'name',
                    'Value': username,
                },
            ],
            TemporaryPassword=password,
            MessageAction='SUPPRESS',
        )

        # パスワード適用(アカウントステータスをCONFIRMEDに変更)
        client.admin_set_user_password(
            UserPoolId=USERPOOL_ID,
            Username=userid,
            Password=password,
            Permanent=True,
        )

        # グループ追加
        client.admin_add_user_to_group(
            UserPoolId=USERPOOL_ID,
            Username=userid,
            GroupName=user_auth,
        )

        # ユーザステータス変更
        if not status:
            client.admin_disable_user(
                UserPoolId=USERPOOL_ID,
                Username=userid,
            )
    except Exception:
        return False

    return True


def modify_user(event, username, userid, user_auth, status):
    """Cognitoおよびエンティティのユーザ情報を更新する。
    Args:
        event: イベントオブジェクト
        username: Cognitoの標準属性name
        userid: ユーザID
        user_auth: ユーザグループ(admin,referrer,updaterより選択)
        status: ユーザの有効(True)、無効(False)を指定
    Returns:
        処理結果(True/False)
    """
    try:
        client.admin_update_user_attributes(
            UserPoolId=USERPOOL_ID,
            Username=userid,
            UserAttributes=[
                {
                    'Name': 'name',
                    'Value': username,
                },
            ]
        )

        # グループ追加
        client.admin_add_user_to_group(
            UserPoolId=USERPOOL_ID,
            Username=userid,
            GroupName=user_auth,
        )

        # ユーザステータス変更
        if not status:
            client.admin_disable_user(
                UserPoolId=USERPOOL_ID,
                Username=userid,
            )
        else:
            client.admin_enable_user(
                UserPoolId=USERPOOL_ID,
                Username=userid,
            )
    except Exception:
        return False

    return True


def create_random_password(number_digits):
    """ユーザ登録用のランダムパスワードを生成する。
    Args:
        number_digits:パスワード桁数
    Returns:
        生成されたランダムパスワード(大文字英字、小文字英字、数字を必ず含む)
    """
    while True:
        punctuation = '^$*.[]{}()?-"!@#%&/\\,><\':;|_~`'  # 特殊文字の要件に'+='が含まれていないためstring.punctuationは未使用
        words = string.ascii_letters + string.digits + punctuation
        password = ''.join(secrets.choice(words) for i in range(number_digits))
        if (any(c.isdigit() for c in password)
                and any(c.isupper() for c in password)
                and any(c.islower() for c in password)
                and any((c in punctuation) for c in password)):
            break
    return password


def delete_disable_user():
    """
    Cognitoで無効(Disable)状態のユーザを削除する。
    Returns:
        削除したユーザのリスト
    """
    def _del_user_name_list(pagination_token=None):
        """ Cognitoの無効ユーザーリストの再帰取得
        Args:
            pagination_token: 前回呼び出しトークン

        Returns:client
            ユーザ情報(ユーザーID)
        """
        if pagination_token is None:
            response = client.list_users(
                UserPoolId=USERPOOL_ID,
            )
        else:
            response = client.list_users(
                UserPoolId=USERPOOL_ID,
                PaginationToken=pagination_token,
            )
        for user in response['Users']:
            if not user['Enabled']:
                yield user['Username']

        if 'PaginationToken' in response:
            yield from _del_user_name_list(response.get('PaginationToken'))

    try:
        user_name_list = _del_user_name_list()
        for user_name in user_name_list:
            client.admin_delete_user(
                UserPoolId=USERPOOL_ID,
                Username=user_name,
            )
    except Exception:
        raise


def set_user_password(user_id, password):
    """Cognitoユーザーパスワードを更新する。
    Args:
        user_id: ユーザID
        password: ユーザパスワード
    Returns:
        処理結果(True/False)
    """

    try:
        # パスワード適用
        client.admin_set_user_password(
            UserPoolId=USERPOOL_ID,
            Username=user_id,
            Password=password,
            Permanent=True,
        )
    except Exception:
        return False

    return True


def change_password(old_password, new_password, access_token):
    """指定されたユーザーIDのパスワードを変更する。
    Args:
        old_password: 旧パスワード
        new_password: 新パスワード
        access_token: アクセストークン
    """
    try:
        client.change_password(
            PreviousPassword=old_password,
            ProposedPassword=new_password,
            AccessToken=access_token,
        )
    except Exception:
        raise


def get_group_list():
    """ユーザープールに登録されているすべてのグループの名前を一覧で取得する。
    Returns:
        グループ名(リスト)
    """
    def _group_list(next_token=None):
        if next_token is None:
            response = client.list_groups(
                UserPoolId=USERPOOL_ID,
            )
        else:
            response = client.list_groups(
                UserPoolId=USERPOOL_ID,
                NextToken=next_token,
            )
        for group in response['Groups']:
            yield group['GroupName']

        if response.get('NextToken'):
            yield from _group_list(response.get('NextToken'))

    try:
        yield from _group_list()
    except Exception:
        raise

参考