Commit 4fe8490b authored by Bruce Momjian's avatar Bruce Momjian

Add replication email.

parent 56720e52
...@@ -43,7 +43,7 @@ From owner-pgsql-hackers@hub.org Fri Dec 24 10:01:18 1999 ...@@ -43,7 +43,7 @@ From owner-pgsql-hackers@hub.org Fri Dec 24 10:01:18 1999
Received: from renoir.op.net (root@renoir.op.net [207.29.195.4]) Received: from renoir.op.net (root@renoir.op.net [207.29.195.4])
by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id LAA11295 by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id LAA11295
for <pgman@candle.pha.pa.us>; Fri, 24 Dec 1999 11:01:17 -0500 (EST) for <pgman@candle.pha.pa.us>; Fri, 24 Dec 1999 11:01:17 -0500 (EST)
Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.6 $) with ESMTP id KAA20310 for <pgman@candle.pha.pa.us>; Fri, 24 Dec 1999 10:39:18 -0500 (EST) Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.7 $) with ESMTP id KAA20310 for <pgman@candle.pha.pa.us>; Fri, 24 Dec 1999 10:39:18 -0500 (EST)
Received: from localhost (majordom@localhost) Received: from localhost (majordom@localhost)
by hub.org (8.9.3/8.9.3) with SMTP id KAA61760; by hub.org (8.9.3/8.9.3) with SMTP id KAA61760;
Fri, 24 Dec 1999 10:31:13 -0500 (EST) Fri, 24 Dec 1999 10:31:13 -0500 (EST)
...@@ -129,7 +129,7 @@ From owner-pgsql-hackers@hub.org Fri Dec 24 18:31:03 1999 ...@@ -129,7 +129,7 @@ From owner-pgsql-hackers@hub.org Fri Dec 24 18:31:03 1999
Received: from renoir.op.net (root@renoir.op.net [207.29.195.4]) Received: from renoir.op.net (root@renoir.op.net [207.29.195.4])
by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id TAA26244 by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id TAA26244
for <pgman@candle.pha.pa.us>; Fri, 24 Dec 1999 19:31:02 -0500 (EST) for <pgman@candle.pha.pa.us>; Fri, 24 Dec 1999 19:31:02 -0500 (EST)
Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.6 $) with ESMTP id TAA12730 for <pgman@candle.pha.pa.us>; Fri, 24 Dec 1999 19:30:05 -0500 (EST) Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.7 $) with ESMTP id TAA12730 for <pgman@candle.pha.pa.us>; Fri, 24 Dec 1999 19:30:05 -0500 (EST)
Received: from localhost (majordom@localhost) Received: from localhost (majordom@localhost)
by hub.org (8.9.3/8.9.3) with SMTP id TAA57851; by hub.org (8.9.3/8.9.3) with SMTP id TAA57851;
Fri, 24 Dec 1999 19:23:31 -0500 (EST) Fri, 24 Dec 1999 19:23:31 -0500 (EST)
...@@ -212,7 +212,7 @@ From owner-pgsql-hackers@hub.org Fri Dec 24 21:31:10 1999 ...@@ -212,7 +212,7 @@ From owner-pgsql-hackers@hub.org Fri Dec 24 21:31:10 1999
Received: from renoir.op.net (root@renoir.op.net [207.29.195.4]) Received: from renoir.op.net (root@renoir.op.net [207.29.195.4])
by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id WAA02578 by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id WAA02578
for <pgman@candle.pha.pa.us>; Fri, 24 Dec 1999 22:31:09 -0500 (EST) for <pgman@candle.pha.pa.us>; Fri, 24 Dec 1999 22:31:09 -0500 (EST)
Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.6 $) with ESMTP id WAA16641 for <pgman@candle.pha.pa.us>; Fri, 24 Dec 1999 22:18:56 -0500 (EST) Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.7 $) with ESMTP id WAA16641 for <pgman@candle.pha.pa.us>; Fri, 24 Dec 1999 22:18:56 -0500 (EST)
Received: from localhost (majordom@localhost) Received: from localhost (majordom@localhost)
by hub.org (8.9.3/8.9.3) with SMTP id WAA89135; by hub.org (8.9.3/8.9.3) with SMTP id WAA89135;
Fri, 24 Dec 1999 22:11:12 -0500 (EST) Fri, 24 Dec 1999 22:11:12 -0500 (EST)
...@@ -486,7 +486,7 @@ From owner-pgsql-hackers@hub.org Sun Dec 26 08:31:09 1999 ...@@ -486,7 +486,7 @@ From owner-pgsql-hackers@hub.org Sun Dec 26 08:31:09 1999
Received: from renoir.op.net (root@renoir.op.net [207.29.195.4]) Received: from renoir.op.net (root@renoir.op.net [207.29.195.4])
by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id JAA17976 by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id JAA17976
for <pgman@candle.pha.pa.us>; Sun, 26 Dec 1999 09:31:07 -0500 (EST) for <pgman@candle.pha.pa.us>; Sun, 26 Dec 1999 09:31:07 -0500 (EST)
Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.6 $) with ESMTP id JAA23337 for <pgman@candle.pha.pa.us>; Sun, 26 Dec 1999 09:28:36 -0500 (EST) Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.7 $) with ESMTP id JAA23337 for <pgman@candle.pha.pa.us>; Sun, 26 Dec 1999 09:28:36 -0500 (EST)
Received: from localhost (majordom@localhost) Received: from localhost (majordom@localhost)
by hub.org (8.9.3/8.9.3) with SMTP id JAA90738; by hub.org (8.9.3/8.9.3) with SMTP id JAA90738;
Sun, 26 Dec 1999 09:21:58 -0500 (EST) Sun, 26 Dec 1999 09:21:58 -0500 (EST)
...@@ -909,7 +909,7 @@ From owner-pgsql-hackers@hub.org Thu Dec 30 08:01:09 1999 ...@@ -909,7 +909,7 @@ From owner-pgsql-hackers@hub.org Thu Dec 30 08:01:09 1999
Received: from renoir.op.net (root@renoir.op.net [207.29.195.4]) Received: from renoir.op.net (root@renoir.op.net [207.29.195.4])
by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id JAA10317 by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id JAA10317
for <pgman@candle.pha.pa.us>; Thu, 30 Dec 1999 09:01:08 -0500 (EST) for <pgman@candle.pha.pa.us>; Thu, 30 Dec 1999 09:01:08 -0500 (EST)
Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.6 $) with ESMTP id IAA02365 for <pgman@candle.pha.pa.us>; Thu, 30 Dec 1999 08:37:10 -0500 (EST) Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.7 $) with ESMTP id IAA02365 for <pgman@candle.pha.pa.us>; Thu, 30 Dec 1999 08:37:10 -0500 (EST)
Received: from localhost (majordom@localhost) Received: from localhost (majordom@localhost)
by hub.org (8.9.3/8.9.3) with SMTP id IAA87902; by hub.org (8.9.3/8.9.3) with SMTP id IAA87902;
Thu, 30 Dec 1999 08:34:22 -0500 (EST) Thu, 30 Dec 1999 08:34:22 -0500 (EST)
...@@ -1006,7 +1006,7 @@ From owner-pgsql-patches@hub.org Sun Jan 2 23:01:38 2000 ...@@ -1006,7 +1006,7 @@ From owner-pgsql-patches@hub.org Sun Jan 2 23:01:38 2000
Received: from renoir.op.net (root@renoir.op.net [207.29.195.4]) Received: from renoir.op.net (root@renoir.op.net [207.29.195.4])
by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id AAA16274 by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id AAA16274
for <pgman@candle.pha.pa.us>; Mon, 3 Jan 2000 00:01:28 -0500 (EST) for <pgman@candle.pha.pa.us>; Mon, 3 Jan 2000 00:01:28 -0500 (EST)
Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.6 $) with ESMTP id XAA02655 for <pgman@candle.pha.pa.us>; Sun, 2 Jan 2000 23:45:55 -0500 (EST) Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.7 $) with ESMTP id XAA02655 for <pgman@candle.pha.pa.us>; Sun, 2 Jan 2000 23:45:55 -0500 (EST)
Received: from hub.org (hub.org [216.126.84.1]) Received: from hub.org (hub.org [216.126.84.1])
by hub.org (8.9.3/8.9.3) with ESMTP id XAA13828; by hub.org (8.9.3/8.9.3) with ESMTP id XAA13828;
Sun, 2 Jan 2000 23:40:47 -0500 (EST) Sun, 2 Jan 2000 23:40:47 -0500 (EST)
...@@ -1424,7 +1424,7 @@ From owner-pgsql-hackers@hub.org Tue Jan 4 10:31:01 2000 ...@@ -1424,7 +1424,7 @@ From owner-pgsql-hackers@hub.org Tue Jan 4 10:31:01 2000
Received: from renoir.op.net (root@renoir.op.net [207.29.195.4]) Received: from renoir.op.net (root@renoir.op.net [207.29.195.4])
by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id LAA17522 by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id LAA17522
for <pgman@candle.pha.pa.us>; Tue, 4 Jan 2000 11:31:00 -0500 (EST) for <pgman@candle.pha.pa.us>; Tue, 4 Jan 2000 11:31:00 -0500 (EST)
Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.6 $) with ESMTP id LAA01541 for <pgman@candle.pha.pa.us>; Tue, 4 Jan 2000 11:27:30 -0500 (EST) Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.7 $) with ESMTP id LAA01541 for <pgman@candle.pha.pa.us>; Tue, 4 Jan 2000 11:27:30 -0500 (EST)
Received: from localhost (majordom@localhost) Received: from localhost (majordom@localhost)
by hub.org (8.9.3/8.9.3) with SMTP id LAA09992; by hub.org (8.9.3/8.9.3) with SMTP id LAA09992;
Tue, 4 Jan 2000 11:18:07 -0500 (EST) Tue, 4 Jan 2000 11:18:07 -0500 (EST)
...@@ -1905,3 +1905,1723 @@ Great. ...@@ -1905,3 +1905,1723 @@ Great.
+ If your life is a hard drive, | 830 Blythe Avenue + If your life is a hard drive, | 830 Blythe Avenue
+ Christ can be your backup. | Drexel Hill, Pennsylvania 19026 + Christ can be your backup. | Drexel Hill, Pennsylvania 19026
From pgsql-general-owner+M805@postgresql.org Tue Nov 21 23:53:04 2000
Received: from mail.postgresql.org (webmail.postgresql.org [216.126.85.28])
by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id AAA19262
for <pgman@candle.pha.pa.us>; Wed, 22 Nov 2000 00:53:03 -0500 (EST)
Received: from mail.postgresql.org (webmail.postgresql.org [216.126.85.28])
by mail.postgresql.org (8.11.1/8.11.1) with SMTP id eAM5qYs47249;
Wed, 22 Nov 2000 00:52:34 -0500 (EST)
(envelope-from pgsql-general-owner+M805@postgresql.org)
Received: from racerx.cabrion.com (racerx.cabrion.com [166.82.231.4])
by mail.postgresql.org (8.11.1/8.11.1) with ESMTP id eAM5lJs46653
for <pgsql-general@postgresql.org>; Wed, 22 Nov 2000 00:47:19 -0500 (EST)
(envelope-from rob@cabrion.com)
Received: from cabrionhome (gso163-25-211.triad.rr.com [24.163.25.211])
by racerx.cabrion.com (8.8.7/8.8.7) with SMTP id AAA13731
for <pgsql-general@postgresql.org>; Wed, 22 Nov 2000 00:45:20 -0500
Message-ID: <006501c05447$fb9aa0c0$4100fd0a@cabrion.org>
From: "rob" <rob@cabrion.com>
To: <pgsql-general@postgresql.org>
Subject: [GENERAL] Synchronization Toolkit
Date: Wed, 22 Nov 2000 00:49:29 -0500
MIME-Version: 1.0
Content-Type: multipart/mixed;
boundary="----=_NextPart_000_0062_01C0541E.125CAF30"
X-Priority: 3
X-MSMail-Priority: Normal
X-Mailer: Microsoft Outlook Express 5.50.4133.2400
X-MimeOLE: Produced By Microsoft MimeOLE V5.50.4133.2400
Precedence: bulk
Sender: pgsql-general-owner@postgresql.org
Status: OR
This is a multi-part message in MIME format.
------=_NextPart_000_0062_01C0541E.125CAF30
Content-Type: text/plain; charset="iso-8859-1"
Content-Transfer-Encoding: 7bit
Not to be confused with replication, my concept of synchronization is to
manage changes between a server table (or tables) and one or more mobile,
disconnected databases (i.e. PalmPilot, laptop, etc.).
I read through the notes in the TODO for this topic and devised a tool kit
for doing synchronization. I hope that the Postgresql development community
will find this useful and will help me refine this concept by offering
insight, experience and some good old fashion hacking if you are so
inclined.
The bottom of this message describes how to use the attached files.
I look forward to your feedback.
--rob
Methodology:
I devised a concept that I call "session versioning". This means that every
time a row changes it does NOT get a new version. Rather it gets stamped
with the current session version common to all published tables. Clients,
when they connect for synchronization, will immediately increment this
common version number reserve the result as a "post version" and then
increment the session version again. This version number, implemented as a
sequence, is common to all synchronized tables and rows.
Any time the server makes changes to the row gets stamped with the current
session version, when the client posts its changes it uses the reserved
"post version". The client then makes all it's changes stamping the changed
rows with it's reserved "post version" rather than the current version. The
reason why is explained later. It is important that the client post all its
own changes first so that it does not end up receiving records which changed
since it's last session that it is about to update anyway.
Reserving the post version is a two step process. First, the number is
simply stored in a variable for later use. Second, the value is added to a
lock table (last_stable) to indicate to any concurrent sessions that rows
with higher version numbers are to be considered "unstable" at the moment
and they should not attempt to retrieve them at this time. Each client,
upon connection, will use the lowest value in this lock table (max_version)
to determine the upper boundary for versions it should retrieve. The lower
boundary is simply the previous session's "max_version" plus one. Thus
when the client retrieves changes is uses the following SQL "where"
expression:
WHERE row_version >= max_version and row_version <= last_stable_version and
version <> this_post_version
The point of reserving and locking a post version is important in that it
allows concurrent synchronization by multiple clients. The first, of many,
clients to connect basically dictates to all future clients that they must
not take any rows equal to or greater than the one which it just reserved
and locked. The reason the session version is incremented a second time is
so that the server may continue to post changes concurrent with any client
changes and be certain that these concurrent server changes will not taint
rows the client is about to retrieve. Once the client is finished with it's
session it removes the lock on it's post version.
Partitioning data for use by each node is the next challenge we face. How
can we control which "slice" of data each client receives? A slice can be
horizontal or vertical within a table. Horizontal slices are easy, it's
just the where clause of an SQL statement that says "give me the rows that
match X criteria". We handle this by storing and appending a where clause
to each client's retrieval statement in addition to where clause described
above. Actually, two where clauses are stored and appended. One is per
client and one is per publication (table).
We defined horizontal slices by filtering rows. Vertical slices are limits
by column. The tool kit does provide a mechanism for pseudo vertical
partitioning. When a client is "subscribed" to a publication, the toolkit
stores what columns that node is to receive during a session. These are
stored in the subscribed_cols table. While this does limit the number
columns transmitted, the insert/update/delete triggers do not recognize
changes based on columns. The "pseudo" nature of our vertical partitioning
is evident by example:
Say you have a table with name, address and phone number as columns. You
restrict a client to see only name and address. This means that phone
number information will not be sent to the client during synchronization,
and the client can't attempt to alter the phone number of a given entry.
Great, but . . . if, on the server, the phone number (but not the name or
address) is changed, the entire row gets marked with a new version. This
means that the name and address will get sent to the client even though they
didn't change.
Well, there's the flaw in vertical partitioning. Other than wasting
bandwidth, the extra row does no harm to the process. The workaround for
this is to highly normalize your schema when possible.
Collisions are the next crux one encounters with synchronization. When two
clients retrieve the same row and both make (different)changes, which one is
correct? So far the system operates totally independent of time. This is
good because it doesn't rely on the server or client to keep accurate time.
We can just ignore time all together, but then we force our clients to
synchronize on a strict schedule in order to avoid (or reduce) collisions.
If every node synchronized immediately after making changes we could just
stop here. Unfortunately this isn't reality. Reality dictates that of two
clients: Client A & B will each pick up the same record on Monday. A will
make changes on Monday, then leave for vacation. B will make changes on
Wednesday because new information was gathered in A's absence. Client B
posts those changes Wednesday. Meanwhile, client A returns from vacation on
Friday and synchronizes his changes. A over writes B's changes even though
A made changes before the most recent information was posted by B.
It is clear that we need some form of time stamp to cope with the above
example. While clocks aren't the most reliable, they are the only common
version control available to solve this problem. The system is set up to
accept (but not require) timestamps from clients and changes on the server
are time stamped. The system, when presented a time stamp with a row, will
compare them to figure out who wins in a tie. The system makes certain
"sanity" checks with regard to these time stamps. A client may not attempt
to post a change with a timestamp that is more than one hour in the future
(according to what the server thinks "now" is) nor one hour before it's last
synchronization date/time. The client row will be immediately placed into
the collision table if the timestamp is that far out of whack.
Implementations of the tool kit should take care to ensure that client &
server agree on what "now" is before attempting to submit changes with
timestamps.
Time stamps are not required. Should a client be incapable of tracking
timestamps, etc. The system will assume that any server row which has been
changed since the client's last session will win a tie. This is quite error
prone, so timestamps are encouraged where possible.
Inserts pose an interesting challenge. Since multiple clients cannot share
a sequence (often used as a primary key) while disconnected. They will be
responsible for their own unique "row_id" when inserting records. Inserts
accept any arbitrary key, and write back to the client a special kind of
update that gives the server's row_id. The client is responsible for making
sure that this update takes place locally.
Deletes are the last portion of the process. When deletes occur, the
row_id, version, etc. are stored in a "deleted" table. These entries are
retrieved by the client using the same version filter as described above.
The table is pruned at the end of each session by deleting all records with
versions that are less than the lowest 'last_version' stored for each
client.
Having wrapped up the synchronization process, I'll move on to describe some
points about managing clients, publications and the like.
The tool kit is split into two objects: SyncManagement and Synchronization.
The Synchronization object exposes an API that client implementations use to
communicate and receive changes. The management functions handle system
install and uninstall in addition to publication of tables and client
subscriptions.
Installation and uninstallation are handled by their corresponding functions
in the API. All system tables are prefixed and suffixed with four
underscores, in hopes that this avoids conflict with an existing tables.
Calling the install function more than once will generate an error message.
Uninstall will remove all related tables, sequences, functions and triggers
from the system.
The first step, after installing the system, is to publish a table. A table
can be published more than once under different names. Simply provide a
unique name as the second argument to the publish function. Since object
names are restricted to 32 characters in Postgres, each table is given a
unique id and this id is used to create the trigger and sequence names.
Since one table can be published multiple times, but only needs one set of
triggers and one sequence for change management a reference count is kept so
that we know when to add/drop triggers and functions. By default, all
columns are published, but the third argument to the publish function
accepts an array reference of column names that allows you to specify a
limited set. Information about the table is stored in the "tables" table,
info about the publication is in the "publications" table and column names
are stored in "subscribed_cols" table.
The next step is to subscribe a client to a table. A client is identified
by a user name and a node name. The subscribe function takes three
arguments: user, node & publication. The subscription process writes an
entry into the "subscribed" table with default values. Of note, the
"RefreshOnce" attribute is set to true whenever a table is published. This
indicates to the system that a full table refresh should be sent the next
time the client connects even if the client requests synchronization rather
than refresh.
The toolkit does not, yet, provide a way to manage the whereclause stored at
either the publication or client level. To use or test this feature, you
will need to set the whereclause attributes manually.
Tables and users can be unpublished and unsubscribed using the corresponding
functions within the tool kit's management interface. Because postgres
lacks an "ALTER TABLE DROP COLUMN" function, the unpublish function only
removes default values and indexes for those columns.
The API isn't the most robust thing in the world right now. All functions
return undef on success and an error string otherwise (like DBD). I hope to
clean up the API considerably over the next month. The code has not been
field tested at this time.
The files attached are:
1) SynKit.pm (A perl module that contains install/uninstall functions and a
simple api for synchronization & management)
2) sync_install.pl (Sample code to demonstrate the installation, publishing
and subscribe process)
3) sync_uninstall.pl (Sample code to demonstrate the uninstallation,
unpublishing and unsubscribe process)
To use them on Linux (don't know about Win32 but should work fine):
- set up a test database and make SURE plpgsql is installed
- install perl 5.05 along with Date::Parse(TimeDate-1.1) , DBI and DBD::Pg
modules [www.cpan.org]
- copy all three attached files to a test directory
- cd to your test directory
- edit all three files and change the three DBI variables to suit your
system (they are clearly marked)
- % perl sync_install.pl
- check out the tables, functions & triggers installed
- % perl sync.pl
- check out the 'sync_test' table, do some updates/inserts/deletes and run
sync.pl again
NOTE: Sanity checks default to allow no more than 50% of the table
to be changed by the client in a single session.
If you delete all (or most of) the rows you will get errors when
you run sync.pl again! (by design)
- % perl sync_uninstall.pl (when you are done)
- check out the sample scripts and the perl module code (commented, but
not documented)
------=_NextPart_000_0062_01C0541E.125CAF30
Content-Type: application/octet-stream; name="sync.pl"
Content-Transfer-Encoding: quoted-printable
Content-Disposition: attachment; filename="sync.pl"
# This script depicts the syncronization process for two users.
## CHANGE THESE THREE VARIABLE TO MATCH YOUR SYSTEM ###########
my $dbi_connect_string =3D 'dbi:Pg:dbname=3Dtest;host=3Dsnoopy'; #
my $db_user =3D 'test'; #
my $db_pass =3D 'test'; #
#################################################################
my $ret; #holds return value
use SynKit;
#create a synchronization object (pass dbi connection info)
my $s =3D Synchronize->new($dbi_connect_string,$db_user,$db_pass);
#start a session by passing a user name, "node" identifier and a collision =
queue name (client or server)
$ret =3D $s->start_session('JOE','REMOTE_NODE_NAME','server');
print "Handle this error: $ret\n\n" if $ret;
#call this once before attempting to apply individual changes
$ret =3D $s->start_changes('sync_test',['name']);
print "Handle this error: $ret\n\n" if $ret;
#call this for each change the client wants to make to the database
$ret =3D $s->apply_change(CLIENTROWID,'insert',undef,['ted']);
print "Handle this error: $ret\n\n" if $ret;
#call this for each change the client wants to make to the database
$ret =3D $s->apply_change(CLIENTROWID,'insert','1973-11-10 11:25:00 AM -05=
',['tim']);
print "Handle this error: $ret\n\n" if $ret;
#call this for each change the client wants to make to the database
$ret =3D $s->apply_change(999,'update',undef,['tom']);
print "Handle this error: $ret\n\n" if $ret;
#call this for each change the client wants to make to the database
$ret =3D $s->apply_change(1,'update',undef,['tom']);
print "Handle this error: $ret\n\n" if $ret;
#call this once after all changes have been submitted
$ret =3D $s->end_changes();
print "Handle this error: $ret\n\n" if $ret;
#call this to get updates from all subscribed tables
$ret =3D $s->get_all_updates();
print "Handle this error: $ret\n\n" if $ret;
print "\n\nSyncronization session is complete. (JOE) \n\n";
# make some changes to the database (server perspective)
print "\n\nMaking changes to the the database. (server side) \n\n";
use DBI;
my $dbh =3D DBI->connect($dbi_connect_string,$db_user,$db_pass);
$dbh->do("insert into sync_test values ('roger')");
$dbh->do("insert into sync_test values ('john')");
$dbh->do("insert into sync_test values ('harry')");
$dbh->do("delete from sync_test where name =3D 'roger'");
$dbh->do("update sync_test set name =3D 'tom' where name =3D 'harry'");
$dbh->disconnect;
#now do another session for a different user
#start a session by passing a user name, "node" identifier and a collision =
queue name (client or server)
$ret =3D $s->start_session('KEN','ANOTHER_REMOTE_NODE_NAME','server');
print "Handle this error: $ret\n\n" if $ret;
#call this to get updates from all subscribed tables
$ret =3D $s->get_all_updates();
print "Handle this error: $ret\n\n" if $ret;
print "\n\nSynchronization session is complete. (KEN)\n\n";
print "Now look at your database and see what happend, make changes to the =
test table, etc. and run this again.\n\n";
------=_NextPart_000_0062_01C0541E.125CAF30
Content-Type: application/octet-stream; name="sync_uninstall.pl"
Content-Transfer-Encoding: quoted-printable
Content-Disposition: attachment; filename="sync_uninstall.pl"
# this script uninstalls the synchronization system using the SyncManager o=
bject;
use SynKit;
### CHANGE THESE TO MATCH YOUR SYSTEM ########################
my $dbi_connect_string =3D 'dbi:Pg:dbname=3Dtest;host=3Dsnoopy'; #
my $db_user =3D 'test'; #
my $db_pass =3D 'test'; #
#################################################################
my $ret; #holds return value
#create an instance of the SyncManager object
my $m =3D SyncManager->new($dbi_connect_string,$db_user,$db_pass);
# call this to unsubscribe a user/node (not necessary if you are uninstalli=
ng)
print $m->unsubscribe('KEN','ANOTHER_REMOTE_NODE_NAME','sync_test');
#call this to unpublish a table (not necessary if you are uninstalling)
print $m->unpublish('sync_test');
#call this to uninstall the syncronization system
# NOTE: this will automatically unpublish & unsubscribe all users
print $m->UNINSTALL;
# now let's drop our little test table
use DBI;
my $dbh =3D DBI->connect($dbi_connect_string,$db_user,$db_pass);
$dbh->do("drop table sync_test");
$dbh->disconnect;
print "\n\nI hope you enjoyed this little demonstration\n\n";
------=_NextPart_000_0062_01C0541E.125CAF30
Content-Type: application/octet-stream; name="sync_install.pl"
Content-Transfer-Encoding: quoted-printable
Content-Disposition: attachment; filename="sync_install.pl"
# This script shows how to install the synchronization system=20
# using the SyncManager object
use SynKit;
### CHANGE THESE TO MATCH YOUR SYSTEM ##########################
my $dbi_connect_string =3D 'dbi:Pg:dbname=3Dtest;host=3Dsnoopy'; #
my $db_user =3D 'test'; #
my $db_pass =3D 'test'; #
#################################################################
my $ret; #holds return value
#create an instance of the sync manager object
my $m =3D SyncManager->new($dbi_connect_string,$db_user,$db_pass);
#Call this to install the syncronization management tables, etc.
$ret =3D $m->INSTALL;
die "Handle this error: $ret\n\n" if $ret;
#create a test table for us to demonstrate with
use DBI;
my $dbh =3D DBI->connect($dbi_connect_string,$db_user,$db_pass);
$dbh->do("create table sync_test (name text)");
$dbh->do("insert into sync_test values ('rob')");
$dbh->do("insert into sync_test values ('rob')");
$dbh->do("insert into sync_test values ('rob')");
$dbh->do("insert into sync_test values ('ted')");
$dbh->do("insert into sync_test values ('ted')");
$dbh->do("insert into sync_test values ('ted')");
$dbh->disconnect;
#call this to "publish" a table
$ret =3D $m->publish('sync_test');
print "Handle this error: $ret\n\n" if $ret;
#call this to "subscribe" a user/node to a publication (table)
$ret =3D $m->subscribe('JOE','REMOTE_NODE_NAME','sync_test');
print "Handle this error: $ret\n\n" if $ret;
#call this to "subscribe" a user/node to a publication (table)
$ret =3D $m->subscribe('KEN','ANOTHER_REMOTE_NODE_NAME','sync_test');
print "Handle this error: $ret\n\n" if $ret;
print "Now you can do: 'perl sync.pl' a few times to play\n\n";
print "Do 'perl sync_uninstall.pl' to uninstall the system\n";
------=_NextPart_000_0062_01C0541E.125CAF30
Content-Type: application/octet-stream; name="SynKit.pm"
Content-Transfer-Encoding: quoted-printable
Content-Disposition: attachment; filename="SynKit.pm"
# Perl DB synchronization toolkit
#created for postgres 7.0.2 +
use strict;
BEGIN {
use vars qw($VERSION);
# set the version for version checking
$VERSION =3D 1.00;
}
package Synchronize;
use DBI;
use Date::Parse;
# new requires 3 arguments: dbi connection string, plus the corresponding u=
sername and password to get connected to the database
sub new {
my $proto =3D shift;
my $class =3D ref($proto) || $proto;
my $self =3D {};
my $dbi =3D shift;
my $user =3D shift;
my $pass =3D shift;
$self->{DBH} =3D DBI->connect($dbi,$user,$pass) || die "Failed to connect =
to database: ".DBI->errstr();
$self->{user} =3D undef;
$self->{node} =3D undef;
$self->{status} =3D undef; # holds status of table update portion of sessi=
on
$self->{pubs} =3D {}; #holds hash of pubs available to sessiom with val =
=3D 1 if ok to request sync
$self->{orderpubs} =3D undef; #holds array ref of subscribed pubs ordered =
by sync_order
$self->{this_post_ver} =3D undef; #holds the version number under which th=
is session will post changes
$self->{max_ver} =3D undef; #holds the maximum safe version for getting up=
dates
$self->{current} =3D {}; #holds the current publication info to which chan=
ges are being applied
$self->{queue} =3D 'server'; # tells collide function what to do with coll=
isions. (default is to hold on server)
$self->{DBLOG}=3D DBI->connect($dbi,$user,$pass) || die "cannot log to DB:=
".DBI->errstr();=20
return bless ($self, $class);
}
sub dblog {=20
my $self =3D shift;
my $msg =3D $self->{DBLOG}->quote($_[0]);
my $quser =3D $self->{DBH}->quote($self->{user});
my $qnode =3D $self->{DBH}->quote($self->{node});
$self->{DBLOG}->do("insert into ____sync_log____ (username, nodename,stamp=
, message) values($quser, $qnode, now(), $msg)");
}
#start_session establishes session wide information and other housekeeping =
chores
# Accepts username, nodename and queue (client or server) as arguments;
sub start_session {
my $self =3D shift;
$self->{user} =3D shift || die 'Username is required';
$self->{node} =3D shift || die 'Nodename is required';
$self->{queue} =3D shift;
if ($self->{queue} ne 'server' && $self->{queue} ne 'client') {
die "You must provide a queue argument of either 'server' or 'client'";
}
my $quser =3D $self->{DBH}->quote($self->{user});
my $qnode =3D $self->{DBH}->quote($self->{node});
my $sql =3D "select pubname from ____subscribed____ where username =3D $qu=
ser and nodename =3D $qnode";
my @pubs =3D $self->GetColList($sql);
return 'User/Node has no subscriptions!' if !defined(@pubs);
# go though the list and check permissions and rules for each
foreach my $pub (@pubs) {
my $qpub =3D $self->{DBH}->quote($pub);
my $sql =3D "select disabled, pubname, fullrefreshonly, refreshonce,post_=
ver from ____subscribed____ where username =3D $quser and pubname =3D $qpub=
and nodename =3D $qnode";
my $sth =3D $self->{DBH}->prepare($sql) || die $self->{DBH}->errstr;
$sth->execute || die $self->{DBH}->errstr;
my @row;
while (@row =3D $sth->fetchrow_array) {
next if $row[0]; #publication is disabled
next if !defined($row[1]); #publication does not exist (should never occ=
ur)
if ($row[2] || $row[3]) { #refresh of refresh once flag is set
$self->{pubs}->{$pub} =3D 0; #refresh only
next;
}
if (!defined($row[4])) { #no previous session exists, must refresh
$self->{pubs}->{$pub} =3D 0; #refresh only
next;
}
$self->{pubs}->{$pub} =3D 1; #OK for sync
}
$sth->finish;
}
$sql =3D "select pubname from ____publications____ order by sync_order";
my @op =3D $self->GetColList($sql);
my @orderpubs;
#loop through ordered pubs and remove non subscribed publications
foreach my $pub (@op) {
push @orderpubs, $pub if defined($self->{pubs}->{$pub});
}
=09
$self->{orderpubs} =3D \@orderpubs;
# Now we obtain a session version number, etc.
$self->{DBH}->{AutoCommit} =3D 0; #allows "transactions"
$self->{DBH}->{RaiseError} =3D 1; #script [or eval] will automatically die=
on errors
eval { #start DB transaction
#lock the version sequence until we determin that we have gotten
#a good value. Lock will be released on commit.
$self->{DBH}->do('lock ____version_seq____ in access exclusive mode');
# remove stale locks if they exist
my $sql =3D "delete from ____last_stable____ where username =3D $quser an=
d nodename =3D $qnode";
$self->{DBH}->do($sql);
# increment version sequence & grab the next val as post_ver
my $sql =3D "select nextval('____version_seq____')";
my $sth =3D $self->{DBH}->prepare($sql);
$sth->execute;
($self->{this_post_ver}) =3D $sth->fetchrow_array();
$sth->finish;
# grab max_ver from last_stable
$sql =3D "select min(version) from ____last_stable____";=20
$sth =3D $self->{DBH}->prepare($sql);
$sth->execute;
($self->{max_ver}) =3D $sth->fetchrow_array();
$sth->finish;
# if there was no version in lock table, then take the ID that was in use
# when we started the session ($max_ver -1)
$self->{max_ver} =3D $self->{this_post_ver} -1 if (!defined($self->{max_v=
er}));
# lock post_ver by placing it in last_stable
$self->{DBH}->do("insert into ____last_stable____ (version, username, nod=
ename) values ($self->{this_post_ver}, $quser,$qnode)");
# increment version sequence again (discard result)
$sql =3D "select nextval('____version_seq____')";
$sth =3D $self->{DBH}->prepare($sql);
$sth->execute;
$sth->fetchrow_array();
$sth->finish;
}; #end eval/transaction
if ($@) { # part of transaction failed
return 'Start session failed';
$self->{DBH}->rollback;
} else { # all's well commit block
$self->{DBH}->commit;
}
$self->{DBH}->{AutoCommit} =3D 1;
$self->{DBH}->{RaiseError} =3D 0;
return undef;
}
#start changes should be called once before applying individual change requ=
ests
# Requires publication and ref to columns that will be updated as arguments
sub start_changes {
my $self =3D shift;
my $pub =3D shift || die 'Publication is required';
my $colref =3D shift || die 'Reference to column array is required';
$self->{status} =3D 'starting';
my $qpub =3D $self->{DBH}->quote($pub);
my $quser =3D $self->{DBH}->quote($self->{user});
my $qnode =3D $self->{DBH}->quote($self->{node});
my @cols =3D @{$colref};
my @subcols =3D $self->GetColList("select col_name from ____subscribed_col=
s____ where username =3D $quser and nodename =3D $qnode and pubname =3D $qp=
ub");
my %subcols;
foreach my $col (@subcols) {
$subcols{$col} =3D 1;
}
foreach my $col (@cols) {=09
return "User/node is not subscribed to column '$col'" if !$subcols{$col};
}
my $sql =3D "select pubname, readonly, last_session, post_ver, last_ver, w=
hereclause, sanity_limit,=20
sanity_delete, sanity_update, sanity_insert from ____subscribed____ where u=
sername =3D $quser and pubname =3D $qpub and nodename =3D $qnode";
my ($junk, $readonly, $last_session, $post_ver, $last_ver, $whereclause, $=
sanity_limit,=20
$sanity_delete, $sanity_update, $sanity_insert) =3D $self->GetOneRow($sql);
=09
return 'Publication is read only' if $readonly;
$sql =3D "select whereclause from ____publications____ where pubname =3D $=
qpub";
my ($wc) =3D $self->GetOneRow($sql);
$whereclause =3D '('.$whereclause.')' if $whereclause;
$whereclause =3D $whereclause.' and ('.$wc.')' if $wc;
my ($table) =3D $self->GetOneRow("select tablename from ____publications__=
__ where pubname =3D $qpub");
return 'Publication is not registered correctly' if !defined($table);
my %info;
$info{pub} =3D $pub;
$info{whereclause} =3D $whereclause;
$info{post_ver} =3D $post_ver;
$last_session =3D~ s/([+|-]\d\d?)$/ $1/; #put a space before timezone=09
$last_session =3D str2time ($last_session); #convert to perltime (seconds =
since 1970)
$info{last_session} =3D $last_session;
$info{last_ver} =3D $last_ver;
$info{table} =3D $table;
$info{cols} =3D \@cols;
my $sql =3D "select count(oid) from $table";
$sql =3D $sql .' '.$whereclause if $whereclause;
my ($rowcount) =3D $self->GetOneRow($sql);
#calculate sanity levels (convert from % to number of rows)
# limits defined as less than 1 mean no limit
$info{sanitylimit} =3D $rowcount * ($sanity_limit / 100) if $sanity_limit =
> 0;
$info{insertlimit} =3D $rowcount * ($sanity_insert / 100) if $sanity_inser=
t > 0;
$info{updatelimit} =3D $rowcount * ($sanity_update / 100) if $sanity_updat=
e > 0;
$info{deletelimit} =3D $rowcount * ($sanity_delete / 100) if $sanity_delet=
e > 0;
$self->{sanitycount} =3D 0;
$self->{updatecount} =3D 0;
$self->{insertcount} =3D 0;
$self->{deletecount} =3D 0;
$self->{current} =3D \%info;
$self->{DBH}->{AutoCommit} =3D 0; #turn on transaction behavior so we can =
roll back on sanity limits, etc.
$self->{status} =3D 'ready';
return undef;
}
#call this once all changes are submitted to commit them;
sub end_changes {
my $self =3D shift;
return undef if $self->{status} ne 'ready';
$self->{DBH}->commit;
$self->{DBH}->{AutoCommit} =3D 1;
$self->{status} =3D 'success';
return undef;
}
#call apply_change once for each row level client update
# Accepts 4 params: rowid, action, timestamp and reference to data array
# Note: timestamp can be undef, data can be undef
# timestamp MUST be in perl time (secs since 1970)
#this routine checks basic timestamp info and sanity limits, then passes th=
e info along to do_action() for processing
sub apply_change {
my $self =3D shift;
my $rowid =3D shift || return 'Row ID is required'; #don't die just for on=
e bad row
my $action =3D shift || return 'Action is required'; #don't die just for o=
ne bad row
my $timestamp =3D shift;
my $dataref =3D shift;
$action =3D lc($action);
$timestamp =3D str2time($timestamp) if $timestamp;
return 'Status failure, cannot accept changes: '.$self->{status} if $self-=
>{status} ne 'ready';
my %info =3D %{$self->{current}};
$self->{sanitycount}++;
if ($info{sanitylimit} && $self->{sanitycount} > $info{sanitylimit}) {
# too many changes from client
my $ret =3D $self->sanity('limit');
return $ret if $ret;
}
=09
if ($timestamp && $timestamp > time() + 3600) { # current time + one hour
#client's clock is way off, cannot submit changes in future
my $ret =3D $self->collide('future', $info{table}, $rowid, $action, undef=
, $timestamp, $dataref, $self->{queue});
return $ret if $ret;
}
if ($timestamp && $timestamp < $info{last_session} - 3600) { # last sessio=
n time less one hour
#client's clock is way off, cannot submit changes that occured before las=
t sync date
my $ret =3D $self->collide('past', $info{table}, $rowid, $action, undef, =
$timestamp, $dataref , $self->{queue});
return $ret if $ret;
}
my ($crow, $cver, $ctime); #current row,ver,time
if ($action ne 'insert') {
my $sql =3D "select ____rowid____, ____rowver____, ____stamp____ from $in=
fo{table} where ____rowid____ =3D $rowid";
($crow, $cver, $ctime) =3D $self->GetOneRow($sql);
if (!defined($crow)) {
my $ret =3D $self->collide('norow', $info{table}, $rowid, $action, undef=
, $timestamp, $dataref , $self->{queue});
return $ret if $ret;=09=09
}
$ctime =3D~ s/([+|-]\d\d?)$/ $1/; #put space between timezone
$ctime =3D str2time($ctime) if $ctime; #convert to perl time
if ($timestamp) {
if ($ctime < $timestamp) {
my $ret =3D $self->collide('time', $info{table}, $rowid, $action, undef=
, $timestamp, $dataref, $self->{queue} );=09=09
return $ret if $ret;
}
} else {
if ($cver > $self->{this_post_ver}) {
my $ret =3D $self->collide('version', $info{table}, $rowid, $action, un=
def, $timestamp, $dataref, $self->{queue} );
return $ret if $ret;
}
}
=09
}
if ($action eq 'insert') {
$self->{insertcount}++;
if ($info{insertlimit} && $self->{insertcount} > $info{insertlimit}) {
# too many changes from client
my $ret =3D $self->sanity('insert');
return $ret if $ret;
}
my $qtable =3D $self->{DBH}->quote($info{table});
my ($rowidsequence) =3D '_'.$self->GetOneRow("select table_id from ____ta=
bles____ where tablename =3D $qtable").'__rowid_seq';
return 'Table incorrectly registered, cannot get rowid sequence name: '.$=
self->{DBH}->errstr() if not defined $rowidsequence;
my @data;
foreach my $val (@{$dataref}) {
push @data, $self->{DBH}->quote($val);
}
my $sql =3D "insert into $info{table} (";
if ($timestamp) {
$sql =3D $sql . join(',',@{$info{cols}}) . ',____rowver____, ____stamp__=
__) values (';
$sql =3D $sql . join (',',@data) .','.$self->{this_post_ver}.',\''.local=
time($timestamp).'\')';
} else {
$sql =3D $sql . join(',',@{$info{cols}}) . ',____rowver____) values (';
$sql =3D $sql . join (',',@data) .','.$self->{this_post_ver}.')';
}
my $ret =3D $self->{DBH}->do($sql);
if (!$ret) {
my $ret =3D $self->collide($self->{DBH}->errstr(), $info{table}, $rowid,=
$action, undef, $timestamp, $dataref , $self->{queue});
return $ret if $ret;=09=09
}
my ($newrowid) =3D $self->GetOneRow("select currval('$rowidsequence')");
return 'Failed to get current rowid on inserted row'.$self->{DBH}->errstr=
if not defined $newrowid;
$self->changerowid($rowid, $newrowid);
}
if ($action eq 'update') {
$self->{updatecount}++;
if ($info{updatelimit} && $self->{updatecount} > $info{updatelimit}) {
# too many changes from client
my $ret =3D $self->sanity('update');
return $ret if $ret;
}
my @data;
foreach my $val (@{$dataref}) {
push @data, $self->{DBH}->quote($val);
}=09
my $sql =3D "update $info{table} set ";
my @cols =3D @{$info{cols}};
foreach my $col (@cols) {
my $val =3D shift @data;
$sql =3D $sql . "$col =3D $val,";
}
$sql =3D $sql." ____rowver____ =3D $self->{this_post_ver}";
$sql =3D $sql.", ____stamp____ =3D '".localtime($timestamp)."'" if $times=
tamp;
$sql =3D $sql." where ____rowid____ =3D $rowid";
$sql =3D $sql." and $info{whereclause}" if $info{whereclause};
my $ret =3D $self->{DBH}->do($sql);
if (!$ret) {
my $ret =3D $self->collide($self->{DBH}->errstr(), $info{table}, $rowid,=
$action, undef, $timestamp, $dataref , $self->{queue});
return $ret if $ret;=09=09
}
}
if ($action eq 'delete') {
$self->{deletecount}++;
if ($info{deletelimit} && $self->{deletecount} > $info{deletelimit}) {
# too many changes from client
my $ret =3D $self->sanity('delete');
return $ret if $ret;
}
if ($timestamp) {
my $sql =3D "update $info{table} set ____rowver____ =3D $self->{this_pos=
t_ver}, ____stamp____ =3D '".localtime($timestamp)."' where ____rowid____ =
=3D $rowid";
$sql =3D $sql . " where $info{whereclause}" if $info{whereclause};
$self->{DBH}->do($sql) || return 'Predelete update failed: '.$self->{DBH=
}->errstr;
} else {
my $sql =3D "update $info{table} set ____rowver____ =3D $self->{this_pos=
t_ver} where ____rowid____ =3D $rowid";
$sql =3D $sql . " where $info{whereclause}" if $info{whereclause};
$self->{DBH}->do($sql) || return 'Predelete update failed: '.$self->{DBH=
}->errstr;
}
my $sql =3D "delete from $info{table} where ____rowid____ =3D $rowid";
$sql =3D $sql . " where $info{whereclause}" if $info{whereclause};
my $ret =3D $self->{DBH}->do($sql);
if (!$ret) {
my $ret =3D $self->collide($self->{DBH}->errstr(), $info{table}, $rowid,=
$action, undef, $timestamp, $dataref , $self->{queue});
return $ret if $ret;=09=09
}
}
=09
=09
return undef;
}
sub changerowid {
my $self =3D shift;
my $oldid =3D shift;
my $newid =3D shift;
$self->writeclient('changeid',"$oldid\t$newid");
}
#writes info to client
sub writeclient {
my $self =3D shift;
my $type =3D shift;
my @info =3D @_;
print "$type: ",join("\t",@info),"\n";
return undef;
}
# Override this for custom behavior. Default is to echo back the sanity fa=
ilure reason.=20=20
# If you want to override a collision, you can do so by returning undef.
sub sanity {
my $self =3D shift;
my $reason =3D shift;
$self->{status} =3D 'sanity exceeded';
$self->{DBH}->rollback;
return $reason;
}
# Override this for custom behavior. Default is to echo back the failure r=
eason.=20=20
# If you want to override a collision, you can do so by returning undef.
sub collide {
my $self =3D shift;
my ($reason,$table,$rowid,$action,$rowver,$timestamp,$data, $queue) =3D @_;
my @data;
foreach my $val (@{$data}) {
push @data, $self->{DBH}->quote($val);
}=09
if ($reason =3D~ /integrity/i || $reason =3D~ /constraint/i) {
$self->{status} =3D 'intergrity violation';
$self->{DBH}->rollback;
}
my $datastring;
my @cols =3D @{$self->{current}->{cols}};
foreach my $col (@cols) {
my $val =3D shift @data;
$datastring =3D $datastring . "$col =3D $val,";
}
chop $datastring; #remove trailing comma
if ($queue eq 'server') {
$timestamp =3D localtime($timestamp) if defined($timestamp);
$rowid =3D $self->{DBH}->quote($rowid);
$rowid =3D 'null' if !defined($rowid);
$rowver =3D 'null' if !defined($rowver);
$timestamp =3D $self->{DBH}->quote($timestamp);
$data =3D $self->{DBH}->quote($data);
my $qtable =3D $self->{DBH}->quote($table);
my $qreason =3D $self->{DBH}->quote($reason);
my $qaction =3D $self->{DBH}->quote($action);
my $quser =3D $self->{DBH}->quote($self->{user});
my $qnode =3D $self->{DBH}->quote($self->{node});
$datastring =3D $self->{DBH}->quote($datastring);
my $sql =3D "insert into ____collision____ (rowid,
tablename, rowver, stamp, data, reason, action, username,
nodename, queue) values($rowid,$qtable, $rowver, $timestamp,$datastring,
$qreason, $qaction,$quser, $qnode)";
$self->{DBH}->do($sql) || die 'Failed to write to collision table: '.$sel=
f->{DBH}->errstr;
} else {
$self->writeclient('collision',$rowid,$table, $rowver, $timestamp,$reason=
, $action,$self->{user}, $self->{node}, $data);
}
return $reason;
}
#calls get_updates once for each publication the user/node is subscribed to=
in correct sync_order
sub get_all_updates {
my $self =3D shift;
my $quser =3D $self->{DBH}->quote($self->{user});
my $qnode =3D $self->{DBH}->quote($self->{node});
foreach my $pub (@{$self->{orderpubs}}) {
$self->get_updates($pub, 1); #request update as sync unless overrridden b=
y flags
}
}
# Call this once for each table the client needs refreshed or sync'ed AFTER=
all inbound client changes have been posted
# Accepts publication and sync flag as arguments
sub get_updates {
my $self =3D shift;
my $pub =3D shift || die 'Publication is required';
my $sync =3D shift;
my $qpub =3D $self->{DBH}->quote($pub);
my $quser =3D $self->{DBH}->quote($self->{user});
my $qnode =3D $self->{DBH}->quote($self->{node});
#enforce refresh and refreshonce flags
undef $sync if !$self->{pubs}->{$pub};=20
my %info =3D $self->{current};
my @cols =3D $self->GetColList("select col_name from ____subscribed_cols__=
__ where username =3D $quser and nodename =3D $qnode and pubname =3D $qpub"=
);;
my ($table) =3D $self->GetOneRow("select tablename from ____publications__=
__ where pubname =3D $qpub");
return 'Table incorrectly registered for read' if !defined($table);
my $qtable =3D $self->{DBH}->quote($table);=09
my $sql =3D "select pubname, last_session, post_ver, last_ver, whereclause=
from ____subscribed____ where username =3D $quser and pubname =3D $qpub an=
d nodename =3D $qnode";
my ($junk, $last_session, $post_ver, $last_ver, $whereclause) =3D $self->G=
etOneRow($sql);
my ($wc) =3D $self->GetOneRow("select whereclause from ____publications___=
_ where pubname =3D $qpub");
$whereclause =3D '('.$whereclause.')' if $whereclause;
$whereclause =3D $whereclause.' and ('.$wc.')' if $wc;
if ($sync) {
$self->writeclient('start synchronize', $pub);
} else {
$self->writeclient('start refresh', $pub);
$self->{DBH}->do("update ____subscribed____ set refreshonce =3D false whe=
re pubname =3D $qpub and username =3D $quser and nodename =3D $qnode") || r=
eturn 'Failed to clear RefreshOnce flag: '.$self->{DBH}->errstr;
}
$self->writeclient('columns',@cols);
my $sql =3D "select ____rowid____, ".join(',', @cols)." from $table";
if ($sync) {
$sql =3D $sql." where (____rowver____ <=3D $self->{max_ver} and ____rowve=
r____ > $last_ver)";
if (defined($self->{this_post_ver})) {
$sql =3D $sql . " and (____rowver____ <> $post_ver)";
}
} else {
$sql =3D $sql." where (____rowver____ <=3D $self->{max_ver})";
}
$sql =3D $sql." and $whereclause" if $whereclause;
=09
my $sth =3D $self->{DBH}->prepare($sql) || return 'Failed to get prepare S=
QL for updates: '.$self->{DBH}->errstr;
$sth->execute || return 'Failed to execute SQL for updates: '.$self->{DBH}=
->errstr;
my @row;
while (@row =3D $sth->fetchrow_array) {
$self->writeclient('update/insert',@row);
}
$sth->finish;
# now get deleted rows
if ($sync) {
$sql =3D "select rowid from ____deleted____ where (tablename =3D $qtable)=
";
$sql =3D $sql." and (rowver <=3D $self->{max_ver} and rowver > $last_ver)=
";
if (defined($self->{this_post_ver})) {
$sql =3D $sql . " and (rowver <> $self->{this_post_ver})";
}
$sql =3D $sql." and $whereclause" if $whereclause;
$sth =3D $self->{DBH}->prepare($sql) || return 'Failed to get prepare SQL=
for deletes: '.$self->{DBH}->errstr;
$sth->execute || return 'Failed to execute SQL for deletes: '.$self->{DBH=
}->errstr;
my @row;
while (@row =3D $sth->fetchrow_array) {
$self->writeclient('delete',@row);
}
$sth->finish;
}
if ($sync) {
$self->writeclient('end synchronize', $pub);
} else {
$self->writeclient('end refresh', $pub);
}
my $qpub =3D $self->{DBH}->quote($pub);
my $quser =3D $self->{DBH}->quote($self->{user});
my $qnode =3D $self->{DBH}->quote($self->{node});
$self->{DBH}->do("update ____subscribed____ set last_ver =3D $self->{max_v=
er}, last_session =3D now(), post_ver =3D $self->{this_post_ver} where user=
name =3D $quser and nodename =3D $qnode and pubname =3D $qpub");
return undef;
}
# Call this once when everything else is done. Does housekeeping.=20
# (MAKE THIS AN OBJECT DESTRUCTOR?)
sub DESTROY {
my $self =3D shift;
#release version from lock table (including old ones)
my $quser =3D $self->{DBH}->quote($self->{user});
my $qnode =3D $self->{DBH}->quote($self->{node});
my $sql =3D "delete from ____last_stable____ where username =3D $quser and=
nodename =3D $qnode";
$self->{DBH}->do($sql);
#clean up deleted table
my ($version) =3D $self->GetOneRow("select min(last_ver) from ____subscrib=
ed____");
return undef if not defined $version;
$self->{DBH}->do("delete from ____deleted____ where rowver < $version") ||=
return 'Failed to prune deleted table'.$self->{DBH}->errstr;;
#disconnect from DBD sessions
$self->{DBH}->disconnect;
$self->{DBLOG}->disconnect;
return undef;
}
############# Helper Subs ############
sub GetColList {
my $self =3D shift;
my $sql =3D shift || die 'Must provide sql select statement';
my $sth =3D $self->{DBH}->prepare($sql) || return undef;
$sth->execute || return undef;
my $val;
my @col;
while (($val) =3D $sth->fetchrow_array) {
push @col, $val;
}
$sth->finish;
return @col;
}
sub GetOneRow {
my $self =3D shift;
my $sql =3D shift || die 'Must provide sql select statement';
my $sth =3D $self->{DBH}->prepare($sql) || return undef;
$sth->execute || return undef;
my @row =3D $sth->fetchrow_array;
$sth->finish;
return @row;
}
=20
package SyncManager;
use DBI;
# new requires 3 arguments: dbi connection string, plus the corresponding u=
sername and password
sub new {
my $proto =3D shift;
my $class =3D ref($proto) || $proto;
my $self =3D {};
my $dbi =3D shift;
my $user =3D shift;
my $pass =3D shift;
$self->{DBH} =3D DBI->connect($dbi,$user,$pass) || die "Failed to connect =
to database: ".DBI->errstr();
$self->{DBLOG}=3D DBI->connect($dbi,$user,$pass) || die "cannot log to DB:=
".DBI->errstr();
=09
return bless ($self, $class);
}
sub dblog {=20
my $self =3D shift;
my $msg =3D $self->{DBLOG}->quote($_[0]);
my $quser =3D $self->{DBH}->quote($self->{user});
my $qnode =3D $self->{DBH}->quote($self->{node});
$self->{DBLOG}->do("insert into ____sync_log____ (username, nodename,stamp=
, message) values($quser, $qnode, now(), $msg)");
}
#this should never need to be called, but it might if a node bails without =
releasing their locks
sub ReleaseAllLocks {
my $self =3D shift;
$self->{DBH}->do("delete from ____last_stable____)");
}
# Adds a publication to the system. Also adds triggers, sequences, etc ass=
ociated with the table if approproate.
# accepts two argument: the name of a physical table and the name under wh=
ich to publish it=20
# NOTE: the publication name is optional and will default to the table na=
me if not supplied
# returns undef if ok, else error string;
sub publish {
my $self =3D shift;
my $table =3D shift || die 'You must provide a table name (and optionally =
a unique publication name)';
my $pub =3D shift;
$pub =3D $table if not defined($pub);
my $qpub =3D $self->{DBH}->quote($pub);
my $sql =3D "select tablename from ____publications____ where pubname =3D =
$qpub";
my ($junk) =3D $self->GetOneRow($sql);
return 'Publication already exists' if defined($junk);
my $qtable =3D $self->{DBH}->quote($table);
$sql =3D "select table_id, refcount from ____tables____ where tablename =
=3D $qtable";
my ($id, $refcount) =3D $self->GetOneRow($sql);
if(!defined($id)) {
$self->{DBH}->do("insert into ____tables____ (tablename, refcount) values=
($qtable,1)") || return 'Failed to register table: ' . $self->{DBH}->errst=
r;
my $sql =3D "select table_id from ____tables____ where tablename =3D $qta=
ble";
($id) =3D $self->GetOneRow($sql);
}
if (defined($refcount)) {
$self->{DBH}->do("update ____tables____ set refcount =3D refcount+1 where=
table_id =3D $id") || return 'Failed to update refrence count: ' . $self->=
{DBH}->errstr;
} else {
=09=09
$id =3D '_'.$id.'_';=20
my @cols =3D $self->GetTableCols($table, 1); # 1 =3D get hidden cols too
my %skip;
foreach my $col (@cols) {
$skip{$col} =3D 1;
}
=09=09
if (!$skip{____rowver____}) {
$self->{DBH}->do("alter table $table add column ____rowver____ int4"); #=
don't fail here in case table is being republished, just accept the error s=
ilently
}
$self->{DBH}->do("update $table set ____rowver____ =3D ____version_seq___=
_.last_value - 1") || return 'Failed to initialize rowver: ' . $self->{DBH}=
->errstr;
if (!$skip{____rowid____}) {
$self->{DBH}->do("alter table $table add column ____rowid____ int4"); #d=
on't fail here in case table is being republished, just accept the error si=
lently
}
my $index =3D $id.'____rowid____idx';
$self->{DBH}->do("create index $index on $table(____rowid____)") || retur=
n 'Failed to create rowid index: ' . $self->{DBH}->errstr;
my $sequence =3D $id.'_rowid_seq';
$self->{DBH}->do("create sequence $sequence") || return 'Failed to create=
rowver sequence: ' . $self->{DBH}->errstr;
$self->{DBH}->do("alter table $table alter column ____rowid____ set defau=
lt nextval('$sequence')"); #don't fail here in case table is being republis=
hed, just accept the error silently
$self->{DBH}->do("update $table set ____rowid____ =3D nextval('$sequence=
')") || return 'Failed to initialize rowid: ' . $self->{DBH}->errstr;
if (!$skip{____stamp____}) {
$self->{DBH}->do("alter table $table add column ____stamp____ timestamp"=
); #don't fail here in case table is being republished, just accept the err=
or silently
}
$self->{DBH}->do("update $table set ____stamp____ =3D now()") || return =
'Failed to initialize stamp: ' . $self->{DBH}->errstr;
my $trigger =3D $id.'_ver_ins';
$self->{DBH}->do("create trigger $trigger before insert on $table for eac=
h row execute procedure sync_insert_ver()") || return 'Failed to create tri=
gger: ' . $self->{DBH}->errstr;
my $trigger =3D $id.'_ver_upd';
$self->{DBH}->do("create trigger $trigger before update on $table for eac=
h row execute procedure sync_update_ver()") || return 'Failed to create tri=
gger: ' . $self->{DBH}->errstr;
my $trigger =3D $id.'_del_row';
$self->{DBH}->do("create trigger $trigger after delete on $table for each=
row execute procedure sync_delete_row()") || return 'Failed to create trig=
ger: ' . $self->{DBH}->errstr;
}
$self->{DBH}->do("insert into ____publications____ (pubname, tablename) va=
lues ('$pub','$table')") || return 'Failed to create publication entry: '.$=
self->{DBH}->errstr;
return undef;
}
# Removes a publication from the system. Also drops triggers, sequences, e=
tc associated with the table if approproate.
# accepts one argument: the name of a publication
# returns undef if ok, else error string;
sub unpublish {
my $self =3D shift;
my $pub =3D shift || return 'You must provide a publication name';
my $qpub =3D $self->{DBH}->quote($pub);
my $sql =3D "select tablename from ____publications____ where pubname =3D =
$qpub";
my ($table) =3D $self->GetOneRow($sql);
return 'Publication does not exist' if !defined($table);
my $qtable =3D $self->{DBH}->quote($table);
$sql =3D "select table_id, refcount from ____tables____ where tablename =
=3D $qtable";
my ($id, $refcount) =3D $self->GetOneRow($sql);
return 'Table: $table is not correctly registered!' if not defined($id);
$self->{DBH}->do("update ____tables____ set refcount =3D refcount -1 where=
tablename =3D $qtable") || return 'Failed to decrement reference count: ' =
. $self->{DBH}->errstr;
$self->{DBH}->do("delete from ____subscribed____ where pubname =3D $qpub")=
|| return 'Failed to delete user subscriptions: ' . $self->{DBH}->errstr;
$self->{DBH}->do("delete from ____subscribed_cols____ where pubname =3D $q=
pub") || return 'Failed to delete subscribed columns: ' . $self->{DBH}->err=
str;
$self->{DBH}->do("delete from ____publications____ where tablename =3D $qt=
able and pubname =3D $qpub") || return 'Failed to delete from publications:=
' . $self->{DBH}->errstr;
#if this is the last reference, we want to drop triggers, etc;
if ($refcount <=3D 1) {
$id =3D "_".$id."_";
$self->{DBH}->do("alter table $table alter column ____rowver____ drop def=
ault") || return 'Failed to alter column default: ' . $self->{DBH}->errstr;
$self->{DBH}->do("alter table $table alter column ____rowid____ drop defa=
ult") || return 'Failed to alter column default: ' . $self->{DBH}->errstr;
$self->{DBH}->do("alter table $table alter column ____stamp____ drop defa=
ult") || return 'Failed to alter column default: ' . $self->{DBH}->errstr;
my $trigger =3D $id.'_ver_upd';
$self->{DBH}->do("drop trigger $trigger on $table") || return 'Failed to =
drop trigger: ' . $self->{DBH}->errstr;
my $trigger =3D $id.'_ver_ins';
$self->{DBH}->do("drop trigger $trigger on $table") || return 'Failed to =
drop trigger: ' . $self->{DBH}->errstr;
my $trigger =3D $id.'_del_row';
$self->{DBH}->do("drop trigger $trigger on $table") || return 'Failed to =
drop trigger: ' . $self->{DBH}->errstr;
my $sequence =3D $id.'_rowid_seq';
$self->{DBH}->do("drop sequence $sequence") || return 'Failed to drop seq=
uence: ' . $self->{DBH}->errstr;
my $index =3D $id.'____rowid____idx';
$self->{DBH}->do("drop index $index") || return 'Failed to drop index: ' =
. $self->{DBH}->errstr;
$self->{DBH}->do("delete from ____tables____ where tablename =3D $qtable"=
) || return 'remove entry from tables: ' . $self->{DBH}->errstr;
}
return undef;
}
#Subscribe user/node to a publication
# Accepts 3 arguements: Username, Nodename, Publication
# NOTE: the remaining arguments can be supplied as column names to which =
the user/node should be subscribed
# Return undef if ok, else returns an error string
sub subscribe {
my $self =3D shift;
my $user =3D shift || die 'You must provide user, node and publication as =
arguments';
my $node =3D shift || die 'You must provide user, node and publication as =
arguments';
my $pub =3D shift || die 'You must provide user, node and publication as a=
rguments';
my @cols =3D @_;
my $quser =3D $self->{DBH}->quote($user);
my $qnode =3D $self->{DBH}->quote($node);
my $qpub =3D $self->{DBH}->quote($pub);
my $sql =3D "select tablename from ____publications____ where pubname =3D =
$qpub";
my ($table) =3D $self->GetOneRow($sql);
return "Publication $pub does not exist." if not defined $table;
my $qtable =3D $self->{DBH}->quote($table);
@cols =3D $self->GetTableCols($table) if !@cols; # get defaults if cols we=
re not spefified by caller
$self->{DBH}->do("insert into ____subscribed____ (username, nodename,pubna=
me,last_ver,refreshonce) values('$user', '$node','$pub',0, true)") || retur=
n 'Failes to create subscription: ' . $self->{DBH}->errstr;=09
foreach my $col (@cols) {
$self->{DBH}->do("insert into ____subscribed_cols____ (username, nodename=
, pubname, col_name) values ('$user','$node','$pub','$col')") || return 'Fa=
iles to subscribe column: ' . $self->{DBH}->errstr;=09
}
return undef;
}
#Unsubscribe user/node to a publication
# Accepts 3 arguements: Username, Nodename, Publication
# Return undef if ok, else returns an error string
sub unsubscribe {
my $self =3D shift;
my $user =3D shift || die 'You must provide user, node and publication as =
arguments';
my $node =3D shift || die 'You must provide user, node and publication as =
arguments';
my $pub =3D shift || die 'You must provide user, node and publication as a=
rguments';
my @cols =3D @_;
my $quser =3D $self->{DBH}->quote($user);
my $qnode =3D $self->{DBH}->quote($node);
my $qpub =3D $self->{DBH}->quote($pub);
my $sql =3D "select tablename from ____publications____ where pubname =3D =
$qpub";
my $table =3D $self->GetOneRow($sql);
return "Publication $pub does not exist." if not defined $table;
$self->{DBH}->do("delete from ____subscribed_cols____ where pubname =3D $q=
pub and username =3D $quser and nodename =3D $qnode") || return 'Failed to =
remove column subscription: '. $self->{DBH}->errstr;
$self->{DBH}->do("delete from ____subscribed____ where pubname =3D $qpub a=
nd username =3D $quser and nodename =3D $qnode") || return 'Failed to remov=
e subscription: '. $self->{DBH}->errstr;
return undef;
}
#INSTALL creates the necessary management tables.=20=20
#returns undef if everything is ok, else returns a string describing the e=
rror;
sub INSTALL {
my $self =3D shift;
#check to see if management tables are already installed
my ($test) =3D $self->GetOneRow("select * from pg_class where relname =3D '=
____publications____'");
if (defined($test)) {
return 'It appears that synchronization manangement tables are already ins=
talled here. Please uninstall before reinstalling.';
};
#install the management tables, etc.
$self->{DBH}->do("create table ____publications____ (pubname text primary k=
ey,description text, tablename text, sync_order int4, whereclause text)") |=
| return $self->{DBH}->errstr();
$self->{DBH}->do("create table ____subscribed_cols____ (nodename text, user=
name text, pubname text, col_name text, description text, primary key(noden=
ame, username, pubname,col_name))") || return $self->{DBH}->errstr();
$self->{DBH}->do("create table ____subscribed____ (nodename text, username =
text, pubname text, last_session timestamp, post_ver int4, last_ver int4, w=
hereclause text, sanity_limit int4 default 0, sanity_delete int4 default 0,=
sanity_update int4 default 0, sanity_insert int4 default 50, readonly bool=
ean, disabled boolean, fullrefreshonly boolean, refreshonce boolean, primar=
y key(nodename, username, pubname))") || return $self->{DBH}->errstr();
$self->{DBH}->do("create table ____last_stable____ (version int4, username =
text, nodename text, primary key(version, username, nodename))") || return =
$self->{DBH}->errstr();
$self->{DBH}->do("create table ____tables____ (tablename text, table_id int=
4, refcount int4, primary key(tablename, table_id))") || return $self->{DBH=
}->errstr();
$self->{DBH}->do("create sequence ____table_id_seq____") || return $self->{=
DBH}->errstr();
$self->{DBH}->do("alter table ____tables____ alter column table_id set defa=
ult nextval('____table_id_seq____')") || return $self->{DBH}->errstr();
$self->{DBH}->do("create table ____deleted____ (rowid int4, tablename text,=
rowver int4, stamp timestamp, primary key (rowid, tablename))") || return =
$self->{DBH}->errstr();
$self->{DBH}->do("create table ____collision____ (rowid text, tablename tex=
t, rowver int4, stamp timestamp, faildate timestamp default now(),data text=
,reason text, action text, username text, nodename text,queue text)") || re=
turn $self->{DBH}->errstr();
$self->{DBH}->do("create sequence ____version_seq____") || return $self->{D=
BH}->errstr();
$self->{DBH}->do("create table ____sync_log____ (username text, nodename te=
xt, stamp timestamp, message text)") || return $self->{DBH}->errstr();
$self->{DBH}->do("create function sync_insert_ver() returns opaque as
'begin
if new.____rowver____ isnull then
new.____rowver____ :=3D ____version_seq____.last_value;
end if;
if new.____stamp____ isnull then
new.____stamp____ :=3D now();
end if;
return NEW;
end;' language 'plpgsql'") || return $self->{DBH}->errstr();
$self->{DBH}->do("create function sync_update_ver() returns opaque as
'begin
if new.____rowver____ =3D old.____rowver____ then
new.____rowver____ :=3D ____version_seq____.last_value;
end if;
if new.____stamp____ =3D old.____stamp____ then
new.____stamp____ :=3D now();
end if;
return NEW;
end;' language 'plpgsql'") || return $self->{DBH}->errstr();
$self->{DBH}->do("create function sync_delete_row() returns opaque as=20
'begin=20
insert into ____deleted____ (rowid,tablename,rowver,stamp) values
(old.____rowid____, TG_RELNAME, old.____rowver____,old.____stamp____);=20
return old;=20
end;' language 'plpgsql'") || return $self->{DBH}->errstr();
return undef;
}
#removes all management tables & related stuff
#returns undef if ok, else returns an error message as a string
sub UNINSTALL {
my $self =3D shift;
#Make sure all tables are unpublished first
my $sth =3D $self->{DBH}->prepare("select pubname from ____publications____=
");
$sth->execute;
my $pub;
while (($pub) =3D $sth->fetchrow_array) {
$self->unpublish($pub);=09
}
$sth->finish;
$self->{DBH}->do("drop table ____publications____") || return $self->{DBH}-=
>errstr();
$self->{DBH}->do("drop table ____subscribed_cols____") || return $self->{DB=
H}->errstr();
$self->{DBH}->do("drop table ____subscribed____") || return $self->{DBH}->e=
rrstr();
$self->{DBH}->do("drop table ____last_stable____") || return $self->{DBH}->=
errstr();
$self->{DBH}->do("drop table ____deleted____") || return $self->{DBH}->errs=
tr();
$self->{DBH}->do("drop table ____collision____") || return $self->{DBH}->er=
rstr();
$self->{DBH}->do("drop table ____tables____") || return $self->{DBH}->errst=
r();
$self->{DBH}->do("drop table ____sync_log____") || return $self->{DBH}->err=
str();
$self->{DBH}->do("drop sequence ____table_id_seq____") || return $self->{DB=
H}->errstr();
$self->{DBH}->do("drop sequence ____version_seq____") || return $self->{DBH=
}->errstr();
$self->{DBH}->do("drop function sync_insert_ver()") || return $self->{DBH}-=
>errstr();
$self->{DBH}->do("drop function sync_update_ver()") || return $self->{DBH}-=
>errstr();
$self->{DBH}->do("drop function sync_delete_row()") || return $self->{DBH}-=
>errstr();
return undef;
}
sub DESTROY {
my $self =3D shift;
$self->{DBH}->disconnect;
$self->{DBLOG}->disconnect;
return undef;
}
############# Helper Subs ############
sub GetOneRow {
my $self =3D shift;
my $sql =3D shift || die 'Must provide sql select statement';
my $sth =3D $self->{DBH}->prepare($sql) || return undef;
$sth->execute || return undef;
my @row =3D $sth->fetchrow_array;
$sth->finish;
return @row;
}
#call this with second non-zero value to get hidden columns
sub GetTableCols {
my $self =3D shift;
my $table =3D shift || die 'Must provide table name';
my $wanthidden =3D shift;
my $sql =3D "select * from $table where 0 =3D 1";
my $sth =3D $self->{DBH}->prepare($sql) || return undef;
$sth->execute || return undef;
my @row =3D @{$sth->{NAME}};
$sth->finish;
return @row if $wanthidden;
my @cols;
foreach my $col (@row) {
next if $col eq '____rowver____';
next if $col eq '____stamp____';
next if $col eq '____rowid____';
push @cols, $col;=09
}
return @cols;
}
1; #happy require
------=_NextPart_000_0062_01C0541E.125CAF30--
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment