目的
boto3 ラブライブを利用して、AWS Coginit での操作 Utitl 作成方法を共有します。
実装
# -*- coding: utf-8 -*-
import secrets
import string
import boto3
USERPOOL_ID = 'USERPOOL_ID'
client = boto3.client('cognito-idp')
def show_user_list():
"""Cognitoおよびエンティティからユーザリストを取得する(最大60名a)
Returns:
ユーザ情報(ユーザーID、ユーザー名、状態、グループ権限)
"""
def _user_list(pagination_token=None):
""" Cognitoユーザーリストの再帰取得
Args:
pagination_token: 前回呼び出しトークン
Returns:
ユーザ情報(ユーザーID、ユーザー名、状態、グループ権限)
"""
if pagination_token is None:
response = client.list_users(
UserPoolId=USERPOOL_ID,
)
else:
response = client.list_users(
UserPoolId=USERPOOL_ID,
PaginationToken=pagination_token,
)
for user in response['Users']:
record = {}
attrs = [attr['Value'] for attr in user['Attributes'] if attr['Name'] == 'name']
record.update(
user_id=user['Username'],
user_name=attrs[0],
status=user['Enabled']
)
groups = client.admin_list_groups_for_user(
UserPoolId=USERPOOL_ID,
Username=user['Username']
)
record.update(
authority=groups['Groups'][0]['GroupName'],
)
yield record
if 'PaginationToken' in response:
yield from _user_list(response.get('PaginationToken'))
try:
yield from _user_list()
except Exception:
raise
def add_user(username, userid, user_auth, status, password):
"""Cognitoおよびエンティティにユーザ情報を登録する。
Args:
username: Cognitoの標準属性name
userid: ユーザID
user_auth: ユーザグループ(admin,referrer,updaterより選択)
status: ユーザの有効(True)、無効(False)を指定
password: ユーザパスワード
Returns:
処理結果(True/False)
"""
try:
# ユーザ作成
client.admin_create_user(
UserPoolId=USERPOOL_ID,
Username=userid,
UserAttributes=[
{
'Name': 'name',
'Value': username,
},
],
TemporaryPassword=password,
MessageAction='SUPPRESS',
)
# パスワード適用(アカウントステータスをCONFIRMEDに変更)
client.admin_set_user_password(
UserPoolId=USERPOOL_ID,
Username=userid,
Password=password,
Permanent=True,
)
# グループ追加
client.admin_add_user_to_group(
UserPoolId=USERPOOL_ID,
Username=userid,
GroupName=user_auth,
)
# ユーザステータス変更
if not status:
client.admin_disable_user(
UserPoolId=USERPOOL_ID,
Username=userid,
)
except Exception:
return False
return True
def modify_user(event, username, userid, user_auth, status):
"""Cognitoおよびエンティティのユーザ情報を更新する。
Args:
event: イベントオブジェクト
username: Cognitoの標準属性name
userid: ユーザID
user_auth: ユーザグループ(admin,referrer,updaterより選択)
status: ユーザの有効(True)、無効(False)を指定
Returns:
処理結果(True/False)
"""
try:
client.admin_update_user_attributes(
UserPoolId=USERPOOL_ID,
Username=userid,
UserAttributes=[
{
'Name': 'name',
'Value': username,
},
]
)
# グループ追加
client.admin_add_user_to_group(
UserPoolId=USERPOOL_ID,
Username=userid,
GroupName=user_auth,
)
# ユーザステータス変更
if not status:
client.admin_disable_user(
UserPoolId=USERPOOL_ID,
Username=userid,
)
else:
client.admin_enable_user(
UserPoolId=USERPOOL_ID,
Username=userid,
)
except Exception:
return False
return True
def create_random_password(number_digits):
"""ユーザ登録用のランダムパスワードを生成する。
Args:
number_digits:パスワード桁数
Returns:
生成されたランダムパスワード(大文字英字、小文字英字、数字を必ず含む)
"""
while True:
punctuation = '^$*.[]{}()?-"!@#%&/\\,><\':;|_~`' # 特殊文字の要件に'+='が含まれていないためstring.punctuationは未使用
words = string.ascii_letters + string.digits + punctuation
password = ''.join(secrets.choice(words) for i in range(number_digits))
if (any(c.isdigit() for c in password)
and any(c.isupper() for c in password)
and any(c.islower() for c in password)
and any((c in punctuation) for c in password)):
break
return password
def delete_disable_user():
"""
Cognitoで無効(Disable)状態のユーザを削除する。
Returns:
削除したユーザのリスト
"""
def _del_user_name_list(pagination_token=None):
""" Cognitoの無効ユーザーリストの再帰取得
Args:
pagination_token: 前回呼び出しトークン
Returns:client
ユーザ情報(ユーザーID)
"""
if pagination_token is None:
response = client.list_users(
UserPoolId=USERPOOL_ID,
)
else:
response = client.list_users(
UserPoolId=USERPOOL_ID,
PaginationToken=pagination_token,
)
for user in response['Users']:
if not user['Enabled']:
yield user['Username']
if 'PaginationToken' in response:
yield from _del_user_name_list(response.get('PaginationToken'))
try:
user_name_list = _del_user_name_list()
for user_name in user_name_list:
client.admin_delete_user(
UserPoolId=USERPOOL_ID,
Username=user_name,
)
except Exception:
raise
def set_user_password(user_id, password):
"""Cognitoユーザーパスワードを更新する。
Args:
user_id: ユーザID
password: ユーザパスワード
Returns:
処理結果(True/False)
"""
try:
# パスワード適用
client.admin_set_user_password(
UserPoolId=USERPOOL_ID,
Username=user_id,
Password=password,
Permanent=True,
)
except Exception:
return False
return True
def change_password(old_password, new_password, access_token):
"""指定されたユーザーIDのパスワードを変更する。
Args:
old_password: 旧パスワード
new_password: 新パスワード
access_token: アクセストークン
"""
try:
client.change_password(
PreviousPassword=old_password,
ProposedPassword=new_password,
AccessToken=access_token,
)
except Exception:
raise
def get_group_list():
"""ユーザープールに登録されているすべてのグループの名前を一覧で取得する。
Returns:
グループ名(リスト)
"""
def _group_list(next_token=None):
if next_token is None:
response = client.list_groups(
UserPoolId=USERPOOL_ID,
)
else:
response = client.list_groups(
UserPoolId=USERPOOL_ID,
NextToken=next_token,
)
for group in response['Groups']:
yield group['GroupName']
if response.get('NextToken'):
yield from _group_list(response.get('NextToken'))
try:
yield from _group_list()
except Exception:
raise
参考
CognitoIdentityProvider - Boto3 1.35.44 documentation
boto3.amazonaws.com