Replication milestone 1

Previous Topic Next Topic
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view

Replication milestone 1

Timo Sirainen
5 milestones planned currently. I'll start with the always-on
multi-master replication, because that's the most difficult one to get
working correctly and efficiently. So if during its implementation I
find some design problems, less code needs to be fixed.

Milestone 0 will hopefully be done within a month. This includes reading
and replying to the rest of the mails on this list and getting a v1.1.0
release out. :) There will most likely also be a v1.2.0 release this
summer that has some new features on top of v1.1.

Several companies have asked for replication within the past year. At
least two are willing to contribute some money into its development this
summer, but having more wouldn't hurt of course. :) Paying also
guarantees (within reasonable limits) that I'll listen to what kind of a
installation you're planning and make sure that the replication will
work in that kind of a setup.

So, below's the milestone 1 fully described with a couple of FIXMEs
left. The rest of the milestones are described only briefly.

0. Prerequisites:

 - Change v1.2 CONDSTORE implementation to allocate modseqs differently.
Currently log file sequence + offset determines the modseq. This can't work
in a cluster since their log files are different. Instead add a new
"UIDs [..] have modseq X" transaction log record that gets updated when
needed. UID list can be 0 to update higest-modseq. To avoid adding
modseq for all changes, we can be optimistic and assume that usually
multiple servers aren't doing changes at the same time. So the next
transaction after modseq=5 record will have modseq=6 and the following one
modseq=7, etc.

 - Update v2.0 framework to include v1.2 changes and make it work. Base the
replication code on top of v2.0.

 - Shared mailboxes making all users' all mailboxes available for
read/write within the same (replication) process.

1. Incremental replication for existing mailboxes.

IMAP, POP3 and deliver use a replication plugin, which connect to a
replication process via UNIX socket (or TCP optionally for chroots). The
plugin sends the SAVE, EXPUNGE and STORE changes to the replication
process, which forwards them to other servers. The other servers receive
the commands and store them to local filesystem.

The sending and the receiving sides have quite distinct jobs, so they
should probably be implemented as separate processes. They could still use
the same TCP socket though, replication sending process writes to the
socket and replication receiving process reads the socket.

The processes still need to share some state though. Most importantly
keeping track of the mailbox master servers. This is probably the most
cleanly implemented by having a separate master tracking process which the
sending/receiving processes talk to via UNIX socket.

The rest of the data sharing is about forwarding command replies (SAVE,
STORE with CONDSTORE flag) to replication sender, so it can pass them on to
the originating mail process. This can be done easily using an extra
pipe/socket between the processes.

Examples use:

 - C1 = Client 1
 - C2 = Client 2
 - S = Replication sending process
 - R = Remote replication receiving process
 - Small letter before commands = IMAP-like command tag
 - * before commands = tag for a command that doesn't expect a reply

1.1. Master tracking process

Master tracking process is connected to sending and receiving replication

LOCK: (mail process -> replication sender -> master tracker)
 - User name
 - Mailbox name
 - success

LOCK is sent by replication plugin from mail processes when it wants to
assign UIDs for new messages. Replication sender proxies this command to
master tracker. If server is already the master, the command returns
success immediately. Otherwise tracker looks up from its cache who the
current master is and requests it using the REQUEST command. Once GIVE
command is received, reply success to the mail process.

FIXME: handle timeouts (and master disconnections) somehow?

UNLOCK: (mail process -> replication sender -> master tracker)
 - User name
 - Mailbox name
No reply.

UNLOCK is sent by mail processes after they've finished assigning UIDs and
sending replication SAVEs. If this was the last lock, pending TAKE requests
for this mailbox are handled.

GIVE: (replication receiver -> master tracker)
 - User name
 - Mailbox name
No reply.

Notification that this server is now the master for the specified mailbox.
If there are any pending LOCK requests for this mailbox, they're replied to
in here.

TAKE ([mail process ->] replication receiver -> master tracker):
 - User name
 - Mailbox name
 - Server ID
No reply.

Request for giving master state for the specified server. When this mailbox
contains no locks, the TAKE command is forwarded to replication sending
process which passes it to the requesting server. While waiting for
existing locks to be releases, handling new LOCK commands for the mailbox
must be delayed to avoid lock starvation.

When forwarding TAKE command, the destination server ID is cached as being
the mailbox's current master.

If we're not the current master for this mailbox, forward the request using
REQUEST command. This also happens if there are multiple pending TAKE
requests. The first one gets to become the master and the rest of them are
handled as REQUEST commands.

REQUEST (master tracker -> replication sender):
 - User name
 - Mailbox name
 - Server ID of the current assumed master
 - Server ID who wants to be the new master (e.g. ourself)

Request master status to be moved to the specified server. If server ID
isn't ours, cache it as being the mailbox's current master. If we don't
have any idea who the master might be currently, use the root server ID.

ROOT-SET (replication receiver -> master tracker):
 - Server ID

This command causes all cached master states to be dropped. Any new LOCK
requests are delayed. Once all mailboxes are unlocked, drop being a master
from all mailboxes and send ROOT-CLEAR command to replication sender, which
passes on the information that we've cleared the master states. The given
server ID is set to be the new root.

ROOT-CLEAR (replication receiver -> master tracker):
 - Server ID

Given server ID has notified that it has cleared its master states. If
we're not the current root, this command is forwarded to it. If we're the
root and this was the last pending ROOT-CLEAR, start handling the pending
REQUEST and LOCK commands.

FIXME: How is initial root selection done? How is it decided who's the new
one when previous one dies?

1.2. Replication sending

1.2.1. Saving

(For now we ignore the possibility of saving as non-master.)

When a mail process wants to give UIDs for newly saved messages, it first
requests for master lock for the mailbox from the master tracker process.
Once UIDs have been assigned for the new messages, the messages are sent to
replication sender using SAVE commands. After the last one is sent, the
master lock is dropped. Note that this lock duration should be as small as
possible, so in Dovecot's code this means doing both locking and unlocking
in mailbox_transaction_commit().

If multiple processes are sending SAVEs at the same time, the replication
sender may receive the UIDs in wrong order. It must reorder them before
forwarding the SAVEs to other servers. Since all master LOCK/UNLOCK
commands pass through the replication sender, it can figure out when it has
seen all the SAVEs based on them. For example:

C1-C3 -> S: a LOCK [1]
S -> C1-C3: a OK

[Client 1 sends the first mail]
C1 -> S: b SAVE uid=5 ..
[SAVE uid=5 depends on dropping [1] locks (currently clients 1,2,3)]
C1 -> S: * UNLOCK [1] (left 2)

[Client 1 starts sending more mails in a separate transaction, lock counter
is incremented because of the previous SAVE]
C1 -> S: d LOCK [2] (remember lock[2].prev_uid=5)
S -> C1: d OK

[Client 2 sends its mail]
C2 -> S: b SAVE uid=9 ..
[SAVE uid=9 depends on dropping [1-2] locks (currently clients 1,2,3)]
C2 -> S: * UNLOCK [1] (left 1)

[Client 3 sends its mail]
C3 -> S: b SAVE uid=3 ..
[SAVE uid=3 depends on dropping [1] lock (currently client 3). It doesn't
depend on [2] lock, because uid < lock[2].prev_uid (3 < 5)]
C3 -> S: * UNLOCK [1] (left 0)

[SAVEs depending on [1] locks are ordered and forwarded.]
S -> R: x SAVE uid=3 ..
S -> R: y SAVE uid=5 ..

[Client 1 finishes its second transaction]
C1 -> S: f SAVE uid=7
[SAVE uid=7 depends on dropping [2] lock (currently client 1)]
C1 -> S: * UNLOCK [2] (left 0)

[SAVEs depending on [2] locks are ordered and forwarded]
S -> R: x SAVE uid=7 ..
S -> R: y SAVE uid=9 ..

The idea is that locks have a counter which is incremented if there was a
SAVE command sent after the previous LOCK command. The previous SAVE's UID
(prev_uid) is also stored to the lock. When the first SAVE after LOCK
arrives, it's assigned a dependency on all existing locks which have uid >
prev_uid. The lock counter order doesn't guarantee an ascending prev_uid
order. (An actual lock counter probably isn't even needed, prev_uid should
be enough as the lock ID. Lock counter simplifies the above example

All subsequent SAVEs from the client before the next UNLOCK will have the
same dependency as the first SAVE. This is because by that time the
transaction has been committed and all messages within the transaction have
been given UIDs. So in fact it wouldn't really matter in which order the
SAVEs are even sent to the replication sender. Although sending them in
order allows an extra optimization: If only a single client is holding a
lock, the SAVEs can be immediately forwarded as they're being received.

After all lock dependencies have been dropped, it's guaranteed that all new
SAVEs will have higher UIDs, so the SAVEs can be ordered and forwarded.

A further optimization could use some kind of weak locks that are dropped
after the client's first SAVE command. After the weak lock count has
dropped to zero, the messages can started to be sent. For example assume
two clients having the same lock counter. Both clients would be sending
10000 messages within the same transaction. Waiting for and buffering all
of them would waste a lot of memory. Instead it would be possible to read
the first SAVE from the first client and then block until the first SAVE is
read from the second client. The SAVE with a smaller UID can be forwarded
to remote servers. The next message is then read from that connection, and
this is continued until all messages have been processed.

A simpler solution would have been to order and forward SAVEs only when
lock counter dropped to zero, but under very high load it's (at least
theoretically) possible that the counter never drops to zero.

In synchronous setups (recommended whenever possible) the SAVE commands use
a tag (not "*"), which requests an ack reply from the remote. After
replication sender has sent SAVE to a remote server, the remote server
eventually replies back. This reply is forwarded from replication receiver
to sender, which in turn forwards it to the mail process. After mail
process has finished sending all SAVEs, it waits until it has received at
least one ack for each SAVEd message. If the replication sender sent the
SAVE to multiple servers, mail process may receive multiple acks for each
message. It should just ignore the ones it doesn't care about.

1.2.2. Modseq updating

If all mailbox modifications were done only while the server was the
mailbox master, there wouldn't be any problems with modseq updates, because
only one server could modify the mailbox at a time. This scales badly
though, so it's preferrable to move the master only when absolutely
required (for SAVEs) and handle modseqs another ways.

Modseqs are incremented by each mailbox modification. All modification
commands contain a modseq parameter. When applying the modification command
make sure the local modseq is at least as high as the modification
command's modseq. If the local modseq is higher, broadcast the new modseq
to all other servers using MODSEQ command. When receiving a MODSEQ command,
update the modseq only if its current value is lower than the received one.
Remember that modseqs can also be updated for expunged messages.

All this means that eventually everyone's modseq will be the highest
assigned modseq for the change in the cluster. It doesn't matter if
temporarily some clients saw lower modseqs. The worst that can happen is
that some extra FLAG/MODSEQ updates are sent to the client.

Although there is a problem if a client can jump from a server where it had
higher modseqs to a server that hadn't yet received the incremented modseqs
and performed a sync using the higher modseq. If the updated modseqs come
after the sync, they're lost from the client. This could be avoided by
issuing a cluster-wide sync wait before such syncing commands are run.

1.2.3. Expunging

EXPUNGE commands can be replicated without waiting for anything. There are
problems with messages getting expunged while COPY is being handled, but
this will be resolved later when COPY is implemented.

1.2.4. Flag updates

When STORE is received by a master, it checks if it has higher modseqs in
messages than the modseq in STORE parameter. Then it applies all the
changes and forwards the changes to other servers using possibly updated
modseqs. For messages that had higher initial modseqs their current flags
are sent back to the originating server to fix a potential desync.

If STORE is initiated by the master, there are no flag desynchronization

The correctness of applying conflicting changes could be improved by
including a "highest modseq of messages before this STORE" parameter. The
applying server (master) could then find the flags from that point in time
and apply changes on top of it. And finally on the top would be applied
newer changes (that were already once applied). There could still be
conflicting changes, but the result would be better for updates that had
been delayed for a long time. Finding these flag changes and old flag
values isn't simple though, so this won't be at least in initial

STORE UNCHANGEDSINCE (CONDSTORE extension) is initially implemented by
grabbing a master lock. If there are simultaneous STOREs in other servers,
they'll temporarily have lower modseqs and different message flags.
Eventually when the STOREs reach the master, they will be applied on top of
the CONDSTORE modification. So even if normal STOREs were sent slightly
earlier than CONDSTORE, they'd eventually be applied on top of the
CONDSTORE change. This makes the STORE UNCHANGEDSINCE work correctly (even
if with a slight delay), and our multi-master replication should be fully
compatible with the CONDSTORE spec.

1.3. Replication receiving

Replication receives 3 different types of data from remote sender:

 1. Replies to commands this server's replication sender sent
 2. Message texts (as part of SAVE)
 3. Metadata updates (SAVEs, STORE, EXPUNGE, etc.)

Command replies are handled by directly forwarding them to the replication
sender process via a pipe/socket.

1.3.1. Receiving SAVEs

The message text part of the SAVE is stored in a file. The file format
depends on the server's configured default data format. For now we consider
only maildir format where the data is written as-is to the file. This file
is later hard linked to the destination maildir directory.

If multiple partitions are used, the file should be written to the correct
partition. The initial code could write the file to the destination
mailbox's tmp/ directory to make sure the partition is correct. There may
be some permission problems with this approach though, a separate
replication drop directory would be nicer.

The rest of the SAVE is processed similarly to other metadata updates. The
path to the written file is added as new metadata to the SAVE command.

1.3.2. Metadata updates

The primary replication receiver process's job is to read data from the
network as fast as possible. Processing the actual updates can take a
while, so the work is distributed to multiple worker processes.

As discussed above, the primary replication receiver process still writes
message texts to files. The workers could do this as well, but it would
mean that the text needs to be sent via IPC to the workers, which adds
extra overhead. If the message writing really becomes the bottleneck, this
change could be done.

The replication receiver tries to pass the same user's updates to the same
workers so they can benefit from keeping mailboxes open longer. Each worker
could be limited to e.g. 100 open mailboxes.

The worker handles the SAVE/EXPUNGE/STORE commands using the normal mailbox
handling functions. After the changes were successfully committed, commands
that had tags are replied as having succeeded. If a change failed, a
failure reply is sent instead for it. Replication receiver passes this
reply to the replication sender (not to be confused by remote command
replies which are also forwarded), which in turn passes it to the
originating server.


2. Mailbox synchronization for existing mailboxes.

 - As described in the replication protocol.
 - IMAP UID conflict resolution.

3. Mailbox list synchronization.

 - Track creates, deletes, renames.
 - Add unique global mailbox ID which is preserved across renames and
allows replication to perform renaming (and symlinking?)

4. More features

4.1. Global UIDs

 - Save GUIDs to messages (dbox: metadata, maildir: dovecot-uidlist)
 - Tracking GUIDs globally
 - Implement COPY command
 - Implement SAVEs sending only global UIDs and FETCHing unknown ones

4.2. Support for untrusted user-run replication processes

 - Make SAVE and work without being a master
 - Make CONDSTORE's STORE UNCHANGEDSINCE work without being a master

4.3. Transactions

 - Wrap changes into transactions just like they're in the originating
Dovecot's transaction (e.g. TX-BEGIN, TX-END commands)
 - On receiving side a transaction is handled in a single Dovecot transaction
by a single worker.

4.4. Cluster-wide sync

 - Broadcast a ping packet and wait for a reply from all servers on the
cluster. This ensures that all changes before the ping packet was sent have
been received.
 - Then ping replication receiver process and have it ping worker processes
to wait until all received changes have been committed.
 - CHECK command could do it.
 - CONDSTORE/QRESYNC commands that take modseq parameters could maybe do it?
Or would it be too slow? It would at least increase reliability. We could
probably also figure out some rules when the sync is needed and when not.

5. dbox support

 - Message text can't just be written to a file and renamed. At minimum
some dbox metadata needs to be written to it.
 - Files can't be shared between mailboxes by hard linking like they can be
with maildir

signature.asc (196 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view

Re: Replication milestone 1

Timo Sirainen
One more thing came to my mind:

On Sat, 2008-05-03 at 05:04 +0300, Timo Sirainen wrote:
> 5. dbox support
>  - Message text can't just be written to a file and renamed. At minimum
> some dbox metadata needs to be written to it.
>  - Files can't be shared between mailboxes by hard linking like they can be
> with maildir

 - dbox to support message checksums (one checksum / 64 kB of data or
so?). When reading message data compare the checksum. If it's a
mismatch, log an error and if possible fetch the data from another
server and fix the local file with it.

signature.asc (196 bytes) Download Attachment