mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-16 02:02:13 -05:00
by hitting POST /<user_id>/email-code, we create an email two factor code to send to the user. That email contains a link with a token that will sign the user in when opened. Also some other things: "email verification" (aka when you first create an account) doesn't hit the API anymore refactor 2fa code verification and sending to use jsonschema, and share code between sms and email Die marshmallow die!
116 lines
3.1 KiB
Python
116 lines
3.1 KiB
Python
from random import (SystemRandom)
|
|
from datetime import (datetime, timedelta)
|
|
from sqlalchemy import func
|
|
from app import db
|
|
from app.models import (User, VerifyCode)
|
|
|
|
|
|
def _remove_values_for_keys_if_present(dict, keys):
|
|
for key in keys:
|
|
dict.pop(key, None)
|
|
|
|
|
|
def create_secret_code():
|
|
return ''.join(map(str, [SystemRandom().randrange(10) for i in range(5)]))
|
|
|
|
|
|
def save_user_attribute(usr, update_dict={}):
|
|
db.session.query(User).filter_by(id=usr.id).update(update_dict)
|
|
db.session.commit()
|
|
|
|
|
|
def save_model_user(usr, update_dict={}, pwd=None):
|
|
if pwd:
|
|
usr.password = pwd
|
|
usr.password_changed_at = datetime.utcnow()
|
|
if update_dict:
|
|
_remove_values_for_keys_if_present(update_dict, ['id', 'password_changed_at'])
|
|
db.session.query(User).filter_by(id=usr.id).update(update_dict)
|
|
else:
|
|
db.session.add(usr)
|
|
db.session.commit()
|
|
|
|
|
|
def create_user_code(user, code, code_type):
|
|
verify_code = VerifyCode(code_type=code_type,
|
|
expiry_datetime=datetime.utcnow() + timedelta(minutes=30),
|
|
user=user)
|
|
verify_code.code = code
|
|
db.session.add(verify_code)
|
|
db.session.commit()
|
|
return verify_code
|
|
|
|
|
|
def get_user_code(user, code, code_type):
|
|
# Get the most recent codes to try and reduce the
|
|
# time searching for the correct code.
|
|
codes = VerifyCode.query.filter_by(
|
|
user=user, code_type=code_type).order_by(
|
|
VerifyCode.created_at.desc())
|
|
return next((x for x in codes if x.check_code(code)), None)
|
|
|
|
|
|
def delete_codes_older_created_more_than_a_day_ago():
|
|
deleted = db.session.query(VerifyCode).filter(
|
|
VerifyCode.created_at < datetime.utcnow() - timedelta(hours=24)
|
|
).delete()
|
|
db.session.commit()
|
|
return deleted
|
|
|
|
|
|
def use_user_code(id):
|
|
verify_code = VerifyCode.query.get(id)
|
|
verify_code.code_used = True
|
|
db.session.add(verify_code)
|
|
db.session.commit()
|
|
|
|
|
|
def delete_model_user(user):
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
|
|
|
|
def delete_user_verify_codes(user):
|
|
VerifyCode.query.filter_by(user=user).delete()
|
|
db.session.commit()
|
|
|
|
|
|
def count_user_verify_codes(user):
|
|
query = VerifyCode.query.filter(
|
|
VerifyCode.user == user,
|
|
VerifyCode.expiry_datetime > datetime.utcnow(),
|
|
VerifyCode.code_used.is_(False)
|
|
)
|
|
return query.count()
|
|
|
|
|
|
def get_user_by_id(user_id=None):
|
|
if user_id:
|
|
return User.query.filter_by(id=user_id).one()
|
|
return User.query.filter_by().all()
|
|
|
|
|
|
def get_user_by_email(email):
|
|
return User.query.filter(func.lower(User.email_address) == func.lower(email)).one()
|
|
|
|
|
|
def increment_failed_login_count(user):
|
|
user.failed_login_count += 1
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
|
|
def reset_failed_login_count(user):
|
|
if user.failed_login_count > 0:
|
|
user.failed_login_count = 0
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
|
|
def update_user_password(user, password):
|
|
# reset failed login count - they've just reset their password so should be fine
|
|
user.password = password
|
|
user.password_changed_at = datetime.utcnow()
|
|
db.session.add(user)
|
|
db.session.commit()
|