Commit 51b363ec authored by Bruce Momjian's avatar Bruce Momjian

Please apply this patch to contrib/dbmirror

In incorperates changes from myself and a number of contributors.

This update to dbmirror provides:

-replication of sequence operations via setval/nextval
-DBMirror.pl support for logging to syslog
-changed the names of the tables to dbmirror_*  (no quotes required)
-Support for writitng SQL statements to files instead of directly to
 a slave database
-More options for DBMirror.pl in the config files.

Steven Singer
parent 6dfb2b25
...@@ -33,7 +33,7 @@ ...@@ -33,7 +33,7 @@
# #
# #
############################################################################## ##############################################################################
# $PostgreSQL: pgsql/contrib/dbmirror/DBMirror.pl,v 1.7 2003/11/29 22:39:19 pgsql Exp $ # $PostgreSQL: pgsql/contrib/dbmirror/DBMirror.pl,v 1.8 2004/02/17 03:34:35 momjian Exp $
# #
############################################################################## ##############################################################################
...@@ -79,17 +79,17 @@ sub mirrorCommand($$$$$$); ...@@ -79,17 +79,17 @@ sub mirrorCommand($$$$$$);
sub mirrorInsert($$$$$); sub mirrorInsert($$$$$);
sub mirrorDelete($$$$$); sub mirrorDelete($$$$$);
sub mirrorUpdate($$$$$); sub mirrorUpdate($$$$$);
sub sendQueryToSlaves($$);
sub logErrorMessage($); sub logErrorMessage($);
sub openSlaveConnection($); sub setupSlave($);
sub updateMirrorHostTable($$); sub updateMirrorHostTable($$);
sub extractData($$); sub extractData($$);
local $::masterHost; local $::masterHost;
local $::masterDb; local $::masterDb;
local $::masterUser; local $::masterUser;
local $::masterPassword; local $::masterPassword;
local $::errorThreshold=5; local $::errorThreshold=5;
local $::errorEmailAddr=undef; local $::errorEmailAddr=undef;
local $::sleepInterval=60;
my %slaveInfoHash; my %slaveInfoHash;
local $::slaveInfo = \%slaveInfoHash; local $::slaveInfo = \%slaveInfoHash;
...@@ -115,8 +115,25 @@ sub Main() { ...@@ -115,8 +115,25 @@ sub Main() {
die; die;
} }
if (defined($::syslog))
my $connectString = "host=$::masterHost dbname=$::masterDb user=$::masterUser password=$::masterPassword"; {
# log with syslog
require Sys::Syslog;
import Sys::Syslog qw(openlog syslog);
openlog($0, 'cons,pid', 'user');
syslog("info", '%s', "starting $0 script with $ARGV[0]");
}
my $connectString;
if(defined($::masterHost))
{
$connectString .= "host=$::masterHost ";
}
if(defined($::masterPort))
{
$connectString .= "port=$::masterPort ";
}
$connectString .= "dbname=$::masterDb user=$::masterUser password=$::masterPassword";
$masterConn = Pg::connectdb($connectString); $masterConn = Pg::connectdb($connectString);
...@@ -138,33 +155,29 @@ sub Main() { ...@@ -138,33 +155,29 @@ sub Main() {
my $firstTime = 1; my $firstTime = 1;
while(1) { while(1) {
if($firstTime == 0) { if($firstTime == 0) {
sleep 60; sleep $::sleepInterval;
} }
$firstTime = 0; $firstTime = 0;
# Open up the connection to the slave.
if(! defined $::slaveInfo->{"status"} ||
$::slaveInfo->{"status"} == -1) {
openSlaveConnection($::slaveInfo);
}
setupSlave($::slaveInfo);
sendQueryToSlaves(undef,"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
sendQueryToSlaves(undef,"SET CONSTRAINTS ALL DEFERRED");
#Obtain a list of pending transactions using ordering by our approximation #Obtain a list of pending transactions using ordering by our approximation
#to the commit time. The commit time approximation is taken to be the #to the commit time. The commit time approximation is taken to be the
#SeqId of the last row edit in the transaction. #SeqId of the last row edit in the transaction.
my $pendingTransQuery = "SELECT pd.\"XID\",MAX(\"SeqId\") FROM \"Pending\" pd"; my $pendingTransQuery = "SELECT pd.XID,MAX(SeqId) FROM dbmirror_Pending pd";
$pendingTransQuery .= " LEFT JOIN \"MirroredTransaction\" mt INNER JOIN"; $pendingTransQuery .= " LEFT JOIN dbmirror_MirroredTransaction mt INNER JOIN";
$pendingTransQuery .= " \"MirrorHost\" mh ON mt.\"MirrorHostId\" = "; $pendingTransQuery .= " dbmirror_MirrorHost mh ON mt.MirrorHostId = ";
$pendingTransQuery .= " mh.\"MirrorHostId\" AND mh.\"HostName\"="; $pendingTransQuery .= " mh.MirrorHostId AND mh.SlaveName=";
$pendingTransQuery .= " '$::slaveInfo->{\"slaveHost\"}' "; $pendingTransQuery .= " '$::slaveInfo->{\"slaveName\"}' ";
$pendingTransQuery .= " ON pd.\"XID\""; $pendingTransQuery .= " ON pd.XID";
$pendingTransQuery .= " = mt.\"XID\" WHERE mt.\"XID\" is null "; $pendingTransQuery .= " = mt.XID WHERE mt.XID is null ";
$pendingTransQuery .= " GROUP BY pd.\"XID\" ";
$pendingTransQuery .= " ORDER BY MAX(pd.\"SeqId\")";
$pendingTransQuery .= " GROUP BY pd.XID";
$pendingTransQuery .= " ORDER BY MAX(pd.SeqId)";
my $pendingTransResults = $masterConn->exec($pendingTransQuery); my $pendingTransResults = $masterConn->exec($pendingTransQuery);
...@@ -185,13 +198,21 @@ sub Main() { ...@@ -185,13 +198,21 @@ sub Main() {
my $XID = $pendingTransResults->getvalue($curTransTuple,0); my $XID = $pendingTransResults->getvalue($curTransTuple,0);
my $maxSeqId = $pendingTransResults->getvalue($curTransTuple,1); my $maxSeqId = $pendingTransResults->getvalue($curTransTuple,1);
my $seqId; my $seqId;
my $pendingQuery = "SELECT pnd.\"SeqId\",pnd.\"TableName\",";
$pendingQuery .= " pnd.\"Op\",pnddata.\"IsKey\", pnddata.\"Data\" AS \"Data\" "; if($::slaveInfo->{'status'} eq 'FileClosed')
$pendingQuery .= " FROM \"Pending\" pnd, \"PendingData\" pnddata "; {
$pendingQuery .= " WHERE pnd.\"SeqId\" = pnddata.\"SeqId\" AND "; openTransactionFile($::slaveInfo,$XID);
}
$pendingQuery .= " pnd.\"XID\"=$XID ORDER BY \"SeqId\", \"IsKey\" DESC";
my $pendingQuery = "SELECT pnd.SeqId,pnd.TableName,";
$pendingQuery .= " pnd.Op,pnddata.IsKey, pnddata.Data AS Data ";
$pendingQuery .= " FROM dbmirror_Pending pnd, dbmirror_PendingData pnddata ";
$pendingQuery .= " WHERE pnd.SeqId = pnddata.SeqId ";
$pendingQuery .= " AND pnd.XID=$XID ORDER BY SeqId, IsKey DESC";
my $pendingResults = $masterConn->exec($pendingQuery); my $pendingResults = $masterConn->exec($pendingQuery);
...@@ -200,40 +221,47 @@ sub Main() { ...@@ -200,40 +221,47 @@ sub Main() {
die; die;
} }
sendQueryToSlaves($XID,"BEGIN");
my $numPending = $pendingResults->ntuples; my $numPending = $pendingResults->ntuples;
my $curTuple = 0; my $curTuple = 0;
sendQueryToSlaves(undef,"BEGIN");
while ($curTuple < $numPending) { while ($curTuple < $numPending) {
$seqId = $pendingResults->getvalue($curTuple,0); $seqId = $pendingResults->getvalue($curTuple,0);
my $tableName = $pendingResults->getvalue($curTuple,1); my $tableName = $pendingResults->getvalue($curTuple,1);
my $op = $pendingResults->getvalue($curTuple,2); my $op = $pendingResults->getvalue($curTuple,2);
$curTuple = mirrorCommand($seqId,$tableName,$op,$XID, $curTuple = mirrorCommand($seqId,$tableName,$op,$XID,
$pendingResults,$curTuple) +1; $pendingResults,$curTuple) +1;
if($::slaveInfo->{"status"}==-1) {
last;
}
} }
#Now commit the transaction.
if($::slaveInfo->{"status"}==-1) { if($::slaveInfo->{'status'} ne 'DBOpen' &&
$::slaveInfo->{'status'} ne 'FileOpen')
{
last; last;
} }
sendQueryToSlaves(undef,"COMMIT"); sendQueryToSlaves(undef,"COMMIT");
#Now commit the transaction.
updateMirrorHostTable($XID,$seqId); updateMirrorHostTable($XID,$seqId);
if($commandCount > 5000) {
$commandCount = 0;
$::slaveInfo->{"status"} = -1;
$::slaveInfo->{"slaveConn"}->reset;
#Open the connection right away.
openSlaveConnection($::slaveInfo);
}
$pendingResults = undef; $pendingResults = undef;
$curTransTuple = $curTransTuple +1; $curTransTuple = $curTransTuple +1;
if($::slaveInfo->{'status'} eq 'FileOpen')
{
close ($::slaveInfo->{'TransactionFile'});
}
elsif($::slaveInfo->{'status'} eq 'DBOpen')
{
if($commandCount > 5000) {
$commandCount = 0;
$::slaveInfo->{"status"} = 'DBClosed';
$::slaveInfo->{"slaveConn"}->reset;
#Open the connection right away.
openSlaveConnection($::slaveInfo);
}
}
}#while transactions left. }#while transactions left.
$pendingTransResults = undef; $pendingTransResults = undef;
...@@ -303,6 +331,7 @@ sub mirrorCommand($$$$$$) { ...@@ -303,6 +331,7 @@ sub mirrorCommand($$$$$$) {
my $pendingResults = $_[4]; my $pendingResults = $_[4];
my $currentTuple = $_[5]; my $currentTuple = $_[5];
if($op eq 'i') { if($op eq 'i') {
$currentTuple = mirrorInsert($seqId,$tableName,$transId,$pendingResults $currentTuple = mirrorInsert($seqId,$tableName,$transId,$pendingResults
,$currentTuple); ,$currentTuple);
...@@ -315,6 +344,10 @@ sub mirrorCommand($$$$$$) { ...@@ -315,6 +344,10 @@ sub mirrorCommand($$$$$$) {
$currentTuple = mirrorUpdate($seqId,$tableName,$transId,$pendingResults, $currentTuple = mirrorUpdate($seqId,$tableName,$transId,$pendingResults,
$currentTuple); $currentTuple);
} }
if($op eq 's') {
$currentTuple = mirrorSequence($seqId,$tableName,$transId,$pendingResults,
$currentTuple);
}
$commandCount = $commandCount +1; $commandCount = $commandCount +1;
if($commandCount % 100 == 0) { if($commandCount % 100 == 0) {
# print "Sent 100 commmands on SeqId $seqId \n"; # print "Sent 100 commmands on SeqId $seqId \n";
...@@ -411,7 +444,7 @@ sub mirrorInsert($$$$$) { ...@@ -411,7 +444,7 @@ sub mirrorInsert($$$$$) {
$firstIteration=0; $firstIteration=0;
} }
$valuesQuery .= ")"; $valuesQuery .= ")";
sendQueryToSlaves(undef,$insertQuery . $valuesQuery); sendQueryToSlaves($transId,$insertQuery . $valuesQuery);
return $currentTuple; return $currentTuple;
} }
...@@ -491,7 +524,6 @@ sub mirrorDelete($$$$$) { ...@@ -491,7 +524,6 @@ sub mirrorDelete($$$$$) {
$counter++; $counter++;
$firstField=0; $firstField=0;
} }
sendQueryToSlaves($transId,$deleteQuery); sendQueryToSlaves($transId,$deleteQuery);
return $currentTuple; return $currentTuple;
} }
...@@ -554,14 +586,12 @@ sub mirrorUpdate($$$$$) { ...@@ -554,14 +586,12 @@ sub mirrorUpdate($$$$$) {
my $transId = $_[2]; my $transId = $_[2];
my $pendingResult = $_[3]; my $pendingResult = $_[3];
my $currentTuple = $_[4]; my $currentTuple = $_[4];
my $counter; my $counter;
my $quotedValue; my $quotedValue;
my $updateQuery = "UPDATE $tableName SET "; my $updateQuery = "UPDATE $tableName SET ";
my $currentField; my $currentField;
my %keyValueHash; my %keyValueHash;
my %dataValueHash; my %dataValueHash;
my $firstIteration=1; my $firstIteration=1;
...@@ -615,12 +645,27 @@ sub mirrorUpdate($$$$$) { ...@@ -615,12 +645,27 @@ sub mirrorUpdate($$$$$) {
} }
$firstIteration=0; $firstIteration=0;
} }
sendQueryToSlaves($transId,$updateQuery); sendQueryToSlaves($transId,$updateQuery);
return $currentTuple+1; return $currentTuple+1;
} }
sub mirrorSequence($$$$$) {
my $seqId = $_[0];
my $sequenceName = $_[1];
my $transId = $_[2];
my $pendingResult = $_[3];
my $currentTuple = $_[4];
my $query;
my $sequenceValue = $pendingResult->getvalue($currentTuple,4);
$query = sprintf("select setval(%s,%s)",$sequenceName,$sequenceValue);
sendQueryToSlaves($transId,$query);
return $currentTuple;
}
=item sendQueryToSlaves(seqId,sqlQuery) =item sendQueryToSlaves(seqId,sqlQuery)
...@@ -647,7 +692,7 @@ sub sendQueryToSlaves($$) { ...@@ -647,7 +692,7 @@ sub sendQueryToSlaves($$) {
my $seqId = $_[0]; my $seqId = $_[0];
my $sqlQuery = $_[1]; my $sqlQuery = $_[1];
if($::slaveInfo->{"status"} == 0) { if($::slaveInfo->{"status"} eq 'DBOpen') {
my $queryResult = $::slaveInfo->{"slaveConn"}->exec($sqlQuery); my $queryResult = $::slaveInfo->{"slaveConn"}->exec($sqlQuery);
unless($queryResult->resultStatus == PGRES_COMMAND_OK) { unless($queryResult->resultStatus == PGRES_COMMAND_OK) {
my $errorMessage; my $errorMessage;
...@@ -660,10 +705,18 @@ sub sendQueryToSlaves($$) { ...@@ -660,10 +705,18 @@ sub sendQueryToSlaves($$) {
$::slaveInfo->{"status"} = -1; $::slaveInfo->{"status"} = -1;
} }
} }
elsif($::slaveInfo->{"status"} eq 'FileOpen' ) {
my $xfile = $::slaveInfo->{'TransactionFile'};
print $xfile $sqlQuery . ";\n";
}
} }
=item logErrorMessage(error) =item logErrorMessage(error)
Mails an error message to the users specified $errorEmailAddr Mails an error message to the users specified $errorEmailAddr
...@@ -707,41 +760,30 @@ sub logErrorMessage($) { ...@@ -707,41 +760,30 @@ sub logErrorMessage($) {
print mailPipe "\n\n\n=================================================\n"; print mailPipe "\n\n\n=================================================\n";
close mailPipe; close mailPipe;
} }
if (defined($::syslog))
{
syslog('err', '%s (%m)', $error);
}
warn($error); warn($error);
$lastErrorMsg = $error; $lastErrorMsg = $error;
} }
sub openSlaveConnection($) { sub setupSlave($) {
my $slavePtr = $_[0]; my $slavePtr = $_[0];
my $slaveConn;
my $slaveConnString = "host=" . $slavePtr->{"slaveHost"};
$slaveConnString .= " dbname=" . $slavePtr->{"slaveDb"};
$slaveConnString .= " user=" . $slavePtr->{"slaveUser"};
$slaveConnString .= " password=" . $slavePtr->{"slavePassword"};
$slaveConn = Pg::connectdb($slaveConnString);
if($slaveConn->status != PGRES_CONNECTION_OK) {
my $errorMessage = "Can't connect to slave database " ;
$errorMessage .= $slavePtr->{"slaveHost"} . "\n";
$errorMessage .= $slaveConn->errorMessage;
logErrorMessage($errorMessage);
$slavePtr->{"status"} = -1;
}
else {
$slavePtr->{"slaveConn"} = $slaveConn;
$slavePtr->{"status"} = 0; $slavePtr->{"status"} = 0;
#Determine the MirrorHostId for the slave from the master's database #Determine the MirrorHostId for the slave from the master's database
my $resultSet = $masterConn->exec('SELECT "MirrorHostId" FROM ' my $resultSet = $masterConn->exec('SELECT MirrorHostId FROM '
. ' "MirrorHost" WHERE "HostName"' . ' dbmirror_MirrorHost WHERE SlaveName'
. '=\'' . $slavePtr->{"slaveHost"} . '=\'' . $slavePtr->{"slaveName"}
. '\''); . '\'');
if($resultSet->ntuples !=1) { if($resultSet->ntuples !=1) {
my $errorMessage .= $slavePtr->{"slaveHost"} ."\n"; my $errorMessage .= $slavePtr->{"slaveName"} ."\n";
$errorMessage .= "Has no MirrorHost entry on master\n"; $errorMessage .= "Has no MirrorHost entry on master\n";
logErrorMessage($errorMessage); logErrorMessage($errorMessage);
$slavePtr->{"status"}=-1; $slavePtr->{"status"}=-1;
...@@ -749,14 +791,24 @@ sub openSlaveConnection($) { ...@@ -749,14 +791,24 @@ sub openSlaveConnection($) {
} }
$slavePtr->{"MirrorHostId"} = $resultSet->getvalue(0,0); $slavePtr->{"MirrorHostId"} = $resultSet->getvalue(0,0);
if(defined($::slaveInfo->{'slaveDb'})) {
# We talk directly to a slave database.
#
if($::slaveInfo->{"status"} ne 'DBOpen')
{
openSlaveConnection($::slaveInfo);
}
sendQueryToSlaves(undef,"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
sendQueryToSlaves(undef,"SET CONSTRAINTS ALL DEFERRED");
}
else {
$::slaveInfo->{"status"} = 'FileClosed';
} }
} }
=item updateMirrorHostTable(lastTransId,lastSeqId) =item updateMirrorHostTable(lastTransId,lastSeqId)
Updates the MirroredTransaction table to reflect the fact that Updates the MirroredTransaction table to reflect the fact that
...@@ -783,39 +835,40 @@ sub updateMirrorHostTable($$) { ...@@ -783,39 +835,40 @@ sub updateMirrorHostTable($$) {
my $lastTransId = shift; my $lastTransId = shift;
my $lastSeqId = shift; my $lastSeqId = shift;
if($::slaveInfo->{"status"}==0) {
my $deleteTransactionQuery;
my $deleteResult; my $deleteTransactionQuery;
my $updateMasterQuery = "INSERT INTO \"MirroredTransaction\" "; my $deleteResult;
$updateMasterQuery .= " (\"XID\",\"LastSeqId\",\"MirrorHostId\")"; my $updateMasterQuery = "INSERT INTO dbmirror_MirroredTransaction ";
$updateMasterQuery .= " VALUES ($lastTransId,$lastSeqId,$::slaveInfo->{\"MirrorHostId\"}) "; $updateMasterQuery .= " (XID,LastSeqId,MirrorHostId)";
$updateMasterQuery .= " VALUES ($lastTransId,$lastSeqId,$::slaveInfo->{\"MirrorHostId\"}) ";
my $updateResult = $masterConn->exec($updateMasterQuery);
unless($updateResult->resultStatus == PGRES_COMMAND_OK) { my $updateResult = $masterConn->exec($updateMasterQuery);
my $errorMessage = $masterConn->errorMessage . "\n"; unless($updateResult->resultStatus == PGRES_COMMAND_OK) {
$errorMessage .= $updateMasterQuery; my $errorMessage = $masterConn->errorMessage . "\n";
logErrorMessage($errorMessage); $errorMessage .= $updateMasterQuery;
die; logErrorMessage($errorMessage);
} die;
}
# print "Updated slaves to transaction $lastTransId\n" ; # print "Updated slaves to transaction $lastTransId\n" ;
# flush STDOUT; # flush STDOUT;
#If this transaction has now been mirrored to all mirror hosts #If this transaction has now been mirrored to all mirror hosts
#then it can be deleted. #then it can be deleted.
$deleteTransactionQuery = 'DELETE FROM "Pending" WHERE "XID"=' $deleteTransactionQuery = 'DELETE FROM dbmirror_Pending WHERE XID='
. $lastTransId . ' AND (SELECT COUNT(*) FROM "MirroredTransaction"' . $lastTransId . ' AND (SELECT COUNT(*) FROM dbmirror_MirroredTransaction'
. ' WHERE "XID"=' . $lastTransId . ')=(SELECT COUNT(*) FROM' . ' WHERE XID=' . $lastTransId . ')=(SELECT COUNT(*) FROM'
. ' "MirrorHost")'; . ' dbmirror_MirrorHost)';
$deleteResult = $masterConn->exec($deleteTransactionQuery); $deleteResult = $masterConn->exec($deleteTransactionQuery);
if($deleteResult->resultStatus!=PGRES_COMMAND_OK) { if($deleteResult->resultStatus!=PGRES_COMMAND_OK) {
logErrorMessage($masterConn->errorMessage . "\n" . logErrorMessage($masterConn->errorMessage . "\n" .
$deleteTransactionQuery); $deleteTransactionQuery);
die; die;
}
} }
} }
...@@ -889,3 +942,69 @@ sub extractData($$) { ...@@ -889,3 +942,69 @@ sub extractData($$) {
return %valuesHash; return %valuesHash;
} }
sub openTransactionFile($$)
{
my $slaveInfo = shift;
my $XID =shift;
# my $now_str = localtime;
my $nowsec;
my $nowmin;
my $nowhour;
my $nowmday;
my $nowmon;
my $nowyear;
my $nowwday;
my $nowyday;
my $nowisdst;
($nowsec,$nowmin,$nowhour,$nowmday,$nowmon,$nowyear,$nowwday,$nowyday,$nowisdst) =
localtime;
my $fileName=sprintf(">%s/%s_%d-%d-%d_%d:%d:%dXID%d.sql", $::slaveInfo->{'TransactionFileDirectory'},
$::slaveInfo->{"MirrorHostId"},($nowyear+1900),($nowmon+1),$nowmday,$nowhour,$nowmin,
$nowsec,$XID);
my $xfile;
open($xfile,$fileName) or die "Can't open $fileName : $!";
$slaveInfo->{'TransactionFile'} = $xfile;
$slaveInfo->{'status'} = 'FileOpen';
}
sub openSlaveConnection($) {
my $slavePtr = $_[0];
my $slaveConn;
my $slaveConnString;
if(defined($slavePtr->{"slaveHost"}))
{
$slaveConnString .= "host=" . $slavePtr->{"slaveHost"} . " ";
}
if(defined($slavePtr->{"slavePort"}))
{
$slaveConnString .= "port=" . $slavePtr->{"slavePort"} . " ";
}
$slaveConnString .= " dbname=" . $slavePtr->{"slaveDb"};
$slaveConnString .= " user=" . $slavePtr->{"slaveUser"};
$slaveConnString .= " password=" . $slavePtr->{"slavePassword"};
$slaveConn = Pg::connectdb($slaveConnString);
if($slaveConn->status != PGRES_CONNECTION_OK) {
my $errorMessage = "Can't connect to slave database " ;
$errorMessage .= $slavePtr->{"slaveHost"} . "\n";
$errorMessage .= $slaveConn->errorMessage;
logErrorMessage($errorMessage);
$slavePtr->{"status"} = 'DBFailed';
}
else {
$slavePtr->{"slaveConn"} = $slaveConn;
$slavePtr->{"status"} = 'DBOpen';
}
}
BEGIN;
SET autocommit TO 'on';
CREATE FUNCTION "recordchange" () RETURNS trigger AS CREATE FUNCTION "recordchange" () RETURNS trigger AS
'/usr/local/pgsql/lib/pending.so', 'recordchange' LANGUAGE 'C'; '$libdir/pending.so', 'recordchange' LANGUAGE 'C';
CREATE TABLE "MirrorHost" (
"MirrorHostId" serial,
"HostName" varchar NOT NULL,
PRIMARY KEY("MirrorHostId")
);
CREATE TABLE dbmirror_MirrorHost (
MirrorHostId serial not null,
SlaveName varchar NOT NULL,
PRIMARY KEY(MirrorHostId)
);
CREATE TABLE "Pending" (
"SeqId" serial,
"TableName" varchar NOT NULL,
"Op" character,
"XID" int4 NOT NULL,
PRIMARY KEY ("SeqId")
CREATE TABLE dbmirror_Pending (
SeqId serial,
TableName Name NOT NULL,
Op character,
XID int4 NOT NULL,
PRIMARY KEY (SeqId)
); );
CREATE INDEX "Pending_XID_Index" ON "Pending" ("XID"); CREATE INDEX "dbmirror_Pending_XID_Index" ON dbmirror_Pending (XID);
CREATE TABLE "PendingData" ( CREATE TABLE dbmirror_PendingData (
"SeqId" int4 NOT NULL, SeqId int4 NOT NULL,
"IsKey" bool NOT NULL, IsKey bool NOT NULL,
"Data" varchar, Data varchar,
PRIMARY KEY ("SeqId", "IsKey") , PRIMARY KEY (SeqId, IsKey) ,
FOREIGN KEY ("SeqId") REFERENCES "Pending" ("SeqId") ON UPDATE CASCADE ON DELETE CASCADE FOREIGN KEY (SeqId) REFERENCES dbmirror_Pending (SeqId) ON UPDATE CASCADE ON DELETE CASCADE
); );
CREATE TABLE "MirroredTransaction" ( CREATE TABLE dbmirror_MirroredTransaction (
"XID" int4 NOT NULL, XID int4 NOT NULL,
"LastSeqId" int4 NOT NULL, LastSeqId int4 NOT NULL,
"MirrorHostId" int4 NOT NULL, MirrorHostId int4 NOT NULL,
PRIMARY KEY ("XID","MirrorHostId"), PRIMARY KEY (XID,MirrorHostId),
FOREIGN KEY ("MirrorHostId") REFERENCES "MirrorHost" ("MirrorHostId") ON UPDATE CASCADE ON DELETE CASCADE, FOREIGN KEY (MirrorHostId) REFERENCES dbmirror_MirrorHost (MirrorHostId) ON UPDATE CASCADE ON DELETE CASCADE,
FOREIGN KEY ("LastSeqId") REFERENCES "Pending" ("SeqId") ON UPDATE FOREIGN KEY (LastSeqId) REFERENCES dbmirror_Pending (SeqId) ON UPDATE
CASCADE ON DELETE CASCADE CASCADE ON DELETE CASCADE
); );
UPDATE pg_proc SET proname='nextval_pg' WHERE proname='nextval';
CREATE FUNCTION pg_catalog.nextval(text) RETURNS int8 AS
'/usr/local/postgresql-7.4/lib/pending.so', 'nextval' LANGUAGE 'C' STRICT;
UPDATE pg_proc set proname='setval_pg' WHERE proname='setval';
CREATE FUNCTION pg_catalog.setval(text,int4) RETURNS int8 AS
'/usr/local/postgresql-7.4/lib/pending.so', 'setval' LANGUAGE 'C' STRICT;
COMMIT;
\ No newline at end of file
...@@ -6,7 +6,7 @@ DBMirror is a database mirroring system developed for the PostgreSQL ...@@ -6,7 +6,7 @@ DBMirror is a database mirroring system developed for the PostgreSQL
database Written and maintained by Steven Singer(ssinger@navtechinc.com) database Written and maintained by Steven Singer(ssinger@navtechinc.com)
(c) 2001-2002 Navtech Systems Support Inc. (c) 2001-2004 Navtech Systems Support Inc.
ALL RIGHTS RESERVED ALL RIGHTS RESERVED
Permission to use, copy, modify, and distribute this software and its Permission to use, copy, modify, and distribute this software and its
...@@ -57,7 +57,7 @@ Pending tables. ...@@ -57,7 +57,7 @@ Pending tables.
Requirments: Requirments:
--------------------------------- ---------------------------------
-PostgreSQL-7.4 (Older versions are no longer supported) -PostgreSQL-7.4 (Older versions are no longer supported)
-Perl 5.6(Other versions might work) -Perl 5.6 or 5.8 (Other versions might work)
-PgPerl (http://gborg.postgresql.org/project/pgperl/projdisplay.php) -PgPerl (http://gborg.postgresql.org/project/pgperl/projdisplay.php)
...@@ -81,13 +81,8 @@ PostgreSQL-7.4 Make Instructions: ...@@ -81,13 +81,8 @@ PostgreSQL-7.4 Make Instructions:
You should now have a file named pending.so that contains the trigger. You should now have a file named pending.so that contains the trigger.
Install this file in /usr/local/pgsql/lib (or another suitable location). Install this file in your Postgresql lib directory (/usr/local/pgsql/lib)
If you choose a different location the MirrorSetup.sql script will need
to be modified to reflect your new location. The CREATE FUNCTION command
in the MirrorSetup.sql script associates the trigger function with the
pending.so shared library. Modify the arguments to this command if you
choose to install the trigger elsewhere.
2) Run MirrorSetup.sql 2) Run MirrorSetup.sql
...@@ -95,7 +90,8 @@ This file contains SQL commands to setup the Mirroring environment. ...@@ -95,7 +90,8 @@ This file contains SQL commands to setup the Mirroring environment.
This includes This includes
-Telling PostgreSQL about the "recordchange" trigger function. -Telling PostgreSQL about the "recordchange" trigger function.
-Creating the Pending,PendingData, MirrorHost, MirroredTransaction tables -Creating the dbmirror_Pending,dbmirror_PendingData,dbmirror_MirrorHost,
dbmirror_MirroredTransaction tables
To execute the script use psql as follows To execute the script use psql as follows
...@@ -114,17 +110,34 @@ DBMirror.pl script. See slaveDatabase.conf for a sample. ...@@ -114,17 +110,34 @@ DBMirror.pl script. See slaveDatabase.conf for a sample.
The master settings refer to the master database(The one that is The master settings refer to the master database(The one that is
being mirrored). being mirrored).
The slave settings refer to the database that the data is being mirrored to. The slave settings refer to the database that the data is being
The slaveHost parameter must refer to the machine name of the slave (Either mirrored to.
a resolvable hostname or an IP address). The value for slave host
must match the Hostname field in the MirrorHost table(See step 6).
The master user must have sufficient permissions to modify the Pending The slaveName setting in the configuration file must match the slave
tables and to read all of the tables being mirrored. name specified in the dbmirror_MirrorHost table.
DBMirror.pl can be run in two modes of operation:
A) It can connect directly to the slave database. To do this specify
a slave database name and optional host and port along with a username
and password. See slaveDatabase.conf for details.
The master user must have sufficient permissions to modify the Pending
tables and to read all of the tables being mirrored.
The slave user must have enough permissions on the slave database to
modify(INSERT,UPDATE,DELETE) any tables on the slave system that are being
mirrored.
B) The SQL statements that should be executed on the slave can be
written to files which can then be executed slave database through
psql. This would be suitable for setups where their is no direct
connection between the slave database and the master. A file is
generated for each transaction in the directory specified by
TransactionFileDirectory. The file name contains the date/time the
file was created along with the transaction id.
The slave user must have enough permissions on the slave database to
modify(INSERT,UPDATE,DELETE) any tables on the slave system that are being
mirrored.
4) Add the trigger to tables. 4) Add the trigger to tables.
...@@ -153,7 +166,7 @@ The name of the host in the MirrorHost table must exactly match the ...@@ -153,7 +166,7 @@ The name of the host in the MirrorHost table must exactly match the
slaveHost variable for that slave in the configuration file. slaveHost variable for that slave in the configuration file.
For example For example
INSERT INTO "MirrorHost" ("HostName") VALUES ('mySlaveMachine.mycompany.com'); INSERT INTO "MirrorHost" ("SlaveName") VALUES ('backup_system');
6) Start DBMirror.pl 6) Start DBMirror.pl
...@@ -171,7 +184,8 @@ Any errors are printed to standard out and emailed to the address specified in ...@@ -171,7 +184,8 @@ Any errors are printed to standard out and emailed to the address specified in
the configuration file. the configuration file.
DBMirror can be run from the master, the slave, or a third machine as long DBMirror can be run from the master, the slave, or a third machine as long
as it is able to access both the master and slave databases. as it is able to access both the master and slave databases(not
required if SQL files are being generated)
7) Periodically run clean_pending.pl 7) Periodically run clean_pending.pl
clean_pending.pl cleans out any entries from the Pending tables that clean_pending.pl cleans out any entries from the Pending tables that
...@@ -194,11 +208,28 @@ TODO(Current Limitations) ...@@ -194,11 +208,28 @@ TODO(Current Limitations)
---------- ----------
-Support for selective mirroring based on the content of data. -Support for selective mirroring based on the content of data.
-Support for BLOB's. -Support for BLOB's.
-Support for conflict resolution. -Support for multi-master mirroring with conflict resolution.
-Batching SQL commands in DBMirror for better performance over WAN's.
-Better support for dealing with Schema changes. -Better support for dealing with Schema changes.
Significant Changes Since 7.4
----------------
-Support for mirroring SEQUENCE's
-Support for unix domain sockets
-Support for outputting slave SQL statements to a file
-Changed the names of replication tables are now named
dbmirror_pending etc..
Credits
-----------
Achilleus Mantzios <achill@matrix.gatewaynet.com>
Steven Singer Steven Singer
Navtech Systems Support Inc. Navtech Systems Support Inc.
ssinger@navtechinc.com ssinger@navtechinc.com
/**************************************************************************** /****************************************************************************
* pending.c * pending.c
* $PostgreSQL: pgsql/contrib/dbmirror/pending.c,v 1.15 2003/11/29 22:39:19 pgsql Exp $ * $Id: pending.c,v 1.16 2004/02/17 03:34:35 momjian Exp $
* $PostgreSQL: pgsql/contrib/dbmirror/pending.c,v 1.16 2004/02/17 03:34:35 momjian Exp $
* *
* This file contains a trigger for Postgresql-7.x to record changes to tables * This file contains a trigger for Postgresql-7.x to record changes to tables
* to a pending table for mirroring. * to a pending table for mirroring.
...@@ -34,35 +35,60 @@ ...@@ -34,35 +35,60 @@
#include <executor/spi.h> #include <executor/spi.h>
#include <commands/trigger.h> #include <commands/trigger.h>
#include <utils/lsyscache.h> #include <utils/lsyscache.h>
#include <utils/array.h>
enum FieldUsage enum FieldUsage
{ {
PRIMARY = 0, NONPRIMARY, ALL, NUM_FIELDUSAGE PRIMARY = 0, NONPRIMARY, ALL, NUM_FIELDUSAGE
}; };
int storePending(char *cpTableName, HeapTuple tBeforeTuple, int storePending(char *cpTableName, HeapTuple tBeforeTuple,
HeapTuple tAfterTuple, HeapTuple tAfterTuple,
TupleDesc tTupdesc, TupleDesc tTupdesc,
TriggerData *tpTrigdata, char cOp); Oid tableOid,
char cOp);
int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc, int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc,
TriggerData *tpTrigdata); Oid tableOid);
int storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc, int storeData(char *cpTableName, HeapTuple tTupleData,
TriggerData *tpTrigData, int iIncludeKeyData); TupleDesc tTupleDesc,Oid tableOid,int iIncludeKeyData);
int2vector *getPrimaryKey(Oid tblOid); int2vector *getPrimaryKey(Oid tblOid);
char *packageData(HeapTuple tTupleData, TupleDesc tTupleDecs, char *packageData(HeapTuple tTupleData, TupleDesc tTupleDecs, Oid tableOid,
TriggerData *tTrigData,
enum FieldUsage eKeyUsage); enum FieldUsage eKeyUsage);
#define BUFFER_SIZE 256 #define BUFFER_SIZE 256
#define MAX_OID_LEN 10 #define MAX_OID_LEN 10
/*#define DEBUG_OUTPUT 1 */ #define DEBUG_OUTPUT 1
extern Datum recordchange(PG_FUNCTION_ARGS); extern Datum recordchange(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(recordchange); PG_FUNCTION_INFO_V1(recordchange);
#if defined DEBUG_OUTPUT
#define debug_msg2(x,y) elog(NOTICE,x,y)
#define debug_msg(x) elog(NOTICE,x)
#define debug_msg3(x,y,z) elog(NOTICE,x,y,z)
#else
#define debug_msg2(x,y)
#define debug_msg(x)
#define debug_msg(x,y,z)
#endif
extern Datum nextval(PG_FUNCTION_ARGS);
extern Datum setval(PG_FUNCTION_ARGS);
int saveSequenceUpdate(const text * sequenceName,
int nextSequenceValue);
/***************************************************************************** /*****************************************************************************
* The entry point for the trigger function. * The entry point for the trigger function.
* The Trigger takes a single SQL 'text' argument indicating the name of the * The Trigger takes a single SQL 'text' argument indicating the name of the
...@@ -81,13 +107,15 @@ recordchange(PG_FUNCTION_ARGS) ...@@ -81,13 +107,15 @@ recordchange(PG_FUNCTION_ARGS)
char op = 0; char op = 0;
char *schemaname; char *schemaname;
char *fullyqualtblname; char *fullyqualtblname;
char *pkxpress=NULL;
if (fcinfo->context != NULL) if (fcinfo->context != NULL)
{ {
if (SPI_connect() < 0) if (SPI_connect() < 0)
{ {
elog(NOTICE, "storePending could not connect to SPI"); ereport(ERROR,(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("dbmirror:recordchange could not connect to SPI")));
return -1; return -1;
} }
trigdata = (TriggerData *) fcinfo->context; trigdata = (TriggerData *) fcinfo->context;
...@@ -124,8 +152,15 @@ recordchange(PG_FUNCTION_ARGS) ...@@ -124,8 +152,15 @@ recordchange(PG_FUNCTION_ARGS)
beforeTuple = trigdata->tg_trigtuple; beforeTuple = trigdata->tg_trigtuple;
op = 'd'; op = 'd';
} }
else
{
ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION),
errmsg("dbmirror:recordchange Unknown operation")));
}
if (storePending(fullyqualtblname, beforeTuple, afterTuple, tupdesc, trigdata, op)) if (storePending(fullyqualtblname, beforeTuple, afterTuple,
tupdesc, retTuple->t_tableOid, op))
{ {
/* An error occoured. Skip the operation. */ /* An error occoured. Skip the operation. */
ereport(ERROR, ereport(ERROR,
...@@ -135,10 +170,11 @@ recordchange(PG_FUNCTION_ARGS) ...@@ -135,10 +170,11 @@ recordchange(PG_FUNCTION_ARGS)
return PointerGetDatum(NULL); return PointerGetDatum(NULL);
} }
#if defined DEBUG_OUTPUT debug_msg("dbmirror:recordchange returning on success");
elog(NOTICE, "returning on success");
#endif
SPI_pfree(fullyqualtblname); SPI_pfree(fullyqualtblname);
if(pkxpress != NULL)
SPI_pfree(pkxpress);
SPI_finish(); SPI_finish();
return PointerGetDatum(retTuple); return PointerGetDatum(retTuple);
} }
...@@ -160,41 +196,45 @@ int ...@@ -160,41 +196,45 @@ int
storePending(char *cpTableName, HeapTuple tBeforeTuple, storePending(char *cpTableName, HeapTuple tBeforeTuple,
HeapTuple tAfterTuple, HeapTuple tAfterTuple,
TupleDesc tTupDesc, TupleDesc tTupDesc,
TriggerData *tpTrigData, char cOp) Oid tableOid,
char cOp)
{ {
char *cpQueryBase = "INSERT INTO \"Pending\" (\"TableName\",\"Op\",\"XID\") VALUES ($1,$2,$3)"; char *cpQueryBase = "INSERT INTO dbmirror_pending (TableName,Op,XID) VALUES ($1,$2,$3)";
int iResult = 0; int iResult = 0;
HeapTuple tCurTuple; HeapTuple tCurTuple;
char nulls[3]=" ";
/* Points the current tuple(before or after) */ /* Points the current tuple(before or after) */
Datum saPlanData[4]; Datum saPlanData[3];
Oid taPlanArgTypes[3] = {NAMEOID, CHAROID, INT4OID}; Oid taPlanArgTypes[4] = {NAMEOID,
CHAROID,
INT4OID};
void *vpPlan; void *vpPlan;
tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple; tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple;
vpPlan = SPI_prepare(cpQueryBase, 3, taPlanArgTypes); vpPlan = SPI_prepare(cpQueryBase, 3, taPlanArgTypes);
if (vpPlan == NULL) if (vpPlan == NULL)
elog(NOTICE, "error creating plan"); ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION),
/* SPI_saveplan(vpPlan); */ errmsg("dbmirror:storePending error creating plan")));
saPlanData[0] = PointerGetDatum(cpTableName); saPlanData[0] = PointerGetDatum(cpTableName);
saPlanData[1] = CharGetDatum(cOp); saPlanData[1] = CharGetDatum(cOp);
saPlanData[2] = Int32GetDatum(GetCurrentTransactionId()); saPlanData[2] = Int32GetDatum(GetCurrentTransactionId());
iResult = SPI_execp(vpPlan, saPlanData, nulls, 1);
iResult = SPI_execp(vpPlan, saPlanData, NULL, 1);
if (iResult < 0) if (iResult < 0)
elog(NOTICE, "storedPending fired (%s) returned %d", cpQueryBase, iResult); elog(NOTICE, "storedPending fired (%s) returned %d",
cpQueryBase, iResult);
#if defined DEBUG_OUTPUT
elog(NOTICE, "row successfully stored in pending table"); debug_msg("dbmirror:storePending row successfully stored in pending table");
#endif
if (cOp == 'd') if (cOp == 'd')
{ {
...@@ -202,7 +242,8 @@ storePending(char *cpTableName, HeapTuple tBeforeTuple, ...@@ -202,7 +242,8 @@ storePending(char *cpTableName, HeapTuple tBeforeTuple,
* This is a record of a delete operation. * This is a record of a delete operation.
* Just store the key data. * Just store the key data.
*/ */
iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tpTrigData); iResult = storeKeyInfo(cpTableName,
tBeforeTuple, tTupDesc, tableOid);
} }
else if (cOp == 'i') else if (cOp == 'i')
{ {
...@@ -210,20 +251,22 @@ storePending(char *cpTableName, HeapTuple tBeforeTuple, ...@@ -210,20 +251,22 @@ storePending(char *cpTableName, HeapTuple tBeforeTuple,
* An Insert operation. * An Insert operation.
* Store all data * Store all data
*/ */
iResult = storeData(cpTableName, tAfterTuple, tTupDesc, tpTrigData, TRUE); iResult = storeData(cpTableName, tAfterTuple,
tTupDesc, tableOid,TRUE);
} }
else else
{ {
/* op must be an update. */ /* op must be an update. */
iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tpTrigData); iResult = storeKeyInfo(cpTableName, tBeforeTuple,
iResult = iResult ? iResult : storeData(cpTableName, tAfterTuple, tTupDesc, tTupDesc, tableOid);
tpTrigData, TRUE); iResult = iResult ? iResult :
storeData(cpTableName, tAfterTuple, tTupDesc,
tableOid,TRUE);
} }
#if defined DEBUG_OUTPUT
elog(NOTICE, "done storing keyinfo"); debug_msg("dbmirror:storePending done storing keyinfo");
#endif
return iResult; return iResult;
...@@ -231,12 +274,11 @@ storePending(char *cpTableName, HeapTuple tBeforeTuple, ...@@ -231,12 +274,11 @@ storePending(char *cpTableName, HeapTuple tBeforeTuple,
int int
storeKeyInfo(char *cpTableName, HeapTuple tTupleData, storeKeyInfo(char *cpTableName, HeapTuple tTupleData,
TupleDesc tTupleDesc, TupleDesc tTupleDesc, Oid tableOid)
TriggerData *tpTrigData)
{ {
Oid saPlanArgTypes[1] = {NAMEOID}; Oid saPlanArgTypes[1] = {NAMEOID};
char *insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'t',$1)"; char *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'t',$1)";
void *pplan; void *pplan;
Datum saPlanData[1]; Datum saPlanData[1];
char *cpKeyData; char *cpKeyData;
...@@ -250,7 +292,7 @@ storeKeyInfo(char *cpTableName, HeapTuple tTupleData, ...@@ -250,7 +292,7 @@ storeKeyInfo(char *cpTableName, HeapTuple tTupleData,
} }
/* pplan = SPI_saveplan(pplan); */ /* pplan = SPI_saveplan(pplan); */
cpKeyData = packageData(tTupleData, tTupleDesc, tpTrigData, PRIMARY); cpKeyData = packageData(tTupleData, tTupleDesc,tableOid, PRIMARY);
if (cpKeyData == NULL) if (cpKeyData == NULL)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT), (errcode(ERRCODE_UNDEFINED_OBJECT),
...@@ -258,9 +300,9 @@ storeKeyInfo(char *cpTableName, HeapTuple tTupleData, ...@@ -258,9 +300,9 @@ storeKeyInfo(char *cpTableName, HeapTuple tTupleData,
errmsg("there is no PRIMARY KEY for table %s", errmsg("there is no PRIMARY KEY for table %s",
cpTableName))); cpTableName)));
#if defined DEBUG_OUTPUT
elog(NOTICE, "key data: %s", cpKeyData); debug_msg2("dbmirror:storeKeyInfo key data: %s", cpKeyData);
#endif
saPlanData[0] = PointerGetDatum(cpKeyData); saPlanData[0] = PointerGetDatum(cpKeyData);
iRetCode = SPI_execp(pplan, saPlanData, NULL, 1); iRetCode = SPI_execp(pplan, saPlanData, NULL, 1);
...@@ -270,12 +312,12 @@ storeKeyInfo(char *cpTableName, HeapTuple tTupleData, ...@@ -270,12 +312,12 @@ storeKeyInfo(char *cpTableName, HeapTuple tTupleData,
if (iRetCode != SPI_OK_INSERT) if (iRetCode != SPI_OK_INSERT)
{ {
elog(NOTICE, "error inserting row in pendingDelete"); ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION)
,errmsg("error inserting row in pendingDelete")));
return -1; return -1;
} }
#if defined DEBUG_OUTPUT
elog(NOTICE, "insert successful"); debug_msg("insert successful");
#endif
return 0; return 0;
...@@ -318,12 +360,12 @@ getPrimaryKey(Oid tblOid) ...@@ -318,12 +360,12 @@ getPrimaryKey(Oid tblOid)
* Stores a copy of the non-key data for the row. * Stores a copy of the non-key data for the row.
*****************************************************************************/ *****************************************************************************/
int int
storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc, storeData(char *cpTableName, HeapTuple tTupleData,
TriggerData *tpTrigData, int iIncludeKeyData) TupleDesc tTupleDesc,Oid tableOid, int iIncludeKeyData)
{ {
Oid planArgTypes[1] = {NAMEOID}; Oid planArgTypes[1] = {NAMEOID};
char *insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'f',$1)"; char *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'f',$1)";
void *pplan; void *pplan;
Datum planData[1]; Datum planData[1];
char *cpKeyData; char *cpKeyData;
...@@ -338,9 +380,10 @@ storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc, ...@@ -338,9 +380,10 @@ storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,
/* pplan = SPI_saveplan(pplan); */ /* pplan = SPI_saveplan(pplan); */
if (iIncludeKeyData == 0) if (iIncludeKeyData == 0)
cpKeyData = packageData(tTupleData, tTupleDesc, tpTrigData, NONPRIMARY); cpKeyData = packageData(tTupleData, tTupleDesc,
tableOid, NONPRIMARY);
else else
cpKeyData = packageData(tTupleData, tTupleDesc, tpTrigData, ALL); cpKeyData = packageData(tTupleData, tTupleDesc,tableOid, ALL);
planData[0] = PointerGetDatum(cpKeyData); planData[0] = PointerGetDatum(cpKeyData);
iRetValue = SPI_execp(pplan, planData, NULL, 1); iRetValue = SPI_execp(pplan, planData, NULL, 1);
...@@ -353,9 +396,9 @@ storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc, ...@@ -353,9 +396,9 @@ storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,
elog(NOTICE, "error inserting row in pendingDelete"); elog(NOTICE, "error inserting row in pendingDelete");
return -1; return -1;
} }
#if defined DEBUG_OUTPUT
elog(NOTICE, "insert successful"); debug_msg("dbmirror:storeKeyData insert successful");
#endif
return 0; return 0;
...@@ -376,8 +419,7 @@ storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc, ...@@ -376,8 +419,7 @@ storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,
* ALL implies include all fields. * ALL implies include all fields.
*/ */
char * char *
packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, Oid tableOid,
TriggerData *tpTrigData,
enum FieldUsage eKeyUsage) enum FieldUsage eKeyUsage)
{ {
int iNumCols; int iNumCols;
...@@ -391,14 +433,17 @@ packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, ...@@ -391,14 +433,17 @@ packageData(HeapTuple tTupleData, TupleDesc tTupleDesc,
if (eKeyUsage != ALL) if (eKeyUsage != ALL)
{ {
tpPKeys = getPrimaryKey(tpTrigData->tg_relation->rd_id); tpPKeys = getPrimaryKey(tableOid);
if (tpPKeys == NULL) if (tpPKeys == NULL)
return NULL; return NULL;
} }
#if defined DEBUG_OUTPUT
if (tpPKeys != NULL) if (tpPKeys != NULL)
elog(NOTICE, "have primary keys"); {
#endif debug_msg("dbmirror:packageData have primary keys");
}
cpDataBlock = SPI_palloc(BUFFER_SIZE); cpDataBlock = SPI_palloc(BUFFER_SIZE);
iDataBlockSize = BUFFER_SIZE; iDataBlockSize = BUFFER_SIZE;
iUsedDataBlock = 0; /* To account for the null */ iUsedDataBlock = 0; /* To account for the null */
...@@ -417,49 +462,58 @@ packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, ...@@ -417,49 +462,58 @@ packageData(HeapTuple tTupleData, TupleDesc tTupleDesc,
{ {
/* Determine if this is a primary key or not. */ /* Determine if this is a primary key or not. */
iIsPrimaryKey = 0; iIsPrimaryKey = 0;
for (iPrimaryKeyIndex = 0; (*tpPKeys)[iPrimaryKeyIndex] != 0; for (iPrimaryKeyIndex = 0;
(*tpPKeys)[iPrimaryKeyIndex] != 0;
iPrimaryKeyIndex++) iPrimaryKeyIndex++)
{ {
if ((*tpPKeys)[iPrimaryKeyIndex] == iColumnCounter) if ((*tpPKeys)[iPrimaryKeyIndex]
== iColumnCounter)
{ {
iIsPrimaryKey = 1; iIsPrimaryKey = 1;
break; break;
} }
} }
if (iIsPrimaryKey ? (eKeyUsage != PRIMARY) : (eKeyUsage != NONPRIMARY)) if (iIsPrimaryKey ? (eKeyUsage != PRIMARY) :
(eKeyUsage != NONPRIMARY))
{ {
/** /**
* Don't use. * Don't use.
*/ */
#if defined DEBUG_OUTPUT
elog(NOTICE, "skipping column"); debug_msg("dbmirror:packageData skipping column");
#endif
continue; continue;
} }
} /* KeyUsage!=ALL */ } /* KeyUsage!=ALL */
#ifndef NODROPCOLUMN
if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped) if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped)
{ {
/** /**
* This column has been dropped. * This column has been dropped.
* Do not mirror it. * Do not mirror it.
*/ */
continue; continue;
} }
#endif
cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs cpFieldName = DatumGetPointer(NameGetDatum
[iColumnCounter - 1]->attname));
#if defined DEBUG_OUTPUT (&tTupleDesc->attrs
elog(NOTICE, "field name: %s", cpFieldName); [iColumnCounter - 1]->attname));
#endif
while (iDataBlockSize - iUsedDataBlock < strlen(cpFieldName) + 6) debug_msg2("dbmirror:packageData field name: %s", cpFieldName);
while (iDataBlockSize - iUsedDataBlock <
strlen(cpFieldName) + 6)
{ {
cpDataBlock = SPI_repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE); cpDataBlock = SPI_repalloc(cpDataBlock,
iDataBlockSize +
BUFFER_SIZE);
iDataBlockSize = iDataBlockSize + BUFFER_SIZE; iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
} }
sprintf(cpDataBlock + iUsedDataBlock, "\"%s\"=", cpFieldName); sprintf(cpDataBlock + iUsedDataBlock, "\"%s\"=", cpFieldName);
iUsedDataBlock = iUsedDataBlock + strlen(cpFieldName) + 3; iUsedDataBlock = iUsedDataBlock + strlen(cpFieldName) + 3;
cpFieldData = SPI_getvalue(tTupleData, tTupleDesc, iColumnCounter); cpFieldData = SPI_getvalue(tTupleData, tTupleDesc,
iColumnCounter);
cpUnFormatedPtr = cpFieldData; cpUnFormatedPtr = cpFieldData;
cpFormatedPtr = cpDataBlock + iUsedDataBlock; cpFormatedPtr = cpDataBlock + iUsedDataBlock;
...@@ -477,15 +531,17 @@ packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, ...@@ -477,15 +531,17 @@ packageData(HeapTuple tTupleData, TupleDesc tTupleDesc,
continue; continue;
} }
#if defined DEBUG_OUTPUT debug_msg2("dbmirror:packageData field data: \"%s\"",
elog(NOTICE, "field data: \"%s\"", cpFieldData); cpFieldData);
elog(NOTICE, "starting format loop"); debug_msg("dbmirror:packageData starting format loop");
#endif
while (*cpUnFormatedPtr != 0) while (*cpUnFormatedPtr != 0)
{ {
while (iDataBlockSize - iUsedDataBlock < 2) while (iDataBlockSize - iUsedDataBlock < 2)
{ {
cpDataBlock = SPI_repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE); cpDataBlock = SPI_repalloc(cpDataBlock,
iDataBlockSize
+ BUFFER_SIZE);
iDataBlockSize = iDataBlockSize + BUFFER_SIZE; iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
cpFormatedPtr = cpDataBlock + iUsedDataBlock; cpFormatedPtr = cpDataBlock + iUsedDataBlock;
} }
...@@ -505,25 +561,218 @@ packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, ...@@ -505,25 +561,218 @@ packageData(HeapTuple tTupleData, TupleDesc tTupleDesc,
while (iDataBlockSize - iUsedDataBlock < 3) while (iDataBlockSize - iUsedDataBlock < 3)
{ {
cpDataBlock = SPI_repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE); cpDataBlock = SPI_repalloc(cpDataBlock,
iDataBlockSize +
BUFFER_SIZE);
iDataBlockSize = iDataBlockSize + BUFFER_SIZE; iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
cpFormatedPtr = cpDataBlock + iUsedDataBlock; cpFormatedPtr = cpDataBlock + iUsedDataBlock;
} }
sprintf(cpFormatedPtr, "' "); sprintf(cpFormatedPtr, "' ");
iUsedDataBlock = iUsedDataBlock + 2; iUsedDataBlock = iUsedDataBlock + 2;
#if defined DEBUG_OUTPUT
elog(NOTICE, "data block: \"%s\"", cpDataBlock); debug_msg2("dbmirror:packageData data block: \"%s\"",
#endif cpDataBlock);
} /* for iColumnCounter */ } /* for iColumnCounter */
if (tpPKeys != NULL) if (tpPKeys != NULL)
SPI_pfree(tpPKeys); SPI_pfree(tpPKeys);
#if defined DEBUG_OUTPUT
elog(NOTICE, "returning DataBlockSize:%d iUsedDataBlock:%d", iDataBlockSize, debug_msg3("dbmirror:packageData returning DataBlockSize:%d iUsedDataBlock:%d",
iUsedDataBlock); iDataBlockSize,
#endif iUsedDataBlock);
memset(cpDataBlock + iUsedDataBlock, 0, iDataBlockSize - iUsedDataBlock); memset(cpDataBlock + iUsedDataBlock, 0, iDataBlockSize - iUsedDataBlock);
return cpDataBlock; return cpDataBlock;
} }
PG_FUNCTION_INFO_V1(setval);
Datum setval(PG_FUNCTION_ARGS)
{
text * sequenceName;
Oid setvalArgTypes[2] = {TEXTOID,INT4OID};
int nextValue;
void * setvalPlan=NULL;
Datum setvalData[2];
const char * setvalQuery = "SELECT setval_pg($1,$2)";
int ret;
sequenceName = PG_GETARG_TEXT_P(0);
nextValue = PG_GETARG_INT32(1);
setvalData[0] = PointerGetDatum(sequenceName);
setvalData[1] = Int32GetDatum(nextValue);
if (SPI_connect() < 0)
{
ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("dbmirror:setval could not connect to SPI")));
return -1;
}
setvalPlan = SPI_prepare(setvalQuery,2,setvalArgTypes);
if(setvalPlan == NULL)
{
ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("dbmirror:setval could not prepare plan")));
return -1;
}
ret = SPI_execp(setvalPlan,setvalData,NULL,1);
if(ret != SPI_OK_SELECT || SPI_processed != 1)
return -1;
debug_msg2("dbmirror:setval: setval_pg returned ok:%d",nextValue);
ret = saveSequenceUpdate(sequenceName,nextValue);
SPI_pfree(setvalPlan);
SPI_finish();
debug_msg("dbmirror:setval about to return");
return Int64GetDatum(nextValue);
}
PG_FUNCTION_INFO_V1(nextval);
Datum
nextval(PG_FUNCTION_ARGS)
{
text * sequenceName;
const char * nextvalQuery = "SELECT nextval_pg($1)";
Oid nextvalArgTypes[1] = {TEXTOID};
void * nextvalPlan=NULL;
Datum nextvalData[1];
int ret;
HeapTuple resTuple;
char isNull;
int nextSequenceValue;
debug_msg("dbmirror:nextval Starting pending.so:nextval");
sequenceName = PG_GETARG_TEXT_P(0);
if (SPI_connect() < 0)
{
ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("dbmirror:nextval could not connect to SPI")));
return -1;
}
nextvalPlan = SPI_prepare(nextvalQuery,1,nextvalArgTypes);
debug_msg("prepared plan to call nextval_pg");
if(nextvalPlan==NULL)
{
ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("dbmirror:nextval error creating plan")));
return -1;
}
nextvalData[0] = PointerGetDatum(sequenceName);
ret = SPI_execp(nextvalPlan,nextvalData,NULL,1);
debug_msg("dbmirror:Executed call to nextval_pg");
if(ret != SPI_OK_SELECT || SPI_processed != 1)
return -1;
resTuple = SPI_tuptable->vals[0];
debug_msg("dbmirror:nextval Set resTuple");
nextSequenceValue =*(DatumGetPointer(SPI_getbinval(resTuple,
SPI_tuptable->tupdesc,
1,&isNull)));
debug_msg2("dbmirror:nextval Set SPI_getbinval:%d",nextSequenceValue);
saveSequenceUpdate(sequenceName,nextSequenceValue);
SPI_pfree(resTuple);
SPI_pfree(nextvalPlan);
SPI_finish();
return Int64GetDatum(nextSequenceValue);
}
int
saveSequenceUpdate(const text * sequenceName,
int nextSequenceVal)
{
Oid insertArgTypes[2] = {TEXTOID,INT4OID};
Oid insertDataArgTypes[1] = {NAMEOID};
void * insertPlan=NULL;
void * insertDataPlan=NULL;
Datum insertDatum[2];
Datum insertDataDatum[1];
char nextSequenceText[32];
const char * insertQuery =
"INSERT INTO dbmirror_Pending (TableName,Op,XID) VALUES" \
"($1,'s',$2)";
const char * insertDataQuery =
"INSERT INTO dbmirror_PendingData(SeqId,IsKey,Data) VALUES " \
"(currval('dbmirror_pending_seqid_seq'),'t',$1)";
int ret;
insertPlan = SPI_prepare(insertQuery,2,insertArgTypes);
insertDataPlan = SPI_prepare(insertDataQuery,1,insertDataArgTypes);
debug_msg("Prepared insert query");
if(insertPlan == NULL || insertDataPlan == NULL)
{
ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),errmsg("dbmirror:nextval error creating plan")));
}
insertDatum[1] = Int32GetDatum(GetCurrentTransactionId());
insertDatum[0] = PointerGetDatum(sequenceName);
sprintf(nextSequenceText,"%d",nextSequenceVal);
insertDataDatum[0] = PointerGetDatum(nextSequenceText);
debug_msg2("dbmirror:savesequenceupdate: Setting value %s",
nextSequenceText);
debug_msg("dbmirror:About to execute insert query");
ret = SPI_execp(insertPlan,insertDatum,NULL,1);
ret = SPI_execp(insertDataPlan,insertDataDatum,NULL,1);
debug_msg("dbmirror:Insert query finished");
SPI_pfree(insertPlan);
SPI_pfree(insertDataPlan);
return ret;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment