mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-03 09:51:11 -05:00
Merge pull request #85 from alphagov/add_permissions_to_models
Permissions added.
This commit is contained in:
@@ -3,7 +3,7 @@ import uuid
|
|||||||
from sqlalchemy import UniqueConstraint, Sequence
|
from sqlalchemy import UniqueConstraint, Sequence
|
||||||
from . import db
|
from . import db
|
||||||
import datetime
|
import datetime
|
||||||
from sqlalchemy.dialects.postgresql import UUID
|
from sqlalchemy.dialects.postgresql import (UUID, ARRAY)
|
||||||
from app.encryption import (
|
from app.encryption import (
|
||||||
hashpw,
|
hashpw,
|
||||||
check_hash
|
check_hash
|
||||||
@@ -40,6 +40,7 @@ class User(db.Model):
|
|||||||
logged_in_at = db.Column(db.DateTime, nullable=True)
|
logged_in_at = db.Column(db.DateTime, nullable=True)
|
||||||
failed_login_count = db.Column(db.Integer, nullable=False, default=0)
|
failed_login_count = db.Column(db.Integer, nullable=False, default=0)
|
||||||
state = db.Column(db.String, nullable=False, default='pending')
|
state = db.Column(db.String, nullable=False, default='pending')
|
||||||
|
permissions = db.Column("permissions", ARRAY(db.String))
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def password(self):
|
def password(self):
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
#!/bin/bash
|
||||||
export NOTIFY_API_ENVIRONMENT='config.Test'
|
export NOTIFY_API_ENVIRONMENT='config.Test'
|
||||||
export ADMIN_CLIENT_USER_NAME='dev-notify-admin'
|
export ADMIN_CLIENT_USER_NAME='dev-notify-admin'
|
||||||
export ADMIN_CLIENT_SECRET='dev-notify-secret-key'
|
export ADMIN_CLIENT_SECRET='dev-notify-secret-key'
|
||||||
|
|||||||
26
migrations/versions/0015_add_permissions.py
Normal file
26
migrations/versions/0015_add_permissions.py
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
"""empty message
|
||||||
|
|
||||||
|
Revision ID: 0015_add_permissions
|
||||||
|
Revises: 0014_job_id_nullable
|
||||||
|
Create Date: 2016-02-19 14:16:59.359493
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = '0015_add_permissions'
|
||||||
|
down_revision = '0014_job_id_nullable'
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from sqlalchemy.dialects import postgresql
|
||||||
|
|
||||||
|
def upgrade():
|
||||||
|
### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.add_column('users', sa.Column('permissions', postgresql.ARRAY(sa.String()), nullable=True))
|
||||||
|
### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade():
|
||||||
|
### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.drop_column('users', 'permissions')
|
||||||
|
### end Alembic commands ###
|
||||||
@@ -31,7 +31,8 @@ def sample_user(notify_db,
|
|||||||
'email_address': email,
|
'email_address': email,
|
||||||
'password': 'password',
|
'password': 'password',
|
||||||
'mobile_number': '+447700900986',
|
'mobile_number': '+447700900986',
|
||||||
'state': 'active'
|
'state': 'active',
|
||||||
|
'permissions': []
|
||||||
}
|
}
|
||||||
usr = User.query.filter_by(email_address=email).first()
|
usr = User.query.filter_by(email_address=email).first()
|
||||||
if not usr:
|
if not usr:
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import json
|
import json
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from app.models import (User, Service)
|
from app.models import (User, Service)
|
||||||
|
from app.dao.users_dao import save_model_user
|
||||||
from tests import create_authorization_header
|
from tests import create_authorization_header
|
||||||
from tests.app.conftest import sample_service as create_sample_service
|
from tests.app.conftest import sample_service as create_sample_service
|
||||||
|
|
||||||
@@ -27,7 +28,8 @@ def test_get_user_list(notify_api, notify_db, notify_db_session, sample_user, sa
|
|||||||
"password_changed_at": None,
|
"password_changed_at": None,
|
||||||
"logged_in_at": None,
|
"logged_in_at": None,
|
||||||
"state": "active",
|
"state": "active",
|
||||||
"failed_login_count": 0
|
"failed_login_count": 0,
|
||||||
|
"permissions": []
|
||||||
}
|
}
|
||||||
assert expected in json_resp['data']
|
assert expected in json_resp['data']
|
||||||
|
|
||||||
@@ -54,7 +56,8 @@ def test_get_user(notify_api, notify_db, notify_db_session, sample_user, sample_
|
|||||||
"password_changed_at": None,
|
"password_changed_at": None,
|
||||||
"logged_in_at": None,
|
"logged_in_at": None,
|
||||||
"state": "active",
|
"state": "active",
|
||||||
"failed_login_count": 0
|
"failed_login_count": 0,
|
||||||
|
"permissions": []
|
||||||
}
|
}
|
||||||
assert json_resp['data'] == expected
|
assert json_resp['data'] == expected
|
||||||
|
|
||||||
@@ -74,7 +77,8 @@ def test_post_user(notify_api, notify_db, notify_db_session, sample_admin_servic
|
|||||||
"password_changed_at": None,
|
"password_changed_at": None,
|
||||||
"logged_in_at": None,
|
"logged_in_at": None,
|
||||||
"state": "active",
|
"state": "active",
|
||||||
"failed_login_count": 0
|
"failed_login_count": 0,
|
||||||
|
"permissions": []
|
||||||
}
|
}
|
||||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||||
path=url_for('user.create_user'),
|
path=url_for('user.create_user'),
|
||||||
@@ -107,7 +111,8 @@ def test_post_user_missing_attribute_email(notify_api, notify_db, notify_db_sess
|
|||||||
"password_changed_at": None,
|
"password_changed_at": None,
|
||||||
"logged_in_at": None,
|
"logged_in_at": None,
|
||||||
"state": "active",
|
"state": "active",
|
||||||
"failed_login_count": 0
|
"failed_login_count": 0,
|
||||||
|
"permissions": []
|
||||||
}
|
}
|
||||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||||
path=url_for('user.create_user'),
|
path=url_for('user.create_user'),
|
||||||
@@ -138,7 +143,8 @@ def test_post_user_missing_attribute_password(notify_api, notify_db, notify_db_s
|
|||||||
"password_changed_at": None,
|
"password_changed_at": None,
|
||||||
"logged_in_at": None,
|
"logged_in_at": None,
|
||||||
"state": "active",
|
"state": "active",
|
||||||
"failed_login_count": 0
|
"failed_login_count": 0,
|
||||||
|
"permissions": []
|
||||||
}
|
}
|
||||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||||
path=url_for('user.create_user'),
|
path=url_for('user.create_user'),
|
||||||
@@ -166,7 +172,8 @@ def test_put_user(notify_api, notify_db, notify_db_session, sample_user, sample_
|
|||||||
data = {
|
data = {
|
||||||
'name': sample_user.name,
|
'name': sample_user.name,
|
||||||
'email_address': new_email,
|
'email_address': new_email,
|
||||||
'mobile_number': sample_user.mobile_number
|
'mobile_number': sample_user.mobile_number,
|
||||||
|
'permissions': []
|
||||||
}
|
}
|
||||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||||
path=url_for('user.update_user', user_id=sample_user.id),
|
path=url_for('user.update_user', user_id=sample_user.id),
|
||||||
@@ -189,7 +196,8 @@ def test_put_user(notify_api, notify_db, notify_db_session, sample_user, sample_
|
|||||||
"id": user.id,
|
"id": user.id,
|
||||||
"logged_in_at": None,
|
"logged_in_at": None,
|
||||||
"state": "active",
|
"state": "active",
|
||||||
"failed_login_count": 0
|
"failed_login_count": 0,
|
||||||
|
"permissions": []
|
||||||
}
|
}
|
||||||
assert json_resp['data'] == expected
|
assert json_resp['data'] == expected
|
||||||
assert json_resp['data']['email_address'] == new_email
|
assert json_resp['data']['email_address'] == new_email
|
||||||
@@ -211,7 +219,8 @@ def test_put_user_update_password(notify_api,
|
|||||||
'name': sample_user.name,
|
'name': sample_user.name,
|
||||||
'email_address': sample_user.email_address,
|
'email_address': sample_user.email_address,
|
||||||
'mobile_number': sample_user.mobile_number,
|
'mobile_number': sample_user.mobile_number,
|
||||||
'password': new_password
|
'password': new_password,
|
||||||
|
'permissions': []
|
||||||
}
|
}
|
||||||
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||||
path=url_for('user.update_user', user_id=sample_user.id),
|
path=url_for('user.update_user', user_id=sample_user.id),
|
||||||
@@ -382,7 +391,8 @@ def test_delete_user(notify_api, sample_user, sample_admin_service_id):
|
|||||||
"id": sample_user.id,
|
"id": sample_user.id,
|
||||||
"logged_in_at": None,
|
"logged_in_at": None,
|
||||||
"state": "active",
|
"state": "active",
|
||||||
"failed_login_count": 0
|
"failed_login_count": 0,
|
||||||
|
"permissions": []
|
||||||
|
|
||||||
}
|
}
|
||||||
assert json_resp['data'] == expected
|
assert json_resp['data'] == expected
|
||||||
@@ -405,3 +415,122 @@ def test_delete_user_not_exists(notify_api, notify_db, notify_db_session, sample
|
|||||||
assert User.query.count() == 2
|
assert User.query.count() == 2
|
||||||
json_resp = json.loads(resp.get_data(as_text=True))
|
json_resp = json.loads(resp.get_data(as_text=True))
|
||||||
assert json_resp == {'error': 'No result found'}
|
assert json_resp == {'error': 'No result found'}
|
||||||
|
|
||||||
|
|
||||||
|
def test_post_with_permissions(notify_api, notify_db, notify_db_session, sample_admin_service_id):
|
||||||
|
"""
|
||||||
|
Tests POST endpoint '/' to create a user with permissions.
|
||||||
|
"""
|
||||||
|
with notify_api.test_request_context():
|
||||||
|
with notify_api.test_client() as client:
|
||||||
|
assert User.query.count() == 1
|
||||||
|
permissions = ['new permission']
|
||||||
|
data = {
|
||||||
|
"name": "Test User",
|
||||||
|
"email_address": "user@digital.cabinet-office.gov.uk",
|
||||||
|
"password": "password",
|
||||||
|
"mobile_number": "+447700900986",
|
||||||
|
"password_changed_at": None,
|
||||||
|
"logged_in_at": None,
|
||||||
|
"state": "active",
|
||||||
|
"failed_login_count": 0,
|
||||||
|
"permissions": permissions
|
||||||
|
}
|
||||||
|
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||||
|
path=url_for('user.create_user'),
|
||||||
|
method='POST',
|
||||||
|
request_body=json.dumps(data))
|
||||||
|
headers = [('Content-Type', 'application/json'), auth_header]
|
||||||
|
resp = client.post(
|
||||||
|
url_for('user.create_user'),
|
||||||
|
data=json.dumps(data),
|
||||||
|
headers=headers)
|
||||||
|
assert resp.status_code == 201
|
||||||
|
user = User.query.filter_by(email_address='user@digital.cabinet-office.gov.uk').first()
|
||||||
|
json_resp = json.loads(resp.get_data(as_text=True))
|
||||||
|
json_resp['data'] == {"email_address": user.email_address, "id": user.id}
|
||||||
|
assert json_resp['data']['email_address'] == user.email_address
|
||||||
|
assert json_resp['data']['id'] == user.id
|
||||||
|
assert json_resp['data']['permissions'] == permissions
|
||||||
|
|
||||||
|
|
||||||
|
def test_put_add_permissions(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id):
|
||||||
|
"""
|
||||||
|
Tests PUT endpoint '/' to update user permissions.
|
||||||
|
"""
|
||||||
|
with notify_api.test_request_context():
|
||||||
|
with notify_api.test_client() as client:
|
||||||
|
permissions = ['one permission', 'another permission']
|
||||||
|
data = {
|
||||||
|
'name': sample_user.name,
|
||||||
|
'email_address': sample_user.email_address,
|
||||||
|
'mobile_number': sample_user.mobile_number,
|
||||||
|
'permissions': permissions
|
||||||
|
}
|
||||||
|
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||||
|
path=url_for('user.update_user', user_id=sample_user.id),
|
||||||
|
method='PUT',
|
||||||
|
request_body=json.dumps(data))
|
||||||
|
headers = [('Content-Type', 'application/json'), auth_header]
|
||||||
|
resp = client.put(
|
||||||
|
url_for('user.update_user', user_id=sample_user.id),
|
||||||
|
data=json.dumps(data),
|
||||||
|
headers=headers)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert User.query.count() == 2
|
||||||
|
user = User.query.filter_by(email_address=sample_user.email_address).first()
|
||||||
|
json_resp = json.loads(resp.get_data(as_text=True))
|
||||||
|
expected = {
|
||||||
|
"name": user.name,
|
||||||
|
"email_address": user.email_address,
|
||||||
|
"mobile_number": user.mobile_number,
|
||||||
|
"password_changed_at": None,
|
||||||
|
"id": user.id,
|
||||||
|
"logged_in_at": None,
|
||||||
|
"state": user.state,
|
||||||
|
"failed_login_count": 0,
|
||||||
|
"permissions": permissions
|
||||||
|
}
|
||||||
|
assert json_resp['data'] == expected
|
||||||
|
|
||||||
|
|
||||||
|
def test_put_remove_permissions(notify_api, notify_db, notify_db_session, sample_user, sample_admin_service_id):
|
||||||
|
"""
|
||||||
|
Tests PUT endpoint '/' to update user permissions.
|
||||||
|
"""
|
||||||
|
with notify_api.test_request_context():
|
||||||
|
with notify_api.test_client() as client:
|
||||||
|
old_permissions = ['one permission', 'another permission']
|
||||||
|
save_model_user(sample_user, {'permissions': old_permissions})
|
||||||
|
permissions = ['new permissions']
|
||||||
|
data = {
|
||||||
|
'name': sample_user.name,
|
||||||
|
'email_address': sample_user.email_address,
|
||||||
|
'mobile_number': sample_user.mobile_number,
|
||||||
|
'permissions': permissions
|
||||||
|
}
|
||||||
|
auth_header = create_authorization_header(service_id=sample_admin_service_id,
|
||||||
|
path=url_for('user.update_user', user_id=sample_user.id),
|
||||||
|
method='PUT',
|
||||||
|
request_body=json.dumps(data))
|
||||||
|
headers = [('Content-Type', 'application/json'), auth_header]
|
||||||
|
resp = client.put(
|
||||||
|
url_for('user.update_user', user_id=sample_user.id),
|
||||||
|
data=json.dumps(data),
|
||||||
|
headers=headers)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert User.query.count() == 2
|
||||||
|
user = User.query.filter_by(email_address=sample_user.email_address).first()
|
||||||
|
json_resp = json.loads(resp.get_data(as_text=True))
|
||||||
|
expected = {
|
||||||
|
"name": user.name,
|
||||||
|
"email_address": user.email_address,
|
||||||
|
"mobile_number": user.mobile_number,
|
||||||
|
"password_changed_at": None,
|
||||||
|
"id": user.id,
|
||||||
|
"logged_in_at": None,
|
||||||
|
"state": user.state,
|
||||||
|
"failed_login_count": 0,
|
||||||
|
"permissions": permissions
|
||||||
|
}
|
||||||
|
assert json_resp['data'] == expected
|
||||||
|
|||||||
Reference in New Issue
Block a user