mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-04 18:31:13 -05:00
Merge pull request #725 from alphagov/update-user-profile-endpoints
Refactor saving user profile
This commit is contained in:
@@ -7,18 +7,26 @@ from app import db
|
|||||||
from app.models import (User, VerifyCode)
|
from app.models import (User, VerifyCode)
|
||||||
|
|
||||||
|
|
||||||
|
def _remove_values_for_keys_if_present(dict, keys):
|
||||||
|
for key in keys:
|
||||||
|
dict.pop(key, None)
|
||||||
|
|
||||||
|
|
||||||
def create_secret_code():
|
def create_secret_code():
|
||||||
return ''.join(map(str, random.sample(range(9), 5)))
|
return ''.join(map(str, random.sample(range(9), 5)))
|
||||||
|
|
||||||
|
|
||||||
|
def save_user_attribute(usr, update_dict={}):
|
||||||
|
db.session.query(User).filter_by(id=usr.id).update(update_dict)
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
|
||||||
def save_model_user(usr, update_dict={}, pwd=None):
|
def save_model_user(usr, update_dict={}, pwd=None):
|
||||||
if pwd:
|
if pwd:
|
||||||
usr.password = pwd
|
usr.password = pwd
|
||||||
usr.password_changed_at = datetime.utcnow()
|
usr.password_changed_at = datetime.utcnow()
|
||||||
if update_dict:
|
if update_dict:
|
||||||
if update_dict.get('id'):
|
_remove_values_for_keys_if_present(update_dict, ['id', 'password_changed_at'])
|
||||||
del update_dict['id']
|
|
||||||
update_dict.pop('password_changed_at')
|
|
||||||
db.session.query(User).filter_by(id=usr.id).update(update_dict)
|
db.session.query(User).filter_by(id=usr.id).update(update_dict)
|
||||||
else:
|
else:
|
||||||
db.session.add(usr)
|
db.session.add(usr)
|
||||||
@@ -74,7 +82,7 @@ def delete_user_verify_codes(user):
|
|||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
|
|
||||||
def get_model_users(user_id=None):
|
def get_user_by_id(user_id=None):
|
||||||
if user_id:
|
if user_id:
|
||||||
return User.query.filter_by(id=user_id).one()
|
return User.query.filter_by(id=user_id).one()
|
||||||
return User.query.filter_by().all()
|
return User.query.filter_by().all()
|
||||||
|
|||||||
@@ -105,6 +105,42 @@ class UserSchema(BaseSchema):
|
|||||||
strict = True
|
strict = True
|
||||||
|
|
||||||
|
|
||||||
|
class UserUpdateAttributeSchema(BaseSchema):
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
model = models.User
|
||||||
|
exclude = (
|
||||||
|
'id', 'updated_at', 'created_at', 'user_to_service',
|
||||||
|
'_password', 'verify_codes', 'logged_in_at', 'password_changed_at',
|
||||||
|
'failed_login_count', 'state', 'platform_admin')
|
||||||
|
strict = True
|
||||||
|
|
||||||
|
@validates('name')
|
||||||
|
def validate_name(self, value):
|
||||||
|
if not value:
|
||||||
|
raise ValidationError('Invalid name')
|
||||||
|
|
||||||
|
@validates('email_address')
|
||||||
|
def validate_email_address(self, value):
|
||||||
|
try:
|
||||||
|
validate_email_address(value)
|
||||||
|
except InvalidEmailError as e:
|
||||||
|
raise ValidationError(e.message)
|
||||||
|
|
||||||
|
@validates('mobile_number')
|
||||||
|
def validate_mobile_number(self, value):
|
||||||
|
try:
|
||||||
|
validate_phone_number(value)
|
||||||
|
except InvalidPhoneError as error:
|
||||||
|
raise ValidationError('Invalid phone number: {}'.format(error))
|
||||||
|
|
||||||
|
@validates_schema(pass_original=True)
|
||||||
|
def check_unknown_fields(self, data, original_data):
|
||||||
|
for key in original_data:
|
||||||
|
if key not in self.fields:
|
||||||
|
raise ValidationError('Unknown field name {}'.format(key))
|
||||||
|
|
||||||
|
|
||||||
class ProviderDetailsSchema(BaseSchema):
|
class ProviderDetailsSchema(BaseSchema):
|
||||||
class Meta:
|
class Meta:
|
||||||
model = models.ProviderDetails
|
model = models.ProviderDetails
|
||||||
@@ -529,6 +565,7 @@ class UnarchivedTemplateSchema(BaseSchema):
|
|||||||
|
|
||||||
user_schema = UserSchema()
|
user_schema = UserSchema()
|
||||||
user_schema_load_json = UserSchema(load_json=True)
|
user_schema_load_json = UserSchema(load_json=True)
|
||||||
|
user_update_schema_load_json = UserUpdateAttributeSchema(load_json=True, partial=True)
|
||||||
service_schema = ServiceSchema()
|
service_schema = ServiceSchema()
|
||||||
service_schema_load_json = ServiceSchema(load_json=True)
|
service_schema_load_json = ServiceSchema(load_json=True)
|
||||||
detailed_service_schema = DetailedServiceSchema()
|
detailed_service_schema = DetailedServiceSchema()
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ from app.dao.service_whitelist_dao import (
|
|||||||
)
|
)
|
||||||
from app.dao import notifications_dao
|
from app.dao import notifications_dao
|
||||||
from app.dao.provider_statistics_dao import get_fragment_count
|
from app.dao.provider_statistics_dao import get_fragment_count
|
||||||
from app.dao.users_dao import get_model_users
|
from app.dao.users_dao import get_user_by_id
|
||||||
from app.errors import (
|
from app.errors import (
|
||||||
register_errors,
|
register_errors,
|
||||||
InvalidRequest
|
InvalidRequest
|
||||||
@@ -88,7 +88,7 @@ def create_service():
|
|||||||
errors = {'user_id': ['Missing data for required field.']}
|
errors = {'user_id': ['Missing data for required field.']}
|
||||||
raise InvalidRequest(errors, status_code=400)
|
raise InvalidRequest(errors, status_code=400)
|
||||||
|
|
||||||
user = get_model_users(data['user_id'])
|
user = get_user_by_id(data['user_id'])
|
||||||
data.pop('user_id', None)
|
data.pop('user_id', None)
|
||||||
valid_service = service_schema.load(request.get_json()).data
|
valid_service = service_schema.load(request.get_json()).data
|
||||||
dao_create_service(valid_service, user)
|
dao_create_service(valid_service, user)
|
||||||
@@ -148,7 +148,7 @@ def get_users_for_service(service_id):
|
|||||||
@service_blueprint.route('/<uuid:service_id>/users/<user_id>', methods=['POST'])
|
@service_blueprint.route('/<uuid:service_id>/users/<user_id>', methods=['POST'])
|
||||||
def add_user_to_service(service_id, user_id):
|
def add_user_to_service(service_id, user_id):
|
||||||
service = dao_fetch_service_by_id(service_id)
|
service = dao_fetch_service_by_id(service_id)
|
||||||
user = get_model_users(user_id=user_id)
|
user = get_user_by_id(user_id=user_id)
|
||||||
|
|
||||||
if user in service.users:
|
if user in service.users:
|
||||||
error = 'User id: {} already part of service id: {}'.format(user_id, service_id)
|
error = 'User id: {} already part of service id: {}'.format(user_id, service_id)
|
||||||
@@ -163,7 +163,7 @@ def add_user_to_service(service_id, user_id):
|
|||||||
@service_blueprint.route('/<uuid:service_id>/users/<user_id>', methods=['DELETE'])
|
@service_blueprint.route('/<uuid:service_id>/users/<user_id>', methods=['DELETE'])
|
||||||
def remove_user_from_service(service_id, user_id):
|
def remove_user_from_service(service_id, user_id):
|
||||||
service = dao_fetch_service_by_id(service_id)
|
service = dao_fetch_service_by_id(service_id)
|
||||||
user = get_model_users(user_id=user_id)
|
user = get_user_by_id(user_id=user_id)
|
||||||
if user not in service.users:
|
if user not in service.users:
|
||||||
error = 'User not found'
|
error = 'User not found'
|
||||||
raise InvalidRequest(error, status_code=404)
|
raise InvalidRequest(error, status_code=404)
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ from datetime import datetime
|
|||||||
from flask import (jsonify, request, Blueprint, current_app)
|
from flask import (jsonify, request, Blueprint, current_app)
|
||||||
from app import encryption, DATETIME_FORMAT
|
from app import encryption, DATETIME_FORMAT
|
||||||
from app.dao.users_dao import (
|
from app.dao.users_dao import (
|
||||||
get_model_users,
|
get_user_by_id,
|
||||||
save_model_user,
|
save_model_user,
|
||||||
create_user_code,
|
create_user_code,
|
||||||
get_user_code,
|
get_user_code,
|
||||||
@@ -12,7 +12,8 @@ from app.dao.users_dao import (
|
|||||||
increment_failed_login_count,
|
increment_failed_login_count,
|
||||||
reset_failed_login_count,
|
reset_failed_login_count,
|
||||||
get_user_by_email,
|
get_user_by_email,
|
||||||
create_secret_code
|
create_secret_code,
|
||||||
|
save_user_attribute
|
||||||
)
|
)
|
||||||
from app.dao.permissions_dao import permission_dao
|
from app.dao.permissions_dao import permission_dao
|
||||||
from app.dao.services_dao import dao_fetch_service_by_id
|
from app.dao.services_dao import dao_fetch_service_by_id
|
||||||
@@ -22,8 +23,10 @@ from app.schemas import (
|
|||||||
email_data_request_schema,
|
email_data_request_schema,
|
||||||
user_schema,
|
user_schema,
|
||||||
request_verify_code_schema,
|
request_verify_code_schema,
|
||||||
|
permission_schema,
|
||||||
user_schema_load_json,
|
user_schema_load_json,
|
||||||
permission_schema)
|
user_update_schema_load_json
|
||||||
|
)
|
||||||
|
|
||||||
from app.celery.tasks import (
|
from app.celery.tasks import (
|
||||||
send_sms,
|
send_sms,
|
||||||
@@ -53,7 +56,7 @@ def create_user():
|
|||||||
|
|
||||||
@user.route('/<uuid:user_id>', methods=['PUT'])
|
@user.route('/<uuid:user_id>', methods=['PUT'])
|
||||||
def update_user(user_id):
|
def update_user(user_id):
|
||||||
user_to_update = get_model_users(user_id=user_id)
|
user_to_update = get_user_by_id(user_id=user_id)
|
||||||
req_json = request.get_json()
|
req_json = request.get_json()
|
||||||
update_dct, errors = user_schema_load_json.load(req_json)
|
update_dct, errors = user_schema_load_json.load(req_json)
|
||||||
pwd = req_json.get('password', None)
|
pwd = req_json.get('password', None)
|
||||||
@@ -66,9 +69,20 @@ def update_user(user_id):
|
|||||||
return jsonify(data=user_schema.dump(user_to_update).data), 200
|
return jsonify(data=user_schema.dump(user_to_update).data), 200
|
||||||
|
|
||||||
|
|
||||||
|
@user.route('/<uuid:user_id>', methods=['POST'])
|
||||||
|
def update_user_attribute(user_id):
|
||||||
|
user_to_update = get_user_by_id(user_id=user_id)
|
||||||
|
req_json = request.get_json()
|
||||||
|
update_dct, errors = user_update_schema_load_json.load(req_json)
|
||||||
|
if errors:
|
||||||
|
raise InvalidRequest(errors, status_code=400)
|
||||||
|
save_user_attribute(user_to_update, update_dict=update_dct)
|
||||||
|
return jsonify(data=user_schema.dump(user_to_update).data), 200
|
||||||
|
|
||||||
|
|
||||||
@user.route('/<uuid:user_id>/verify/password', methods=['POST'])
|
@user.route('/<uuid:user_id>/verify/password', methods=['POST'])
|
||||||
def verify_user_password(user_id):
|
def verify_user_password(user_id):
|
||||||
user_to_verify = get_model_users(user_id=user_id)
|
user_to_verify = get_user_by_id(user_id=user_id)
|
||||||
|
|
||||||
txt_pwd = None
|
txt_pwd = None
|
||||||
try:
|
try:
|
||||||
@@ -92,7 +106,7 @@ def verify_user_password(user_id):
|
|||||||
|
|
||||||
@user.route('/<uuid:user_id>/verify/code', methods=['POST'])
|
@user.route('/<uuid:user_id>/verify/code', methods=['POST'])
|
||||||
def verify_user_code(user_id):
|
def verify_user_code(user_id):
|
||||||
user_to_verify = get_model_users(user_id=user_id)
|
user_to_verify = get_user_by_id(user_id=user_id)
|
||||||
|
|
||||||
txt_code = None
|
txt_code = None
|
||||||
resp_json = request.get_json()
|
resp_json = request.get_json()
|
||||||
@@ -120,7 +134,7 @@ def verify_user_code(user_id):
|
|||||||
|
|
||||||
@user.route('/<uuid:user_id>/sms-code', methods=['POST'])
|
@user.route('/<uuid:user_id>/sms-code', methods=['POST'])
|
||||||
def send_user_sms_code(user_id):
|
def send_user_sms_code(user_id):
|
||||||
user_to_send_to = get_model_users(user_id=user_id)
|
user_to_send_to = get_user_by_id(user_id=user_id)
|
||||||
verify_code, errors = request_verify_code_schema.load(request.get_json())
|
verify_code, errors = request_verify_code_schema.load(request.get_json())
|
||||||
|
|
||||||
secret_code = create_secret_code()
|
secret_code = create_secret_code()
|
||||||
@@ -149,7 +163,7 @@ def send_user_sms_code(user_id):
|
|||||||
|
|
||||||
@user.route('/<uuid:user_id>/change-email-verification', methods=['POST'])
|
@user.route('/<uuid:user_id>/change-email-verification', methods=['POST'])
|
||||||
def send_user_confirm_new_email(user_id):
|
def send_user_confirm_new_email(user_id):
|
||||||
user_to_send_to = get_model_users(user_id=user_id)
|
user_to_send_to = get_user_by_id(user_id=user_id)
|
||||||
email, errors = email_data_request_schema.load(request.get_json())
|
email, errors = email_data_request_schema.load(request.get_json())
|
||||||
if errors:
|
if errors:
|
||||||
raise InvalidRequest(message=errors, status_code=400)
|
raise InvalidRequest(message=errors, status_code=400)
|
||||||
@@ -178,7 +192,7 @@ def send_user_confirm_new_email(user_id):
|
|||||||
|
|
||||||
@user.route('/<uuid:user_id>/email-verification', methods=['POST'])
|
@user.route('/<uuid:user_id>/email-verification', methods=['POST'])
|
||||||
def send_user_email_verification(user_id):
|
def send_user_email_verification(user_id):
|
||||||
user_to_send_to = get_model_users(user_id=user_id)
|
user_to_send_to = get_user_by_id(user_id=user_id)
|
||||||
secret_code = create_secret_code()
|
secret_code = create_secret_code()
|
||||||
create_user_code(user_to_send_to, secret_code, 'email')
|
create_user_code(user_to_send_to, secret_code, 'email')
|
||||||
|
|
||||||
@@ -230,7 +244,7 @@ def send_already_registered_email(user_id):
|
|||||||
@user.route('/<uuid:user_id>', methods=['GET'])
|
@user.route('/<uuid:user_id>', methods=['GET'])
|
||||||
@user.route('', methods=['GET'])
|
@user.route('', methods=['GET'])
|
||||||
def get_user(user_id=None):
|
def get_user(user_id=None):
|
||||||
users = get_model_users(user_id=user_id)
|
users = get_user_by_id(user_id=user_id)
|
||||||
result = user_schema.dump(users, many=True) if isinstance(users, list) else user_schema.dump(users)
|
result = user_schema.dump(users, many=True) if isinstance(users, list) else user_schema.dump(users)
|
||||||
return jsonify(data=result.data)
|
return jsonify(data=result.data)
|
||||||
|
|
||||||
@@ -239,7 +253,7 @@ def get_user(user_id=None):
|
|||||||
def set_permissions(user_id, service_id):
|
def set_permissions(user_id, service_id):
|
||||||
# TODO fix security hole, how do we verify that the user
|
# TODO fix security hole, how do we verify that the user
|
||||||
# who is making this request has permission to make the request.
|
# who is making this request has permission to make the request.
|
||||||
user = get_model_users(user_id=user_id)
|
user = get_user_by_id(user_id=user_id)
|
||||||
service = dao_fetch_service_by_id(service_id=service_id)
|
service = dao_fetch_service_by_id(service_id=service_id)
|
||||||
permissions, errors = permission_schema.load(request.get_json(), many=True)
|
permissions, errors = permission_schema.load(request.get_json(), many=True)
|
||||||
|
|
||||||
|
|||||||
@@ -7,12 +7,13 @@ import pytest
|
|||||||
|
|
||||||
from app.dao.users_dao import (
|
from app.dao.users_dao import (
|
||||||
save_model_user,
|
save_model_user,
|
||||||
get_model_users,
|
save_user_attribute,
|
||||||
|
get_user_by_id,
|
||||||
delete_model_user,
|
delete_model_user,
|
||||||
increment_failed_login_count,
|
increment_failed_login_count,
|
||||||
reset_failed_login_count,
|
reset_failed_login_count,
|
||||||
get_user_by_email,
|
get_user_by_email,
|
||||||
delete_codes_older_created_more_than_a_day_ago
|
delete_codes_older_created_more_than_a_day_ago,
|
||||||
)
|
)
|
||||||
|
|
||||||
from tests.app.conftest import sample_user as create_sample_user
|
from tests.app.conftest import sample_user as create_sample_user
|
||||||
@@ -37,13 +38,13 @@ def test_create_user(notify_api, notify_db, notify_db_session):
|
|||||||
|
|
||||||
def test_get_all_users(notify_api, notify_db, notify_db_session, sample_user):
|
def test_get_all_users(notify_api, notify_db, notify_db_session, sample_user):
|
||||||
assert User.query.count() == 1
|
assert User.query.count() == 1
|
||||||
assert len(get_model_users()) == 1
|
assert len(get_user_by_id()) == 1
|
||||||
email = "another.notify@digital.cabinet-office.gov.uk"
|
email = "another.notify@digital.cabinet-office.gov.uk"
|
||||||
another_user = create_sample_user(notify_db,
|
another_user = create_sample_user(notify_db,
|
||||||
notify_db_session,
|
notify_db_session,
|
||||||
email=email)
|
email=email)
|
||||||
assert User.query.count() == 2
|
assert User.query.count() == 2
|
||||||
assert len(get_model_users()) == 2
|
assert len(get_user_by_id()) == 2
|
||||||
|
|
||||||
|
|
||||||
def test_get_user(notify_api, notify_db, notify_db_session):
|
def test_get_user(notify_api, notify_db, notify_db_session):
|
||||||
@@ -51,23 +52,20 @@ def test_get_user(notify_api, notify_db, notify_db_session):
|
|||||||
another_user = create_sample_user(notify_db,
|
another_user = create_sample_user(notify_db,
|
||||||
notify_db_session,
|
notify_db_session,
|
||||||
email=email)
|
email=email)
|
||||||
assert get_model_users(user_id=another_user.id).email_address == email
|
assert get_user_by_id(user_id=another_user.id).email_address == email
|
||||||
|
|
||||||
|
|
||||||
def test_get_user_not_exists(notify_api, notify_db, notify_db_session, fake_uuid):
|
def test_get_user_not_exists(notify_api, notify_db, notify_db_session, fake_uuid):
|
||||||
try:
|
try:
|
||||||
get_model_users(user_id=fake_uuid)
|
get_user_by_id(user_id=fake_uuid)
|
||||||
pytest.fail("NoResultFound exception not thrown.")
|
pytest.fail("NoResultFound exception not thrown.")
|
||||||
except NoResultFound as e:
|
except NoResultFound as e:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def test_get_user_invalid_id(notify_api, notify_db, notify_db_session):
|
def test_get_user_invalid_id(notify_api, notify_db, notify_db_session):
|
||||||
try:
|
with pytest.raises(DataError):
|
||||||
get_model_users(user_id="blah")
|
get_user_by_id(user_id="blah")
|
||||||
pytest.fail("DataError exception not thrown.")
|
|
||||||
except DataError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def test_delete_users(notify_api, notify_db, notify_db_session, sample_user):
|
def test_delete_users(notify_api, notify_db, notify_db_session, sample_user):
|
||||||
@@ -131,3 +129,17 @@ def make_verify_code(user, age=timedelta(hours=0), code="12335"):
|
|||||||
)
|
)
|
||||||
db.session.add(verify_code)
|
db.session.add(verify_code)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('user_attribute, user_value', [
|
||||||
|
('name', 'New User'),
|
||||||
|
('email_address', 'newuser@mail.com'),
|
||||||
|
('mobile_number', '+4407700900460')
|
||||||
|
])
|
||||||
|
def test_update_user_attribute(client, sample_user, user_attribute, user_value):
|
||||||
|
assert getattr(sample_user, user_attribute) != user_value
|
||||||
|
update_dict = {
|
||||||
|
user_attribute: user_value
|
||||||
|
}
|
||||||
|
save_user_attribute(sample_user, update_dict)
|
||||||
|
assert getattr(sample_user, user_attribute) == user_value
|
||||||
|
|||||||
@@ -1,3 +1,8 @@
|
|||||||
|
import pytest
|
||||||
|
|
||||||
|
from marshmallow import ValidationError
|
||||||
|
|
||||||
|
|
||||||
def test_job_schema_doesnt_return_notifications(sample_notification_with_job):
|
def test_job_schema_doesnt_return_notifications(sample_notification_with_job):
|
||||||
from app.schemas import job_schema
|
from app.schemas import job_schema
|
||||||
|
|
||||||
@@ -22,3 +27,51 @@ def test_notification_schema_adds_api_key_name(sample_notification_with_api_key)
|
|||||||
|
|
||||||
data = notification_with_template_schema.dump(sample_notification_with_api_key).data
|
data = notification_with_template_schema.dump(sample_notification_with_api_key).data
|
||||||
assert data['key_name'] == 'Test key'
|
assert data['key_name'] == 'Test key'
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('user_attribute, user_value', [
|
||||||
|
('name', 'New User'),
|
||||||
|
('email_address', 'newuser@mail.com'),
|
||||||
|
('mobile_number', '+4407700900460')
|
||||||
|
])
|
||||||
|
def test_user_update_schema_accepts_valid_attribute_pairs(user_attribute, user_value):
|
||||||
|
update_dict = {
|
||||||
|
user_attribute: user_value
|
||||||
|
}
|
||||||
|
from app.schemas import user_update_schema_load_json
|
||||||
|
|
||||||
|
data, errors = user_update_schema_load_json.load(update_dict)
|
||||||
|
assert not errors
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('user_attribute, user_value', [
|
||||||
|
('name', None),
|
||||||
|
('name', ''),
|
||||||
|
('email_address', 'bademail@...com'),
|
||||||
|
('mobile_number', '+44077009')
|
||||||
|
])
|
||||||
|
def test_user_update_schema_rejects_invalid_attribute_pairs(user_attribute, user_value):
|
||||||
|
from app.schemas import user_update_schema_load_json
|
||||||
|
update_dict = {
|
||||||
|
user_attribute: user_value
|
||||||
|
}
|
||||||
|
|
||||||
|
with pytest.raises(ValidationError):
|
||||||
|
data, errors = user_update_schema_load_json.load(update_dict)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('user_attribute', [
|
||||||
|
'id', 'updated_at', 'created_at', 'user_to_service',
|
||||||
|
'_password', 'verify_codes', 'logged_in_at', 'password_changed_at',
|
||||||
|
'failed_login_count', 'state', 'platform_admin'
|
||||||
|
])
|
||||||
|
def test_user_update_schema_rejects_disallowed_attribute_keys(user_attribute):
|
||||||
|
update_dict = {
|
||||||
|
user_attribute: 'not important'
|
||||||
|
}
|
||||||
|
from app.schemas import user_update_schema_load_json
|
||||||
|
|
||||||
|
with pytest.raises(ValidationError) as excinfo:
|
||||||
|
data, errors = user_update_schema_load_json.load(update_dict)
|
||||||
|
|
||||||
|
assert excinfo.value.messages['_schema'][0] == 'Unknown field name {}'.format(user_attribute)
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import json
|
import json
|
||||||
|
import pytest
|
||||||
|
|
||||||
from flask import url_for, current_app
|
from flask import url_for, current_app
|
||||||
from freezegun import freeze_time
|
from freezegun import freeze_time
|
||||||
@@ -180,6 +181,29 @@ def test_put_user(notify_api, notify_db, notify_db_session, sample_service):
|
|||||||
assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)])
|
assert sorted(expected_permissions) == sorted(fetched['permissions'][str(sample_service.id)])
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('user_attribute, user_value', [
|
||||||
|
('name', 'New User'),
|
||||||
|
('email_address', 'newuser@mail.com'),
|
||||||
|
('mobile_number', '+4407700900460')
|
||||||
|
])
|
||||||
|
def test_post_user_attribute(client, sample_user, user_attribute, user_value):
|
||||||
|
assert getattr(sample_user, user_attribute) != user_value
|
||||||
|
update_dict = {
|
||||||
|
user_attribute: user_value
|
||||||
|
}
|
||||||
|
auth_header = create_authorization_header()
|
||||||
|
headers = [('Content-Type', 'application/json'), auth_header]
|
||||||
|
|
||||||
|
resp = client.post(
|
||||||
|
url_for('user.update_user_attribute', user_id=sample_user.id),
|
||||||
|
data=json.dumps(update_dict),
|
||||||
|
headers=headers)
|
||||||
|
|
||||||
|
assert resp.status_code == 200
|
||||||
|
json_resp = json.loads(resp.get_data(as_text=True))
|
||||||
|
assert json_resp['data'][user_attribute] == user_value
|
||||||
|
|
||||||
|
|
||||||
def test_put_user_update_password(notify_api,
|
def test_put_user_update_password(notify_api,
|
||||||
notify_db,
|
notify_db,
|
||||||
notify_db_session,
|
notify_db_session,
|
||||||
|
|||||||
Reference in New Issue
Block a user