Plain is headed towards 1.0! Subscribe for development updates →

  1import os
  2import shlex
  3import subprocess
  4
  5from plain.runtime import APP_PATH, settings
  6
  7SNAPSHOT_DB_PREFIX = "plaindb_snapshot_"
  8
  9
 10class DBContainer:
 11    def __init__(self):
 12        project_root = APP_PATH.parent
 13        tmp_dir = settings.PLAIN_TEMP_PATH
 14
 15        name = os.path.basename(project_root) + "-postgres-1"
 16
 17        if "DATABASE_URL" in os.environ:
 18            from plain.models import database_url
 19
 20            postgres_version = os.environ.get("POSTGRES_VERSION")
 21            parsed_db_url = database_url.parse(os.environ.get("DATABASE_URL"))
 22
 23        self.name = name
 24        self.tmp_dir = os.path.abspath(tmp_dir)
 25        self.postgres_version = postgres_version or "13"
 26        self.postgres_port = parsed_db_url.get("PORT", "5432")
 27        self.postgres_db = parsed_db_url.get("NAME", "postgres")
 28        self.postgres_user = parsed_db_url.get("USER", "postgres")
 29        self.postgres_password = parsed_db_url.get("PASSWORD", "postgres")
 30
 31    def execute(self, command, *args, **kwargs):
 32        docker_flags = kwargs.pop("docker_flags", "-it")
 33        return subprocess.run(
 34            [
 35                "docker",
 36                "exec",
 37                docker_flags,
 38                self.name,
 39                *shlex.split(command),
 40            ]
 41            + list(args),
 42            check=True,
 43            **kwargs,
 44        )
 45
 46    def reset(self, create=False):
 47        try:
 48            self.execute(
 49                f"dropdb {self.postgres_db} --force -U {self.postgres_user}",
 50                stdout=subprocess.PIPE,
 51                stderr=subprocess.PIPE,
 52            )
 53        except subprocess.CalledProcessError as e:
 54            if "does not exist" not in e.stdout.decode():
 55                print(e.stderr.decode())
 56                raise
 57
 58        if create:
 59            self.execute(
 60                f"createdb {self.postgres_db} -U {self.postgres_user}",
 61                stdout=subprocess.PIPE,
 62                stderr=subprocess.PIPE,
 63            )
 64
 65    def terminate_connections(self):
 66        self.execute(
 67            f"psql -U {self.postgres_user} {self.postgres_db} -c",
 68            f"SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity WHERE pg_stat_activity.datname = '{self.postgres_db}' AND pid <> pg_backend_pid();",
 69            stdout=subprocess.DEVNULL,
 70        )
 71
 72    def create_snapshot(self, name):
 73        snapshot_name = f"{SNAPSHOT_DB_PREFIX}{name}"
 74        current_git_branch = (
 75            subprocess.check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"])
 76            .decode()
 77            .strip()
 78        )
 79        description = f"branch={current_git_branch}"
 80
 81        self.terminate_connections()
 82        try:
 83            self.execute(
 84                f"createdb {snapshot_name} '{description}' -U {self.postgres_user} -T {self.postgres_db}",
 85                stdout=subprocess.PIPE,
 86                stderr=subprocess.PIPE,
 87            )
 88        except subprocess.CalledProcessError as e:
 89            if "already exists" in e.stdout.decode():
 90                return False
 91            else:
 92                raise
 93
 94        return True
 95
 96    def list_snapshots(self):
 97        self.execute(
 98            f"psql -U {self.postgres_user} {self.postgres_db} -c",
 99            f"SELECT REPLACE(datname, '{SNAPSHOT_DB_PREFIX}', '') as name, pg_size_pretty(pg_database_size(datname)) as size, pg_catalog.shobj_description(oid, 'pg_database') AS description, (pg_stat_file('base/'||oid ||'/PG_VERSION')).modification as created FROM pg_catalog.pg_database WHERE datname LIKE '{SNAPSHOT_DB_PREFIX}%' ORDER BY created;",
100        )
101
102    def delete_snapshot(self, name):
103        snapshot_name = f"{SNAPSHOT_DB_PREFIX}{name}"
104        try:
105            self.execute(
106                f"dropdb {snapshot_name} -U {self.postgres_user}",
107                stdout=subprocess.PIPE,
108                stderr=subprocess.PIPE,
109            )
110        except subprocess.CalledProcessError as e:
111            if "does not exist" in e.stdout.decode():
112                return False
113            else:
114                raise
115
116        return True
117
118    def restore_snapshot(self, name):
119        snapshot_name = f"{SNAPSHOT_DB_PREFIX}{name}"
120        self.reset(create=False)
121        self.execute(
122            f"createdb {self.postgres_db} -U {self.postgres_user} -T {snapshot_name}",
123        )
124
125    def export(self, export_path):
126        successful = (
127            subprocess.run(
128                [
129                    "docker",
130                    "exec",
131                    self.name,
132                    "/bin/bash",
133                    "-c",
134                    f"pg_dump -U {self.postgres_user} {self.postgres_db}",
135                ],
136                stdout=open(export_path, "w+"),
137            ).returncode
138            == 0
139        )
140        return successful
141
142    def import_sql(self, sql_file):
143        self.reset(create=True)
144        successful = (
145            subprocess.run(
146                f"docker exec -i {self.name} psql -U {self.postgres_user} {self.postgres_db} < {shlex.quote(sql_file)}",
147                shell=True,
148            ).returncode
149            == 0
150        )
151        return successful