use joinedload to only hit the database once per request

also:

* only include active orgs
* write lots of tests
This commit is contained in:
Leo Hemsted
2018-03-13 13:07:02 +00:00
parent 91fa475645
commit 5871dee606
4 changed files with 150 additions and 8 deletions

View File

@@ -1,6 +1,9 @@
from random import (SystemRandom)
from datetime import (datetime, timedelta)
from sqlalchemy import func
from sqlalchemy.orm import joinedload
from app import db
from app.models import (User, VerifyCode)
@@ -113,3 +116,16 @@ def update_user_password(user, password):
user.password_changed_at = datetime.utcnow()
db.session.add(user)
db.session.commit()
def get_user_and_accounts(user_id):
return User.query.filter(
User.id == user_id
).options(
# eagerly load the user's services and organisations, and also the service's org and vice versa
# (so we can see if the user knows about it)
joinedload('services'),
joinedload('organisations'),
joinedload('organisations.services'),
joinedload('services.organisation'),
).one()

View File

@@ -116,11 +116,11 @@ class User(db.Model):
services = db.relationship(
'Service',
secondary='user_to_service',
backref=db.backref('user_to_service', lazy='dynamic'))
backref='user_to_service')
organisations = db.relationship(
'Organisation',
secondary='user_to_organisation',
backref=db.backref('users', lazy='dynamic'))
backref='users')
@property
def password(self):

View File

@@ -19,7 +19,8 @@ from app.dao.users_dao import (
create_secret_code,
save_user_attribute,
update_user_password,
count_user_verify_codes
count_user_verify_codes,
get_user_and_accounts
)
from app.dao.permissions_dao import permission_dao
from app.dao.services_dao import dao_fetch_service_by_id
@@ -397,8 +398,7 @@ def update_password(user_id):
@user_blueprint.route('/<uuid:user_id>/organisations-and-services', methods=['GET'])
def get_organisations_and_services_for_user(user_id):
user = get_user_by_id(user_id=user_id)
user = get_user_and_accounts(user_id)
data = {
'organisations': [
{
@@ -410,10 +410,10 @@ def get_organisations_and_services_for_user(user_id):
'name': service.name
}
for service in org.services
if service.active and user in service.users
if service.active and service in user.services
]
}
for org in user.organisations
for org in user.organisations if org.active
],
'services_without_organisations': [
{
@@ -426,7 +426,7 @@ def get_organisations_and_services_for_user(user_id):
# but not one that the user can see.
(
not service.organisation or
user not in service.organisation.users
service.organisation not in user.organisations
)
)
]

View File

@@ -15,6 +15,7 @@ from app.models import (
)
from app.dao.permissions_dao import default_service_permissions
from tests import create_authorization_header
from tests.app.db import create_service, create_organisation, create_user
def test_get_user_list(admin_request, sample_service):
@@ -605,3 +606,128 @@ def test_cannot_update_user_password_using_attributes_method(admin_request, samp
_expected_status=400
)
assert resp['message']['_schema'] == ['Unknown field name password']
def test_get_orgs_and_services_nests_services(admin_request, sample_user):
org1 = create_organisation(name='org1')
org2 = create_organisation(name='org2')
service1 = create_service(service_name='service1')
service2 = create_service(service_name='service2')
service3 = create_service(service_name='service3')
org1.services = [service1, service2]
org2.services = []
sample_user.organisations = [org1, org2]
sample_user.services = [service1, service2, service3]
resp = admin_request.get('user.get_organisations_and_services_for_user', user_id=sample_user.id)
assert resp == {
'organisations': [
{
'name': org1.name,
'id': str(org1.id),
'services': [
{
'name': service1.name,
'id': str(service1.id)
},
{
'name': service2.name,
'id': str(service2.id)
}
]
},
{
'name': org2.name,
'id': str(org2.id),
'services': []
}
],
'services_without_organisations': [
{
'name': service3.name,
'id': str(service3.id)
}
]
}
def test_get_orgs_and_services_only_returns_active(admin_request, sample_user):
org1 = create_organisation(name='org1', active=True)
org2 = create_organisation(name='org2', active=False)
# in an active org
service1 = create_service(service_name='service1', active=True)
service2 = create_service(service_name='service2', active=False)
# active but in an inactive org
service3 = create_service(service_name='service3', active=True)
# not in an org
service4 = create_service(service_name='service4', active=True)
service5 = create_service(service_name='service5', active=False)
org1.services = [service1, service2]
org2.services = [service3]
sample_user.organisations = [org1, org2]
sample_user.services = [service1, service2, service3, service4, service5]
resp = admin_request.get('user.get_organisations_and_services_for_user', user_id=sample_user.id)
assert resp == {
'organisations': [
{
'name': org1.name,
'id': str(org1.id),
'services': [
{
'name': service1.name,
'id': str(service1.id)
}
]
}
],
'services_without_organisations': [
{
'name': service4.name,
'id': str(service4.id)
}
]
}
def test_get_orgs_and_services_only_shows_users_orgs_and_services(admin_request, sample_user):
other_user = create_user(email='other@user.com')
org1 = create_organisation(name='org1')
org2 = create_organisation(name='org2')
service1 = create_service(service_name='service1')
service2 = create_service(service_name='service2')
org1.services = [service1]
sample_user.organisations = [org2]
sample_user.services = [service1]
other_user.organisations = [org1, org2]
other_user.services = [service1, service2]
resp = admin_request.get('user.get_organisations_and_services_for_user', user_id=sample_user.id)
assert resp == {
'organisations': [
{
'name': org2.name,
'id': str(org2.id),
'services': []
}
],
# service1 belongs to org1, but the user doesn't know about org1
'services_without_organisations': [
{
'name': service1.name,
'id': str(service1.id)
}
]
}