From 4e915462b8642d1e507888981fcc31eca430a803 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 12:00:21 +0000 Subject: [PATCH 01/31] Migrate arbitrary amounts of synth data --- flowdb/testdata/bin/9910_migrate_test_data.sh | 15 + .../testdata/bin/9910_migrate_test_data.sql | 389 ------------------ flowdb/testdata/bin/migrate_synth_data.py | 374 +++++++++++++++++ flowdb_testdata.Dockerfile.dockerignore | 3 +- 4 files changed, 391 insertions(+), 390 deletions(-) create mode 100644 flowdb/testdata/bin/9910_migrate_test_data.sh delete mode 100644 flowdb/testdata/bin/9910_migrate_test_data.sql create mode 100644 flowdb/testdata/bin/migrate_synth_data.py diff --git a/flowdb/testdata/bin/9910_migrate_test_data.sh b/flowdb/testdata/bin/9910_migrate_test_data.sh new file mode 100644 index 0000000000..e8673a1db3 --- /dev/null +++ b/flowdb/testdata/bin/9910_migrate_test_data.sh @@ -0,0 +1,15 @@ +#!/bin/sh +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + + +set -e +export PGUSER="$POSTGRES_USER" + +# +# Migrate synthetic data. +# + +python3 migrate_synth_data.py \ No newline at end of file diff --git a/flowdb/testdata/bin/9910_migrate_test_data.sql b/flowdb/testdata/bin/9910_migrate_test_data.sql deleted file mode 100644 index 985b88eeb1..0000000000 --- a/flowdb/testdata/bin/9910_migrate_test_data.sql +++ /dev/null @@ -1,389 +0,0 @@ -/* -This Source Code Form is subject to the terms of the Mozilla Public -License, v. 2.0. If a copy of the MPL was not distributed with this -file, You can obtain one at http://mozilla.org/MPL/2.0/. -*/ - -BEGIN; -/* Populate subscribers */ - -INSERT INTO interactions.subscriber (msisdn, imei, imsi, tac) - SELECT msisdn, imei, imsi, tac FROM events.calls group by msisdn, imei, imsi, tac - UNION - SELECT msisdn, imei, imsi, tac FROM events.sms group by msisdn, imei, imsi, tac - UNION - SELECT msisdn, imei, imsi, tac FROM events.mds group by msisdn, imei, imsi, tac - UNION - SELECT msisdn, imei, imsi, tac FROM events.topups group by msisdn, imei, imsi, tac; - -/* Populate locations */ - -INSERT INTO interactions.locations (site_id, cell_id, position) - SELECT sites.site_id as site_id, cells.cell_id AS cell_id, cells.geom_point as position FROM - infrastructure.cells LEFT JOIN - infrastructure.sites ON - cells.site_id=sites.id AND cells.version=sites.version; - -/* Create a view mapping location ids to cell ids */ - -CREATE VIEW cell_id_mapping AS ( - SELECT * FROM - interactions.locations - LEFT JOIN ( - SELECT cell_id, id as mno_cell_id, daterange(date_of_first_service, date_of_last_service, '[]') as valid_period FROM - infrastructure.cells) c - USING (cell_id) -); - -/* Create partitions on the events tables */ - -CREATE TABLE interactions.events_supertable_20160101 PARTITION OF interactions.event_supertable - FOR VALUES FROM (20160101) TO (20160102); - -CREATE TABLE interactions.events_supertable_20160102 PARTITION OF interactions.event_supertable - FOR VALUES FROM (20160102) TO (20160103); - -CREATE TABLE interactions.events_supertable_20160103 PARTITION OF interactions.event_supertable - FOR VALUES FROM (20160103) TO (20160104); - -CREATE TABLE interactions.events_supertable_20160104 PARTITION OF interactions.event_supertable - FOR VALUES FROM (20160104) TO (20160105); - -CREATE TABLE interactions.events_supertable_20160105 PARTITION OF interactions.event_supertable - FOR VALUES FROM (20160105) TO (20160106); - -CREATE TABLE interactions.events_supertable_20160106 PARTITION OF interactions.event_supertable - FOR VALUES FROM (20160106) TO (20160107); - -CREATE TABLE interactions.events_supertable_20160107 PARTITION OF interactions.event_supertable - FOR VALUES FROM (20160107) TO (20160108); - -/* Calls */ - -CREATE TABLE interactions.calls_20160101 PARTITION OF interactions.calls - FOR VALUES FROM (20160101) TO (20160102); - -CREATE TABLE interactions.calls_20160102 PARTITION OF interactions.calls - FOR VALUES FROM (20160102) TO (20160103); - -CREATE TABLE interactions.calls_20160103 PARTITION OF interactions.calls - FOR VALUES FROM (20160103) TO (20160104); - -CREATE TABLE interactions.calls_20160104 PARTITION OF interactions.calls - FOR VALUES FROM (20160104) TO (20160105); - -CREATE TABLE interactions.calls_20160105 PARTITION OF interactions.calls - FOR VALUES FROM (20160105) TO (20160106); - -CREATE TABLE interactions.calls_20160106 PARTITION OF interactions.calls - FOR VALUES FROM (20160106) TO (20160107); - -CREATE TABLE interactions.calls_20160107 PARTITION OF interactions.calls - FOR VALUES FROM (20160107) TO (20160108); - -/* sms */ - -CREATE TABLE interactions.sms_20160101 PARTITION OF interactions.sms - FOR VALUES FROM (20160101) TO (20160102); - -CREATE TABLE interactions.sms_20160102 PARTITION OF interactions.sms - FOR VALUES FROM (20160102) TO (20160103); - -CREATE TABLE interactions.sms_20160103 PARTITION OF interactions.sms - FOR VALUES FROM (20160103) TO (20160104); - -CREATE TABLE interactions.sms_20160104 PARTITION OF interactions.sms - FOR VALUES FROM (20160104) TO (20160105); - -CREATE TABLE interactions.sms_20160105 PARTITION OF interactions.sms - FOR VALUES FROM (20160105) TO (20160106); - -CREATE TABLE interactions.sms_20160106 PARTITION OF interactions.sms - FOR VALUES FROM (20160106) TO (20160107); - -CREATE TABLE interactions.sms_20160107 PARTITION OF interactions.sms - FOR VALUES FROM (20160107) TO (20160108); - -/* mds */ - -CREATE TABLE interactions.mds_20160101 PARTITION OF interactions.mds - FOR VALUES FROM (20160101) TO (20160102); - -CREATE TABLE interactions.mds_20160102 PARTITION OF interactions.mds - FOR VALUES FROM (20160102) TO (20160103); - -CREATE TABLE interactions.mds_20160103 PARTITION OF interactions.mds - FOR VALUES FROM (20160103) TO (20160104); - -CREATE TABLE interactions.mds_20160104 PARTITION OF interactions.mds - FOR VALUES FROM (20160104) TO (20160105); - -CREATE TABLE interactions.mds_20160105 PARTITION OF interactions.mds - FOR VALUES FROM (20160105) TO (20160106); - -CREATE TABLE interactions.mds_20160106 PARTITION OF interactions.mds - FOR VALUES FROM (20160106) TO (20160107); - -CREATE TABLE interactions.mds_20160107 PARTITION OF interactions.mds - FOR VALUES FROM (20160107) TO (20160108); - -/* topup */ - -CREATE TABLE interactions.topup_20160101 PARTITION OF interactions.topup - FOR VALUES FROM (20160101) TO (20160102); - -CREATE TABLE interactions.topup_20160102 PARTITION OF interactions.topup - FOR VALUES FROM (20160102) TO (20160103); - -CREATE TABLE interactions.topup_20160103 PARTITION OF interactions.topup - FOR VALUES FROM (20160103) TO (20160104); - -CREATE TABLE interactions.topup_20160104 PARTITION OF interactions.topup - FOR VALUES FROM (20160104) TO (20160105); - -CREATE TABLE interactions.topup_20160105 PARTITION OF interactions.topup - FOR VALUES FROM (20160105) TO (20160106); - -CREATE TABLE interactions.topup_20160106 PARTITION OF interactions.topup - FOR VALUES FROM (20160106) TO (20160107); - -CREATE TABLE interactions.topup_20160107 PARTITION OF interactions.topup - FOR VALUES FROM (20160107) TO (20160108); - -/* Populate calls */ - -WITH event_data AS (SELECT - caller_ident.subscriber_id, - caller_loc.location_id, - time_dim_id, - date_dim_id, - callee_ident.subscriber_id as called_subscriber_id, - callee_loc.location_id as called_party_location_id, - calling_party_msisdn, - called_party_msisdn, - duration, - event_timestamp - FROM - (SELECT id, duration as duration, datetime as event_timestamp, location_id as caller_location_id, - msisdn as calling_party_msisdn, tac as caller_tac FROM events.calls - WHERE outgoing) callers - LEFT JOIN (SELECT id, location_id as callee_location_id, - msisdn as called_party_msisdn, tac as callee_tac FROM events.calls - WHERE not outgoing) called - USING (id) - LEFT JOIN - interactions.subscriber AS caller_ident - ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac - LEFT JOIN - interactions.subscriber AS callee_ident - ON callee_ident.msisdn=called_party_msisdn AND callee_ident.tac=callee_tac - LEFT JOIN - cell_id_mapping AS caller_loc - ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date - LEFT JOIN - cell_id_mapping AS callee_loc - ON callee_location_id=callee_loc.mno_cell_id AND callee_loc.valid_period @> event_timestamp::date - LEFT JOIN - d_date ON event_timestamp::date = date_actual - LEFT JOIN - d_time ON - EXTRACT(HOUR from event_timestamp) = hour_of_day), - call_data AS - - (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) - SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='calls') - FROM event_data - RETURNING *) - -INSERT INTO interactions.calls (event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn, duration) - SELECT event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn, duration FROM call_data NATURAL JOIN event_data; - -/* Populate sms */ - -WITH event_data AS (SELECT caller_ident.subscriber_id, - caller_loc.location_id, - time_dim_id as time_dim_id, - date_dim_id as date_dim_id, - callee_ident.subscriber_id as called_subscriber_id, - callee_loc.location_id as called_party_location_id, - calling_party_msisdn, - called_party_msisdn, - event_timestamp - FROM - (SELECT id, datetime as event_timestamp, location_id as caller_location_id, - msisdn as calling_party_msisdn, tac as caller_tac FROM events.sms - WHERE outgoing) callers - LEFT JOIN (SELECT id, location_id as callee_location_id, - msisdn as called_party_msisdn, tac as callee_tac FROM events.sms - WHERE not outgoing) called - USING (id) - LEFT JOIN - interactions.subscriber AS caller_ident - ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac - LEFT JOIN - interactions.subscriber AS callee_ident - ON callee_ident.msisdn=called_party_msisdn AND callee_ident.tac=callee_tac - LEFT JOIN - cell_id_mapping AS caller_loc - ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date - LEFT JOIN - cell_id_mapping AS callee_loc - ON callee_location_id=callee_loc.mno_cell_id AND callee_loc.valid_period @> event_timestamp::date - LEFT JOIN - d_date ON event_timestamp::date = date_actual - LEFT JOIN - d_time ON - EXTRACT(HOUR from event_timestamp) = hour_of_day), - sms_data AS - (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) - SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='sms') - FROM event_data - RETURNING *) - -INSERT INTO interactions.sms (event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn) - SELECT event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn FROM sms_data NATURAL JOIN event_data; - -/* Populate topup */ - - -WITH event_data AS (SELECT caller_ident.subscriber_id, - caller_loc.location_id, - time_dim_id, - date_dim_id, - recharge_amount, - airtime_fee, - tax_and_fee, - pre_event_balance, - post_event_balance, - calling_party_msisdn, - caller_tac, - event_timestamp - FROM - (SELECT datetime as event_timestamp, location_id as caller_location_id, - msisdn as calling_party_msisdn, tac as caller_tac, recharge_amount, - airtime_fee, tax_and_fee, pre_event_balance, post_event_balance - FROM events.topups) topup - LEFT JOIN - interactions.subscriber AS caller_ident - ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac - LEFT JOIN - cell_id_mapping AS caller_loc - ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date - LEFT JOIN - d_date ON event_timestamp::date = date_actual - LEFT JOIN - d_time ON - EXTRACT(HOUR from event_timestamp) = hour_of_day), - topup_data AS - (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) - SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='topup') - FROM event_data - RETURNING *) - -INSERT INTO interactions.topup (event_id, date_dim_id, recharge_amount, airtime_fee, tax_and_fee, pre_event_balance, post_event_balance) - SELECT event_id, date_dim_id, recharge_amount, airtime_fee, tax_and_fee, pre_event_balance, post_event_balance FROM topup_data NATURAL JOIN event_data; - -/* Populate mds */ - - -WITH event_data AS (SELECT caller_ident.subscriber_id, - caller_loc.location_id, - time_dim_id, - date_dim_id, - volume_total as data_volume_total, - volume_upload as data_volume_up, - volume_download as data_volume_down, - duration, - event_timestamp - FROM - (SELECT datetime as event_timestamp, location_id as caller_location_id, - msisdn as calling_party_msisdn, tac as caller_tac, volume_total, volume_upload, volume_download, - duration - FROM events.mds) mds - LEFT JOIN - interactions.subscriber AS caller_ident - ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac - LEFT JOIN - cell_id_mapping AS caller_loc - ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date - LEFT JOIN - d_date ON event_timestamp::date = date_actual - LEFT JOIN - d_time ON - EXTRACT(HOUR from event_timestamp) = hour_of_day), - mds_data AS - (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) - SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='mds') - FROM event_data - RETURNING *) - -INSERT INTO interactions.mds (event_id, date_dim_id, data_volume_total, data_volume_up, - data_volume_down, - duration) - SELECT event_id, date_dim_id, data_volume_total, data_volume_up, - data_volume_down, - duration FROM mds_data NATURAL JOIN event_data; - -/* Populate geoms from the existing admin units */ - -INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) - SELECT admin3pcod as short_name, admin3name as long_name, 1 as geo_kind_id, 3 as spatial_resolution, geom - FROM geography.admin3; - -INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) - SELECT admin2pcod as short_name, admin2name as long_name, 1 as geo_kind_id, 2 as spatial_resolution, geom - FROM geography.admin2; - -INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) - SELECT admin1pcod as short_name, admin1name as long_name, 1 as geo_kind_id, 1 as spatial_resolution, geom - FROM geography.admin1; - -INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) - SELECT admin0pcod as short_name, admin0name as long_name, 1 as geo_kind_id, 0 as spatial_resolution, geom - FROM geography.admin0; - -INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) - SELECT district_c as short_name, district_n as long_name, 1 as geo_kind_id, 2 as spatial_resolution, geom - FROM public.gambia_admin2; - -/* Populate the geobridge */ - -INSERT INTO geography.geo_bridge (location_id, gid, valid_from, valid_to, linkage_method_id) - SELECT locations.location_id, geoms.gid, '-Infinity'::date as valid_from, 'Infinity'::date as valid_to, 1 as linkage_method_id from interactions.locations LEFT JOIN geography.geoms ON ST_Intersects(position, geom); - - -/* Populate subscriber sightings */ - -CREATE TABLE interactions.subscriber_sightings_20160101 PARTITION OF interactions.subscriber_sightings - FOR VALUES FROM (20160101) TO (20160102); - -CREATE TABLE interactions.subscriber_sightings_20160102 PARTITION OF interactions.subscriber_sightings - FOR VALUES FROM (20160102) TO (20160103); - -CREATE TABLE interactions.subscriber_sightings_20160103 PARTITION OF interactions.subscriber_sightings - FOR VALUES FROM (20160103) TO (20160104); - -CREATE TABLE interactions.subscriber_sightings_20160104 PARTITION OF interactions.subscriber_sightings - FOR VALUES FROM (20160104) TO (20160105); - -CREATE TABLE interactions.subscriber_sightings_20160105 PARTITION OF interactions.subscriber_sightings - FOR VALUES FROM (20160105) TO (20160106); - -CREATE TABLE interactions.subscriber_sightings_20160106 PARTITION OF interactions.subscriber_sightings - FOR VALUES FROM (20160106) TO (20160107); - -CREATE TABLE interactions.subscriber_sightings_20160107 PARTITION OF interactions.subscriber_sightings - FOR VALUES FROM (20160107) TO (20160108); - -INSERT INTO interactions.subscriber_sightings (event_id, subscriber_id, location_id, time_dim_id, date_dim_id, sighting_timestamp) - SELECT event_id, subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp FROM interactions.event_supertable; - -INSERT INTO interactions.subscriber_sightings (event_id, subscriber_id, location_id, time_dim_id, date_dim_id, sighting_timestamp) - SELECT event_id, called_subscriber_id as subscriber_id, called_party_location_id as location_id, time_dim_id, date_dim_id, event_timestamp - FROM interactions.event_supertable NATURAL JOIN interactions.calls; - -INSERT INTO interactions.subscriber_sightings (event_id, subscriber_id, location_id, time_dim_id, date_dim_id, sighting_timestamp) - SELECT event_id, called_subscriber_id as subscriber_id, called_party_location_id as location_id, time_dim_id, date_dim_id, event_timestamp - FROM interactions.event_supertable NATURAL JOIN interactions.sms; - -COMMIT; \ No newline at end of file diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py new file mode 100644 index 0000000000..0a44f5f10f --- /dev/null +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -0,0 +1,374 @@ +from pathlib import Path + +import os +import argparse +import datetime +from concurrent.futures.thread import ThreadPoolExecutor +from contextlib import contextmanager +from multiprocessing import cpu_count + +import sqlalchemy as sqlalchemy +from sqlalchemy.exc import ResourceClosedError + +import structlog +import json + +structlog.configure( + processors=[ + structlog.stdlib.PositionalArgumentsFormatter(), + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.JSONRenderer(serializer=json.dumps), + ] +) +logger = structlog.get_logger(__name__) + + +@contextmanager +def log_duration(job: str, **kwargs): + """ + Small context handler that logs the duration of the with block. + + Parameters + ---------- + job: str + Description of what is being run, will be shown under the "job" key in log + kwargs: dict + Any kwargs will be shown in the log as "key":"value" + """ + start_time = datetime.datetime.now() + logger.info("Started", job=job, **kwargs) + yield + logger.info( + "Finished", job=job, runtime=str(datetime.datetime.now() - start_time), **kwargs + ) + + +parser = argparse.ArgumentParser(description="Flowminder Synthetic CDR Migrator\n") +parser.add_argument( + "--n-days", + type=int, + default=os.getenv("N_DAYS", 7), + help="Number of days of data to migrate.", +) + +if __name__ == "__main__": + args = parser.parse_args() + with log_duration("Migrating synthetic data..", **vars(args)): + num_days = args.n_days + engine = sqlalchemy.create_engine( + f"postgresql://{os.getenv('POSTGRES_USER')}@/{os.getenv('POSTGRES_DB')}", + echo=False, + strategy="threadlocal", + pool_size=min(cpu_count(), int(os.getenv("MAX_CPUS", cpu_count()))), + pool_timeout=None, + ) + logger.info( + "Connected.", + num_connections=min(cpu_count(), int(os.getenv("MAX_CPUS", cpu_count()))), + ) + + def do_exec(args): + msg, sql = args + with log_duration(msg): + with engine.begin() as trans: + res = trans.execute(sql) + try: + logger.info(f"SQL result", job=msg, result=res.fetchall()) + except ResourceClosedError: + pass # Nothing to do here + except Exception as exc: + logger.error("Hit an issue.", exc=exc) + raise exc + + start_time = datetime.datetime.now() + + for date in ( + datetime.date(2016, 1, 1) + datetime.timedelta(days=i) + for i in range(num_days) + ): + with log_duration("Migrating day.", day=date): + partition_period = f"FROM ({date.strftime('%Y%M%d')}) TO ({(date + datetime.timedelta(days=1)).strftime('%Y%M%d')})" + with log_duration("Creating partitions.", day=date): + with engine.begin() as trans: + trans.execute( + f"CREATE TABLE interactions.events_supertable_{date.strftime('%Y%M%d')} PARTITION OF interactions.event_supertable FOR VALUES {partition_period};" + ) + trans.execute( + f"CREATE TABLE interactions.subscriber_sightings_{date.strftime('%Y%M%d')} PARTITION OF interactions.subscriber_sightings FOR VALUES {partition_period};" + ) + for event_type in ("calls", "sms", "mds", "topups"): + trans.execute( + f"CREATE TABLE interactions.calls_{date.strftime('%Y%M%d')} PARTITION OF interactions.{event_type} FOR VALUES {partition_period};" + ) + with log_duration("Migrate subscribers."): + with engine.begin() as trans: + trans.execute( + """ + INSERT INTO interactions.subscriber (msisdn, imei, imsi, tac) + SELECT msisdn, imei, imsi, tac FROM events.calls group by msisdn, imei, imsi, tac + UNION + SELECT msisdn, imei, imsi, tac FROM events.sms group by msisdn, imei, imsi, tac + UNION + SELECT msisdn, imei, imsi, tac FROM events.mds group by msisdn, imei, imsi, tac + UNION + SELECT msisdn, imei, imsi, tac FROM events.topups group by msisdn, imei, imsi, tac; + """ + ) + trans.execute( + """ + INSERT INTO interactions.locations (site_id, cell_id, position) + SELECT sites.site_id as site_id, cells.cell_id AS cell_id, cells.geom_point as position FROM + infrastructure.cells LEFT JOIN + infrastructure.sites ON + cells.site_id=sites.id AND cells.version=sites.version; + """ + ) + trans.execute( + """ + CREATE VIEW cell_id_mapping AS ( + SELECT * FROM + interactions.locations + LEFT JOIN ( + SELECT cell_id, id as mno_cell_id, daterange(date_of_first_service, date_of_last_service, '[]') as valid_period FROM + infrastructure.cells) c + USING (cell_id) + ); + """ + ) + + with log_duration("Migrate geography"): + with engine.begin() as trans: + trans.execute( + """ + INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) + SELECT admin3pcod as short_name, admin3name as long_name, 1 as geo_kind_id, 3 as spatial_resolution, geom + FROM geography.admin3; + + INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) + SELECT admin2pcod as short_name, admin2name as long_name, 1 as geo_kind_id, 2 as spatial_resolution, geom + FROM geography.admin2; + + INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) + SELECT admin1pcod as short_name, admin1name as long_name, 1 as geo_kind_id, 1 as spatial_resolution, geom + FROM geography.admin1; + + INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) + SELECT admin0pcod as short_name, admin0name as long_name, 1 as geo_kind_id, 0 as spatial_resolution, geom + FROM geography.admin0; + + INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) + SELECT district_c as short_name, district_n as long_name, 1 as geo_kind_id, 2 as spatial_resolution, geom + FROM public.gambia_admin2; + + /* Populate the geobridge */ + + INSERT INTO geography.geo_bridge (location_id, gid, valid_from, valid_to, linkage_method_id) + SELECT locations.location_id, geoms.gid, '-Infinity'::date as valid_from, 'Infinity'::date as valid_to, 1 as linkage_method_id from interactions.locations LEFT JOIN geography.geoms ON ST_Intersects(position, geom); + """ + ) + + events = [ + ( + """ + WITH event_data AS (SELECT + caller_ident.subscriber_id, + caller_loc.location_id, + time_dim_id, + date_dim_id, + callee_ident.subscriber_id as called_subscriber_id, + callee_loc.location_id as called_party_location_id, + calling_party_msisdn, + called_party_msisdn, + duration, + event_timestamp + FROM + (SELECT id, duration as duration, datetime as event_timestamp, location_id as caller_location_id, + msisdn as calling_party_msisdn, tac as caller_tac FROM events.calls + WHERE outgoing) callers + LEFT JOIN (SELECT id, location_id as callee_location_id, + msisdn as called_party_msisdn, tac as callee_tac FROM events.calls + WHERE not outgoing) called + USING (id) + LEFT JOIN + interactions.subscriber AS caller_ident + ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac + LEFT JOIN + interactions.subscriber AS callee_ident + ON callee_ident.msisdn=called_party_msisdn AND callee_ident.tac=callee_tac + LEFT JOIN + cell_id_mapping AS caller_loc + ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date + LEFT JOIN + cell_id_mapping AS callee_loc + ON callee_location_id=callee_loc.mno_cell_id AND callee_loc.valid_period @> event_timestamp::date + LEFT JOIN + d_date ON event_timestamp::date = date_actual + LEFT JOIN + d_time ON + EXTRACT(HOUR from event_timestamp) = hour_of_day), + call_data AS + + (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) + SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='calls') + FROM event_data + RETURNING *) + + INSERT INTO interactions.calls (event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn, duration) + SELECT event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn, duration FROM call_data NATURAL JOIN event_data; + """, + "Call events", + ), + ( + """ + WITH event_data AS (SELECT caller_ident.subscriber_id, + caller_loc.location_id, + time_dim_id as time_dim_id, + date_dim_id as date_dim_id, + callee_ident.subscriber_id as called_subscriber_id, + callee_loc.location_id as called_party_location_id, + calling_party_msisdn, + called_party_msisdn, + event_timestamp + FROM + (SELECT id, datetime as event_timestamp, location_id as caller_location_id, + msisdn as calling_party_msisdn, tac as caller_tac FROM events.sms + WHERE outgoing) callers + LEFT JOIN (SELECT id, location_id as callee_location_id, + msisdn as called_party_msisdn, tac as callee_tac FROM events.sms + WHERE not outgoing) called + USING (id) + LEFT JOIN + interactions.subscriber AS caller_ident + ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac + LEFT JOIN + interactions.subscriber AS callee_ident + ON callee_ident.msisdn=called_party_msisdn AND callee_ident.tac=callee_tac + LEFT JOIN + cell_id_mapping AS caller_loc + ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date + LEFT JOIN + cell_id_mapping AS callee_loc + ON callee_location_id=callee_loc.mno_cell_id AND callee_loc.valid_period @> event_timestamp::date + LEFT JOIN + d_date ON event_timestamp::date = date_actual + LEFT JOIN + d_time ON + EXTRACT(HOUR from event_timestamp) = hour_of_day), + sms_data AS + (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) + SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='sms') + FROM event_data + RETURNING *) + + INSERT INTO interactions.sms (event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn) + SELECT event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn FROM sms_data NATURAL JOIN event_data; + """, + "SMS events", + ), + ( + """ + WITH event_data AS (SELECT caller_ident.subscriber_id, + caller_loc.location_id, + time_dim_id, + date_dim_id, + volume_total as data_volume_total, + volume_upload as data_volume_up, + volume_download as data_volume_down, + duration, + event_timestamp + FROM + (SELECT datetime as event_timestamp, location_id as caller_location_id, + msisdn as calling_party_msisdn, tac as caller_tac, volume_total, volume_upload, volume_download, + duration + FROM events.mds) mds + LEFT JOIN + interactions.subscriber AS caller_ident + ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac + LEFT JOIN + cell_id_mapping AS caller_loc + ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date + LEFT JOIN + d_date ON event_timestamp::date = date_actual + LEFT JOIN + d_time ON + EXTRACT(HOUR from event_timestamp) = hour_of_day), + mds_data AS + (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) + SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='mds') + FROM event_data + RETURNING *) + + INSERT INTO interactions.mds (event_id, date_dim_id, data_volume_total, data_volume_up, + data_volume_down, + duration) + SELECT event_id, date_dim_id, data_volume_total, data_volume_up, + data_volume_down, + duration FROM mds_data NATURAL JOIN event_data; + """, + "MDS events", + ), + ( + """ + WITH event_data AS (SELECT caller_ident.subscriber_id, + caller_loc.location_id, + time_dim_id, + date_dim_id, + recharge_amount, + airtime_fee, + tax_and_fee, + pre_event_balance, + post_event_balance, + calling_party_msisdn, + caller_tac, + event_timestamp + FROM + (SELECT datetime as event_timestamp, location_id as caller_location_id, + msisdn as calling_party_msisdn, tac as caller_tac, recharge_amount, + airtime_fee, tax_and_fee, pre_event_balance, post_event_balance + FROM events.topups) topup + LEFT JOIN + interactions.subscriber AS caller_ident + ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac + LEFT JOIN + cell_id_mapping AS caller_loc + ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date + LEFT JOIN + d_date ON event_timestamp::date = date_actual + LEFT JOIN + d_time ON + EXTRACT(HOUR from event_timestamp) = hour_of_day), + topup_data AS + (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) + SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='topup') + FROM event_data + RETURNING *) + + INSERT INTO interactions.topup (event_id, date_dim_id, recharge_amount, airtime_fee, tax_and_fee, pre_event_balance, post_event_balance) + SELECT event_id, date_dim_id, recharge_amount, airtime_fee, tax_and_fee, pre_event_balance, post_event_balance FROM topup_data NATURAL JOIN event_data; + """, + "Topup events", + ), + ] + with log_duration("Migrate events."): + with ThreadPoolExecutor( + min(cpu_count(), int(os.getenv("MAX_CPUS", cpu_count()))) + ) as tp: + list(tp.map(do_exec, events)) + with log_duration("Migrate sightings."): + with engine.begin() as trans: + trans.execute( + """ + INSERT INTO interactions.subscriber_sightings (event_id, subscriber_id, location_id, time_dim_id, date_dim_id, sighting_timestamp) + SELECT event_id, subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp FROM interactions.event_supertable; + + INSERT INTO interactions.subscriber_sightings (event_id, subscriber_id, location_id, time_dim_id, date_dim_id, sighting_timestamp) + SELECT event_id, called_subscriber_id as subscriber_id, called_party_location_id as location_id, time_dim_id, date_dim_id, event_timestamp + FROM interactions.event_supertable NATURAL JOIN interactions.calls; + + INSERT INTO interactions.subscriber_sightings (event_id, subscriber_id, location_id, time_dim_id, date_dim_id, sighting_timestamp) + SELECT event_id, called_subscriber_id as subscriber_id, called_party_location_id as location_id, time_dim_id, date_dim_id, event_timestamp + FROM interactions.event_supertable NATURAL JOIN interactions.sms; + """ + ) diff --git a/flowdb_testdata.Dockerfile.dockerignore b/flowdb_testdata.Dockerfile.dockerignore index fa6b71a5cc..1463f7ac31 100644 --- a/flowdb_testdata.Dockerfile.dockerignore +++ b/flowdb_testdata.Dockerfile.dockerignore @@ -12,7 +12,8 @@ # !flowdb/testdata/bin/9900_ingest_test_data.sh !flowdb/testdata/bin/9910_run_synthetic_dfs_data_generation_script.sh -!flowdb/testdata/bin/9910_migrate_test_data.sql +!flowdb/testdata/bin/9910_migrate_test_data.sh +!flowdb/testdata/bin/migrate_synth_data.py !flowdb/testdata/test_data/Pipfile !flowdb/testdata/test_data/Pipfile.lock !flowdb/testdata/test_data/sql/ From d82c94a54ff90180078a86f2b6e0b0fac3c209fc Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 12:24:31 +0000 Subject: [PATCH 02/31] Wrong path --- flowdb/testdata/bin/9910_migrate_test_data.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowdb/testdata/bin/9910_migrate_test_data.sh b/flowdb/testdata/bin/9910_migrate_test_data.sh index e8673a1db3..aeb43dbf81 100644 --- a/flowdb/testdata/bin/9910_migrate_test_data.sh +++ b/flowdb/testdata/bin/9910_migrate_test_data.sh @@ -12,4 +12,4 @@ export PGUSER="$POSTGRES_USER" # Migrate synthetic data. # -python3 migrate_synth_data.py \ No newline at end of file +python3 /docker-entrypoint-initdb.d/migrate_synth_data.py \ No newline at end of file From f68010b3b75b188de4cee739476bb92457e1cb2a Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 12:37:57 +0000 Subject: [PATCH 03/31] No structlog --- flowdb/testdata/bin/migrate_synth_data.py | 49 +++++++++++++++-------- 1 file changed, 33 insertions(+), 16 deletions(-) diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py index 0a44f5f10f..13da240740 100644 --- a/flowdb/testdata/bin/migrate_synth_data.py +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -9,20 +9,26 @@ import sqlalchemy as sqlalchemy from sqlalchemy.exc import ResourceClosedError - -import structlog import json -structlog.configure( - processors=[ - structlog.stdlib.PositionalArgumentsFormatter(), - structlog.processors.TimeStamper(fmt="iso"), - structlog.processors.StackInfoRenderer(), - structlog.processors.format_exc_info, - structlog.processors.JSONRenderer(serializer=json.dumps), - ] -) -logger = structlog.get_logger(__name__) +try: + import structlog + + structlog.configure( + processors=[ + structlog.stdlib.PositionalArgumentsFormatter(), + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.JSONRenderer(serializer=json.dumps), + ] + ) + logger = structlog.get_logger(__name__) +except ImportError: + import logging + + logger = logging.getLogger(__name__) + logger.setLevel("DEBUG") @contextmanager @@ -38,11 +44,22 @@ def log_duration(job: str, **kwargs): Any kwargs will be shown in the log as "key":"value" """ start_time = datetime.datetime.now() - logger.info("Started", job=job, **kwargs) + try: + logger.info("Started", job=job, **kwargs) + except: + logger.info(f"Started {job}: {kwargs}") yield - logger.info( - "Finished", job=job, runtime=str(datetime.datetime.now() - start_time), **kwargs - ) + try: + logger.info( + "Finished", + job=job, + runtime=str(datetime.datetime.now() - start_time), + **kwargs, + ) + except: + logger.info( + f"Finished {job}. runtime={str(datetime.datetime.now() - start_time)}, {kwargs}" + ) parser = argparse.ArgumentParser(description="Flowminder Synthetic CDR Migrator\n") From 5bf32f0476fcac2d832d14ca5f7acad0a3030b23 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 12:46:19 +0000 Subject: [PATCH 04/31] No threadlocal --- flowdb/testdata/bin/migrate_synth_data.py | 1 - 1 file changed, 1 deletion(-) diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py index 13da240740..2cefeb0567 100644 --- a/flowdb/testdata/bin/migrate_synth_data.py +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -77,7 +77,6 @@ def log_duration(job: str, **kwargs): engine = sqlalchemy.create_engine( f"postgresql://{os.getenv('POSTGRES_USER')}@/{os.getenv('POSTGRES_DB')}", echo=False, - strategy="threadlocal", pool_size=min(cpu_count(), int(os.getenv("MAX_CPUS", cpu_count()))), pool_timeout=None, ) From 11a062964104c4bd40e36605399b1976d5729772 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 12:57:05 +0000 Subject: [PATCH 05/31] Silly logging --- flowdb/testdata/bin/migrate_synth_data.py | 35 +++++++++++------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py index 2cefeb0567..b5d243a73a 100644 --- a/flowdb/testdata/bin/migrate_synth_data.py +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -31,6 +31,13 @@ logger.setLevel("DEBUG") +def log(msg, **kwargs): + try: + logger.info(msg, **kwargs) + except: + logger.info(f"{msg}: {kwargs}") + + @contextmanager def log_duration(job: str, **kwargs): """ @@ -44,22 +51,14 @@ def log_duration(job: str, **kwargs): Any kwargs will be shown in the log as "key":"value" """ start_time = datetime.datetime.now() - try: - logger.info("Started", job=job, **kwargs) - except: - logger.info(f"Started {job}: {kwargs}") + log("Started", job=job, **kwargs) yield - try: - logger.info( - "Finished", - job=job, - runtime=str(datetime.datetime.now() - start_time), - **kwargs, - ) - except: - logger.info( - f"Finished {job}. runtime={str(datetime.datetime.now() - start_time)}, {kwargs}" - ) + log( + "Finished", + job=job, + runtime=str(datetime.datetime.now() - start_time), + **kwargs, + ) parser = argparse.ArgumentParser(description="Flowminder Synthetic CDR Migrator\n") @@ -80,7 +79,7 @@ def log_duration(job: str, **kwargs): pool_size=min(cpu_count(), int(os.getenv("MAX_CPUS", cpu_count()))), pool_timeout=None, ) - logger.info( + log( "Connected.", num_connections=min(cpu_count(), int(os.getenv("MAX_CPUS", cpu_count()))), ) @@ -91,11 +90,11 @@ def do_exec(args): with engine.begin() as trans: res = trans.execute(sql) try: - logger.info(f"SQL result", job=msg, result=res.fetchall()) + log(f"SQL result", job=msg, result=res.fetchall()) except ResourceClosedError: pass # Nothing to do here except Exception as exc: - logger.error("Hit an issue.", exc=exc) + log("Hit an issue.", exc=exc) raise exc start_time = datetime.datetime.now() From 665436dde9c881cce57da3660442aacab19cf22f Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 13:18:18 +0000 Subject: [PATCH 06/31] Update migrate_synth_data.py --- flowdb/testdata/bin/migrate_synth_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py index b5d243a73a..9d52b3879f 100644 --- a/flowdb/testdata/bin/migrate_synth_data.py +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -115,7 +115,7 @@ def do_exec(args): ) for event_type in ("calls", "sms", "mds", "topups"): trans.execute( - f"CREATE TABLE interactions.calls_{date.strftime('%Y%M%d')} PARTITION OF interactions.{event_type} FOR VALUES {partition_period};" + f"CREATE TABLE interactions.{event_type}_{date.strftime('%Y%M%d')} PARTITION OF interactions.{event_type} FOR VALUES {partition_period};" ) with log_duration("Migrate subscribers."): with engine.begin() as trans: From d52fee832dc1d4ab46f21cf12daa9e9a108ddd6f Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 13:22:55 +0000 Subject: [PATCH 07/31] Run for synth data as well --- flowdb_synthetic_data.Dockerfile | 2 ++ flowdb_synthetic_data.Dockerfile.dockerignore | 2 ++ 2 files changed, 4 insertions(+) diff --git a/flowdb_synthetic_data.Dockerfile b/flowdb_synthetic_data.Dockerfile index 4c9521c142..bc2dc74fd1 100644 --- a/flowdb_synthetic_data.Dockerfile +++ b/flowdb_synthetic_data.Dockerfile @@ -42,6 +42,8 @@ RUN mkdir -p /docker-entrypoint-initdb.d/sql/syntheticdata/ && \ COPY --chown=postgres flowdb/testdata/bin/9900_ingest_synthetic_data.sh /docker-entrypoint-initdb.d/ COPY --chown=postgres flowdb/testdata/bin/9800_population_density.sql.gz /docker-entrypoint-initdb.d/ +COPY --chown=postgres flowdb/testdata/bin/9910_migrate_test_data.sh /docker-entrypoint-initdb.d/ +COPY --chown=postgres flowdb/testdata/bin/migrate_synth_data.py /docker-entrypoint-initdb.d/ COPY --chown=postgres flowdb/testdata/bin/9910_run_synthetic_dfs_data_generation_script.sh /docker-entrypoint-initdb.d/ COPY --chown=postgres flowdb/testdata/test_data/py/* /docker-entrypoint-initdb.d/py/testdata/ diff --git a/flowdb_synthetic_data.Dockerfile.dockerignore b/flowdb_synthetic_data.Dockerfile.dockerignore index 0ed8d196c7..c77be87c4d 100644 --- a/flowdb_synthetic_data.Dockerfile.dockerignore +++ b/flowdb_synthetic_data.Dockerfile.dockerignore @@ -12,6 +12,8 @@ !flowdb/testdata/bin/9900_ingest_synthetic_data.sh !flowdb/testdata/bin/9800_population_density.sql.gz !flowdb/testdata/bin/generate_synthetic_data*.py +!flowdb/testdata/bin/9910_migrate_test_data.sh +!flowdb/testdata/bin/migrate_synth_data.py !flowdb/testdata/test_data/sql/admin*.sql !flowdb/testdata/synthetic_data/data/NPL_admbnda_adm3_Districts_simplified.geojson !flowdb/testdata/synthetic_data/Pipfile* From 4d31f101e9d5195079e0d79d2f3780186d7e1d58 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 13:37:52 +0000 Subject: [PATCH 08/31] Update migrate_synth_data.py --- flowdb/testdata/bin/migrate_synth_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py index 9d52b3879f..8edb16f1c0 100644 --- a/flowdb/testdata/bin/migrate_synth_data.py +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -113,7 +113,7 @@ def do_exec(args): trans.execute( f"CREATE TABLE interactions.subscriber_sightings_{date.strftime('%Y%M%d')} PARTITION OF interactions.subscriber_sightings FOR VALUES {partition_period};" ) - for event_type in ("calls", "sms", "mds", "topups"): + for event_type in ("calls", "sms", "mds", "topup"): trans.execute( f"CREATE TABLE interactions.{event_type}_{date.strftime('%Y%M%d')} PARTITION OF interactions.{event_type} FOR VALUES {partition_period};" ) From 5ddd25e20d1dc4d3b4e74476fdc91f92ed240712 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 13:48:02 +0000 Subject: [PATCH 09/31] Update migrate_synth_data.py --- flowdb/testdata/bin/migrate_synth_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py index 8edb16f1c0..0f21715b46 100644 --- a/flowdb/testdata/bin/migrate_synth_data.py +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -85,7 +85,7 @@ def log_duration(job: str, **kwargs): ) def do_exec(args): - msg, sql = args + sql, msg = args with log_duration(msg): with engine.begin() as trans: res = trans.execute(sql) From e8cddf941e8dcd975ff5ca3e27740ef89623259c Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 13:59:31 +0000 Subject: [PATCH 10/31] Update migrate_synth_data.py --- flowdb/testdata/bin/migrate_synth_data.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py index 0f21715b46..d0a563ca3c 100644 --- a/flowdb/testdata/bin/migrate_synth_data.py +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -105,6 +105,7 @@ def do_exec(args): ): with log_duration("Migrating day.", day=date): partition_period = f"FROM ({date.strftime('%Y%M%d')}) TO ({(date + datetime.timedelta(days=1)).strftime('%Y%M%d')})" + print(partition_period) with log_duration("Creating partitions.", day=date): with engine.begin() as trans: trans.execute( From f844b44e9ced2555c3c598118917ac90f53ff79f Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 14:34:01 +0000 Subject: [PATCH 11/31] Fix partitions and logging --- flowdb/testdata/bin/migrate_synth_data.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py index d0a563ca3c..e59377bff2 100644 --- a/flowdb/testdata/bin/migrate_synth_data.py +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -1,3 +1,4 @@ +import sys from pathlib import Path import os @@ -27,6 +28,7 @@ except ImportError: import logging + logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) logger = logging.getLogger(__name__) logger.setLevel("DEBUG") @@ -104,19 +106,18 @@ def do_exec(args): for i in range(num_days) ): with log_duration("Migrating day.", day=date): - partition_period = f"FROM ({date.strftime('%Y%M%d')}) TO ({(date + datetime.timedelta(days=1)).strftime('%Y%M%d')})" - print(partition_period) + partition_period = f"FROM ({date.strftime('%Y%m%d')}) TO ({(date + datetime.timedelta(days=1)).strftime('%Y%m%d')})" with log_duration("Creating partitions.", day=date): with engine.begin() as trans: trans.execute( - f"CREATE TABLE interactions.events_supertable_{date.strftime('%Y%M%d')} PARTITION OF interactions.event_supertable FOR VALUES {partition_period};" + f"CREATE TABLE interactions.events_supertable_{date.strftime('%Y%m%d')} PARTITION OF interactions.event_supertable FOR VALUES {partition_period};" ) trans.execute( - f"CREATE TABLE interactions.subscriber_sightings_{date.strftime('%Y%M%d')} PARTITION OF interactions.subscriber_sightings FOR VALUES {partition_period};" + f"CREATE TABLE interactions.subscriber_sightings_{date.strftime('%Y%m%d')} PARTITION OF interactions.subscriber_sightings FOR VALUES {partition_period};" ) for event_type in ("calls", "sms", "mds", "topup"): trans.execute( - f"CREATE TABLE interactions.{event_type}_{date.strftime('%Y%M%d')} PARTITION OF interactions.{event_type} FOR VALUES {partition_period};" + f"CREATE TABLE interactions.{event_type}_{date.strftime('%Y%m%d')} PARTITION OF interactions.{event_type} FOR VALUES {partition_period};" ) with log_duration("Migrate subscribers."): with engine.begin() as trans: From bf4592ced72d8df392319d1d98a7673b64cb74f0 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 15:06:09 +0000 Subject: [PATCH 12/31] Deal with missing gambia --- flowdb/testdata/bin/migrate_synth_data.py | 25 +++++++++++++++-------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py index e59377bff2..cc95425039 100644 --- a/flowdb/testdata/bin/migrate_synth_data.py +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -173,15 +173,22 @@ def do_exec(args): INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) SELECT admin0pcod as short_name, admin0name as long_name, 1 as geo_kind_id, 0 as spatial_resolution, geom - FROM geography.admin0; - - INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) - SELECT district_c as short_name, district_n as long_name, 1 as geo_kind_id, 2 as spatial_resolution, geom - FROM public.gambia_admin2; - - /* Populate the geobridge */ - - INSERT INTO geography.geo_bridge (location_id, gid, valid_from, valid_to, linkage_method_id) + FROM geography.admin0;""" + ) + with engine.begin() as trans: + try: + trans.execute( + """ + INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) + SELECT district_c as short_name, district_n as long_name, 1 as geo_kind_id, 2 as spatial_resolution, geom + FROM public.gambia_admin2;""" + ) + except: + pass # No gambia table + with engine.begin() as trans: + # Populate the geobridge + trans.execute( + """INSERT INTO geography.geo_bridge (location_id, gid, valid_from, valid_to, linkage_method_id) SELECT locations.location_id, geoms.gid, '-Infinity'::date as valid_from, 'Infinity'::date as valid_to, 1 as linkage_method_id from interactions.locations LEFT JOIN geography.geoms ON ST_Intersects(position, geom); """ ) From 3e6e79eb9bbc947f0fb2f54c07383f0810ed7cad Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 15:11:35 +0000 Subject: [PATCH 13/31] Lint --- flowdb/testdata/bin/migrate_synth_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py index cc95425039..58ef9dfe77 100644 --- a/flowdb/testdata/bin/migrate_synth_data.py +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -184,7 +184,7 @@ def do_exec(args): FROM public.gambia_admin2;""" ) except: - pass # No gambia table + pass # No gambia table with engine.begin() as trans: # Populate the geobridge trans.execute( From 9c0f3456b7f122b6ec3be8a5b183e82e5f4ba261 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 15:56:26 +0000 Subject: [PATCH 14/31] Longer timeout, fix number of days --- .circleci/config.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index aa89c19fd6..584de8c16f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -38,7 +38,7 @@ defaults: - &wait_for_flowdb name: Wait for flowdb to start command: | - dockerize -wait tcp://localhost:5432 -timeout 10m + dockerize -wait tcp://localhost:5432 -timeout 20m - &run_always_org_context context: org-global filters: @@ -818,6 +818,7 @@ jobs: name: python_with_flowdb flowdb_image: "testdata" python_version: "3.8.5" + num_days: 7 # To avoid overriding fixed number of days working_directory: /home/circleci/project/integration_tests steps: - checkout: From 87368d9b35b7e3c156e84735ddbcb08db9e1eed2 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 12:00:21 +0000 Subject: [PATCH 15/31] Migrate arbitrary amounts of synth data --- flowdb/testdata/bin/9910_migrate_test_data.sh | 15 + .../testdata/bin/9910_migrate_test_data.sql | 389 ------------------ flowdb/testdata/bin/migrate_synth_data.py | 374 +++++++++++++++++ flowdb_testdata.Dockerfile.dockerignore | 3 +- 4 files changed, 391 insertions(+), 390 deletions(-) create mode 100644 flowdb/testdata/bin/9910_migrate_test_data.sh delete mode 100644 flowdb/testdata/bin/9910_migrate_test_data.sql create mode 100644 flowdb/testdata/bin/migrate_synth_data.py diff --git a/flowdb/testdata/bin/9910_migrate_test_data.sh b/flowdb/testdata/bin/9910_migrate_test_data.sh new file mode 100644 index 0000000000..e8673a1db3 --- /dev/null +++ b/flowdb/testdata/bin/9910_migrate_test_data.sh @@ -0,0 +1,15 @@ +#!/bin/sh +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + + + +set -e +export PGUSER="$POSTGRES_USER" + +# +# Migrate synthetic data. +# + +python3 migrate_synth_data.py \ No newline at end of file diff --git a/flowdb/testdata/bin/9910_migrate_test_data.sql b/flowdb/testdata/bin/9910_migrate_test_data.sql deleted file mode 100644 index 985b88eeb1..0000000000 --- a/flowdb/testdata/bin/9910_migrate_test_data.sql +++ /dev/null @@ -1,389 +0,0 @@ -/* -This Source Code Form is subject to the terms of the Mozilla Public -License, v. 2.0. If a copy of the MPL was not distributed with this -file, You can obtain one at http://mozilla.org/MPL/2.0/. -*/ - -BEGIN; -/* Populate subscribers */ - -INSERT INTO interactions.subscriber (msisdn, imei, imsi, tac) - SELECT msisdn, imei, imsi, tac FROM events.calls group by msisdn, imei, imsi, tac - UNION - SELECT msisdn, imei, imsi, tac FROM events.sms group by msisdn, imei, imsi, tac - UNION - SELECT msisdn, imei, imsi, tac FROM events.mds group by msisdn, imei, imsi, tac - UNION - SELECT msisdn, imei, imsi, tac FROM events.topups group by msisdn, imei, imsi, tac; - -/* Populate locations */ - -INSERT INTO interactions.locations (site_id, cell_id, position) - SELECT sites.site_id as site_id, cells.cell_id AS cell_id, cells.geom_point as position FROM - infrastructure.cells LEFT JOIN - infrastructure.sites ON - cells.site_id=sites.id AND cells.version=sites.version; - -/* Create a view mapping location ids to cell ids */ - -CREATE VIEW cell_id_mapping AS ( - SELECT * FROM - interactions.locations - LEFT JOIN ( - SELECT cell_id, id as mno_cell_id, daterange(date_of_first_service, date_of_last_service, '[]') as valid_period FROM - infrastructure.cells) c - USING (cell_id) -); - -/* Create partitions on the events tables */ - -CREATE TABLE interactions.events_supertable_20160101 PARTITION OF interactions.event_supertable - FOR VALUES FROM (20160101) TO (20160102); - -CREATE TABLE interactions.events_supertable_20160102 PARTITION OF interactions.event_supertable - FOR VALUES FROM (20160102) TO (20160103); - -CREATE TABLE interactions.events_supertable_20160103 PARTITION OF interactions.event_supertable - FOR VALUES FROM (20160103) TO (20160104); - -CREATE TABLE interactions.events_supertable_20160104 PARTITION OF interactions.event_supertable - FOR VALUES FROM (20160104) TO (20160105); - -CREATE TABLE interactions.events_supertable_20160105 PARTITION OF interactions.event_supertable - FOR VALUES FROM (20160105) TO (20160106); - -CREATE TABLE interactions.events_supertable_20160106 PARTITION OF interactions.event_supertable - FOR VALUES FROM (20160106) TO (20160107); - -CREATE TABLE interactions.events_supertable_20160107 PARTITION OF interactions.event_supertable - FOR VALUES FROM (20160107) TO (20160108); - -/* Calls */ - -CREATE TABLE interactions.calls_20160101 PARTITION OF interactions.calls - FOR VALUES FROM (20160101) TO (20160102); - -CREATE TABLE interactions.calls_20160102 PARTITION OF interactions.calls - FOR VALUES FROM (20160102) TO (20160103); - -CREATE TABLE interactions.calls_20160103 PARTITION OF interactions.calls - FOR VALUES FROM (20160103) TO (20160104); - -CREATE TABLE interactions.calls_20160104 PARTITION OF interactions.calls - FOR VALUES FROM (20160104) TO (20160105); - -CREATE TABLE interactions.calls_20160105 PARTITION OF interactions.calls - FOR VALUES FROM (20160105) TO (20160106); - -CREATE TABLE interactions.calls_20160106 PARTITION OF interactions.calls - FOR VALUES FROM (20160106) TO (20160107); - -CREATE TABLE interactions.calls_20160107 PARTITION OF interactions.calls - FOR VALUES FROM (20160107) TO (20160108); - -/* sms */ - -CREATE TABLE interactions.sms_20160101 PARTITION OF interactions.sms - FOR VALUES FROM (20160101) TO (20160102); - -CREATE TABLE interactions.sms_20160102 PARTITION OF interactions.sms - FOR VALUES FROM (20160102) TO (20160103); - -CREATE TABLE interactions.sms_20160103 PARTITION OF interactions.sms - FOR VALUES FROM (20160103) TO (20160104); - -CREATE TABLE interactions.sms_20160104 PARTITION OF interactions.sms - FOR VALUES FROM (20160104) TO (20160105); - -CREATE TABLE interactions.sms_20160105 PARTITION OF interactions.sms - FOR VALUES FROM (20160105) TO (20160106); - -CREATE TABLE interactions.sms_20160106 PARTITION OF interactions.sms - FOR VALUES FROM (20160106) TO (20160107); - -CREATE TABLE interactions.sms_20160107 PARTITION OF interactions.sms - FOR VALUES FROM (20160107) TO (20160108); - -/* mds */ - -CREATE TABLE interactions.mds_20160101 PARTITION OF interactions.mds - FOR VALUES FROM (20160101) TO (20160102); - -CREATE TABLE interactions.mds_20160102 PARTITION OF interactions.mds - FOR VALUES FROM (20160102) TO (20160103); - -CREATE TABLE interactions.mds_20160103 PARTITION OF interactions.mds - FOR VALUES FROM (20160103) TO (20160104); - -CREATE TABLE interactions.mds_20160104 PARTITION OF interactions.mds - FOR VALUES FROM (20160104) TO (20160105); - -CREATE TABLE interactions.mds_20160105 PARTITION OF interactions.mds - FOR VALUES FROM (20160105) TO (20160106); - -CREATE TABLE interactions.mds_20160106 PARTITION OF interactions.mds - FOR VALUES FROM (20160106) TO (20160107); - -CREATE TABLE interactions.mds_20160107 PARTITION OF interactions.mds - FOR VALUES FROM (20160107) TO (20160108); - -/* topup */ - -CREATE TABLE interactions.topup_20160101 PARTITION OF interactions.topup - FOR VALUES FROM (20160101) TO (20160102); - -CREATE TABLE interactions.topup_20160102 PARTITION OF interactions.topup - FOR VALUES FROM (20160102) TO (20160103); - -CREATE TABLE interactions.topup_20160103 PARTITION OF interactions.topup - FOR VALUES FROM (20160103) TO (20160104); - -CREATE TABLE interactions.topup_20160104 PARTITION OF interactions.topup - FOR VALUES FROM (20160104) TO (20160105); - -CREATE TABLE interactions.topup_20160105 PARTITION OF interactions.topup - FOR VALUES FROM (20160105) TO (20160106); - -CREATE TABLE interactions.topup_20160106 PARTITION OF interactions.topup - FOR VALUES FROM (20160106) TO (20160107); - -CREATE TABLE interactions.topup_20160107 PARTITION OF interactions.topup - FOR VALUES FROM (20160107) TO (20160108); - -/* Populate calls */ - -WITH event_data AS (SELECT - caller_ident.subscriber_id, - caller_loc.location_id, - time_dim_id, - date_dim_id, - callee_ident.subscriber_id as called_subscriber_id, - callee_loc.location_id as called_party_location_id, - calling_party_msisdn, - called_party_msisdn, - duration, - event_timestamp - FROM - (SELECT id, duration as duration, datetime as event_timestamp, location_id as caller_location_id, - msisdn as calling_party_msisdn, tac as caller_tac FROM events.calls - WHERE outgoing) callers - LEFT JOIN (SELECT id, location_id as callee_location_id, - msisdn as called_party_msisdn, tac as callee_tac FROM events.calls - WHERE not outgoing) called - USING (id) - LEFT JOIN - interactions.subscriber AS caller_ident - ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac - LEFT JOIN - interactions.subscriber AS callee_ident - ON callee_ident.msisdn=called_party_msisdn AND callee_ident.tac=callee_tac - LEFT JOIN - cell_id_mapping AS caller_loc - ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date - LEFT JOIN - cell_id_mapping AS callee_loc - ON callee_location_id=callee_loc.mno_cell_id AND callee_loc.valid_period @> event_timestamp::date - LEFT JOIN - d_date ON event_timestamp::date = date_actual - LEFT JOIN - d_time ON - EXTRACT(HOUR from event_timestamp) = hour_of_day), - call_data AS - - (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) - SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='calls') - FROM event_data - RETURNING *) - -INSERT INTO interactions.calls (event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn, duration) - SELECT event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn, duration FROM call_data NATURAL JOIN event_data; - -/* Populate sms */ - -WITH event_data AS (SELECT caller_ident.subscriber_id, - caller_loc.location_id, - time_dim_id as time_dim_id, - date_dim_id as date_dim_id, - callee_ident.subscriber_id as called_subscriber_id, - callee_loc.location_id as called_party_location_id, - calling_party_msisdn, - called_party_msisdn, - event_timestamp - FROM - (SELECT id, datetime as event_timestamp, location_id as caller_location_id, - msisdn as calling_party_msisdn, tac as caller_tac FROM events.sms - WHERE outgoing) callers - LEFT JOIN (SELECT id, location_id as callee_location_id, - msisdn as called_party_msisdn, tac as callee_tac FROM events.sms - WHERE not outgoing) called - USING (id) - LEFT JOIN - interactions.subscriber AS caller_ident - ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac - LEFT JOIN - interactions.subscriber AS callee_ident - ON callee_ident.msisdn=called_party_msisdn AND callee_ident.tac=callee_tac - LEFT JOIN - cell_id_mapping AS caller_loc - ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date - LEFT JOIN - cell_id_mapping AS callee_loc - ON callee_location_id=callee_loc.mno_cell_id AND callee_loc.valid_period @> event_timestamp::date - LEFT JOIN - d_date ON event_timestamp::date = date_actual - LEFT JOIN - d_time ON - EXTRACT(HOUR from event_timestamp) = hour_of_day), - sms_data AS - (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) - SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='sms') - FROM event_data - RETURNING *) - -INSERT INTO interactions.sms (event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn) - SELECT event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn FROM sms_data NATURAL JOIN event_data; - -/* Populate topup */ - - -WITH event_data AS (SELECT caller_ident.subscriber_id, - caller_loc.location_id, - time_dim_id, - date_dim_id, - recharge_amount, - airtime_fee, - tax_and_fee, - pre_event_balance, - post_event_balance, - calling_party_msisdn, - caller_tac, - event_timestamp - FROM - (SELECT datetime as event_timestamp, location_id as caller_location_id, - msisdn as calling_party_msisdn, tac as caller_tac, recharge_amount, - airtime_fee, tax_and_fee, pre_event_balance, post_event_balance - FROM events.topups) topup - LEFT JOIN - interactions.subscriber AS caller_ident - ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac - LEFT JOIN - cell_id_mapping AS caller_loc - ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date - LEFT JOIN - d_date ON event_timestamp::date = date_actual - LEFT JOIN - d_time ON - EXTRACT(HOUR from event_timestamp) = hour_of_day), - topup_data AS - (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) - SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='topup') - FROM event_data - RETURNING *) - -INSERT INTO interactions.topup (event_id, date_dim_id, recharge_amount, airtime_fee, tax_and_fee, pre_event_balance, post_event_balance) - SELECT event_id, date_dim_id, recharge_amount, airtime_fee, tax_and_fee, pre_event_balance, post_event_balance FROM topup_data NATURAL JOIN event_data; - -/* Populate mds */ - - -WITH event_data AS (SELECT caller_ident.subscriber_id, - caller_loc.location_id, - time_dim_id, - date_dim_id, - volume_total as data_volume_total, - volume_upload as data_volume_up, - volume_download as data_volume_down, - duration, - event_timestamp - FROM - (SELECT datetime as event_timestamp, location_id as caller_location_id, - msisdn as calling_party_msisdn, tac as caller_tac, volume_total, volume_upload, volume_download, - duration - FROM events.mds) mds - LEFT JOIN - interactions.subscriber AS caller_ident - ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac - LEFT JOIN - cell_id_mapping AS caller_loc - ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date - LEFT JOIN - d_date ON event_timestamp::date = date_actual - LEFT JOIN - d_time ON - EXTRACT(HOUR from event_timestamp) = hour_of_day), - mds_data AS - (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) - SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='mds') - FROM event_data - RETURNING *) - -INSERT INTO interactions.mds (event_id, date_dim_id, data_volume_total, data_volume_up, - data_volume_down, - duration) - SELECT event_id, date_dim_id, data_volume_total, data_volume_up, - data_volume_down, - duration FROM mds_data NATURAL JOIN event_data; - -/* Populate geoms from the existing admin units */ - -INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) - SELECT admin3pcod as short_name, admin3name as long_name, 1 as geo_kind_id, 3 as spatial_resolution, geom - FROM geography.admin3; - -INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) - SELECT admin2pcod as short_name, admin2name as long_name, 1 as geo_kind_id, 2 as spatial_resolution, geom - FROM geography.admin2; - -INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) - SELECT admin1pcod as short_name, admin1name as long_name, 1 as geo_kind_id, 1 as spatial_resolution, geom - FROM geography.admin1; - -INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) - SELECT admin0pcod as short_name, admin0name as long_name, 1 as geo_kind_id, 0 as spatial_resolution, geom - FROM geography.admin0; - -INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) - SELECT district_c as short_name, district_n as long_name, 1 as geo_kind_id, 2 as spatial_resolution, geom - FROM public.gambia_admin2; - -/* Populate the geobridge */ - -INSERT INTO geography.geo_bridge (location_id, gid, valid_from, valid_to, linkage_method_id) - SELECT locations.location_id, geoms.gid, '-Infinity'::date as valid_from, 'Infinity'::date as valid_to, 1 as linkage_method_id from interactions.locations LEFT JOIN geography.geoms ON ST_Intersects(position, geom); - - -/* Populate subscriber sightings */ - -CREATE TABLE interactions.subscriber_sightings_20160101 PARTITION OF interactions.subscriber_sightings - FOR VALUES FROM (20160101) TO (20160102); - -CREATE TABLE interactions.subscriber_sightings_20160102 PARTITION OF interactions.subscriber_sightings - FOR VALUES FROM (20160102) TO (20160103); - -CREATE TABLE interactions.subscriber_sightings_20160103 PARTITION OF interactions.subscriber_sightings - FOR VALUES FROM (20160103) TO (20160104); - -CREATE TABLE interactions.subscriber_sightings_20160104 PARTITION OF interactions.subscriber_sightings - FOR VALUES FROM (20160104) TO (20160105); - -CREATE TABLE interactions.subscriber_sightings_20160105 PARTITION OF interactions.subscriber_sightings - FOR VALUES FROM (20160105) TO (20160106); - -CREATE TABLE interactions.subscriber_sightings_20160106 PARTITION OF interactions.subscriber_sightings - FOR VALUES FROM (20160106) TO (20160107); - -CREATE TABLE interactions.subscriber_sightings_20160107 PARTITION OF interactions.subscriber_sightings - FOR VALUES FROM (20160107) TO (20160108); - -INSERT INTO interactions.subscriber_sightings (event_id, subscriber_id, location_id, time_dim_id, date_dim_id, sighting_timestamp) - SELECT event_id, subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp FROM interactions.event_supertable; - -INSERT INTO interactions.subscriber_sightings (event_id, subscriber_id, location_id, time_dim_id, date_dim_id, sighting_timestamp) - SELECT event_id, called_subscriber_id as subscriber_id, called_party_location_id as location_id, time_dim_id, date_dim_id, event_timestamp - FROM interactions.event_supertable NATURAL JOIN interactions.calls; - -INSERT INTO interactions.subscriber_sightings (event_id, subscriber_id, location_id, time_dim_id, date_dim_id, sighting_timestamp) - SELECT event_id, called_subscriber_id as subscriber_id, called_party_location_id as location_id, time_dim_id, date_dim_id, event_timestamp - FROM interactions.event_supertable NATURAL JOIN interactions.sms; - -COMMIT; \ No newline at end of file diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py new file mode 100644 index 0000000000..0a44f5f10f --- /dev/null +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -0,0 +1,374 @@ +from pathlib import Path + +import os +import argparse +import datetime +from concurrent.futures.thread import ThreadPoolExecutor +from contextlib import contextmanager +from multiprocessing import cpu_count + +import sqlalchemy as sqlalchemy +from sqlalchemy.exc import ResourceClosedError + +import structlog +import json + +structlog.configure( + processors=[ + structlog.stdlib.PositionalArgumentsFormatter(), + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.JSONRenderer(serializer=json.dumps), + ] +) +logger = structlog.get_logger(__name__) + + +@contextmanager +def log_duration(job: str, **kwargs): + """ + Small context handler that logs the duration of the with block. + + Parameters + ---------- + job: str + Description of what is being run, will be shown under the "job" key in log + kwargs: dict + Any kwargs will be shown in the log as "key":"value" + """ + start_time = datetime.datetime.now() + logger.info("Started", job=job, **kwargs) + yield + logger.info( + "Finished", job=job, runtime=str(datetime.datetime.now() - start_time), **kwargs + ) + + +parser = argparse.ArgumentParser(description="Flowminder Synthetic CDR Migrator\n") +parser.add_argument( + "--n-days", + type=int, + default=os.getenv("N_DAYS", 7), + help="Number of days of data to migrate.", +) + +if __name__ == "__main__": + args = parser.parse_args() + with log_duration("Migrating synthetic data..", **vars(args)): + num_days = args.n_days + engine = sqlalchemy.create_engine( + f"postgresql://{os.getenv('POSTGRES_USER')}@/{os.getenv('POSTGRES_DB')}", + echo=False, + strategy="threadlocal", + pool_size=min(cpu_count(), int(os.getenv("MAX_CPUS", cpu_count()))), + pool_timeout=None, + ) + logger.info( + "Connected.", + num_connections=min(cpu_count(), int(os.getenv("MAX_CPUS", cpu_count()))), + ) + + def do_exec(args): + msg, sql = args + with log_duration(msg): + with engine.begin() as trans: + res = trans.execute(sql) + try: + logger.info(f"SQL result", job=msg, result=res.fetchall()) + except ResourceClosedError: + pass # Nothing to do here + except Exception as exc: + logger.error("Hit an issue.", exc=exc) + raise exc + + start_time = datetime.datetime.now() + + for date in ( + datetime.date(2016, 1, 1) + datetime.timedelta(days=i) + for i in range(num_days) + ): + with log_duration("Migrating day.", day=date): + partition_period = f"FROM ({date.strftime('%Y%M%d')}) TO ({(date + datetime.timedelta(days=1)).strftime('%Y%M%d')})" + with log_duration("Creating partitions.", day=date): + with engine.begin() as trans: + trans.execute( + f"CREATE TABLE interactions.events_supertable_{date.strftime('%Y%M%d')} PARTITION OF interactions.event_supertable FOR VALUES {partition_period};" + ) + trans.execute( + f"CREATE TABLE interactions.subscriber_sightings_{date.strftime('%Y%M%d')} PARTITION OF interactions.subscriber_sightings FOR VALUES {partition_period};" + ) + for event_type in ("calls", "sms", "mds", "topups"): + trans.execute( + f"CREATE TABLE interactions.calls_{date.strftime('%Y%M%d')} PARTITION OF interactions.{event_type} FOR VALUES {partition_period};" + ) + with log_duration("Migrate subscribers."): + with engine.begin() as trans: + trans.execute( + """ + INSERT INTO interactions.subscriber (msisdn, imei, imsi, tac) + SELECT msisdn, imei, imsi, tac FROM events.calls group by msisdn, imei, imsi, tac + UNION + SELECT msisdn, imei, imsi, tac FROM events.sms group by msisdn, imei, imsi, tac + UNION + SELECT msisdn, imei, imsi, tac FROM events.mds group by msisdn, imei, imsi, tac + UNION + SELECT msisdn, imei, imsi, tac FROM events.topups group by msisdn, imei, imsi, tac; + """ + ) + trans.execute( + """ + INSERT INTO interactions.locations (site_id, cell_id, position) + SELECT sites.site_id as site_id, cells.cell_id AS cell_id, cells.geom_point as position FROM + infrastructure.cells LEFT JOIN + infrastructure.sites ON + cells.site_id=sites.id AND cells.version=sites.version; + """ + ) + trans.execute( + """ + CREATE VIEW cell_id_mapping AS ( + SELECT * FROM + interactions.locations + LEFT JOIN ( + SELECT cell_id, id as mno_cell_id, daterange(date_of_first_service, date_of_last_service, '[]') as valid_period FROM + infrastructure.cells) c + USING (cell_id) + ); + """ + ) + + with log_duration("Migrate geography"): + with engine.begin() as trans: + trans.execute( + """ + INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) + SELECT admin3pcod as short_name, admin3name as long_name, 1 as geo_kind_id, 3 as spatial_resolution, geom + FROM geography.admin3; + + INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) + SELECT admin2pcod as short_name, admin2name as long_name, 1 as geo_kind_id, 2 as spatial_resolution, geom + FROM geography.admin2; + + INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) + SELECT admin1pcod as short_name, admin1name as long_name, 1 as geo_kind_id, 1 as spatial_resolution, geom + FROM geography.admin1; + + INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) + SELECT admin0pcod as short_name, admin0name as long_name, 1 as geo_kind_id, 0 as spatial_resolution, geom + FROM geography.admin0; + + INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) + SELECT district_c as short_name, district_n as long_name, 1 as geo_kind_id, 2 as spatial_resolution, geom + FROM public.gambia_admin2; + + /* Populate the geobridge */ + + INSERT INTO geography.geo_bridge (location_id, gid, valid_from, valid_to, linkage_method_id) + SELECT locations.location_id, geoms.gid, '-Infinity'::date as valid_from, 'Infinity'::date as valid_to, 1 as linkage_method_id from interactions.locations LEFT JOIN geography.geoms ON ST_Intersects(position, geom); + """ + ) + + events = [ + ( + """ + WITH event_data AS (SELECT + caller_ident.subscriber_id, + caller_loc.location_id, + time_dim_id, + date_dim_id, + callee_ident.subscriber_id as called_subscriber_id, + callee_loc.location_id as called_party_location_id, + calling_party_msisdn, + called_party_msisdn, + duration, + event_timestamp + FROM + (SELECT id, duration as duration, datetime as event_timestamp, location_id as caller_location_id, + msisdn as calling_party_msisdn, tac as caller_tac FROM events.calls + WHERE outgoing) callers + LEFT JOIN (SELECT id, location_id as callee_location_id, + msisdn as called_party_msisdn, tac as callee_tac FROM events.calls + WHERE not outgoing) called + USING (id) + LEFT JOIN + interactions.subscriber AS caller_ident + ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac + LEFT JOIN + interactions.subscriber AS callee_ident + ON callee_ident.msisdn=called_party_msisdn AND callee_ident.tac=callee_tac + LEFT JOIN + cell_id_mapping AS caller_loc + ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date + LEFT JOIN + cell_id_mapping AS callee_loc + ON callee_location_id=callee_loc.mno_cell_id AND callee_loc.valid_period @> event_timestamp::date + LEFT JOIN + d_date ON event_timestamp::date = date_actual + LEFT JOIN + d_time ON + EXTRACT(HOUR from event_timestamp) = hour_of_day), + call_data AS + + (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) + SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='calls') + FROM event_data + RETURNING *) + + INSERT INTO interactions.calls (event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn, duration) + SELECT event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn, duration FROM call_data NATURAL JOIN event_data; + """, + "Call events", + ), + ( + """ + WITH event_data AS (SELECT caller_ident.subscriber_id, + caller_loc.location_id, + time_dim_id as time_dim_id, + date_dim_id as date_dim_id, + callee_ident.subscriber_id as called_subscriber_id, + callee_loc.location_id as called_party_location_id, + calling_party_msisdn, + called_party_msisdn, + event_timestamp + FROM + (SELECT id, datetime as event_timestamp, location_id as caller_location_id, + msisdn as calling_party_msisdn, tac as caller_tac FROM events.sms + WHERE outgoing) callers + LEFT JOIN (SELECT id, location_id as callee_location_id, + msisdn as called_party_msisdn, tac as callee_tac FROM events.sms + WHERE not outgoing) called + USING (id) + LEFT JOIN + interactions.subscriber AS caller_ident + ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac + LEFT JOIN + interactions.subscriber AS callee_ident + ON callee_ident.msisdn=called_party_msisdn AND callee_ident.tac=callee_tac + LEFT JOIN + cell_id_mapping AS caller_loc + ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date + LEFT JOIN + cell_id_mapping AS callee_loc + ON callee_location_id=callee_loc.mno_cell_id AND callee_loc.valid_period @> event_timestamp::date + LEFT JOIN + d_date ON event_timestamp::date = date_actual + LEFT JOIN + d_time ON + EXTRACT(HOUR from event_timestamp) = hour_of_day), + sms_data AS + (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) + SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='sms') + FROM event_data + RETURNING *) + + INSERT INTO interactions.sms (event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn) + SELECT event_id, date_dim_id, called_subscriber_id, called_party_location_id, calling_party_msisdn, called_party_msisdn FROM sms_data NATURAL JOIN event_data; + """, + "SMS events", + ), + ( + """ + WITH event_data AS (SELECT caller_ident.subscriber_id, + caller_loc.location_id, + time_dim_id, + date_dim_id, + volume_total as data_volume_total, + volume_upload as data_volume_up, + volume_download as data_volume_down, + duration, + event_timestamp + FROM + (SELECT datetime as event_timestamp, location_id as caller_location_id, + msisdn as calling_party_msisdn, tac as caller_tac, volume_total, volume_upload, volume_download, + duration + FROM events.mds) mds + LEFT JOIN + interactions.subscriber AS caller_ident + ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac + LEFT JOIN + cell_id_mapping AS caller_loc + ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date + LEFT JOIN + d_date ON event_timestamp::date = date_actual + LEFT JOIN + d_time ON + EXTRACT(HOUR from event_timestamp) = hour_of_day), + mds_data AS + (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) + SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='mds') + FROM event_data + RETURNING *) + + INSERT INTO interactions.mds (event_id, date_dim_id, data_volume_total, data_volume_up, + data_volume_down, + duration) + SELECT event_id, date_dim_id, data_volume_total, data_volume_up, + data_volume_down, + duration FROM mds_data NATURAL JOIN event_data; + """, + "MDS events", + ), + ( + """ + WITH event_data AS (SELECT caller_ident.subscriber_id, + caller_loc.location_id, + time_dim_id, + date_dim_id, + recharge_amount, + airtime_fee, + tax_and_fee, + pre_event_balance, + post_event_balance, + calling_party_msisdn, + caller_tac, + event_timestamp + FROM + (SELECT datetime as event_timestamp, location_id as caller_location_id, + msisdn as calling_party_msisdn, tac as caller_tac, recharge_amount, + airtime_fee, tax_and_fee, pre_event_balance, post_event_balance + FROM events.topups) topup + LEFT JOIN + interactions.subscriber AS caller_ident + ON caller_ident.msisdn=calling_party_msisdn AND caller_ident.tac=caller_tac + LEFT JOIN + cell_id_mapping AS caller_loc + ON caller_location_id=caller_loc.mno_cell_id AND caller_loc.valid_period @> event_timestamp::date + LEFT JOIN + d_date ON event_timestamp::date = date_actual + LEFT JOIN + d_time ON + EXTRACT(HOUR from event_timestamp) = hour_of_day), + topup_data AS + (INSERT INTO interactions.event_supertable (subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, event_type_id) + SELECT subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp, (SELECT event_type_id FROM interactions.d_event_type WHERE name='topup') + FROM event_data + RETURNING *) + + INSERT INTO interactions.topup (event_id, date_dim_id, recharge_amount, airtime_fee, tax_and_fee, pre_event_balance, post_event_balance) + SELECT event_id, date_dim_id, recharge_amount, airtime_fee, tax_and_fee, pre_event_balance, post_event_balance FROM topup_data NATURAL JOIN event_data; + """, + "Topup events", + ), + ] + with log_duration("Migrate events."): + with ThreadPoolExecutor( + min(cpu_count(), int(os.getenv("MAX_CPUS", cpu_count()))) + ) as tp: + list(tp.map(do_exec, events)) + with log_duration("Migrate sightings."): + with engine.begin() as trans: + trans.execute( + """ + INSERT INTO interactions.subscriber_sightings (event_id, subscriber_id, location_id, time_dim_id, date_dim_id, sighting_timestamp) + SELECT event_id, subscriber_id, location_id, time_dim_id, date_dim_id, event_timestamp FROM interactions.event_supertable; + + INSERT INTO interactions.subscriber_sightings (event_id, subscriber_id, location_id, time_dim_id, date_dim_id, sighting_timestamp) + SELECT event_id, called_subscriber_id as subscriber_id, called_party_location_id as location_id, time_dim_id, date_dim_id, event_timestamp + FROM interactions.event_supertable NATURAL JOIN interactions.calls; + + INSERT INTO interactions.subscriber_sightings (event_id, subscriber_id, location_id, time_dim_id, date_dim_id, sighting_timestamp) + SELECT event_id, called_subscriber_id as subscriber_id, called_party_location_id as location_id, time_dim_id, date_dim_id, event_timestamp + FROM interactions.event_supertable NATURAL JOIN interactions.sms; + """ + ) diff --git a/flowdb_testdata.Dockerfile.dockerignore b/flowdb_testdata.Dockerfile.dockerignore index fa6b71a5cc..1463f7ac31 100644 --- a/flowdb_testdata.Dockerfile.dockerignore +++ b/flowdb_testdata.Dockerfile.dockerignore @@ -12,7 +12,8 @@ # !flowdb/testdata/bin/9900_ingest_test_data.sh !flowdb/testdata/bin/9910_run_synthetic_dfs_data_generation_script.sh -!flowdb/testdata/bin/9910_migrate_test_data.sql +!flowdb/testdata/bin/9910_migrate_test_data.sh +!flowdb/testdata/bin/migrate_synth_data.py !flowdb/testdata/test_data/Pipfile !flowdb/testdata/test_data/Pipfile.lock !flowdb/testdata/test_data/sql/ From 85e8ff0ddf27e1fcf8c0fe0017b12823409e5cae Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 12:24:31 +0000 Subject: [PATCH 16/31] Wrong path --- flowdb/testdata/bin/9910_migrate_test_data.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowdb/testdata/bin/9910_migrate_test_data.sh b/flowdb/testdata/bin/9910_migrate_test_data.sh index e8673a1db3..aeb43dbf81 100644 --- a/flowdb/testdata/bin/9910_migrate_test_data.sh +++ b/flowdb/testdata/bin/9910_migrate_test_data.sh @@ -12,4 +12,4 @@ export PGUSER="$POSTGRES_USER" # Migrate synthetic data. # -python3 migrate_synth_data.py \ No newline at end of file +python3 /docker-entrypoint-initdb.d/migrate_synth_data.py \ No newline at end of file From 4c5f026efbafb6cab956f0f0ce181995bc1aa187 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 12:37:57 +0000 Subject: [PATCH 17/31] No structlog --- flowdb/testdata/bin/migrate_synth_data.py | 49 +++++++++++++++-------- 1 file changed, 33 insertions(+), 16 deletions(-) diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py index 0a44f5f10f..13da240740 100644 --- a/flowdb/testdata/bin/migrate_synth_data.py +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -9,20 +9,26 @@ import sqlalchemy as sqlalchemy from sqlalchemy.exc import ResourceClosedError - -import structlog import json -structlog.configure( - processors=[ - structlog.stdlib.PositionalArgumentsFormatter(), - structlog.processors.TimeStamper(fmt="iso"), - structlog.processors.StackInfoRenderer(), - structlog.processors.format_exc_info, - structlog.processors.JSONRenderer(serializer=json.dumps), - ] -) -logger = structlog.get_logger(__name__) +try: + import structlog + + structlog.configure( + processors=[ + structlog.stdlib.PositionalArgumentsFormatter(), + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.JSONRenderer(serializer=json.dumps), + ] + ) + logger = structlog.get_logger(__name__) +except ImportError: + import logging + + logger = logging.getLogger(__name__) + logger.setLevel("DEBUG") @contextmanager @@ -38,11 +44,22 @@ def log_duration(job: str, **kwargs): Any kwargs will be shown in the log as "key":"value" """ start_time = datetime.datetime.now() - logger.info("Started", job=job, **kwargs) + try: + logger.info("Started", job=job, **kwargs) + except: + logger.info(f"Started {job}: {kwargs}") yield - logger.info( - "Finished", job=job, runtime=str(datetime.datetime.now() - start_time), **kwargs - ) + try: + logger.info( + "Finished", + job=job, + runtime=str(datetime.datetime.now() - start_time), + **kwargs, + ) + except: + logger.info( + f"Finished {job}. runtime={str(datetime.datetime.now() - start_time)}, {kwargs}" + ) parser = argparse.ArgumentParser(description="Flowminder Synthetic CDR Migrator\n") From f0ce2eb77dfe0ebfa76abd4f716d7660aa1b81ea Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 12:46:19 +0000 Subject: [PATCH 18/31] No threadlocal --- flowdb/testdata/bin/migrate_synth_data.py | 1 - 1 file changed, 1 deletion(-) diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py index 13da240740..2cefeb0567 100644 --- a/flowdb/testdata/bin/migrate_synth_data.py +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -77,7 +77,6 @@ def log_duration(job: str, **kwargs): engine = sqlalchemy.create_engine( f"postgresql://{os.getenv('POSTGRES_USER')}@/{os.getenv('POSTGRES_DB')}", echo=False, - strategy="threadlocal", pool_size=min(cpu_count(), int(os.getenv("MAX_CPUS", cpu_count()))), pool_timeout=None, ) From eea308aa2835976ee95f5b7494563f6da1237b85 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 12:57:05 +0000 Subject: [PATCH 19/31] Silly logging --- flowdb/testdata/bin/migrate_synth_data.py | 35 +++++++++++------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py index 2cefeb0567..b5d243a73a 100644 --- a/flowdb/testdata/bin/migrate_synth_data.py +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -31,6 +31,13 @@ logger.setLevel("DEBUG") +def log(msg, **kwargs): + try: + logger.info(msg, **kwargs) + except: + logger.info(f"{msg}: {kwargs}") + + @contextmanager def log_duration(job: str, **kwargs): """ @@ -44,22 +51,14 @@ def log_duration(job: str, **kwargs): Any kwargs will be shown in the log as "key":"value" """ start_time = datetime.datetime.now() - try: - logger.info("Started", job=job, **kwargs) - except: - logger.info(f"Started {job}: {kwargs}") + log("Started", job=job, **kwargs) yield - try: - logger.info( - "Finished", - job=job, - runtime=str(datetime.datetime.now() - start_time), - **kwargs, - ) - except: - logger.info( - f"Finished {job}. runtime={str(datetime.datetime.now() - start_time)}, {kwargs}" - ) + log( + "Finished", + job=job, + runtime=str(datetime.datetime.now() - start_time), + **kwargs, + ) parser = argparse.ArgumentParser(description="Flowminder Synthetic CDR Migrator\n") @@ -80,7 +79,7 @@ def log_duration(job: str, **kwargs): pool_size=min(cpu_count(), int(os.getenv("MAX_CPUS", cpu_count()))), pool_timeout=None, ) - logger.info( + log( "Connected.", num_connections=min(cpu_count(), int(os.getenv("MAX_CPUS", cpu_count()))), ) @@ -91,11 +90,11 @@ def do_exec(args): with engine.begin() as trans: res = trans.execute(sql) try: - logger.info(f"SQL result", job=msg, result=res.fetchall()) + log(f"SQL result", job=msg, result=res.fetchall()) except ResourceClosedError: pass # Nothing to do here except Exception as exc: - logger.error("Hit an issue.", exc=exc) + log("Hit an issue.", exc=exc) raise exc start_time = datetime.datetime.now() From 553b682dc57e579c70d232b9350632a1453a283b Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 13:18:18 +0000 Subject: [PATCH 20/31] Update migrate_synth_data.py --- flowdb/testdata/bin/migrate_synth_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py index b5d243a73a..9d52b3879f 100644 --- a/flowdb/testdata/bin/migrate_synth_data.py +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -115,7 +115,7 @@ def do_exec(args): ) for event_type in ("calls", "sms", "mds", "topups"): trans.execute( - f"CREATE TABLE interactions.calls_{date.strftime('%Y%M%d')} PARTITION OF interactions.{event_type} FOR VALUES {partition_period};" + f"CREATE TABLE interactions.{event_type}_{date.strftime('%Y%M%d')} PARTITION OF interactions.{event_type} FOR VALUES {partition_period};" ) with log_duration("Migrate subscribers."): with engine.begin() as trans: From 5bcd20d78cc7e450466227ea6f2993deb874906c Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 13:22:55 +0000 Subject: [PATCH 21/31] Run for synth data as well --- flowdb_synthetic_data.Dockerfile | 2 ++ flowdb_synthetic_data.Dockerfile.dockerignore | 2 ++ 2 files changed, 4 insertions(+) diff --git a/flowdb_synthetic_data.Dockerfile b/flowdb_synthetic_data.Dockerfile index 73e7ee0d1b..44652f0c5c 100644 --- a/flowdb_synthetic_data.Dockerfile +++ b/flowdb_synthetic_data.Dockerfile @@ -38,6 +38,8 @@ RUN mkdir -p /docker-entrypoint-initdb.d/sql/syntheticdata/ && \ COPY --chown=postgres flowdb/testdata/bin/9900_ingest_synthetic_data.sh /docker-entrypoint-initdb.d/ COPY --chown=postgres flowdb/testdata/bin/9800_population_density.sql.gz /docker-entrypoint-initdb.d/ +COPY --chown=postgres flowdb/testdata/bin/9910_migrate_test_data.sh /docker-entrypoint-initdb.d/ +COPY --chown=postgres flowdb/testdata/bin/migrate_synth_data.py /docker-entrypoint-initdb.d/ COPY --chown=postgres flowdb/testdata/bin/9910_run_synthetic_dfs_data_generation_script.sh /docker-entrypoint-initdb.d/ COPY --chown=postgres flowdb/testdata/test_data/py/* /docker-entrypoint-initdb.d/py/testdata/ diff --git a/flowdb_synthetic_data.Dockerfile.dockerignore b/flowdb_synthetic_data.Dockerfile.dockerignore index 0ed8d196c7..c77be87c4d 100644 --- a/flowdb_synthetic_data.Dockerfile.dockerignore +++ b/flowdb_synthetic_data.Dockerfile.dockerignore @@ -12,6 +12,8 @@ !flowdb/testdata/bin/9900_ingest_synthetic_data.sh !flowdb/testdata/bin/9800_population_density.sql.gz !flowdb/testdata/bin/generate_synthetic_data*.py +!flowdb/testdata/bin/9910_migrate_test_data.sh +!flowdb/testdata/bin/migrate_synth_data.py !flowdb/testdata/test_data/sql/admin*.sql !flowdb/testdata/synthetic_data/data/NPL_admbnda_adm3_Districts_simplified.geojson !flowdb/testdata/synthetic_data/Pipfile* From 863821a05f4551248f1ca274834aefb9e8e89598 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 13:37:52 +0000 Subject: [PATCH 22/31] Update migrate_synth_data.py --- flowdb/testdata/bin/migrate_synth_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py index 9d52b3879f..8edb16f1c0 100644 --- a/flowdb/testdata/bin/migrate_synth_data.py +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -113,7 +113,7 @@ def do_exec(args): trans.execute( f"CREATE TABLE interactions.subscriber_sightings_{date.strftime('%Y%M%d')} PARTITION OF interactions.subscriber_sightings FOR VALUES {partition_period};" ) - for event_type in ("calls", "sms", "mds", "topups"): + for event_type in ("calls", "sms", "mds", "topup"): trans.execute( f"CREATE TABLE interactions.{event_type}_{date.strftime('%Y%M%d')} PARTITION OF interactions.{event_type} FOR VALUES {partition_period};" ) From cb3033fba230a7eec8eb19477eecc33191644d4e Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 13:48:02 +0000 Subject: [PATCH 23/31] Update migrate_synth_data.py --- flowdb/testdata/bin/migrate_synth_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py index 8edb16f1c0..0f21715b46 100644 --- a/flowdb/testdata/bin/migrate_synth_data.py +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -85,7 +85,7 @@ def log_duration(job: str, **kwargs): ) def do_exec(args): - msg, sql = args + sql, msg = args with log_duration(msg): with engine.begin() as trans: res = trans.execute(sql) From 5af983761d45b15ad09bb83fe944ae92e6dc2ca7 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 13:59:31 +0000 Subject: [PATCH 24/31] Update migrate_synth_data.py --- flowdb/testdata/bin/migrate_synth_data.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py index 0f21715b46..d0a563ca3c 100644 --- a/flowdb/testdata/bin/migrate_synth_data.py +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -105,6 +105,7 @@ def do_exec(args): ): with log_duration("Migrating day.", day=date): partition_period = f"FROM ({date.strftime('%Y%M%d')}) TO ({(date + datetime.timedelta(days=1)).strftime('%Y%M%d')})" + print(partition_period) with log_duration("Creating partitions.", day=date): with engine.begin() as trans: trans.execute( From 912fd2f991a8a92ff4e530f06fe9f3fdeefd3712 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 14:34:01 +0000 Subject: [PATCH 25/31] Fix partitions and logging --- flowdb/testdata/bin/migrate_synth_data.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py index d0a563ca3c..e59377bff2 100644 --- a/flowdb/testdata/bin/migrate_synth_data.py +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -1,3 +1,4 @@ +import sys from pathlib import Path import os @@ -27,6 +28,7 @@ except ImportError: import logging + logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) logger = logging.getLogger(__name__) logger.setLevel("DEBUG") @@ -104,19 +106,18 @@ def do_exec(args): for i in range(num_days) ): with log_duration("Migrating day.", day=date): - partition_period = f"FROM ({date.strftime('%Y%M%d')}) TO ({(date + datetime.timedelta(days=1)).strftime('%Y%M%d')})" - print(partition_period) + partition_period = f"FROM ({date.strftime('%Y%m%d')}) TO ({(date + datetime.timedelta(days=1)).strftime('%Y%m%d')})" with log_duration("Creating partitions.", day=date): with engine.begin() as trans: trans.execute( - f"CREATE TABLE interactions.events_supertable_{date.strftime('%Y%M%d')} PARTITION OF interactions.event_supertable FOR VALUES {partition_period};" + f"CREATE TABLE interactions.events_supertable_{date.strftime('%Y%m%d')} PARTITION OF interactions.event_supertable FOR VALUES {partition_period};" ) trans.execute( - f"CREATE TABLE interactions.subscriber_sightings_{date.strftime('%Y%M%d')} PARTITION OF interactions.subscriber_sightings FOR VALUES {partition_period};" + f"CREATE TABLE interactions.subscriber_sightings_{date.strftime('%Y%m%d')} PARTITION OF interactions.subscriber_sightings FOR VALUES {partition_period};" ) for event_type in ("calls", "sms", "mds", "topup"): trans.execute( - f"CREATE TABLE interactions.{event_type}_{date.strftime('%Y%M%d')} PARTITION OF interactions.{event_type} FOR VALUES {partition_period};" + f"CREATE TABLE interactions.{event_type}_{date.strftime('%Y%m%d')} PARTITION OF interactions.{event_type} FOR VALUES {partition_period};" ) with log_duration("Migrate subscribers."): with engine.begin() as trans: From 7ee97a5ba99f97be76f4ab34e674bdbb9e849142 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 15:06:09 +0000 Subject: [PATCH 26/31] Deal with missing gambia --- flowdb/testdata/bin/migrate_synth_data.py | 25 +++++++++++++++-------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py index e59377bff2..cc95425039 100644 --- a/flowdb/testdata/bin/migrate_synth_data.py +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -173,15 +173,22 @@ def do_exec(args): INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) SELECT admin0pcod as short_name, admin0name as long_name, 1 as geo_kind_id, 0 as spatial_resolution, geom - FROM geography.admin0; - - INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) - SELECT district_c as short_name, district_n as long_name, 1 as geo_kind_id, 2 as spatial_resolution, geom - FROM public.gambia_admin2; - - /* Populate the geobridge */ - - INSERT INTO geography.geo_bridge (location_id, gid, valid_from, valid_to, linkage_method_id) + FROM geography.admin0;""" + ) + with engine.begin() as trans: + try: + trans.execute( + """ + INSERT INTO geography.geoms (short_name, long_name, geo_kind_id, spatial_resolution, geom) + SELECT district_c as short_name, district_n as long_name, 1 as geo_kind_id, 2 as spatial_resolution, geom + FROM public.gambia_admin2;""" + ) + except: + pass # No gambia table + with engine.begin() as trans: + # Populate the geobridge + trans.execute( + """INSERT INTO geography.geo_bridge (location_id, gid, valid_from, valid_to, linkage_method_id) SELECT locations.location_id, geoms.gid, '-Infinity'::date as valid_from, 'Infinity'::date as valid_to, 1 as linkage_method_id from interactions.locations LEFT JOIN geography.geoms ON ST_Intersects(position, geom); """ ) From 519c56164906af70225ca6399fdab74ff3661c37 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 15:11:35 +0000 Subject: [PATCH 27/31] Lint --- flowdb/testdata/bin/migrate_synth_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py index cc95425039..58ef9dfe77 100644 --- a/flowdb/testdata/bin/migrate_synth_data.py +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -184,7 +184,7 @@ def do_exec(args): FROM public.gambia_admin2;""" ) except: - pass # No gambia table + pass # No gambia table with engine.begin() as trans: # Populate the geobridge trans.execute( From 3da74f476cf659ad2ff2d1aaca1e1dc004274bff Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Mon, 14 Mar 2022 15:56:26 +0000 Subject: [PATCH 28/31] Longer timeout, fix number of days --- .circleci/config.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 9d0634106b..287a1a4bab 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -38,7 +38,7 @@ defaults: - &wait_for_flowdb name: Wait for flowdb to start command: | - dockerize -wait tcp://localhost:5432 -timeout 10m + dockerize -wait tcp://localhost:5432 -timeout 20m - &run_always_org_context context: org-global filters: @@ -818,6 +818,7 @@ jobs: name: python_with_flowdb flowdb_image: "testdata" python_version: "3.8.5" + num_days: 7 # To avoid overriding fixed number of days working_directory: /home/circleci/project/integration_tests steps: - checkout: From b2ad188079db7b82269dd80180b92a52f9ab8539 Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Wed, 14 Jun 2023 14:43:32 +0100 Subject: [PATCH 29/31] Use pipenv python --- flowdb/testdata/bin/9910_migrate_test_data.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowdb/testdata/bin/9910_migrate_test_data.sh b/flowdb/testdata/bin/9910_migrate_test_data.sh index aeb43dbf81..86fe269277 100644 --- a/flowdb/testdata/bin/9910_migrate_test_data.sh +++ b/flowdb/testdata/bin/9910_migrate_test_data.sh @@ -12,4 +12,4 @@ export PGUSER="$POSTGRES_USER" # Migrate synthetic data. # -python3 /docker-entrypoint-initdb.d/migrate_synth_data.py \ No newline at end of file +pipenv run python /docker-entrypoint-initdb.d/migrate_synth_data.py From 37d3bcf386fd15a0352e1f6a2a964f71cb082581 Mon Sep 17 00:00:00 2001 From: Thingus Date: Wed, 14 Jun 2023 16:19:45 +0100 Subject: [PATCH 30/31] Dropping events schema, bringing up to date with master --- flowdb/testdata/bin/migrate_synth_data.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/flowdb/testdata/bin/migrate_synth_data.py b/flowdb/testdata/bin/migrate_synth_data.py index 58ef9dfe77..0f25b6186d 100644 --- a/flowdb/testdata/bin/migrate_synth_data.py +++ b/flowdb/testdata/bin/migrate_synth_data.py @@ -396,3 +396,10 @@ def do_exec(args): FROM interactions.event_supertable NATURAL JOIN interactions.sms; """ ) + with log_duration("Drop events tables"): + with engine.begin() as trans: + trans.execute( + """ + DROP SCHEMA events CASCADE; + """ + ) From b7488223a6128a8120135fc7a01f4902b5db620c Mon Sep 17 00:00:00 2001 From: Jonathan Gray Date: Wed, 14 Jun 2023 16:57:00 +0100 Subject: [PATCH 31/31] skip synth dfs events because they use the events tables --- ...un_synthetic_dfs_data_generation_script.sh | 23 ------------------- 1 file changed, 23 deletions(-) delete mode 100755 flowdb/testdata/bin/9910_run_synthetic_dfs_data_generation_script.sh diff --git a/flowdb/testdata/bin/9910_run_synthetic_dfs_data_generation_script.sh b/flowdb/testdata/bin/9910_run_synthetic_dfs_data_generation_script.sh deleted file mode 100755 index 85402d9f89..0000000000 --- a/flowdb/testdata/bin/9910_run_synthetic_dfs_data_generation_script.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/sh -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at http://mozilla.org/MPL/2.0/. - - -set -e -export PGUSER="$POSTGRES_USER" - -# -# Generate synthetic DFS data. -# -# Note that the only purpose of this script is to -# call the Python script which does the actual data -# data generation, but we need this shell script as -# a wrapper because the PostgreSQL entrypoint script -# does not pick up .py files on its own. -# - -export DIR=/docker-entrypoint-initdb.d/py/testdata/ - -echo "Running Python script to generate synthetic DFS data." -pipenv run python ${DIR}/zz_generate_synthetic_dfs_data.py