mirror of
https://github.com/GSA/notifications-admin.git
synced 2026-02-05 02:42:26 -05:00
110067722: Create the model and dao for services.
This commit creates the data model for services and user_to_service. The dao is also created to insert, get, activate, and unrestrict the service.
This commit is contained in:
38
app/main/dao/services_dao.py
Normal file
38
app/main/dao/services_dao.py
Normal file
@@ -0,0 +1,38 @@
|
||||
from datetime import datetime
|
||||
|
||||
from app import db
|
||||
from app.models import Service
|
||||
|
||||
|
||||
def insert_new_service(service_name, user):
|
||||
service = Service(name=service_name,
|
||||
created_at=datetime.now(),
|
||||
limit=1000,
|
||||
active=False,
|
||||
restricted=True)
|
||||
|
||||
add_service(service)
|
||||
service.users.append(user)
|
||||
db.session.commit()
|
||||
return service.id
|
||||
|
||||
|
||||
def get_service_by_id(id):
|
||||
return Service.query.get(id)
|
||||
|
||||
|
||||
def unrestrict_service(service_id):
|
||||
service = get_service_by_id(service_id)
|
||||
service.restricted = False
|
||||
add_service(service)
|
||||
|
||||
|
||||
def activate_service(service_id):
|
||||
service = get_service_by_id(service_id)
|
||||
service.active = True
|
||||
add_service(service)
|
||||
|
||||
|
||||
def add_service(service):
|
||||
db.session.add(service)
|
||||
db.session.commit()
|
||||
@@ -78,6 +78,41 @@ class User(db.Model):
|
||||
return True
|
||||
|
||||
|
||||
user_to_service = db.Table(
|
||||
'user_to_service',
|
||||
db.Model.metadata,
|
||||
db.Column('user_id', db.Integer, db.ForeignKey('users.id')),
|
||||
db.Column('service_id', db.Integer, db.ForeignKey('services.id'))
|
||||
)
|
||||
|
||||
|
||||
class Service(db.Model):
|
||||
__tablename__ = 'services'
|
||||
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
name = db.Column(db.String(255), nullable=False)
|
||||
|
||||
token_id = db.Column(db.BigInteger, index=True, unique=True)
|
||||
created_at = db.Column(db.DateTime, index=False, unique=False, nullable=False)
|
||||
active = db.Column(db.Boolean, index=False, unique=False, nullable=False)
|
||||
limit = db.Column(db.BigInteger, index=False, unique=False, nullable=False)
|
||||
users = db.relationship('User', secondary=user_to_service, backref=db.backref('user_to_service', lazy='dynamic'))
|
||||
restricted = db.Column(db.Boolean, index=False, unique=False, nullable=False)
|
||||
|
||||
def serialize(self):
|
||||
serialized = {
|
||||
'id': self.id,
|
||||
'name': self.name,
|
||||
'createdAt': self.created_at.strftime(DATETIME_FORMAT),
|
||||
'active': self.active,
|
||||
'restricted': self.restricted,
|
||||
'limit': self.limit,
|
||||
'user': self.users.serialize()
|
||||
}
|
||||
|
||||
return filter_null_value_fields(serialized)
|
||||
|
||||
|
||||
def filter_null_value_fields(obj):
|
||||
return dict(
|
||||
filter(lambda x: x[1] is not None, obj.items())
|
||||
|
||||
44
migrations/versions/60_add_service.py
Normal file
44
migrations/versions/60_add_service.py
Normal file
@@ -0,0 +1,44 @@
|
||||
"""empty message
|
||||
|
||||
Revision ID: 60_add_service
|
||||
Revises: 50_alter_verify_code_type
|
||||
Create Date: 2015-12-14 15:22:55.938819
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '60_add_service'
|
||||
down_revision = '50_alter_verify_code_type'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table('services',
|
||||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
sa.Column('name', sa.String(length=255), nullable=False),
|
||||
sa.Column('token_id', sa.BigInteger(), nullable=True),
|
||||
sa.Column('created_at', sa.DateTime(), nullable=False),
|
||||
sa.Column('active', sa.Boolean(), nullable=False),
|
||||
sa.Column('limit', sa.BigInteger(), nullable=False),
|
||||
sa.Column('restricted', sa.Boolean(), nullable=False),
|
||||
sa.PrimaryKeyConstraint('id')
|
||||
)
|
||||
op.create_index(op.f('ix_services_token_id'), 'services', ['token_id'], unique=True)
|
||||
op.create_table('user_to_service',
|
||||
sa.Column('user_id', sa.Integer(), nullable=True),
|
||||
sa.Column('service_id', sa.Integer(), nullable=True),
|
||||
sa.ForeignKeyConstraint(['service_id'], ['services.id'], ),
|
||||
sa.ForeignKeyConstraint(['user_id'], ['users.id'], )
|
||||
)
|
||||
### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_table('user_to_service')
|
||||
op.drop_index(op.f('ix_services_token_id'), table_name='services')
|
||||
op.drop_table('services')
|
||||
### end Alembic commands ###
|
||||
39
tests/app/main/dao/test_service_dao.py
Normal file
39
tests/app/main/dao/test_service_dao.py
Normal file
@@ -0,0 +1,39 @@
|
||||
from app.main.dao import services_dao
|
||||
from tests.app.main import create_test_user
|
||||
|
||||
|
||||
def test_can_insert_and_retrieve_new_service(notifications_admin, notifications_admin_db, notify_db_session):
|
||||
user = create_test_user()
|
||||
|
||||
id = services_dao.insert_new_service('testing service', user)
|
||||
saved_service = services_dao.get_service_by_id(id)
|
||||
assert id == saved_service.id
|
||||
assert saved_service.users == [user]
|
||||
assert saved_service.name == 'testing service'
|
||||
|
||||
|
||||
def test_unrestrict_service_updates_the_service(notifications_admin, notifications_admin_db, notify_db_session):
|
||||
user = create_test_user()
|
||||
id = services_dao.insert_new_service('unrestricted service', user)
|
||||
saved_service = services_dao.get_service_by_id(id)
|
||||
assert saved_service.restricted is True
|
||||
services_dao.unrestrict_service(id)
|
||||
unrestricted_service = services_dao.get_service_by_id(id)
|
||||
assert unrestricted_service.restricted is False
|
||||
|
||||
|
||||
def test_activate_service_update_service(notifications_admin, notifications_admin_db, notify_db_session):
|
||||
user = create_test_user()
|
||||
id = services_dao.insert_new_service('activated service', user)
|
||||
service = services_dao.get_service_by_id(id)
|
||||
assert service.active is False
|
||||
services_dao.activate_service(id)
|
||||
activated_service = services_dao.get_service_by_id(id)
|
||||
assert activated_service.active is True
|
||||
|
||||
|
||||
def test_get_service_returns_none_if_service_does_not_exist(notifications_admin,
|
||||
notifications_admin_db,
|
||||
notify_db_session):
|
||||
service = services_dao.get_service_by_id(1)
|
||||
assert service is None
|
||||
Reference in New Issue
Block a user