Send email address in the data rather than the user_id as a path param.

Remove unused OldRequestVerifyCodeSchema.
This commit is contained in:
Rebecca Law
2016-03-07 15:21:05 +00:00
parent b15d3434c3
commit 10296f0cc2
3 changed files with 61 additions and 37 deletions

View File

@@ -1,5 +1,4 @@
import re
from flask import current_app
from flask_marshmallow.fields import fields
from . import ma
from . import models
@@ -102,18 +101,6 @@ class JobSchema(BaseSchema):
model = models.Job
# TODO: Remove this schema once the admin app has stopped using the /user/<user_id>code endpoint
class OldRequestVerifyCodeSchema(ma.Schema):
code_type = fields.Str(required=True)
to = fields.Str(required=False)
@validates('code_type')
def validate_code_type(self, code):
if code not in models.VERIFY_CODE_TYPES:
raise ValidationError('Invalid code type')
class RequestVerifyCodeSchema(ma.Schema):
to = fields.Str(required=False)
@@ -195,6 +182,14 @@ class PermissionSchema(BaseSchema):
exclude = ("created_at",)
class EmailDataSchema(ma.Schema):
email = fields.Str(required=False)
@validates('email')
def validate_to(self, value):
if not email_regex.match(value):
raise ValidationError('Invalid email')
user_schema = UserSchema()
user_schema_load_json = UserSchema(load_json=True)
service_schema = ServiceSchema()
@@ -205,7 +200,6 @@ api_key_schema = ApiKeySchema()
api_key_schema_load_json = ApiKeySchema(load_json=True)
job_schema = JobSchema()
job_schema_load_json = JobSchema(load_json=True)
old_request_verify_code_schema = OldRequestVerifyCodeSchema()
request_verify_code_schema = RequestVerifyCodeSchema()
sms_admin_notification_schema = SmsAdminNotificationSchema()
sms_template_notification_schema = SmsTemplateNotificationSchema()
@@ -216,3 +210,4 @@ notification_status_schema = NotificationStatusSchema()
notification_status_schema_load_json = NotificationStatusSchema(load_json=True)
invited_user_schema = InvitedUserSchema()
permission_schema = PermissionSchema()
email_data_request_schema = EmailDataSchema()

View File

@@ -17,7 +17,7 @@ from app.dao.permissions_dao import permission_dao
from app.dao.services_dao import dao_fetch_service_by_id
from app.schemas import (
old_request_verify_code_schema,
email_data_request_schema,
user_schema,
request_verify_code_schema,
user_schema_load_json,
@@ -191,19 +191,23 @@ def get_by_email():
email = request.args.get('email')
if not email:
return jsonify(result="error", message="invalid request"), 400
user = get_user_by_email(email)
if not user:
return jsonify(result="error", message="not found"), 404
result = user_schema.dump(user)
fetched_user = get_user_by_email(email)
if not fetched_user:
return _user_not_found_for_email(email)
result = user_schema.dump(fetched_user)
return jsonify(data=result.data)
@user.route('/<int:user_id>/reset-password', methods=['POST'])
def send_reset_password(user_id):
user_to_send_to = get_model_users(user_id=user_id)
@user.route('/reset-password', methods=['POST'])
def send_reset_password():
email, errors = email_data_request_schema.load(request.get_json())
if errors:
return jsonify(result="error", message=errors), 400
user_to_send_to = get_user_by_email(email['email'])
if not user_to_send_to:
return _user_not_found(user_id)
return _user_not_found_for_email(email['email'])
reset_password_message = {'to': user_to_send_to.email_address,
'reset_password_url': _create_reset_password_url(user_to_send_to.email_address)}
@@ -216,6 +220,10 @@ def _user_not_found(user_id):
return abort(404, 'User not found for id: {}'.format(user_id))
def _user_not_found_for_email(email):
return abort(404, 'User not found for email address: {}'.format(email))
def _create_reset_password_url(email):
from utils.url_safe_token import generate_token
token = generate_token(email, current_app.config['SECRET_KEY'], current_app.config['DANGEROUS_SALT'])

View File

@@ -286,7 +286,7 @@ def test_get_user_by_email(notify_api, notify_db, notify_db_session, sample_serv
assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)])
def test_get_user_by_email_not_found_returns_400(notify_api,
def test_get_user_by_email_not_found_returns_404(notify_api,
notify_db,
notify_db_session,
sample_user):
@@ -299,7 +299,7 @@ def test_get_user_by_email_not_found_returns_400(notify_api,
assert resp.status_code == 404
json_resp = json.loads(resp.get_data(as_text=True))
assert json_resp['result'] == 'error'
assert json_resp['message'] == 'not found'
assert json_resp['message'] == 'User not found for email address: {}'.format('no_user@digital.gov.uk')
def test_get_user_by_email_bad_url_returns_404(notify_api,
@@ -436,13 +436,14 @@ def test_send_reset_password_should_send_reset_password_link(notify_api,
with notify_api.test_request_context():
with notify_api.test_client() as client:
mocker.patch('app.celery.tasks.email_reset_password.apply_async')
data = json.dumps({'email': sample_user.email_address})
auth_header = create_authorization_header(
path=url_for('user.send_reset_password', user_id=sample_user.id),
path=url_for('user.send_reset_password'),
method='POST',
request_body={})
request_body=data)
resp = client.post(
url_for('user.send_reset_password', user_id=sample_user.id),
data={},
url_for('user.send_reset_password'),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 204
@@ -453,21 +454,41 @@ def test_send_reset_password_should_send_reset_password_link(notify_api,
queue='send-reset-password')
def test_send_reset_password_should_return_404_when_user_doesnot_exist(notify_api,
sample_user,
def test_send_reset_password_should_return_400_when_user_doesnot_exist(notify_api,
mocker):
with notify_api.test_request_context():
with notify_api.test_client() as client:
user_id = 99999
bad_email_address = 'bad@email.gov.uk'
data = json.dumps({'email': bad_email_address})
auth_header = create_authorization_header(
path=url_for('user.send_reset_password', user_id=user_id),
path=url_for('user.send_reset_password'),
method='POST',
request_body={})
request_body=data)
resp = client.post(
url_for('user.send_reset_password', user_id=user_id),
data={},
url_for('user.send_reset_password'),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 404
assert json.loads(resp.get_data(as_text=True))['message'] == 'User not found for id: {}'.format(user_id)
assert json.loads(resp.get_data(as_text=True))['message'] == 'User not found for email address: {}'.format(
bad_email_address)
def test_send_reset_password_should_return_400_when_data_is_not_email_address(notify_api, mocker):
with notify_api.test_request_context():
with notify_api.test_client() as client:
bad_email_address = 'bad.email.gov.uk'
data = json.dumps({'email': bad_email_address})
auth_header = create_authorization_header(
path=url_for('user.send_reset_password'),
method='POST',
request_body=data)
resp = client.post(
url_for('user.send_reset_password'),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 400
assert json.loads(resp.get_data(as_text=True))['message'] == {'email': ['Invalid email']}