bdit_data-sources
bdit_data-sources copied to clipboard
WYS: mobile_api_id MATERIALIZED VIEW doesn't account for signs changing over time
Not a priority but the wys_monthly_summary dag will fail the first task wys_views which refreshes the mobile_api_id view which matches mobile sign installations from the spreadsheets with the locations (and api_id) pulled from the API because it doesn't account for start_date: when that api_id first appears in the API.
It happened
*** Reading local file: /etc/airflow/logs/wys_monthly_summary/wys_views/2021-08-02T07:00:00+00:00/1.log
[2021-09-02 03:00:14,467] {taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: wys_monthly_summary.wys_views 2021-08-02T07:00:00+00:00 [queued]>
[2021-09-02 03:00:14,505] {taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: wys_monthly_summary.wys_views 2021-08-02T07:00:00+00:00 [queued]>
[2021-09-02 03:00:14,505] {taskinstance.py:880} INFO -
--------------------------------------------------------------------------------
[2021-09-02 03:00:14,505] {taskinstance.py:881} INFO - Starting attempt 1 of 1
[2021-09-02 03:00:14,505] {taskinstance.py:882} INFO -
--------------------------------------------------------------------------------
[2021-09-02 03:00:14,537] {taskinstance.py:901} INFO - Executing <Task(PostgresOperator): wys_views> on 2021-08-02T07:00:00+00:00
[2021-09-02 03:00:14,545] {standard_task_runner.py:54} INFO - Started process 14850 to run task
[2021-09-02 03:00:14,665] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'wys_monthly_summary', 'wys_views', '2021-08-02T07:00:00+00:00', '--job_id', '15161', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/refresh_wys_monthly.py', '--cfg_path', '/tmp/tmpdqpv6qir']
[2021-09-02 03:00:14,674] {standard_task_runner.py:78} INFO - Job 15161: Subtask wys_views
[2021-09-02 03:00:14,830] {logging_mixin.py:120} INFO - Running <TaskInstance: wys_monthly_summary.wys_views 2021-08-02T07:00:00+00:00 [running]> on host ip-10-160-2-198
[2021-09-02 03:00:14,920] {postgres_operator.py:62} INFO - Executing: SELECT wys.refresh_mat_views()
[2021-09-02 03:00:14,946] {base_hook.py:89} INFO - Using connection to: id: wys_bot. Host: 10.160.12.47, Port: 5432, Schema: bigdata, Login: wys_bot, Password: XXXXXXXX, extra: None
[2021-09-02 03:00:14,959] {dbapi_hook.py:176} INFO - SELECT wys.refresh_mat_views()
[2021-09-02 03:00:16,250] {taskinstance.py:1150} ERROR - duplicate key value violates unique constraint "mobile_api_id_location_id_idx"
DETAIL: Key (location_id)=(1483594) already exists.
CONTEXT: SQL statement "INSERT INTO wys.mobile_api_id SELECT (diff.newdata).* FROM pg_temp_18.pg_temp_1348702135_2 diff WHERE tid IS NULL"
SQL function "refresh_mat_views" statement 1
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/postgres_operator.py", line 65, in execute
self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/hooks/dbapi_hook.py", line 177, in run
cur.execute(s)
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "mobile_api_id_location_id_idx"
DETAIL: Key (location_id)=(1483594) already exists.
CONTEXT: SQL statement "INSERT INTO wys.mobile_api_id SELECT (diff.newdata).* FROM pg_temp_18.pg_temp_1348702135_2 diff WHERE tid IS NULL"
SQL function "refresh_mat_views" statement 1
[2021-09-02 03:00:16,264] {taskinstance.py:1194} INFO - Marking task as FAILED. dag_id=wys_monthly_summary, task_id=wys_views, execution_date=20210802T070000, start_date=20210902T070014, end_date=20210902T070016
[2021-09-02 03:00:16,285] {base_hook.py:89} INFO - Using connection to: id: slack. Host: https://hooks.slack.com/services, Port: None, Schema: None, Login: None, Password: XXXXXXXX, extra: None
[2021-09-02 03:00:16,307] {base_hook.py:89} INFO - Using connection to: id: slack. Host: https://hooks.slack.com/services, Port: None, Schema: None, Login: None, Password: XXXXXXXX, extra: None
[2021-09-02 03:00:16,309] {http_hook.py:136} INFO - Sending 'POST' to url: https://hooks.slack.com/services/
[2021-09-02 03:00:16,320] {logging_mixin.py:120} WARNING - /usr/lib/python3/dist-packages/urllib3/connectionpool.py:860: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
InsecureRequestWarning)
[2021-09-02 03:00:19,439] {local_task_job.py:102} INFO - Task exited with return code 1
There are two signs for location 1483594
SELECT a.id AS location_id,
a.ward_no,
a.location,
a.from_street,
a.to_street,
a.direction,
a.installation_date,
a.removal_date,
a.comments,
a.combined,
b.api_id
FROM ( SELECT mobile_sign_installations.ward_no,
mobile_sign_installations.location,
mobile_sign_installations.from_street,
mobile_sign_installations.to_street,
mobile_sign_installations.direction,
mobile_sign_installations.installation_date,
mobile_sign_installations.removal_date,
mobile_sign_installations.new_sign_number,
mobile_sign_installations.comments,
mobile_sign_installations.id,
CASE
WHEN mobile_sign_installations.new_sign_number !~~ 'W%'::text THEN (('Ward '::text || mobile_sign_installations.ward_no) || ' - S'::text) || mobile_sign_installations.new_sign_number
WHEN mobile_sign_installations.new_sign_number ~~ 'W%'::text AND mobile_sign_installations.new_sign_number ~~ '% - S%'::text THEN 'Ward '::text || "substring"(mobile_sign_installations.new_sign_number, 2, 10)
WHEN mobile_sign_installations.new_sign_number ~~ 'W%'::text AND mobile_sign_installations.new_sign_number !~~ '% - S%'::text THEN (('Ward '::text || "substring"(mobile_sign_installations.new_sign_number, 2, "position"(mobile_sign_installations.new_sign_number, ' - '::text) + 1)) || 'S'::text) || "right"(mobile_sign_installations.new_sign_number, 1)
ELSE NULL::text
END AS combined
FROM wys.mobile_sign_installations) a
LEFT JOIN ( SELECT DISTINCT locations.api_id,
locations.sign_name
FROM wys.locations
WHERE locations.sign_name ~~ 'Ward%'::text
ORDER BY locations.sign_name) b ON a.combined = b.sign_name
WHERE id = 1483594
As part of fixing this, the wys_views task defined below which calls the wys.refresh_mat_views() function should be broken up so that mobile data doesn't depend on stationary data & vice-versa.
https://github.com/CityofToronto/bdit_data-sources/blob/b4cc5dbf4930d5957a87eb2745e16c363412198f/dags/refresh_wys_monthly.py#L53-L58
There should be two independent tracks for the 2 types of data

So this part of the SQL (which is joining locations so that each mobile installation will have an api_id to match with speed data) should account for the sign start_date and the mobile installation dates. Though bearing in mind that the sign start_date can be after a mobile installation because the earliest "start_date" for signs is the first time we started pulling data
https://github.com/CityofToronto/bdit_data-sources/blob/b4cc5dbf4930d5957a87eb2745e16c363412198f/wys/api/sql/create-view-mobile_api_id.sql#L17-L22
The wys.stationary_signs materialized view is sort of accounting for this over time by looking at the previously and subsequent start_date for any given sign (api_id)
https://github.com/CityofToronto/bdit_data-sources/blob/b4cc5dbf4930d5957a87eb2745e16c363412198f/wys/api/sql/mat-view-stationary-signs.sql#L27-L31
This is then used in the stationary sign summary function as well as
https://github.com/CityofToronto/bdit_data-sources/blob/e98182d30de5511a2a4c246e4771f14c937c6b0f/wys/api/sql/function-stationary-sign-summary.sql#L56-L59
the stationary locations open data materialized view
https://github.com/CityofToronto/bdit_data-sources/blob/e98182d30de5511a2a4c246e4771f14c937c6b0f/wys/api/sql/open_data/mat-view-stationary-locations.sql#L9-L13
Weirdly this issue came about not because of a sign being replaced, but actually a swapping of names between two signs on 2021-08-18
SELECT * FROM wys.locations
NATURAL JOIN (SELECT sign_name
FROM wys.locations
WHERE sign_name LIKE 'Ward%'
GROUP BY sign_name
HAVING COUNT(DISTINCT(api_id)) > 1
) replaced_signs
ORDER BY sign_name, start_date
Temporarily replacing the wys.mobile_api_id mat view to exclude those two signs until a fix is created.
DROP MATERIALIZED VIEW IF EXISTS wys.mobile_api_id;
CREATE MATERIALIZED VIEW IF NOT EXISTS wys.mobile_api_id
TABLESPACE pg_default
AS
SELECT a.id AS location_id,
a.ward_no,
a.location,
a.from_street,
a.to_street,
a.direction,
a.installation_date,
a.removal_date,
a.comments,
a.combined,
b.api_id
FROM ( SELECT mobile_sign_installations.ward_no,
mobile_sign_installations.location,
mobile_sign_installations.from_street,
mobile_sign_installations.to_street,
mobile_sign_installations.direction,
mobile_sign_installations.installation_date,
mobile_sign_installations.removal_date,
mobile_sign_installations.new_sign_number,
mobile_sign_installations.comments,
mobile_sign_installations.id,
CASE
WHEN mobile_sign_installations.new_sign_number !~~ 'W%'::text THEN (('Ward '::text || mobile_sign_installations.ward_no) || ' - S'::text) || mobile_sign_installations.new_sign_number
WHEN mobile_sign_installations.new_sign_number ~~ 'W%'::text AND mobile_sign_installations.new_sign_number ~~ '% - S%'::text THEN 'Ward '::text || "substring"(mobile_sign_installations.new_sign_number, 2, 10)
WHEN mobile_sign_installations.new_sign_number ~~ 'W%'::text AND mobile_sign_installations.new_sign_number !~~ '% - S%'::text THEN (('Ward '::text || "substring"(mobile_sign_installations.new_sign_number, 2, "position"(mobile_sign_installations.new_sign_number, ' - '::text) + 1)) || 'S'::text) || "right"(mobile_sign_installations.new_sign_number, 1)
ELSE NULL::text
END AS combined
FROM wys.mobile_sign_installations
) a
LEFT JOIN ( SELECT DISTINCT locations.api_id,
locations.sign_name
FROM wys.locations
WHERE locations.sign_name ~~ 'Ward%'::text
ORDER BY locations.sign_name) b ON a.combined = b.sign_name
WHERE combined NOT IN('Ward 10 - S7',
'Ward 10 - S2')
WITH DATA;
ALTER TABLE IF EXISTS wys.mobile_api_id
OWNER TO rdumas;
GRANT ALL ON TABLE wys.mobile_api_id TO rdumas;
GRANT SELECT ON TABLE wys.mobile_api_id TO bdit_humans;
CREATE UNIQUE INDEX mobile_api_id_location_id_idx
ON wys.mobile_api_id USING btree
(location_id)
TABLESPACE pg_default;