diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 699695ef1..8eac58fc4 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -22,7 +22,7 @@ from sqlalchemy.sql import functions from notifications_utils.international_billing_rates import INTERNATIONAL_BILLING_RATES from app import db, create_uuid -from app.utils import midnight_n_days_ago +from app.utils import midnight_n_days_ago, escape_special_characters from app.errors import InvalidRequest from app.models import ( Notification, @@ -452,11 +452,7 @@ def dao_get_notifications_by_to_field(service_id, search_term, notification_type else: raise InvalidRequest("Only email and SMS can use search by recipient", 400) - for special_character in ('\\', '_', '%', '/'): - normalised = normalised.replace( - special_character, - '\{}'.format(special_character) - ) + normalised = escape_special_characters(normalised) filters = [ Notification.service_id == service_id, diff --git a/app/dao/users_dao.py b/app/dao/users_dao.py index 50113bbf6..f2965d3a2 100644 --- a/app/dao/users_dao.py +++ b/app/dao/users_dao.py @@ -6,6 +6,7 @@ from sqlalchemy.orm import joinedload from app import db from app.models import (User, VerifyCode) +from app.utils import escape_special_characters def _remove_values_for_keys_if_present(dict, keys): @@ -97,6 +98,11 @@ def get_user_by_email(email): return User.query.filter(func.lower(User.email_address) == func.lower(email)).one() +def get_users_by_partial_email(email): + email = escape_special_characters(email) + return User.query.filter(User.email_address.ilike("%{}%".format(email))).all() + + def increment_failed_login_count(user): user.failed_login_count += 1 db.session.add(user) diff --git a/app/schemas.py b/app/schemas.py index 31b28c32a..de226f215 100644 --- a/app/schemas.py +++ b/app/schemas.py @@ -564,8 +564,14 @@ class EmailDataSchema(ma.Schema): email = fields.Str(required=True) + def __init__(self, partial_email=False): + super().__init__() + self.partial_email = partial_email + @validates('email') def validate_email(self, value): + if self.partial_email: + return try: validate_email_address(value) except InvalidEmailError as e: @@ -686,6 +692,7 @@ notification_with_personalisation_schema = NotificationWithPersonalisationSchema invited_user_schema = InvitedUserSchema() permission_schema = PermissionSchema() email_data_request_schema = EmailDataSchema() +partial_email_data_request_schema = EmailDataSchema(partial_email=True) notifications_filter_schema = NotificationsFilterSchema() service_history_schema = ServiceHistorySchema() api_key_history_schema = ApiKeyHistorySchema() diff --git a/app/user/rest.py b/app/user/rest.py index 646c21239..cfce893a6 100644 --- a/app/user/rest.py +++ b/app/user/rest.py @@ -16,6 +16,7 @@ from app.dao.users_dao import ( increment_failed_login_count, reset_failed_login_count, get_user_by_email, + get_users_by_partial_email, create_secret_code, save_user_attribute, update_user_password, @@ -32,6 +33,7 @@ from app.notifications.process_notifications import ( ) from app.schemas import ( email_data_request_schema, + partial_email_data_request_schema, create_user_schema, permission_schema, user_update_schema_load_json, @@ -356,6 +358,14 @@ def get_by_email(): return jsonify(data=result) +@user_blueprint.route('/find-users-by-email', methods=['POST']) +def find_users_by_email(): + email, errors = partial_email_data_request_schema.load(request.get_json()) + fetched_users = get_users_by_partial_email(email['email']) + result = [user.serialize() for user in fetched_users] + return jsonify(data=result), 200 + + @user_blueprint.route('/reset-password', methods=['POST']) def send_user_reset_password(): email, errors = email_data_request_schema.load(request.get_json()) diff --git a/app/utils.py b/app/utils.py index 71468e135..3ceeed2e6 100644 --- a/app/utils.py +++ b/app/utils.py @@ -115,3 +115,12 @@ def last_n_days(limit_days): # reverse the countdown, -1 from first two args to ensure it stays 0-indexed for x in range(limit_days - 1, -1, -1) ] + + +def escape_special_characters(string): + for special_character in ('\\', '_', '%', '/'): + string = string.replace( + special_character, + '\{}'.format(special_character) + ) + return string diff --git a/tests/app/user/test_rest.py b/tests/app/user/test_rest.py index fc4d9632d..1db989706 100644 --- a/tests/app/user/test_rest.py +++ b/tests/app/user/test_rest.py @@ -731,3 +731,71 @@ def test_get_orgs_and_services_only_shows_users_orgs_and_services(admin_request, } ] } + + +def test_find_users_by_email_finds_user_by_partial_email(notify_db, client): + create_user(email='findel.mestro@foo.com') + create_user(email='me.ignorra@foo.com') + data = json.dumps({"email": "findel"}) + auth_header = create_authorization_header() + + response = client.post( + url_for("user.find_users_by_email"), + data=data, + headers=[('Content-Type', 'application/json'), auth_header] + ) + users = json.loads(response.get_data(as_text=True)) + + assert response.status_code == 200 + assert len(users['data']) == 1 + assert users['data'][0]['email_address'] == 'findel.mestro@foo.com' + + +def test_find_users_by_email_finds_user_by_full_email(notify_db, client): + create_user(email='findel.mestro@foo.com') + create_user(email='me.ignorra@foo.com') + data = json.dumps({"email": "findel.mestro@foo.com"}) + auth_header = create_authorization_header() + + response = client.post( + url_for("user.find_users_by_email"), + data=data, + headers=[('Content-Type', 'application/json'), auth_header] + ) + users = json.loads(response.get_data(as_text=True)) + + assert response.status_code == 200 + assert len(users['data']) == 1 + assert users['data'][0]['email_address'] == 'findel.mestro@foo.com' + + +def test_find_users_by_email_handles_no_results(notify_db, client): + create_user(email='findel.mestro@foo.com') + create_user(email='me.ignorra@foo.com') + data = json.dumps({"email": "rogue"}) + auth_header = create_authorization_header() + + response = client.post( + url_for("user.find_users_by_email"), + data=data, + headers=[('Content-Type', 'application/json'), auth_header] + ) + users = json.loads(response.get_data(as_text=True)) + + assert response.status_code == 200 + assert users['data'] == [] + + +def test_search_for_users_by_email_handles_incorrect_data_format(notify_db, client): + create_user(email='findel.mestro@foo.com') + data = json.dumps({"email": 1}) + auth_header = create_authorization_header() + + response = client.post( + url_for("user.find_users_by_email"), + data=data, + headers=[('Content-Type', 'application/json'), auth_header] + ) + + assert response.status_code == 400 + assert json.loads(response.get_data(as_text=True))['message'] == {'email': ['Not a valid string.']}