Merge pull request #1866 from alphagov/ft-notification-status-migration

Add ft_notification_status table and data migration command
This commit is contained in:
Katie Smith
2018-05-14 09:06:08 +01:00
committed by GitHub
3 changed files with 132 additions and 0 deletions

View File

@@ -603,3 +603,83 @@ def compare_ft_billing_to_monthly_billing(year, service_id=None):
path='/service/{}/billing/ft-monthly-usage?year={}'.format(service_id, year)):
ft_billing_response = get_yearly_usage_by_monthly_from_ft_billing(service_id)
compare_monthly_billing_to_ft_billing(ft_billing_response, monthly_billing_response)
@notify_command(name='migrate-data-to-ft-notification-status')
@click.option('-s', '--start_date', required=True, help="start date inclusive", type=click_dt(format='%Y-%m-%d'))
@click.option('-e', '--end_date', required=True, help="end date inclusive", type=click_dt(format='%Y-%m-%d'))
@statsd(namespace="tasks")
def migrate_data_to_ft_notification_status(start_date, end_date):
print('Notification statuses migration from date {} to {}'.format(start_date, end_date))
process_date = start_date
total_updated = 0
while process_date < end_date:
sql = \
"""
select count(*) from notification_history
where created_at >= (date :start + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
and created_at < (date :end + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
"""
num_notifications = db.session.execute(sql, {"start": process_date,
"end": process_date + timedelta(days=1)}).fetchall()[0][0]
sql = \
"""
select count(*) from
(select distinct template_id, service_id, job_id, notification_type, key_type, notification_status
from notification_history
where created_at >= (date :start + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
and created_at < (date :end + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
) as distinct_records
"""
predicted_records = db.session.execute(sql, {"start": process_date,
"end": process_date + timedelta(days=1)}).fetchall()[0][0]
start_time = datetime.now()
print('ft_notification-status: Migrating date: {}, notifications: {}, expecting {} ft_notification_status rows'
.format(process_date.date(), num_notifications, predicted_records))
# migrate data into ft_notification_status and update if record already exists
sql = \
"""
insert into ft_notification_status (bst_date, template_id, service_id, job_id, notification_type, key_type,
notification_status, notification_count)
select bst_date, template_id, service_id, job_id, notification_type, key_type, notification_status,
sum(notification_count) as notification_count
from (
select
da.bst_date,
n.template_id,
n.service_id,
coalesce(n.job_id, '00000000-0000-0000-0000-000000000000') as job_id,
n.notification_type,
n.key_type,
n.notification_status,
1 as notification_count
from public.notification_history n
left join dm_datetime da on n.created_at >= da.utc_daytime_start
and n.created_at < da.utc_daytime_end
where n.created_at >= (date :start + time '00:00:00') at time zone 'Europe/London'
at time zone 'UTC'
and n.created_at < (date :end + time '00:00:00') at time zone 'Europe/London' at time zone 'UTC'
) as individual_record
group by bst_date, template_id, service_id, job_id, notification_type, key_type, notification_status
order by bst_date
on conflict on constraint ft_notification_status_pkey do update set
notification_count = excluded.notification_count
"""
result = db.session.execute(sql, {"start": process_date, "end": process_date + timedelta(days=1)})
db.session.commit()
print('ft_notification_status: --- Completed took {}ms. Migrated {} rows.'.format(datetime.now() - start_time,
result.rowcount))
if predicted_records != result.rowcount:
print(' : ^^^ Result mismatch by {} rows ^^^'
.format(predicted_records - result.rowcount))
process_date += timedelta(days=1)
total_updated += result.rowcount
print('Total inserted/updated records = {}'.format(total_updated))

View File

@@ -1812,3 +1812,16 @@ class DateTimeDimension(db.Model):
Index('ix_dm_datetime_yearmonth', DateTimeDimension.year, DateTimeDimension.month)
class FactNotificationStatus(db.Model):
__tablename__ = "ft_notification_status"
bst_date = db.Column(db.Date, index=True, primary_key=True, nullable=False)
template_id = db.Column(UUID(as_uuid=True), primary_key=True, index=True, nullable=False)
service_id = db.Column(UUID(as_uuid=True), primary_key=True, index=True, nullable=False, )
job_id = db.Column(UUID(as_uuid=True), primary_key=True, index=True, nullable=False)
notification_type = db.Column(db.Text, primary_key=True, nullable=False)
key_type = db.Column(db.Text, primary_key=True, nullable=False)
notification_status = db.Column(db.Text, primary_key=True, nullable=False)
notification_count = db.Column(db.Integer(), nullable=False)

View File

@@ -0,0 +1,39 @@
"""
Revision ID: 0188_add_ft_notification_status
Revises: 0187_another_letter_org
Create Date: 2018-05-03 10:10:41.824981
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
revision = '0188_add_ft_notification_status'
down_revision = '0187_another_letter_org'
def upgrade():
op.create_table('ft_notification_status',
sa.Column('bst_date', sa.Date(), nullable=False),
sa.Column('template_id', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('service_id', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('job_id', postgresql.UUID(as_uuid=True), nullable=False),
sa.Column('notification_type', sa.Text(), nullable=False),
sa.Column('key_type', sa.Text(), nullable=False),
sa.Column('notification_status', sa.Text(), nullable=False),
sa.Column('notification_count', sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint('bst_date', 'template_id', 'service_id', 'job_id', 'notification_type', 'key_type', 'notification_status')
)
op.create_index(op.f('ix_ft_notification_status_bst_date'), 'ft_notification_status', ['bst_date'], unique=False)
op.create_index(op.f('ix_ft_notification_status_job_id'), 'ft_notification_status', ['job_id'], unique=False)
op.create_index(op.f('ix_ft_notification_status_service_id'), 'ft_notification_status', ['service_id'], unique=False)
op.create_index(op.f('ix_ft_notification_status_template_id'), 'ft_notification_status', ['template_id'], unique=False)
def downgrade():
op.drop_index(op.f('ix_ft_notification_status_bst_date'), table_name='ft_notification_status')
op.drop_index(op.f('ix_ft_notification_status_template_id'), table_name='ft_notification_status')
op.drop_index(op.f('ix_ft_notification_status_service_id'), table_name='ft_notification_status')
op.drop_index(op.f('ix_ft_notification_status_job_id'), table_name='ft_notification_status')
op.drop_table('ft_notification_status')