mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-04 10:21:14 -05:00
Merge pull request #1943 from alphagov/admin_platform_users_search
Add data endpoint for finding users by full or partial email
This commit is contained in:
@@ -22,7 +22,7 @@ from sqlalchemy.sql import functions
|
|||||||
from notifications_utils.international_billing_rates import INTERNATIONAL_BILLING_RATES
|
from notifications_utils.international_billing_rates import INTERNATIONAL_BILLING_RATES
|
||||||
|
|
||||||
from app import db, create_uuid
|
from app import db, create_uuid
|
||||||
from app.utils import midnight_n_days_ago
|
from app.utils import midnight_n_days_ago, escape_special_characters
|
||||||
from app.errors import InvalidRequest
|
from app.errors import InvalidRequest
|
||||||
from app.models import (
|
from app.models import (
|
||||||
Notification,
|
Notification,
|
||||||
@@ -452,11 +452,7 @@ def dao_get_notifications_by_to_field(service_id, search_term, notification_type
|
|||||||
else:
|
else:
|
||||||
raise InvalidRequest("Only email and SMS can use search by recipient", 400)
|
raise InvalidRequest("Only email and SMS can use search by recipient", 400)
|
||||||
|
|
||||||
for special_character in ('\\', '_', '%', '/'):
|
normalised = escape_special_characters(normalised)
|
||||||
normalised = normalised.replace(
|
|
||||||
special_character,
|
|
||||||
'\{}'.format(special_character)
|
|
||||||
)
|
|
||||||
|
|
||||||
filters = [
|
filters = [
|
||||||
Notification.service_id == service_id,
|
Notification.service_id == service_id,
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ from sqlalchemy.orm import joinedload
|
|||||||
|
|
||||||
from app import db
|
from app import db
|
||||||
from app.models import (User, VerifyCode)
|
from app.models import (User, VerifyCode)
|
||||||
|
from app.utils import escape_special_characters
|
||||||
|
|
||||||
|
|
||||||
def _remove_values_for_keys_if_present(dict, keys):
|
def _remove_values_for_keys_if_present(dict, keys):
|
||||||
@@ -97,6 +98,11 @@ def get_user_by_email(email):
|
|||||||
return User.query.filter(func.lower(User.email_address) == func.lower(email)).one()
|
return User.query.filter(func.lower(User.email_address) == func.lower(email)).one()
|
||||||
|
|
||||||
|
|
||||||
|
def get_users_by_partial_email(email):
|
||||||
|
email = escape_special_characters(email)
|
||||||
|
return User.query.filter(User.email_address.ilike("%{}%".format(email))).all()
|
||||||
|
|
||||||
|
|
||||||
def increment_failed_login_count(user):
|
def increment_failed_login_count(user):
|
||||||
user.failed_login_count += 1
|
user.failed_login_count += 1
|
||||||
db.session.add(user)
|
db.session.add(user)
|
||||||
|
|||||||
@@ -564,8 +564,14 @@ class EmailDataSchema(ma.Schema):
|
|||||||
|
|
||||||
email = fields.Str(required=True)
|
email = fields.Str(required=True)
|
||||||
|
|
||||||
|
def __init__(self, partial_email=False):
|
||||||
|
super().__init__()
|
||||||
|
self.partial_email = partial_email
|
||||||
|
|
||||||
@validates('email')
|
@validates('email')
|
||||||
def validate_email(self, value):
|
def validate_email(self, value):
|
||||||
|
if self.partial_email:
|
||||||
|
return
|
||||||
try:
|
try:
|
||||||
validate_email_address(value)
|
validate_email_address(value)
|
||||||
except InvalidEmailError as e:
|
except InvalidEmailError as e:
|
||||||
@@ -686,6 +692,7 @@ notification_with_personalisation_schema = NotificationWithPersonalisationSchema
|
|||||||
invited_user_schema = InvitedUserSchema()
|
invited_user_schema = InvitedUserSchema()
|
||||||
permission_schema = PermissionSchema()
|
permission_schema = PermissionSchema()
|
||||||
email_data_request_schema = EmailDataSchema()
|
email_data_request_schema = EmailDataSchema()
|
||||||
|
partial_email_data_request_schema = EmailDataSchema(partial_email=True)
|
||||||
notifications_filter_schema = NotificationsFilterSchema()
|
notifications_filter_schema = NotificationsFilterSchema()
|
||||||
service_history_schema = ServiceHistorySchema()
|
service_history_schema = ServiceHistorySchema()
|
||||||
api_key_history_schema = ApiKeyHistorySchema()
|
api_key_history_schema = ApiKeyHistorySchema()
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ from app.dao.users_dao import (
|
|||||||
increment_failed_login_count,
|
increment_failed_login_count,
|
||||||
reset_failed_login_count,
|
reset_failed_login_count,
|
||||||
get_user_by_email,
|
get_user_by_email,
|
||||||
|
get_users_by_partial_email,
|
||||||
create_secret_code,
|
create_secret_code,
|
||||||
save_user_attribute,
|
save_user_attribute,
|
||||||
update_user_password,
|
update_user_password,
|
||||||
@@ -32,6 +33,7 @@ from app.notifications.process_notifications import (
|
|||||||
)
|
)
|
||||||
from app.schemas import (
|
from app.schemas import (
|
||||||
email_data_request_schema,
|
email_data_request_schema,
|
||||||
|
partial_email_data_request_schema,
|
||||||
create_user_schema,
|
create_user_schema,
|
||||||
permission_schema,
|
permission_schema,
|
||||||
user_update_schema_load_json,
|
user_update_schema_load_json,
|
||||||
@@ -356,6 +358,14 @@ def get_by_email():
|
|||||||
return jsonify(data=result)
|
return jsonify(data=result)
|
||||||
|
|
||||||
|
|
||||||
|
@user_blueprint.route('/find-users-by-email', methods=['POST'])
|
||||||
|
def find_users_by_email():
|
||||||
|
email, errors = partial_email_data_request_schema.load(request.get_json())
|
||||||
|
fetched_users = get_users_by_partial_email(email['email'])
|
||||||
|
result = [user.serialize() for user in fetched_users]
|
||||||
|
return jsonify(data=result), 200
|
||||||
|
|
||||||
|
|
||||||
@user_blueprint.route('/reset-password', methods=['POST'])
|
@user_blueprint.route('/reset-password', methods=['POST'])
|
||||||
def send_user_reset_password():
|
def send_user_reset_password():
|
||||||
email, errors = email_data_request_schema.load(request.get_json())
|
email, errors = email_data_request_schema.load(request.get_json())
|
||||||
|
|||||||
@@ -115,3 +115,12 @@ def last_n_days(limit_days):
|
|||||||
# reverse the countdown, -1 from first two args to ensure it stays 0-indexed
|
# reverse the countdown, -1 from first two args to ensure it stays 0-indexed
|
||||||
for x in range(limit_days - 1, -1, -1)
|
for x in range(limit_days - 1, -1, -1)
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def escape_special_characters(string):
|
||||||
|
for special_character in ('\\', '_', '%', '/'):
|
||||||
|
string = string.replace(
|
||||||
|
special_character,
|
||||||
|
'\{}'.format(special_character)
|
||||||
|
)
|
||||||
|
return string
|
||||||
|
|||||||
@@ -731,3 +731,71 @@ def test_get_orgs_and_services_only_shows_users_orgs_and_services(admin_request,
|
|||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def test_find_users_by_email_finds_user_by_partial_email(notify_db, client):
|
||||||
|
create_user(email='findel.mestro@foo.com')
|
||||||
|
create_user(email='me.ignorra@foo.com')
|
||||||
|
data = json.dumps({"email": "findel"})
|
||||||
|
auth_header = create_authorization_header()
|
||||||
|
|
||||||
|
response = client.post(
|
||||||
|
url_for("user.find_users_by_email"),
|
||||||
|
data=data,
|
||||||
|
headers=[('Content-Type', 'application/json'), auth_header]
|
||||||
|
)
|
||||||
|
users = json.loads(response.get_data(as_text=True))
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert len(users['data']) == 1
|
||||||
|
assert users['data'][0]['email_address'] == 'findel.mestro@foo.com'
|
||||||
|
|
||||||
|
|
||||||
|
def test_find_users_by_email_finds_user_by_full_email(notify_db, client):
|
||||||
|
create_user(email='findel.mestro@foo.com')
|
||||||
|
create_user(email='me.ignorra@foo.com')
|
||||||
|
data = json.dumps({"email": "findel.mestro@foo.com"})
|
||||||
|
auth_header = create_authorization_header()
|
||||||
|
|
||||||
|
response = client.post(
|
||||||
|
url_for("user.find_users_by_email"),
|
||||||
|
data=data,
|
||||||
|
headers=[('Content-Type', 'application/json'), auth_header]
|
||||||
|
)
|
||||||
|
users = json.loads(response.get_data(as_text=True))
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert len(users['data']) == 1
|
||||||
|
assert users['data'][0]['email_address'] == 'findel.mestro@foo.com'
|
||||||
|
|
||||||
|
|
||||||
|
def test_find_users_by_email_handles_no_results(notify_db, client):
|
||||||
|
create_user(email='findel.mestro@foo.com')
|
||||||
|
create_user(email='me.ignorra@foo.com')
|
||||||
|
data = json.dumps({"email": "rogue"})
|
||||||
|
auth_header = create_authorization_header()
|
||||||
|
|
||||||
|
response = client.post(
|
||||||
|
url_for("user.find_users_by_email"),
|
||||||
|
data=data,
|
||||||
|
headers=[('Content-Type', 'application/json'), auth_header]
|
||||||
|
)
|
||||||
|
users = json.loads(response.get_data(as_text=True))
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert users['data'] == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_search_for_users_by_email_handles_incorrect_data_format(notify_db, client):
|
||||||
|
create_user(email='findel.mestro@foo.com')
|
||||||
|
data = json.dumps({"email": 1})
|
||||||
|
auth_header = create_authorization_header()
|
||||||
|
|
||||||
|
response = client.post(
|
||||||
|
url_for("user.find_users_by_email"),
|
||||||
|
data=data,
|
||||||
|
headers=[('Content-Type', 'application/json'), auth_header]
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert json.loads(response.get_data(as_text=True))['message'] == {'email': ['Not a valid string.']}
|
||||||
|
|||||||
Reference in New Issue
Block a user