Add current_session_id to the user model, update on login

when we change the last logged in time, set the current session id to
a random uuid

this way, we can compare it to the cookie a user has, and if they
differ then we can log them out

also update user.logged_in_at at 2FA rather than password check, since
that feels more accurate
This commit is contained in:
Leo Hemsted
2017-02-17 14:06:16 +00:00
parent 978d99f708
commit a47672f7e3
4 changed files with 44 additions and 12 deletions

View File

@@ -73,6 +73,7 @@ class User(db.Model):
failed_login_count = db.Column(db.Integer, nullable=False, default=0)
state = db.Column(db.String, nullable=False, default='pending')
platform_admin = db.Column(db.Boolean, nullable=False, default=False)
current_session_id = db.Column(UUID(as_uuid=True), nullable=True)
@property
def password(self):

View File

@@ -1,6 +1,9 @@
import json
import uuid
from datetime import datetime
from flask import (jsonify, request, Blueprint, current_app)
from app.dao.users_dao import (
get_user_by_id,
save_model_user,
@@ -32,7 +35,6 @@ from app.schemas import (
user_update_schema_load_json,
user_update_password_schema_load_json
)
from app.errors import (
register_errors,
InvalidRequest
@@ -94,8 +96,6 @@ def verify_user_password(user_id):
raise InvalidRequest(errors, status_code=400)
if user_to_verify.check_password(txt_pwd):
user_to_verify.logged_in_at = datetime.utcnow()
save_model_user(user_to_verify)
reset_failed_login_count(user_to_verify)
return jsonify({}), 204
else:
@@ -109,16 +109,16 @@ def verify_user_password(user_id):
def verify_user_code(user_id):
user_to_verify = get_user_by_id(user_id=user_id)
req_json = request.get_json()
txt_code = None
resp_json = request.get_json()
txt_type = None
errors = {}
try:
txt_code = resp_json['code']
txt_code = req_json['code']
except KeyError:
errors.update({'code': ['Required field missing data']})
try:
txt_type = resp_json['code_type']
txt_type = req_json['code_type']
except KeyError:
errors.update({'code_type': ['Required field missing data']})
if errors:
@@ -131,6 +131,11 @@ def verify_user_code(user_id):
if datetime.utcnow() > code.expiry_datetime or code.code_used:
increment_failed_login_count(user_to_verify)
raise InvalidRequest("Code has expired", status_code=400)
user_to_verify.current_session_id = str(uuid.uuid4())
user_to_verify.logged_in_at = datetime.utcnow()
save_model_user(user_to_verify)
use_user_code(code.id)
reset_failed_login_count(user_to_verify)
return jsonify({}), 204

View File

@@ -0,0 +1,22 @@
"""empty message
Revision ID: 0065_users_current_session_id
Revises: 0064_update_template_process
Create Date: 2017-02-17 11:48:40.669235
"""
# revision identifiers, used by Alembic.
revision = '0065_users_current_session_id'
down_revision = '0064_update_template_process'
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
def upgrade():
op.add_column('users', sa.Column('current_session_id', postgresql.UUID(as_uuid=True), nullable=True))
def downgrade():
op.drop_column('users', 'current_session_id')

View File

@@ -21,9 +21,11 @@ import app.celery.tasks
from tests import create_authorization_header
def test_user_verify_code(client,
sample_sms_code):
@freeze_time('2016-01-01T12:00:00')
def test_user_verify_code(client, sample_sms_code):
sample_sms_code.user.logged_in_at = datetime.utcnow() - timedelta(days=1)
assert not VerifyCode.query.first().code_used
assert sample_sms_code.user.current_session_id is None
data = json.dumps({
'code_type': sample_sms_code.code_type,
'code': sample_sms_code.txt_code})
@@ -34,6 +36,8 @@ def test_user_verify_code(client,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 204
assert VerifyCode.query.first().code_used
assert sample_sms_code.user.logged_in_at == datetime.utcnow()
assert sample_sms_code.user.current_session_id is not None
def test_user_verify_code_missing_code(client,
@@ -88,9 +92,9 @@ def test_user_verify_code_expired_code_and_increments_failed_login_count(
@freeze_time("2016-01-01 10:00:00.000000")
def test_user_verify_password(client,
notify_db_session,
sample_user):
def test_user_verify_password(client, sample_user):
yesterday = datetime.utcnow() - timedelta(days=1)
sample_user.logged_in_at = yesterday
data = json.dumps({'password': 'password'})
auth_header = create_authorization_header()
resp = client.post(
@@ -98,7 +102,7 @@ def test_user_verify_password(client,
data=data,
headers=[('Content-Type', 'application/json'), auth_header])
assert resp.status_code == 204
assert User.query.get(sample_user.id).logged_in_at == datetime.utcnow()
assert User.query.get(sample_user.id).logged_in_at == yesterday
def test_user_verify_password_invalid_password(client,