mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-05 02:41:14 -05:00
Moved the _generate_token methods to the tokens_dao.
This commit is contained in:
@@ -1,17 +1,25 @@
|
||||
from flask import current_app
|
||||
from itsdangerous import URLSafeSerializer
|
||||
|
||||
from app import db
|
||||
from app.models import Token
|
||||
|
||||
|
||||
def save_token_model(token, update_dict={}):
|
||||
def save_model_token(token, update_dict={}):
|
||||
if update_dict:
|
||||
del update_dict['id']
|
||||
db.session.query(Token).filter_by(id=token.id).update(update_dict)
|
||||
else:
|
||||
token.token = _generate_token()
|
||||
db.session.add(token)
|
||||
db.session.commit()
|
||||
|
||||
|
||||
def get_model_tokens(service_id=None, raise_=True):
|
||||
"""
|
||||
:param raise_: when True query tokens using one() which will raise NoResultFound exception
|
||||
when False query tokens usong first() which will return None and not raise an exception.
|
||||
"""
|
||||
if service_id:
|
||||
# If expiry date is None the token is active
|
||||
if raise_:
|
||||
@@ -19,3 +27,25 @@ def get_model_tokens(service_id=None, raise_=True):
|
||||
else:
|
||||
return Token.query.filter_by(service_id=service_id, expiry_date=None).first()
|
||||
return Token.query.filter_by().all()
|
||||
|
||||
|
||||
def get_unsigned_token(service_id):
|
||||
"""
|
||||
There should only be one valid token for each service.
|
||||
This method can only be exposed to the Authentication of the api calls.
|
||||
"""
|
||||
token = Token.query.filter_by(service_id=service_id, expiry_date=None).one()
|
||||
return _get_token(token.token)
|
||||
|
||||
|
||||
def _generate_token(token=None):
|
||||
import uuid
|
||||
if not token:
|
||||
token = uuid.uuid4()
|
||||
serializer = URLSafeSerializer(current_app.config.get('SECRET_KEY'))
|
||||
return serializer.dumps(str(token), current_app.config.get('DANGEROUS_SALT'))
|
||||
|
||||
|
||||
def _get_token(token):
|
||||
serializer = URLSafeSerializer(current_app.config.get('SECRET_KEY'))
|
||||
return serializer.loads(token, salt=current_app.config.get('DANGEROUS_SALT'))
|
||||
|
||||
@@ -31,6 +31,8 @@ class TemplateSchema(ma.ModelSchema):
|
||||
class TokenSchema(ma.ModelSchema):
|
||||
class Meta:
|
||||
model = models.Token
|
||||
exclude = ["service"]
|
||||
|
||||
|
||||
user_schema = UserSchema()
|
||||
users_schema = UserSchema(many=True)
|
||||
|
||||
@@ -1,22 +1,20 @@
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from flask import (jsonify, request, current_app)
|
||||
|
||||
from flask import (jsonify, request)
|
||||
from sqlalchemy.exc import DataError
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
|
||||
from app import db
|
||||
from app.dao import DAOException
|
||||
from app.dao.services_dao import (
|
||||
save_model_service, get_model_services, delete_model_service)
|
||||
from app.dao.tokens_dao import (save_token_model, get_model_tokens)
|
||||
from app.dao.users_dao import get_model_users
|
||||
from app.dao.templates_dao import (
|
||||
save_model_template, get_model_templates)
|
||||
from app.dao import DAOException
|
||||
from .. import service
|
||||
from app import db
|
||||
from app.schemas import (services_schema, service_schema, token_schema)
|
||||
save_model_template, get_model_templates, delete_model_template)
|
||||
from app.dao.tokens_dao import (save_model_token, get_model_tokens, get_unsigned_token)
|
||||
from app.models import Token
|
||||
from itsdangerous import URLSafeSerializer
|
||||
from app.schemas import (
|
||||
services_schema, service_schema, template_schema, templates_schema)
|
||||
services_schema, service_schema, template_schema)
|
||||
from .. import service
|
||||
|
||||
|
||||
# TODO auth to be added.
|
||||
@@ -30,11 +28,10 @@ def create_service():
|
||||
# db.session.commit
|
||||
try:
|
||||
save_model_service(service)
|
||||
token = str(uuid.uuid4())
|
||||
save_token_model(Token(service_id=service.id, token=_generate_token(token)))
|
||||
save_model_token(Token(service_id=service.id))
|
||||
except DAOException as e:
|
||||
return jsonify(result="error", message=str(e)), 400
|
||||
return jsonify(data=service_schema.dump(service).data, token=token), 201
|
||||
return jsonify(data=service_schema.dump(service).data, token=get_unsigned_token(service.id)), 201
|
||||
|
||||
|
||||
# TODO auth to be added
|
||||
@@ -84,30 +81,29 @@ def get_service(service_id=None):
|
||||
@service.route('/<int:service_id>/token/renew', methods=['POST'])
|
||||
def renew_token(service_id=None):
|
||||
try:
|
||||
service = get_model_services(service_id=service_id)
|
||||
get_model_services(service_id=service_id)
|
||||
except DataError:
|
||||
return jsonify(result="error", message="Invalid service id"), 400
|
||||
except NoResultFound:
|
||||
return jsonify(result="error", message="Service not found"), 404
|
||||
|
||||
token = _generate_token()
|
||||
try:
|
||||
service_token = get_model_tokens(service_id=service_id, raise_=False)
|
||||
if service_token:
|
||||
# expire existing token
|
||||
save_token_model(service_token, update_dict={'id': service_token.id, 'expiry_date': datetime.now()})
|
||||
save_model_token(service_token, update_dict={'id': service_token.id, 'expiry_date': datetime.now()})
|
||||
# create a new one
|
||||
save_token_model(Token(service_id=service_id, token=token))
|
||||
save_model_token(Token(service_id=service_id))
|
||||
except DAOException as e:
|
||||
return jsonify(result='error', message=str(e)), 400
|
||||
unsigned_token = str(_get_token(token))
|
||||
unsigned_token = get_unsigned_token(service_id)
|
||||
return jsonify(data=unsigned_token), 201
|
||||
|
||||
|
||||
@service.route('/<int:service_id>/token/revoke', methods=['POST'])
|
||||
def revoke_token(service_id):
|
||||
try:
|
||||
service = get_model_services(service_id=service_id)
|
||||
get_model_services(service_id=service_id)
|
||||
except DataError:
|
||||
return jsonify(result="error", message="Invalid service id"), 400
|
||||
except NoResultFound:
|
||||
@@ -115,20 +111,8 @@ def revoke_token(service_id):
|
||||
|
||||
service_token = get_model_tokens(service_id=service_id, raise_=False)
|
||||
if service_token:
|
||||
save_token_model(service_token, update_dict={'id': service_token.id, 'expiry_date': datetime.now()})
|
||||
return jsonify(data=token_schema.dump(service_token)), 202
|
||||
|
||||
|
||||
def _generate_token(token=None):
|
||||
if not token:
|
||||
token = uuid.uuid4()
|
||||
serializer = URLSafeSerializer(current_app.config.get('SECRET_KEY'))
|
||||
return serializer.dumps(str(token), current_app.config.get('DANGEROUS_SALT'))
|
||||
|
||||
|
||||
def _get_token(token):
|
||||
serializer = URLSafeSerializer(current_app.config.get('SECRET_KEY'))
|
||||
return serializer.loads(token, salt=current_app.config.get('DANGEROUS_SALT'))
|
||||
save_model_token(service_token, update_dict={'id': service_token.id, 'expiry_date': datetime.now()})
|
||||
return jsonify(), 202
|
||||
|
||||
|
||||
# TODO auth to be added.
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import pytest
|
||||
from app.models import (User, Service, Template)
|
||||
from app.dao.users_dao import (save_model_user, get_model_users)
|
||||
from app.models import (User, Service, Template, Token)
|
||||
from app.dao.users_dao import (save_model_user)
|
||||
from app.dao.services_dao import save_model_service
|
||||
from app.dao.templates_dao import save_model_template
|
||||
from app.dao.tokens_dao import save_model_token
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
@@ -50,3 +51,16 @@ def sample_template(notify_db,
|
||||
template = Template(**data)
|
||||
save_model_template(template)
|
||||
return template
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def sample_token(notify_db,
|
||||
notify_db_session,
|
||||
service=None):
|
||||
import uuid
|
||||
if service is None:
|
||||
service = sample_service(notify_db, notify_db_session)
|
||||
data = {'service_id': service.id}
|
||||
token = Token(**data)
|
||||
save_model_token(token)
|
||||
return token
|
||||
|
||||
@@ -1,52 +1,62 @@
|
||||
import uuid
|
||||
from app.dao import tokens_dao
|
||||
from app.dao.tokens_dao import (save_model_token, get_model_tokens, get_unsigned_token, _generate_token, _get_token)
|
||||
from datetime import datetime
|
||||
|
||||
from app.models import Token
|
||||
from pytest import fail
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
|
||||
|
||||
def test_token_is_signed_and_can_be_read_again(notify_api):
|
||||
import uuid
|
||||
with notify_api.test_request_context():
|
||||
token = str(uuid.uuid4())
|
||||
signed_token = _generate_token(token=token)
|
||||
assert token == _get_token(signed_token)
|
||||
assert signed_token != token
|
||||
|
||||
|
||||
def test_save_token_should_create_new_token(notify_api, notify_db, notify_db_session, sample_service):
|
||||
token = uuid.uuid4()
|
||||
api_token = Token(**{'token': token, 'service_id': sample_service.id})
|
||||
api_token = Token(**{'service_id': sample_service.id})
|
||||
save_model_token(api_token)
|
||||
|
||||
tokens_dao.save_token_model(api_token)
|
||||
|
||||
all_tokens = tokens_dao.get_model_tokens()
|
||||
all_tokens = get_model_tokens()
|
||||
assert len(all_tokens) == 1
|
||||
assert all_tokens[0].token == str(token)
|
||||
assert all_tokens[0] == api_token
|
||||
|
||||
|
||||
def test_save_token_should_update_the_token(notify_api, notify_db, notify_db_session, sample_service):
|
||||
api_token = Token(**{'token': uuid.uuid4(), 'service_id': sample_service.id})
|
||||
tokens_dao.save_token_model(api_token)
|
||||
def test_save_token_should_update_the_token(notify_api, notify_db, notify_db_session, sample_token):
|
||||
now = datetime.utcnow()
|
||||
saved_token = tokens_dao.get_model_tokens(sample_service.id)
|
||||
tokens_dao.save_token_model(saved_token, update_dict={'id': saved_token.id, 'expiry_date': now})
|
||||
all_tokens = tokens_dao.get_model_tokens()
|
||||
saved_token = get_model_tokens(sample_token.service_id)
|
||||
save_model_token(saved_token, update_dict={'id': saved_token.id, 'expiry_date': now})
|
||||
all_tokens = get_model_tokens()
|
||||
assert len(all_tokens) == 1
|
||||
assert all_tokens[0].expiry_date == now
|
||||
assert all_tokens[0].token == saved_token.token
|
||||
assert all_tokens[0].id == saved_token.id
|
||||
assert all_tokens[0].service_id == saved_token.service_id
|
||||
|
||||
|
||||
def test_get_token_should_raise_exception_when_service_does_not_exist(notify_api, notify_db, notify_db_session,
|
||||
sample_service):
|
||||
try:
|
||||
tokens_dao.get_model_tokens(sample_service.id)
|
||||
fail()
|
||||
get_model_tokens(sample_service.id)
|
||||
fail("Should have thrown a NoResultFound exception")
|
||||
except NoResultFound:
|
||||
pass
|
||||
|
||||
|
||||
def test_get_token_should_return_none_when_service_does_not_exist(notify_api, notify_db, notify_db_session,
|
||||
sample_service):
|
||||
assert tokens_dao.get_model_tokens(service_id=sample_service.id, raise_=False) is None
|
||||
assert get_model_tokens(service_id=sample_service.id, raise_=False) is None
|
||||
|
||||
|
||||
def test_should_return_token_for_service(notify_api, notify_db, notify_db_session, sample_service):
|
||||
the_token = str(uuid.uuid4())
|
||||
api_token = Token(**{'token': the_token, 'service_id': sample_service.id})
|
||||
tokens_dao.save_token_model(api_token)
|
||||
token = tokens_dao.get_model_tokens(sample_service.id)
|
||||
assert token.service_id == sample_service.id
|
||||
assert token.token == str(the_token)
|
||||
def test_should_return_token_for_service(notify_api, notify_db, notify_db_session, sample_token):
|
||||
token = get_model_tokens(sample_token.service_id)
|
||||
assert token == sample_token
|
||||
|
||||
|
||||
def test_should_return_unsigned_token_for_service_id(notify_api, notify_db, notify_db_session,
|
||||
sample_token):
|
||||
unsigned_token = get_unsigned_token(sample_token.service_id)
|
||||
assert sample_token.token != unsigned_token
|
||||
assert unsigned_token == _get_token(sample_token.token)
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
import json
|
||||
from app.models import (Service, User, Token, Template)
|
||||
from app.dao.services_dao import save_model_service
|
||||
from tests.app.conftest import sample_user as create_sample_user
|
||||
|
||||
from flask import url_for
|
||||
|
||||
from app.dao.services_dao import save_model_service
|
||||
from app.models import (Service, User, Token, Template)
|
||||
from tests.app.conftest import sample_user as create_sample_user
|
||||
|
||||
|
||||
def test_get_service_list(notify_api, notify_db, notify_db_session, sample_service):
|
||||
"""
|
||||
@@ -308,13 +310,11 @@ def test_create_token_should_return_error_when_service_does_not_exist(notify_api
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
def test_revoke_token_should_expire_token_for_service(notify_api, notify_db, notify_db_session, sample_service):
|
||||
def test_revoke_token_should_expire_token_for_service(notify_api, notify_db, notify_db_session, sample_token):
|
||||
with notify_api.test_request_context():
|
||||
with notify_api.test_client() as client:
|
||||
client.post(url_for('service.renew_token', service_id=sample_service.id),
|
||||
headers=[('Content-Type', 'application/json')])
|
||||
assert len(Token.query.all()) == 1
|
||||
response = client.post(url_for('service.revoke_token', service_id=sample_service.id))
|
||||
response = client.post(url_for('service.revoke_token', service_id=sample_token.service_id))
|
||||
assert response.status_code == 202
|
||||
all_tokens = Token.query.all()
|
||||
assert len(all_tokens) == 1
|
||||
@@ -339,15 +339,6 @@ def test_create_service_should_create_new_token_for_service(notify_api, notify_d
|
||||
assert len(Token.query.all()) == 1
|
||||
|
||||
|
||||
def test_token_generated_can_be_read_again(notify_api):
|
||||
from app.service.views.rest import (_generate_token, _get_token)
|
||||
import uuid
|
||||
with notify_api.test_request_context():
|
||||
token = str(uuid.uuid4())
|
||||
signed_token = _generate_token(token=token)
|
||||
assert token == _get_token(signed_token)
|
||||
|
||||
|
||||
def test_create_template(notify_api, notify_db, notify_db_session, sample_service):
|
||||
"""
|
||||
Tests POST endpoint '/<service_id>/template' a template can be created
|
||||
|
||||
Reference in New Issue
Block a user