-
Notifications
You must be signed in to change notification settings - Fork 0
/
migration_script.py
90 lines (77 loc) · 3.46 KB
/
migration_script.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import os
import json
import psycopg
import importlib
import uuid
from typing import Dict
from pathlib import Path
def execute_sql_scripts(con: psycopg.Connection, migration_sql: Dict):
with open(os.path.join(os.getcwd(), "migrations", migration_sql['folder'], migration_sql['file'])) as f:
script = str(f.read())
try:
with con.cursor() as cur:
cur.execute(script)
con.commit()
except Exception as e:
print(e)
def execute_python_scripts(con, migration_py):
module = importlib.import_module(f"database.python.{migration_py['file']}")
module.transform(con)
config_filename = os.path.join(os.getcwd(), 'config', f'{os.environ["FLASK_ENV"]}.py')
try:
with open(config_filename, mode="rb") as config_file:
exec(compile(config_file.read(), config_filename, "exec"))
except IOError as e:
print(e)
if os.path.isfile("migration.json"):
with psycopg.connect(DATABASE_STRING) as con:
with con.cursor() as cur:
cur.execute("""SELECT EXISTS (
SELECT FROM
pg_tables
WHERE
schemaname = 'public' AND
tablename = 'migrations'
);""")
migration_table_exists = cur.fetchone()[0]
if not migration_table_exists:
cur.execute("""CREATE TABLE IF NOT EXISTS migrations (
uuid varchar(200) PRIMARY KEY,
migration text
);""")
con.commit()
migrations = []
with open("migration.json") as migrations_file:
migrations = json.load(migrations_file)
for migration in migrations:
if os.environ["FLASK_ENV"] == "development":
answer = input(f"Should process {migration['file']}? (y/N)")
if answer.lower() != 'y':
continue
if os.environ['FLASK_ENV'] == "production":
migrated_path = os.path.join(os.getcwd(), "migrated")
if not Path(migrated_path).is_file():
with open(migrated_path, 'w') as f:
f.write('')
with open(migrated_path) as f:
content = f.readlines()
content = [x.strip() for x in content]
if migration['file'] in content:
continue
with con.cursor() as cur:
cur.execute("""SELECT * FROM migrations WHERE migration = %s""", (f"{migration['type']}-{migration['file']}",))
migration_exists = cur.fetchall()
if len(migration_exists) == 0:
if migration["type"] == "sql":
execute_sql_scripts(con, migration)
if migration["type"] == "py":
execute_python_scripts(con, migration)
print(f"Processed {migration['file']}")
with con.cursor() as cur:
cur.execute("""INSERT INTO migrations (uuid, migration) VALUES (%s, %s)""", (str(uuid.uuid4()), f"{migration['type']}-{migration['file']}"))
con.commit()
if os.environ["FLASK_ENV"] != "development":
answer = input(f"Delete migrations file? (Y/n)")
if answer.lower() != 'n':
os.remove("migration.json")
print("\nMigrations done")