Fix for git commit review.

This commit is contained in:
Nicholas Staples
2016-02-29 11:50:43 +00:00
parent 9b73b0d9f8
commit 2cf2b51cd9
6 changed files with 39 additions and 42 deletions

View File

@@ -1,6 +1,4 @@
from sqlalchemy.exc import SQLAlchemyError
from werkzeug.datastructures import MultiDict
from sqlalchemy.orm.relationships import RelationshipProperty
from app import db
@@ -19,37 +17,14 @@ class DAOClass(object):
if _commit:
db.session.commit()
def update_instance(self, inst, update_dict):
def update_instance(self, inst, update_dict, _commit=True):
# Make sure the id is not included in the update_dict
update_dict.pop('id')
self.Meta.model.query.filter_by(id=inst.id).update(update_dict)
db.session.commit()
if _commit:
db.session.commit()
def get_query(self, filter_by_dict={}):
if isinstance(filter_by_dict, dict):
filter_by_dict = MultiDict(filter_by_dict)
query = self.Meta.model.query
for k in filter_by_dict.keys():
query = self._build_query(query, k, filter_by_dict.getlist(k))
return query
def delete_instance(self, inst):
def delete_instance(self, inst, _commit=True):
db.session.delete(inst)
db.session.commit()
def _build_query(self, query, key, values):
# TODO Lots to do here to work with all types of filters.
field = getattr(self.Meta.model, key, None)
filters = getattr(self.Meta, 'filter', [key])
if field and key in filters:
if isinstance(field.property, RelationshipProperty):
if len(values) == 1:
query = query.filter_by(**{key: field.property.mapper.class_.query.get(values[0])})
elif len(values) > 1:
query = query.filter(field.in_(field.property.mapper.class_.query.any(values[0])))
else:
if len(values) == 1:
query = query.filter_by(**{key: values[0]})
elif len(values) > 1:
query = query.filter(field.in_(values))
return query
if _commit:
db.session.commit()

View File

@@ -1,5 +1,6 @@
from app.dao import DAOClass
from app.models import Permission
from app.models import (Permission, Service, User)
from werkzeug.datastructures import MultiDict
# Service Permissions
@@ -15,6 +16,26 @@ class PermissionDAO(DAOClass):
class Meta:
model = Permission
def get_query(self, filter_by_dict={}):
if isinstance(filter_by_dict, dict):
filter_by_dict = MultiDict(filter_by_dict)
query = self.Meta.model.query
if 'id' in filter_by_dict:
query = query.filter(Permission.id.in_(filter_by_dict.getlist('id')))
if 'service' in filter_by_dict:
service_ids = filter_by_dict.getlist('service')
if len(service_ids) == 1:
query.filter_by(service=Service.query.get(service_ids[0]))
# TODO the join method for multiple services
if 'user' in filter_by_dict:
user_ids = filter_by_dict.getlist('service')
if len(user_ids) == 1:
query = query.filter_by(user=User.query.get(user_ids[0]))
# TODO the join method for multiple users
if 'permission' in filter_by_dict:
query = query.filter(Permission.permission.in_(filter_by_dict.getlist('permission')))
return query
def add_default_service_permissions_for_user(self, user, service):
for name in default_service_permissions:
permission = Permission(permission=name, user=user, service=service)

View File

@@ -24,12 +24,13 @@ def dao_create_service(service, user):
from app.dao.permissions_dao import permission_dao
service.users.append(user)
permission_dao.add_default_service_permissions_for_user(user, service)
db.session.add(service)
except Exception as e:
# Proper clean up
db.session.rollback()
raise e
db.session.add(service)
db.session.commit()
else:
db.session.commit()
def dao_update_service(service):

View File

@@ -50,7 +50,6 @@ def test_get_job_with_invalid_job_id_returns404(notify_api, sample_template):
response = client.get(path, headers=[auth_header])
assert response.status_code == 404
resp_json = json.loads(response.get_data(as_text=True))
print(resp_json)
assert resp_json['result'] == 'error'
assert resp_json['message'] == 'No result found'
@@ -218,7 +217,6 @@ def test_get_update_job(notify_api, sample_job):
def _setup_jobs(notify_db, notify_db_session, template, number_of_jobs=5):
for i in range(number_of_jobs):
print(i)
create_job(
notify_db,
notify_db_session,

View File

@@ -29,7 +29,8 @@ def test_get_user_list(notify_api, notify_db, notify_db_session, sample_user, sa
"logged_in_at": None,
"state": "active",
"failed_login_count": 0,
"permissions": {}
"permissions": {
str(sample_admin_service_id): ['manage_service', 'send_messages', 'manage_api_keys']}
}
print(json_resp['data'])
assert expected in json_resp['data']
@@ -58,7 +59,8 @@ def test_get_user(notify_api, notify_db, notify_db_session, sample_user, sample_
"logged_in_at": None,
"state": "active",
"failed_login_count": 0,
"permissions": {}
"permissions": {
str(sample_admin_service_id): ['manage_service', 'send_messages', 'manage_api_keys']}
}
assert json_resp['data'] == expected
@@ -197,7 +199,8 @@ def test_put_user(notify_api, notify_db, notify_db_session, sample_user, sample_
"logged_in_at": None,
"state": "active",
"failed_login_count": 0,
"permissions": {}
"permissions": {
str(sample_admin_service_id): ['manage_service', 'send_messages', 'manage_api_keys']}
}
assert json_resp['data'] == expected
assert json_resp['data']['email_address'] == new_email
@@ -295,7 +298,8 @@ def test_get_user_by_email(notify_api, notify_db, notify_db_session, sample_user
"logged_in_at": None,
"state": "active",
"failed_login_count": 0,
"permissions": {}
"permissions": {
str(sample_admin_service_id): ['manage_service', 'send_messages', 'manage_api_keys']}
}
assert json_resp['data'] == expected

View File

@@ -358,7 +358,6 @@ def test_send_user_sms_code(notify_api,
url_for('user.send_user_sms_code', user_id=sample_sms_code.user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
print(resp.get_data(as_text=True))
assert resp.status_code == 204
app.celery.tasks.send_sms_code.apply_async.assert_called_once_with(['something_encrypted'],
queue='sms-code')
@@ -427,7 +426,6 @@ def test_send_user_email_code(notify_api,
url_for('user.send_user_email_code', user_id=sample_email_code.user.id),
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
print(resp.get_data(as_text=True))
assert resp.status_code == 204
app.celery.tasks.send_email_code.apply_async.assert_called_once_with(['something_encrypted'],
queue='email-code')