214 lines
6.5 KiB
Python
214 lines
6.5 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
sqldns.py
|
|
|
|
Prints Data Source Name on stdout
|
|
"""
|
|
|
|
import sys
|
|
import warnings
|
|
from typing import List
|
|
|
|
from django.conf import settings
|
|
from django.core.management.base import BaseCommand, CommandError
|
|
from django.core.management.color import color_style
|
|
from django.db import DEFAULT_DB_ALIAS
|
|
from django_extensions.settings import SQLITE_ENGINES, POSTGRESQL_ENGINES, MYSQL_ENGINES
|
|
from django_extensions.utils.deprecation import RemovedInNextVersionWarning
|
|
|
|
|
|
def _sqlite_name(dbhost, dbport, dbname, dbuser, dbpass):
|
|
return dbname
|
|
|
|
|
|
def _mysql_keyvalue(dbhost, dbport, dbname, dbuser, dbpass):
|
|
dsnstr = f'host="{dbhost}", db="{dbname}", user="{dbuser}", passwd="{dbpass}"'
|
|
if dbport is not None:
|
|
dsnstr += f', port="{dbport}"'
|
|
return dsnstr
|
|
|
|
|
|
def _mysql_args(dbhost, dbport, dbname, dbuser, dbpass):
|
|
dsnstr = f'-h "{dbhost}" -D "{dbname}" -u "{dbuser}" -p "{dbpass}"'
|
|
if dbport is not None:
|
|
dsnstr += f" -P {dbport}"
|
|
return dsnstr
|
|
|
|
|
|
def _postgresql_keyvalue(dbhost, dbport, dbname, dbuser, dbpass):
|
|
dsnstr = f"host='{dbhost}' dbname='{dbname}' user='{dbuser}' password='{dbpass}'"
|
|
if dbport is not None:
|
|
dsnstr += f" port='{dbport}'"
|
|
return dsnstr
|
|
|
|
|
|
def _postgresql_kwargs(dbhost, dbport, dbname, dbuser, dbpass):
|
|
dsnstr = (
|
|
f"host={dbhost!r}, database={dbname!r}, user={dbuser!r}, password={dbpass!r}"
|
|
)
|
|
if dbport is not None:
|
|
dsnstr += f", port={dbport!r}"
|
|
return dsnstr
|
|
|
|
|
|
def _postgresql_pgpass(dbhost, dbport, dbname, dbuser, dbpass):
|
|
return ":".join(str(s) for s in [dbhost, dbport, dbname, dbuser, dbpass])
|
|
|
|
|
|
def _uri(engine):
|
|
def inner(dbhost, dbport, dbname, dbuser, dbpass):
|
|
host = dbhost or ""
|
|
if dbport is not None and dbport != "":
|
|
host += f":{dbport}"
|
|
if dbuser is not None and dbuser != "":
|
|
user = dbuser
|
|
if dbpass is not None and dbpass != "":
|
|
user += f":{dbpass}"
|
|
host = f"{user}@{host}"
|
|
return f"{engine}://{host}/{dbname}"
|
|
|
|
return inner
|
|
|
|
|
|
_FORMATTERS = [
|
|
(SQLITE_ENGINES, None, _sqlite_name),
|
|
(SQLITE_ENGINES, "filename", _sqlite_name),
|
|
(SQLITE_ENGINES, "uri", _uri("sqlite")),
|
|
(MYSQL_ENGINES, None, _mysql_keyvalue),
|
|
(MYSQL_ENGINES, "keyvalue", _mysql_keyvalue),
|
|
(MYSQL_ENGINES, "args", _mysql_args),
|
|
(MYSQL_ENGINES, "uri", _uri("mysql")),
|
|
(POSTGRESQL_ENGINES, None, _postgresql_keyvalue),
|
|
(POSTGRESQL_ENGINES, "keyvalue", _postgresql_keyvalue),
|
|
(POSTGRESQL_ENGINES, "kwargs", _postgresql_kwargs),
|
|
(POSTGRESQL_ENGINES, "uri", _uri("postgresql")),
|
|
(POSTGRESQL_ENGINES, "pgpass", _postgresql_pgpass),
|
|
]
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = "Prints DSN on stdout, as specified in settings.py"
|
|
requires_system_checks: List[str] = []
|
|
can_import_settings = True
|
|
|
|
def add_arguments(self, parser):
|
|
super().add_arguments(parser)
|
|
dbspec = parser.add_mutually_exclusive_group()
|
|
dbspec.add_argument(
|
|
"-R",
|
|
"--router",
|
|
action="store",
|
|
dest="router",
|
|
default=DEFAULT_DB_ALIAS,
|
|
help=(
|
|
"Use this router-database other then default "
|
|
"(deprecated: use --database instead)"
|
|
),
|
|
)
|
|
dbspec.add_argument(
|
|
"--database",
|
|
default=DEFAULT_DB_ALIAS,
|
|
help=(
|
|
"Nominates a database to run command for. "
|
|
'Defaults to the "%s" database.'
|
|
)
|
|
% DEFAULT_DB_ALIAS,
|
|
)
|
|
styles = sorted(
|
|
set([style for _, style, _ in _FORMATTERS if style is not None])
|
|
)
|
|
parser.add_argument(
|
|
"-s",
|
|
"--style",
|
|
action="store",
|
|
dest="style",
|
|
default=None,
|
|
choices=styles + ["all"],
|
|
help="DSN format style.",
|
|
)
|
|
dbspec.add_argument(
|
|
"-a",
|
|
"--all",
|
|
action="store_true",
|
|
dest="all",
|
|
default=False,
|
|
help="Show DSN for all database routes",
|
|
)
|
|
parser.add_argument(
|
|
"-q",
|
|
"--quiet",
|
|
action="store_true",
|
|
dest="quiet",
|
|
default=False,
|
|
help="Quiet mode only show DSN",
|
|
)
|
|
|
|
def handle(self, *args, **options):
|
|
self.style = color_style()
|
|
all_databases = options["all"]
|
|
|
|
if all_databases:
|
|
databases = settings.DATABASES.keys()
|
|
else:
|
|
databases = [options["database"]]
|
|
if options["router"] != DEFAULT_DB_ALIAS:
|
|
warnings.warn(
|
|
"--router is deprecated. You should use --database.",
|
|
RemovedInNextVersionWarning,
|
|
stacklevel=2,
|
|
)
|
|
databases = [options["router"]]
|
|
|
|
for i, database in enumerate(databases):
|
|
if i != 0:
|
|
sys.stdout.write("\n")
|
|
self.show_dsn(database, options)
|
|
|
|
def show_dsn(self, database, options):
|
|
dbinfo = settings.DATABASES.get(database)
|
|
quiet = options["quiet"]
|
|
dsn_style = options["style"]
|
|
|
|
if dbinfo is None:
|
|
raise CommandError("Unknown database %s" % database)
|
|
|
|
engine = dbinfo.get("ENGINE")
|
|
dbuser = dbinfo.get("USER")
|
|
dbpass = dbinfo.get("PASSWORD")
|
|
dbname = dbinfo.get("NAME")
|
|
dbhost = dbinfo.get("HOST")
|
|
dbport = dbinfo.get("PORT")
|
|
if dbport == "":
|
|
dbport = None
|
|
|
|
dsn = [
|
|
formatter(dbhost, dbport, dbname, dbuser, dbpass)
|
|
for engines, style, formatter in _FORMATTERS
|
|
if engine in engines
|
|
and (dsn_style == style or dsn_style == "all" and style is not None)
|
|
]
|
|
|
|
if not dsn:
|
|
available = ", ".join(
|
|
style
|
|
for engines, style, _ in _FORMATTERS
|
|
if engine in engines and style is not None
|
|
)
|
|
dsn = [
|
|
self.style.ERROR(
|
|
f"Invalid style {dsn_style} for {engine} (available: {available})"
|
|
if available
|
|
else "Unknown database, can't generate DSN"
|
|
)
|
|
]
|
|
|
|
if not quiet:
|
|
sys.stdout.write(
|
|
self.style.SQL_TABLE(
|
|
f"DSN for database {database!r} with engine {engine!r}:\n"
|
|
)
|
|
)
|
|
|
|
for output in dsn:
|
|
sys.stdout.write(f"{output}\n")
|