2025-11-10 20:06:38 +03:00

205 lines
7.0 KiB
Python

# -*- coding: utf-8 -*-
import json
from operator import itemgetter
from pathlib import Path
from django.core.management import call_command
from django.core.management.base import BaseCommand, CommandError
from django.db import DEFAULT_DB_ALIAS, connections
from django.db.backends.base.base import BaseDatabaseWrapper
from django.db.migrations.loader import MigrationLoader
from django.db.migrations.recorder import MigrationRecorder
from django.utils import timezone
from django_extensions.management.utils import signalcommand
DEFAULT_FILENAME = "managestate.json"
DEFAULT_STATE = "default"
class Command(BaseCommand):
help = "Manage database state in the convenient way."
_applied_migrations = None
migrate_args: dict
migrate_options: dict
filename: str
verbosity: int
database: str
conn: BaseDatabaseWrapper
def add_arguments(self, parser):
parser.add_argument(
"action",
choices=("dump", "load"),
help="An action to do. "
"Dump action saves applied migrations to a file. "
"Load action applies migrations specified in a file.",
)
parser.add_argument(
"state",
nargs="?",
default=DEFAULT_STATE,
help="A name of a state. Usually a name of a git branch."
f'Defaults to "{DEFAULT_STATE}"',
)
parser.add_argument(
"-d",
"--database",
default=DEFAULT_DB_ALIAS,
help="Nominates a database to synchronize. "
f'Defaults to the "{DEFAULT_DB_ALIAS}" database.',
)
parser.add_argument(
"-f",
"--filename",
default=DEFAULT_FILENAME,
help=f'A file to write to. Defaults to "{DEFAULT_FILENAME}"',
)
# migrate command arguments
parser.add_argument(
"--noinput",
"--no-input",
action="store_false",
dest="interactive",
help='The argument for "migrate" command. '
"Tells Django to NOT prompt the user for input of any kind.",
)
parser.add_argument(
"--fake",
action="store_true",
help='The argument for "migrate" command. '
"Mark migrations as run without actually running them.",
)
parser.add_argument(
"--fake-initial",
action="store_true",
help='The argument for "migrate" command. '
"Detect if tables already exist and fake-apply initial migrations if so. "
"Make sure that the current database schema matches your initial migration "
"before using this flag. "
"Django will only check for an existing table name.",
)
parser.add_argument(
"--plan",
action="store_true",
help='The argument for "migrate" command. '
"Shows a list of the migration actions that will be performed.",
)
parser.add_argument(
"--run-syncdb",
action="store_true",
help='The argument for "migrate" command. '
"Creates tables for apps without migrations.",
)
parser.add_argument(
"--check",
action="store_true",
dest="check_unapplied",
help='The argument for "migrate" command. '
"Exits with a non-zero status if unapplied migrations exist.",
)
@signalcommand
def handle(self, action, database, filename, state, *args, **options):
self.migrate_args = args
self.migrate_options = options
self.verbosity = options["verbosity"]
self.conn = connections[database]
self.database = database
self.filename = filename
getattr(self, action)(state)
def dump(self, state: str):
"""Save applied migrations to a file."""
migrated_apps = self.get_migrated_apps()
migrated_apps.update(self.get_applied_migrations())
self.write({state: migrated_apps})
self.stdout.write(
self.style.SUCCESS(
f'Migrations for state "{state}" have been successfully '
f"saved to {self.filename}."
)
)
def load(self, state: str):
"""Apply migrations from a file."""
migrations = self.read().get(state)
if migrations is None:
raise CommandError(f"No such state saved: {state}")
kwargs = {
**self.migrate_options,
"database": self.database,
"verbosity": self.verbosity - 1 if self.verbosity > 1 else 0,
}
for app, migration in migrations.items():
if self.is_applied(app, migration):
continue
if self.verbosity > 1:
self.stdout.write(
self.style.WARNING(f'Applying migrations for "{app}"')
)
args = (app, migration, *self.migrate_args)
call_command("migrate", *args, **kwargs)
self.stdout.write(
self.style.SUCCESS(
f'Migrations for "{state}" have been successfully applied.'
)
)
def get_migrated_apps(self) -> dict:
"""Installed apps having migrations."""
apps = MigrationLoader(self.conn).migrated_apps
migrated_apps = dict.fromkeys(apps, "zero")
if self.verbosity > 1:
self.stdout.write(
"Apps having migrations: " + ", ".join(sorted(migrated_apps))
)
return migrated_apps
def get_applied_migrations(self) -> dict:
"""Installed apps with last applied migrations."""
if self._applied_migrations:
return self._applied_migrations
migrations = MigrationRecorder(self.conn).applied_migrations()
last_applied = sorted(migrations.keys(), key=itemgetter(1))
self._applied_migrations = dict(last_applied)
return self._applied_migrations
def is_applied(self, app: str, migration: str) -> bool:
"""Check whether a migration for an app is applied or not."""
applied = self.get_applied_migrations().get(app)
if applied == migration:
if self.verbosity > 1:
self.stdout.write(
self.style.WARNING(f'Migrations for "{app}" are already applied.')
)
return True
return False
def read(self) -> dict:
"""Get saved state from the file."""
path = Path(self.filename)
if not path.exists() or not path.is_file():
raise CommandError(f"No such file: {self.filename}")
with open(self.filename) as file:
return json.load(file)
def write(self, data: dict):
"""Write new data to the file using existent one."""
try:
saved = self.read()
except CommandError:
saved = {}
saved.update(data, updated_at=str(timezone.now()))
with open(self.filename, "w") as file:
json.dump(saved, file, indent=2, sort_keys=True)