mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-21 16:01:15 -05:00
Add data endpoint for finding users by full or partial email
This commit is contained in:
@@ -97,6 +97,10 @@ def get_user_by_email(email):
|
||||
return User.query.filter(func.lower(User.email_address) == func.lower(email)).one()
|
||||
|
||||
|
||||
def get_users_by_partial_email(email):
|
||||
return User.query.filter(User.email_address.ilike("\%{}\%".format(email))).all()
|
||||
|
||||
|
||||
def increment_failed_login_count(user):
|
||||
user.failed_login_count += 1
|
||||
db.session.add(user)
|
||||
|
||||
@@ -564,8 +564,14 @@ class EmailDataSchema(ma.Schema):
|
||||
|
||||
email = fields.Str(required=True)
|
||||
|
||||
def __init__(self, partial_email=False):
|
||||
super().__init__()
|
||||
self.partial_email = partial_email
|
||||
|
||||
@validates('email')
|
||||
def validate_email(self, value):
|
||||
if self.partial_email:
|
||||
return
|
||||
try:
|
||||
validate_email_address(value)
|
||||
except InvalidEmailError as e:
|
||||
@@ -686,6 +692,7 @@ notification_with_personalisation_schema = NotificationWithPersonalisationSchema
|
||||
invited_user_schema = InvitedUserSchema()
|
||||
permission_schema = PermissionSchema()
|
||||
email_data_request_schema = EmailDataSchema()
|
||||
partial_email_data_request_schema = EmailDataSchema(partial_email=True)
|
||||
notifications_filter_schema = NotificationsFilterSchema()
|
||||
service_history_schema = ServiceHistorySchema()
|
||||
api_key_history_schema = ApiKeyHistorySchema()
|
||||
|
||||
@@ -16,6 +16,7 @@ from app.dao.users_dao import (
|
||||
increment_failed_login_count,
|
||||
reset_failed_login_count,
|
||||
get_user_by_email,
|
||||
get_users_by_partial_email,
|
||||
create_secret_code,
|
||||
save_user_attribute,
|
||||
update_user_password,
|
||||
@@ -32,6 +33,7 @@ from app.notifications.process_notifications import (
|
||||
)
|
||||
from app.schemas import (
|
||||
email_data_request_schema,
|
||||
partial_email_data_request_schema,
|
||||
create_user_schema,
|
||||
permission_schema,
|
||||
user_update_schema_load_json,
|
||||
@@ -356,6 +358,14 @@ def get_by_email():
|
||||
return jsonify(data=result)
|
||||
|
||||
|
||||
@user_blueprint.route('/find-users-by-email', methods=['POST'])
|
||||
def find_users_by_email():
|
||||
email, errors = partial_email_data_request_schema.load(request.get_json())
|
||||
fetched_users = get_users_by_partial_email(email['email'])
|
||||
result = [user.serialize() for user in fetched_users]
|
||||
return jsonify(data=result), 200
|
||||
|
||||
|
||||
@user_blueprint.route('/reset-password', methods=['POST'])
|
||||
def send_user_reset_password():
|
||||
email, errors = email_data_request_schema.load(request.get_json())
|
||||
|
||||
@@ -731,3 +731,71 @@ def test_get_orgs_and_services_only_shows_users_orgs_and_services(admin_request,
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
def test_find_users_by_email_finds_user_by_partial_email(notify_db, client):
|
||||
create_user(email='findel.mestro@foo.com')
|
||||
create_user(email='me.ignorra@foo.com')
|
||||
data = json.dumps({"email": "findel"})
|
||||
auth_header = create_authorization_header()
|
||||
|
||||
response = client.post(
|
||||
url_for("user.find_users_by_email"),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header]
|
||||
)
|
||||
users = json.loads(response.get_data(as_text=True))
|
||||
|
||||
assert response.status_code == 200
|
||||
assert len(users['data']) == 1
|
||||
assert users['data'][0]['email_address'] == 'findel.mestro@foo.com'
|
||||
|
||||
|
||||
def test_find_users_by_email_finds_user_by_full_email(notify_db, client):
|
||||
create_user(email='findel.mestro@foo.com')
|
||||
create_user(email='me.ignorra@foo.com')
|
||||
data = json.dumps({"email": "findel.mestro@foo.com"})
|
||||
auth_header = create_authorization_header()
|
||||
|
||||
response = client.post(
|
||||
url_for("user.find_users_by_email"),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header]
|
||||
)
|
||||
users = json.loads(response.get_data(as_text=True))
|
||||
|
||||
assert response.status_code == 200
|
||||
assert len(users['data']) == 1
|
||||
assert users['data'][0]['email_address'] == 'findel.mestro@foo.com'
|
||||
|
||||
|
||||
def test_find_users_by_email_handles_no_results(notify_db, client):
|
||||
create_user(email='findel.mestro@foo.com')
|
||||
create_user(email='me.ignorra@foo.com')
|
||||
data = json.dumps({"email": "rogue"})
|
||||
auth_header = create_authorization_header()
|
||||
|
||||
response = client.post(
|
||||
url_for("user.find_users_by_email"),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header]
|
||||
)
|
||||
users = json.loads(response.get_data(as_text=True))
|
||||
|
||||
assert response.status_code == 200
|
||||
assert users['data'] == []
|
||||
|
||||
|
||||
def test_search_for_users_by_email_handles_incorrect_data_format(notify_db, client):
|
||||
create_user(email='findel.mestro@foo.com')
|
||||
data = json.dumps({"email": 1})
|
||||
auth_header = create_authorization_header()
|
||||
|
||||
response = client.post(
|
||||
url_for("user.find_users_by_email"),
|
||||
data=data,
|
||||
headers=[('Content-Type', 'application/json'), auth_header]
|
||||
)
|
||||
|
||||
assert response.status_code == 400
|
||||
assert json.loads(response.get_data(as_text=True))['message'] == {'email': ['Not a valid string.']}
|
||||
|
||||
Reference in New Issue
Block a user