Commit 33755e8e authored by Heikki Linnakangas's avatar Heikki Linnakangas

Change the way encoding and locale checks are done in pg_upgrade.

Lc_collate and lc_ctype have been per-database settings since server version
8.4, but pg_upgrade was still treating them as cluster-wide options. It
fetched the values for the template0 databases in old and new cluster, and
compared them. That's backwards; the encoding and locale of the template0
database doesn't matter, as template0 is guaranteed to contain only ASCII
characters. But if there are any other databases that exist on both clusters
(in particular template1 and postgres databases), their encodings and
locales must be compatible.

Also, make the locale comparison more lenient. If the locale names are not
equal, try to canonicalize both of them by passing them to setlocale(). We
used to do that only when upgrading from 9.1 or below, but it seems like a
good idea even with newer versions. If we change the canonical form of a
locale, this allows pg_upgrade to still work. I'm about to do just that to
fix bug #11431, by mapping a locale name that contains non-ASCII characters
to a pure-ASCII alias of the same locale.

No backpatching, because earlier versions of pg_upgrade still support
upgrading from 8.3 servers. That would be more complicated, so it doesn't
seem worth it, given that we haven't received any complaints about this
from users.
parent f19f0ee7
...@@ -14,12 +14,10 @@ ...@@ -14,12 +14,10 @@
#include "pg_upgrade.h" #include "pg_upgrade.h"
static void set_locale_and_encoding(ClusterInfo *cluster);
static void check_new_cluster_is_empty(void); static void check_new_cluster_is_empty(void);
static void check_locale_and_encoding(ControlData *oldctrl, static void check_databases_are_compatible(void);
ControlData *newctrl); static void check_locale_and_encoding(DbInfo *olddb, DbInfo *newdb);
static bool equivalent_locale(const char *loca, const char *locb); static bool equivalent_locale(int category, const char *loca, const char *locb);
static bool equivalent_encoding(const char *chara, const char *charb);
static void check_is_install_user(ClusterInfo *cluster); static void check_is_install_user(ClusterInfo *cluster);
static void check_for_prepared_transactions(ClusterInfo *cluster); static void check_for_prepared_transactions(ClusterInfo *cluster);
static void check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster); static void check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster);
...@@ -81,8 +79,6 @@ check_and_dump_old_cluster(bool live_check) ...@@ -81,8 +79,6 @@ check_and_dump_old_cluster(bool live_check)
if (!live_check) if (!live_check)
start_postmaster(&old_cluster, true); start_postmaster(&old_cluster, true);
set_locale_and_encoding(&old_cluster);
get_pg_database_relfilenode(&old_cluster); get_pg_database_relfilenode(&old_cluster);
/* Extract a list of databases and tables from the old cluster */ /* Extract a list of databases and tables from the old cluster */
...@@ -127,13 +123,10 @@ check_and_dump_old_cluster(bool live_check) ...@@ -127,13 +123,10 @@ check_and_dump_old_cluster(bool live_check)
void void
check_new_cluster(void) check_new_cluster(void)
{ {
set_locale_and_encoding(&new_cluster);
check_locale_and_encoding(&old_cluster.controldata, &new_cluster.controldata);
get_db_and_rel_infos(&new_cluster); get_db_and_rel_infos(&new_cluster);
check_new_cluster_is_empty(); check_new_cluster_is_empty();
check_databases_are_compatible();
check_loadable_libraries(); check_loadable_libraries();
...@@ -278,94 +271,26 @@ check_cluster_compatibility(bool live_check) ...@@ -278,94 +271,26 @@ check_cluster_compatibility(bool live_check)
} }
/*
* set_locale_and_encoding()
*
* query the database to get the template0 locale
*/
static void
set_locale_and_encoding(ClusterInfo *cluster)
{
ControlData *ctrl = &cluster->controldata;
PGconn *conn;
PGresult *res;
int i_encoding;
int cluster_version = cluster->major_version;
conn = connectToServer(cluster, "template1");
/* for pg < 80400, we got the values from pg_controldata */
if (cluster_version >= 80400)
{
int i_datcollate;
int i_datctype;
res = executeQueryOrDie(conn,
"SELECT datcollate, datctype "
"FROM pg_catalog.pg_database "
"WHERE datname = 'template0' ");
assert(PQntuples(res) == 1);
i_datcollate = PQfnumber(res, "datcollate");
i_datctype = PQfnumber(res, "datctype");
if (GET_MAJOR_VERSION(cluster->major_version) < 902)
{
/*
* Pre-9.2 did not canonicalize the supplied locale names to match
* what the system returns, while 9.2+ does, so convert pre-9.2 to
* match.
*/
ctrl->lc_collate = get_canonical_locale_name(LC_COLLATE,
pg_strdup(PQgetvalue(res, 0, i_datcollate)));
ctrl->lc_ctype = get_canonical_locale_name(LC_CTYPE,
pg_strdup(PQgetvalue(res, 0, i_datctype)));
}
else
{
ctrl->lc_collate = pg_strdup(PQgetvalue(res, 0, i_datcollate));
ctrl->lc_ctype = pg_strdup(PQgetvalue(res, 0, i_datctype));
}
PQclear(res);
}
res = executeQueryOrDie(conn,
"SELECT pg_catalog.pg_encoding_to_char(encoding) "
"FROM pg_catalog.pg_database "
"WHERE datname = 'template0' ");
assert(PQntuples(res) == 1);
i_encoding = PQfnumber(res, "pg_encoding_to_char");
ctrl->encoding = pg_strdup(PQgetvalue(res, 0, i_encoding));
PQclear(res);
PQfinish(conn);
}
/* /*
* check_locale_and_encoding() * check_locale_and_encoding()
* *
* Check that old and new locale and encoding match. Even though the backend * Check that locale and encoding of a database in the old and new clusters
* tries to canonicalize stored locale names, the platform often doesn't * are compatible.
* cooperate, so it's entirely possible that one DB thinks its locale is
* "en_US.UTF-8" while the other says "en_US.utf8". Try to be forgiving.
*/ */
static void static void
check_locale_and_encoding(ControlData *oldctrl, check_locale_and_encoding(DbInfo *olddb, DbInfo *newdb)
ControlData *newctrl)
{ {
if (!equivalent_locale(oldctrl->lc_collate, newctrl->lc_collate)) if (olddb->db_encoding != newdb->db_encoding)
pg_fatal("lc_collate cluster values do not match: old \"%s\", new \"%s\"\n", pg_fatal("encodings for database \"%s\" do not match: old \"%s\", new \"%s\"\n",
oldctrl->lc_collate, newctrl->lc_collate); olddb->db_name,
if (!equivalent_locale(oldctrl->lc_ctype, newctrl->lc_ctype)) pg_encoding_to_char(olddb->db_encoding),
pg_fatal("lc_ctype cluster values do not match: old \"%s\", new \"%s\"\n", pg_encoding_to_char(newdb->db_encoding));
oldctrl->lc_ctype, newctrl->lc_ctype); if (!equivalent_locale(LC_COLLATE, olddb->db_collate, newdb->db_collate))
if (!equivalent_encoding(oldctrl->encoding, newctrl->encoding)) pg_fatal("lc_collate values for database \"%s\" do not match: old \"%s\", new \"%s\"\n",
pg_fatal("encoding cluster values do not match: old \"%s\", new \"%s\"\n", olddb->db_name, olddb->db_collate, newdb->db_collate);
oldctrl->encoding, newctrl->encoding); if (!equivalent_locale(LC_CTYPE, olddb->db_ctype, newdb->db_ctype))
pg_fatal("lc_ctype values for database \"%s\" do not match: old \"%s\", new \"%s\"\n",
olddb->db_name, olddb->db_ctype, newdb->db_ctype);
} }
/* /*
...@@ -373,61 +298,46 @@ check_locale_and_encoding(ControlData *oldctrl, ...@@ -373,61 +298,46 @@ check_locale_and_encoding(ControlData *oldctrl,
* *
* Best effort locale-name comparison. Return false if we are not 100% sure * Best effort locale-name comparison. Return false if we are not 100% sure
* the locales are equivalent. * the locales are equivalent.
*
* Note: The encoding parts of the names are ignored. This function is
* currently used to compare locale names stored in pg_database, and
* pg_database contains a separate encoding field. That's compared directly
* in check_locale_and_encoding().
*/ */
static bool static bool
equivalent_locale(const char *loca, const char *locb) equivalent_locale(int category, const char *loca, const char *locb)
{ {
const char *chara = strrchr(loca, '.'); const char *chara = strrchr(loca, '.');
const char *charb = strrchr(locb, '.'); const char *charb = strrchr(locb, '.');
int lencmp; char *canona;
char *canonb;
/* If they don't both contain an encoding part, just do strcasecmp(). */ int lena;
if (!chara || !charb) int lenb;
return (pg_strcasecmp(loca, locb) == 0);
/* /*
* Compare the encoding parts. Windows tends to use code page numbers for * If the names are equal, the locales are equivalent. Checking this
* the encoding part, which equivalent_encoding() won't like, so accept if * first avoids calling setlocale() in the common case that the names
* the strings are case-insensitive equal; otherwise use * are equal. That's a good thing, if setlocale() is buggy, for example.
* equivalent_encoding() to compare.
*/ */
if (pg_strcasecmp(chara + 1, charb + 1) != 0 && if (pg_strcasecmp(loca, locb) == 0)
!equivalent_encoding(chara + 1, charb + 1)) return true;
return false;
/* /*
* OK, compare the locale identifiers (e.g. en_US part of en_US.utf8). * Not identical. Canonicalize both names, remove the encoding parts,
* * and try again.
* It's tempting to ignore non-alphanumeric chars here, but for now it's
* not clear that that's necessary; just do case-insensitive comparison.
*/ */
lencmp = chara - loca; canona = get_canonical_locale_name(category, loca);
if (lencmp != charb - locb) chara = strrchr(canona, '.');
return false; lena = chara ? (chara - canona) : strlen(canona);
return (pg_strncasecmp(loca, locb, lencmp) == 0); canonb = get_canonical_locale_name(category, locb);
} charb = strrchr(canonb, '.');
lenb = charb ? (charb - canonb) : strlen(canonb);
/* if (lena == lenb && pg_strncasecmp(canona, canonb, lena) == 0)
* equivalent_encoding() return true;
*
* Best effort encoding-name comparison. Return true only if the encodings
* are valid server-side encodings and known equivalent.
*
* Because the lookup in pg_valid_server_encoding() does case folding and
* ignores non-alphanumeric characters, this will recognize many popular
* variant spellings as equivalent, eg "utf8" and "UTF-8" will match.
*/
static bool
equivalent_encoding(const char *chara, const char *charb)
{
int enca = pg_valid_server_encoding(chara);
int encb = pg_valid_server_encoding(charb);
if (enca < 0 || encb < 0)
return false; return false;
return (enca == encb);
} }
...@@ -450,7 +360,35 @@ check_new_cluster_is_empty(void) ...@@ -450,7 +360,35 @@ check_new_cluster_is_empty(void)
new_cluster.dbarr.dbs[dbnum].db_name); new_cluster.dbarr.dbs[dbnum].db_name);
} }
} }
}
/*
* Check that every database that already exists in the new cluster is
* compatible with the corresponding database in the old one.
*/
static void
check_databases_are_compatible(void)
{
int newdbnum;
int olddbnum;
DbInfo *newdbinfo;
DbInfo *olddbinfo;
for (newdbnum = 0; newdbnum < new_cluster.dbarr.ndbs; newdbnum++)
{
newdbinfo = &new_cluster.dbarr.dbs[newdbnum];
/* Find the corresponding database in the old cluster */
for (olddbnum = 0; olddbnum < old_cluster.dbarr.ndbs; olddbnum++)
{
olddbinfo = &old_cluster.dbarr.dbs[olddbnum];
if (strcmp(newdbinfo->db_name, olddbinfo->db_name) == 0)
{
check_locale_and_encoding(olddbinfo, newdbinfo);
break;
}
}
}
} }
......
...@@ -122,10 +122,6 @@ get_control_data(ClusterInfo *cluster, bool live_check) ...@@ -122,10 +122,6 @@ get_control_data(ClusterInfo *cluster, bool live_check)
pg_fatal("Could not get control data using %s: %s\n", pg_fatal("Could not get control data using %s: %s\n",
cmd, getErrorText(errno)); cmd, getErrorText(errno));
/* Only pre-8.4 has these so if they are not set below we will check later */
cluster->controldata.lc_collate = NULL;
cluster->controldata.lc_ctype = NULL;
/* Only in <= 9.2 */ /* Only in <= 9.2 */
if (GET_MAJOR_VERSION(cluster->major_version) <= 902) if (GET_MAJOR_VERSION(cluster->major_version) <= 902)
{ {
...@@ -404,36 +400,6 @@ get_control_data(ClusterInfo *cluster, bool live_check) ...@@ -404,36 +400,6 @@ get_control_data(ClusterInfo *cluster, bool live_check)
cluster->controldata.data_checksum_version = str2uint(p); cluster->controldata.data_checksum_version = str2uint(p);
got_data_checksum_version = true; got_data_checksum_version = true;
} }
/* In pre-8.4 only */
else if ((p = strstr(bufin, "LC_COLLATE:")) != NULL)
{
p = strchr(p, ':');
if (p == NULL || strlen(p) <= 1)
pg_fatal("%d: controldata retrieval problem\n", __LINE__);
p++; /* remove ':' char */
/* skip leading spaces and remove trailing newline */
p += strspn(p, " ");
if (strlen(p) > 0 && *(p + strlen(p) - 1) == '\n')
*(p + strlen(p) - 1) = '\0';
cluster->controldata.lc_collate = pg_strdup(p);
}
/* In pre-8.4 only */
else if ((p = strstr(bufin, "LC_CTYPE:")) != NULL)
{
p = strchr(p, ':');
if (p == NULL || strlen(p) <= 1)
pg_fatal("%d: controldata retrieval problem\n", __LINE__);
p++; /* remove ':' char */
/* skip leading spaces and remove trailing newline */
p += strspn(p, " ");
if (strlen(p) > 0 && *(p + strlen(p) - 1) == '\n')
*(p + strlen(p) - 1) = '\0';
cluster->controldata.lc_ctype = pg_strdup(p);
}
} }
if (output) if (output)
......
...@@ -239,11 +239,15 @@ get_db_infos(ClusterInfo *cluster) ...@@ -239,11 +239,15 @@ get_db_infos(ClusterInfo *cluster)
DbInfo *dbinfos; DbInfo *dbinfos;
int i_datname, int i_datname,
i_oid, i_oid,
i_encoding,
i_datcollate,
i_datctype,
i_spclocation; i_spclocation;
char query[QUERY_ALLOC]; char query[QUERY_ALLOC];
snprintf(query, sizeof(query), snprintf(query, sizeof(query),
"SELECT d.oid, d.datname, %s " "SELECT d.oid, d.datname, d.encoding, d.datcollate, d.datctype, "
"%s AS spclocation "
"FROM pg_catalog.pg_database d " "FROM pg_catalog.pg_database d "
" LEFT OUTER JOIN pg_catalog.pg_tablespace t " " LEFT OUTER JOIN pg_catalog.pg_tablespace t "
" ON d.dattablespace = t.oid " " ON d.dattablespace = t.oid "
...@@ -252,12 +256,15 @@ get_db_infos(ClusterInfo *cluster) ...@@ -252,12 +256,15 @@ get_db_infos(ClusterInfo *cluster)
"ORDER BY 2", "ORDER BY 2",
/* 9.2 removed the spclocation column */ /* 9.2 removed the spclocation column */
(GET_MAJOR_VERSION(cluster->major_version) <= 901) ? (GET_MAJOR_VERSION(cluster->major_version) <= 901) ?
"t.spclocation" : "pg_catalog.pg_tablespace_location(t.oid) AS spclocation"); "t.spclocation" : "pg_catalog.pg_tablespace_location(t.oid)");
res = executeQueryOrDie(conn, "%s", query); res = executeQueryOrDie(conn, "%s", query);
i_oid = PQfnumber(res, "oid"); i_oid = PQfnumber(res, "oid");
i_datname = PQfnumber(res, "datname"); i_datname = PQfnumber(res, "datname");
i_encoding = PQfnumber(res, "encoding");
i_datcollate = PQfnumber(res, "datcollate");
i_datctype = PQfnumber(res, "datctype");
i_spclocation = PQfnumber(res, "spclocation"); i_spclocation = PQfnumber(res, "spclocation");
ntups = PQntuples(res); ntups = PQntuples(res);
...@@ -267,6 +274,9 @@ get_db_infos(ClusterInfo *cluster) ...@@ -267,6 +274,9 @@ get_db_infos(ClusterInfo *cluster)
{ {
dbinfos[tupnum].db_oid = atooid(PQgetvalue(res, tupnum, i_oid)); dbinfos[tupnum].db_oid = atooid(PQgetvalue(res, tupnum, i_oid));
dbinfos[tupnum].db_name = pg_strdup(PQgetvalue(res, tupnum, i_datname)); dbinfos[tupnum].db_name = pg_strdup(PQgetvalue(res, tupnum, i_datname));
dbinfos[tupnum].db_encoding = atoi(PQgetvalue(res, tupnum, i_encoding));
dbinfos[tupnum].db_collate = pg_strdup(PQgetvalue(res, tupnum, i_datcollate));
dbinfos[tupnum].db_ctype = pg_strdup(PQgetvalue(res, tupnum, i_datctype));
snprintf(dbinfos[tupnum].db_tablespace, sizeof(dbinfos[tupnum].db_tablespace), "%s", snprintf(dbinfos[tupnum].db_tablespace, sizeof(dbinfos[tupnum].db_tablespace), "%s",
PQgetvalue(res, tupnum, i_spclocation)); PQgetvalue(res, tupnum, i_spclocation));
} }
......
...@@ -180,6 +180,9 @@ typedef struct ...@@ -180,6 +180,9 @@ typedef struct
char *db_name; /* database name */ char *db_name; /* database name */
char db_tablespace[MAXPGPATH]; /* database default tablespace char db_tablespace[MAXPGPATH]; /* database default tablespace
* path */ * path */
char *db_collate;
char *db_ctype;
int db_encoding;
RelInfoArr rel_arr; /* array of all user relinfos */ RelInfoArr rel_arr; /* array of all user relinfos */
} DbInfo; } DbInfo;
...@@ -218,9 +221,6 @@ typedef struct ...@@ -218,9 +221,6 @@ typedef struct
bool date_is_int; bool date_is_int;
bool float8_pass_by_value; bool float8_pass_by_value;
bool data_checksum_version; bool data_checksum_version;
char *lc_collate;
char *lc_ctype;
char *encoding;
} ControlData; } ControlData;
/* /*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment