Archive for the ‘Cluster’ Category

Eventual consistency in MySQL Cluster — implementation part 1

Декабрь 8th, 2011
The last post described MySQL Cluster epochs and why they provide a good basis for conflict detection, with a few enhancements required. This post describes the enhancements.

The following four mechanisms are required to implement conflict detection via epochs :
  1. Slaves should 'reflect' information about replicated epochs they have applied
    Applied epoch numbers should be included in the Slave Binlog events returning to the originating cluster, in a Binlog position corresponding to the commit time of the replicated epoch transaction relative to Slave local transactions.
  2. Masters should maintain a maximum replicated epoch
    A cluster should use the reflected epoch information to track which of its epochs has been applied by a Slave cluster. This will be the maximum of all epochs applied by the Slave.
  3. Masters should track commit-time epoch per row
    To allow per-row detection of conflicts
  4. Masters should track commit-authorship per row
    To differentiate recent epochs due to replication or conflicting activity.

'Reflecting' epoch information and maintaining the maximum replicated epoch

Every epoch transaction in the Binlog contains a special WRITE_ROW event on the mysql.ndb_apply_status table which carries the epoch transaction's epoch number. This is designed to give an atomically consistent way to determine a Slave cluster's position relative to a Master cluster. Normally these WRITE_ROW events are applied by the Slave but not logged in the Slave's Binlog, even when --log-slave-updates is ON. A new MySQLD option, --ndb-log-apply-status causes WRITE_ROW events applied to the mysql.ndb_apply_status table to be binlogged at a Slave, even when --log-slave-updates is OFF. These events are logged with the ServerId of the Slave MySQLD, so that they can be applied on the Master, but will not loop infinitely.

Allowing this applied epoch information to propagate through a Slave Cluster has the following effects :
  1. Downstream Clusters become aware of their position relative to all upstream Master clusters, not just their immediate Master cluster.
    They gain extra mysql.ndb_apply_status entries for all upstream Masters.
  2. Circularly replicating clusters become aware of which of their epochs, and epoch transactions, have been applied to all clusters in the circle.
    They gain extra mysql.ndb_apply_status entries for all Binlogging MySQLDs in the loop

Effect 1 is useful for replication failover with more than two replication-chained clusters where an intermediate cluster is being routed-around (A->B->C) -> (A->C). Cluster C knows the correct Binlog file and position to resume from on A, without consulting B.

Effect 2 could be used to allow clients to wait until their writes have been fully replicated and are globally visible, a kind of synchronous replication. More relevantly, effect 2 allows us to maintain a maximum replicated epoch value for detecting conflicts.

The visible result of using --ndb-log-apply-status on a Slave is that the mysql.ndb_apply_status table on the Master contains extra entries for the Binlogging MySQLDs attached to its Cluster. The maximum replicated epoch is the maximum of these epoch values.

 
Cluster 1 Epoch transactions in flight in
a circular configuration
(Ignoring Cluster 2 epochs)

39 38 37
->---->----->----->----->--
/ \ (Queued epochs 36-26)
Cluster 1 Cluster 2
(Queued epochs 23,24) \ /
-<---<------<----<----<----
25 26 27

Current epoch = 40
Max replicated epoch = 22


A MySQLD acting as a conflict detecting Slave for a cluster needs to know the attached cluster's maximum replicated epoch for conflict detection. On Slave start, before the Slave starts applying replicated changes to the Ndb storage engine, it scans the mysql.ndb_apply_status table to find the highest reflected epoch value. Rows in mysql.ndb_apply_status with server ids in the CHANGE MASTER TO IGNORE_SERVER_IDS list are considered to be local servers, as well as the Slave's own server id, and the maximum replicated epoch is the maximum epoch value from these rows.


@ Slave start

max_replicated_epoch = SELECT MAX(epoch)
FROM mysql.ndb_apply_status
WHERE server_id IN @@IGNORE_SERVER_IDS;



Once the Max_replicated_epoch has been initialised at slave start, it is updated as each reflected epoch event (WRITE_ROW event to mysql.ndb_apply_status) arrives and is processed by the Slave SQL thread. The current Max_replicated_epoch can be seen by issuing the command SHOW STATUS LIKE 'Ndb_slave_max_replicated_epoch';. Note that this is really just a cached copy of the current result of the SELECT MAX(epoch) query from above. One subtlety is that the max_replicated_epoch is only changed when the Slave commits an epoch transaction, as it is only at this point that we know for sure that any event committed on the other cluster before the replicated epoch was applied has been handled.

Per row last-modified epoch storage

Each row stored in Ndb has a built-in hidden metadata column called NDB$GCI64. This columns stores the epoch number at which the row was last modified. For normal system recovery purposes, only the top 32 bits of the 64 bit epoch, called the Global Checkpoint Index or GCI are used. NDB$EPOCH needs further bits to be stored per-row. Epoch values only use a few of the bits in the bottom 32 bits of the epoch, so by default 6 extra bits per row are used to enable a full 64 bit epoch to be stored for each row. The actual number of bits used can be controlled by a parameter to NDB$EPOCH. Where some epoch is not fully expressible in the number of bits available, the bottom 32 bits are saturated, which again errs on the side of safety, potentially causing false conflicts, but ensuring no real conflicts are missed. The ndb_select_all tool has a --gci64 option which shows each row's stored epoch value.

A conflict detecting slave detects conflicts between transactions already committed, whose rows have their commit-time epoch numbers, and incoming operations in an epoch transaction, which are considered to have been committed at the epoch given by the current Maximum Replicated Epoch. An incoming operation is considered to be in-conflict if the row it affects has a last-committed epoch that is greater than the current Maximum Replicated Epoch.

  in_conflict = (ndb_gci64 > max_replicated_epoch)


In other words, at the time the change was committed on the other Cluster, that other Cluster was only aware of our changes as-of our epoch (max_replicated_epoch). Therefore it was unaware of any changes committed in more recent epochs. If the row being changed has been locally modified since that epoch then there have been concurrent modifications and a conflict has been discovered.

Note that this mechanism is purely based on monitoring serialisation of updates to rows. No semantic understanding of row data, or the meaning of applied changes is attempted. Even if both clusters update some row to contain exactly the same value it will be considered to be a conflict, as the updates were not serialised with respect to each other.

Per row hidden Author metacolumn

One advantage of reusing the row's last-modified epoch number for conflict detection is that it is automatically set on every commit. However the downside is that when a replicated modification is found to not be in conflict, and is applied, the row's epoch is automatically set to the current value at commit time as normal. By definition, the current epoch value is always greater than the maximum replicated epoch, and so if a further replicated modification to the same row were to arrive, it would find the row's epoch to be higher than the current maximum replicated epoch, and detect a false conflict.

In theory we could consider the current maximum replicated epoch to be the row's commit time epoch, but as the per-row epoch is used for other more critical DB recovery purposes it's not safe to abuse it in this way. Instead we use the observation that if we found a previous row update from some other cluster to be not-in-conflict, then further updates from it are also not-in-conflict.

To detect this, a new hidden metadata column is introduced called NDB$AUTHOR. This column is set to zero when a row is modified by any unmodified NdbApi client, including MySQLD, but when a row is modified by the MySQLD Slave SQL thread, it is set to one. More generally, NDB$AUTHOR could be set to a non-zero identifier of which other cluster sourced an accepted change. Just setting to one limits us to having one other cluster originating potentially conflicting changes. The ndb_select_all tool has a --author option which shows each row's stored Author value.

By extending the conflict detecting function to examine the NDB$AUTHOR value, we avoid the problem of falsely detecting conflicts when applied consecutive replicated changes.
  in_conflict = (ndb$author != change_author) && (ndb_gci64 > max_replicated_epoch)


We are currently just using 1 to mean 'other author', so this simplifies to :

in_conflict = (ndb$author != 1) && (ndb_gci64 > max_replicated_epoch)

= (ndb$author == 0) && (ndb_gci64 > max_replicated_epoch)


This conflict detection function is encoded in an Ndb interpreted program and attached to the replicated DELETE and UPDATE NdbApi operations so that it can be quickly and atomically executed at the Ndb data nodes as a predicate prior to applying the operation.

Ndb binlog row event ordering and false conflicts

The happened-before relationship between reflected epoch events (WRITE_ROW to mysql.ndb_apply_status) and incoming row events is used to determine whether a conflict has occurred. As described in the last post, Ndb offers limited ordering guarantees on the row events within an epoch transaction. The only guarantee is that multiple changes to the same row will be recorded in the order they committed. This implies that the relative ordering of the reflected epoch WRITE_ROW event, on some row in mysql.ndb_apply_status, and other row events on other tables sharing the same epoch transaction is meaningless. The only ordering guarantees between different rows exist at epoch boundaries.

This means that if we see a reflected epoch WRITE_ROW event somewhere in replicated epoch j, then we can only safely assume that this happened before incoming row events in epoch j+1 and later. The row events appearing before and after the reflected epoch WRITE_ROW event in epoch j may have committed before or after the reflected epoch event.

The relaxed relative ordering gives us reduced precision in determining happened-before, and to be safe, we must err on the side of assuming that a conflict exists rather than that it does not. Consider a Master committing a change to row X, recorded in epoch N. This is then applied on the Slave in Slave epoch S. If the Slave then commits a local change affecting the same row X in the same epoch S, this will be returned to the Master in the same Slave epoch transaction, and the Master will be unable to determine whether it occurred before or after it's original write to X, so must assume that it occurred before and is therefore in conflict. If the Slave had committed its change in epoch S+1 or later, the happened-before relationship would be clear and the change would not be considered in conflict.

These potential false conflicts are the price paid here for the lack of fine grained event ordering in the Ndb Binlog.

I'm lost

There's been a lot of information, or at least a lot of words. Let's summarise how NDB$EPOCH and NDB$EPOCH_TRANS detect row conflicts by following

  • @Cluster A
    Transactions modify rows, automatically setting their hidden NDB$GCI64 column to the current epoch and their NDB$AUTHOR column to 0

    Binlogging MySQLDs record modified rows in epoch transactions in their Binlogs, together with MySQLD generated mysql.ndb_apply_status WRITE_ROW events

  • @Cluster B
    Slave MySQLDs apply replicated epoch transactions along with their generated mysql.ndb_apply_status WRITE_ROW events

    Other clients of Cluster B commit transactions against the same data.

    Binlogging MySQLDs 'reflect' the applied-replicated epoch information by recording the mysql.ndb_apply_status WRITE_ROW events in their Binlogs as a result of --ndb-log-apply-status.

    Binlogging MySQLDs also record the row changes made by local clients.

  • @Cluster A
    Slave MySQLDs track the incoming reflected epoch mysql.ndb_apply_status WRITE_ROW events to maintain their ndb_slave_max_replicated_epoch variables

    Slave MySQLDs attach NdbApi interpreted programs to UPDATE and DELETE operations as they are applied to the database, comparing the row's stored NDB$GCI64 and NDB$AUTHOR columns with constant values supplied in the program.

    If there are no conflicts, the UPDATE and DELETE operations are applied, and the row's NDB$AUTHOR columns are set to one indicating a successful Slave modification

    If there are conflicts then conflict handling for the conflicting rows begins.

Now does that make any sense? Assuming it does, then next we look at how detected conflicts are handled.

Once again, another wordy endurance test and we're not finished. Surely the end must be near?

PlanetMySQL Voting: Vote UP / Vote DOWN

Eventual Consistency in MySQL Cluster — using epochs

Декабрь 7th, 2011
Before getting to the details of how eventual consistency is implemented, we need to look at epochs. Ndb Cluster maintains an internal distributed logical clock known as the epoch, represented as a 64 bit number. This epoch serves a number of internal functions, and is atomically advanced across all data nodes.

Epochs and consistent distributed state

Ndb is a parallel database, with multiple internal transaction coordinator components starting, executing and committing transactions against rows stored in different data nodes. Concurrent transactions only interact where they attempt to lock the same row. This design minimises unnecessary system-wide synchronisation, enabling linear scalability of reads and writes.

The stream of changes made to rows stored at a data node are written to a local Redo log for node and system recovery. The change stream is also published to NdbApi event listeners, including MySQLD servers recording Binlogs. Each node's change stream contains the row changes it was involved in, as committed by multiple transactions, and coordinated by multiple independent transaction coordinators, interleaved in a partial order.

  Incoming independent transactions
affecting multiple rows

T3 T4 T7
T1 T2 T5

| | |
V V V

-------- -------- --------
| 1 | | 2 | | 3 |
| TC | | TC | | TC | Data nodes with multiple
| |--| |--| | transaction coordinators
|------| |------| |------| acting on data stored in
| | | | | | different nodes
| DATA | | DATA | | DATA |
-------- -------- --------

| | |
V V V

t4 t4 t3
t1 t7 t2
t2 t1 t7
t5

Outgoing row change event
streams by causing
transaction


These row event streams are generated independently by each data node in a cluster, but to be useful they need to be correlated together. For system recovery from a crash, the data nodes need to recover to a cluster-wide consistent state. A state which contains only whole transactions, and a state which, logically at least, existed at some point in time. This correlation could be done by an analysis of the transaction ids and row dependencies of each recorded row change to determine a valid order for the merged event streams, but this would add significant overhead. Instead, the Cluster uses a distributed logical clock known as the epoch to group large sets of committed transactions together.

Each epoch contains zero or more committed transactions. Each committed transaction is in only one epoch. The epoch clock advances periodically, every 100 milliseconds by default. When it is time for a new epoch to start, a distributed protocol known as the Global Commit Protocol (GCP) results in all of the transaction coordinators in the Cluster agreeing on a point of time in the flow of committing transactions at which to change epoch. This epoch boundary, between the commit of the last transaction in epoch n, and the commit of the first transaction in epoch n+1, is a cluster-wide consistent point in time. Obtaining this consistent point in time requires cluster-wide synchronisation, between all transaction coordinators, but it need only happen periodically.

Furthermore, each node ensures that the all events for epoch n are published before any events for epoch n+1 are published. Effectively the event streams are sorted by epoch number, and the first time a new epoch is encountered signifies a precise epoch boundary.

 Incoming independent transactions

T3 T4 T7
T1 T2 T5

| | |
V V V

-------- -------- --------
| 1 | | 2 | | 3 |
| TC | | TC | | TC | Data nodes with multiple
| |--| |--| | transaction coordinators
|------| |------| |------| acting on data stored in
| | | | | | different nodes
| DATA | | DATA | | DATA |
-------- -------- --------

| | |
V V V

t4(22) t4(22) t3(22) Epoch 22
...... ...... ......
t1(23) t7(23) t2(23) Epoch 23
t2(23) t1(23) t7(23)
......
t5(24) Epoch 24

Outgoing row change event
streams by causing transaction
with epoch numbers in ()



When these independent streams are merge-sorted by epoch number we get a unified change stream. Multiple possible orderings can result.
One Partial ordering is shown here :

      Events      Transactions
contained in epoch

t4(22)
t4(22) {T4,T3}
t3(22)

......

t1(23)
t2(23)
t7(23)
t1(23) {T1, T2, T7}
t2(23)
t7(23)

......

t5(24) {T5}



Note that we can state from this that T4 -> T1 (Happened before), and T1 -> T5. However we cannot say whether T4 -> T3 or T3 -> T4. In epoch 23 we see that the row events resulting from T1, T2 and T7 are interleaved.

Epoch boundaries act as markers in the flow of row events generated by each node, which are then used as consistent points to recover to. Epoch boundaries also allow a single system wide unified transaction log to be generated from each node's row change stream, by merge-sorting the per-node row change streams by epoch number. Note that the order of events within an epoch is still not tightly constrained. As concurrent transactions can only interact via row locks, the order of events on a single row (Table and Primary key value) signifies transaction commit order, but there is by definition no order between transactions affecting independent row sets.

To record a Binlog of Ndb row changes, MySQLD listens to the row change streams arriving from each data node, and merge-sorts them them by epoch into a single, epoch-ordered stream. When all events for a given epoch have been received, MySQLD records a single Binlog transaction containing all row events for that epoch. This Binlog transaction is referred to as an 'Epoch transaction' as it describes all row changes that occurred in an epoch.

Epoch transactions in the Binlog

Epoch transactions in the Binlog have some interesting properties :
  • Efficiency : They can be considered a kind of Binlog group commit, where multiple user transactions are recorded in one Binlog (epoch) transaction. As an epoch normally contains 100 milliseconds of row changes from a cluster, this is a significant amortisation.
  • Consistency : Each epoch transaction contains the row operations which occurred when moving the cluster from epoch boundary consistent state A to epoch boundary consistent state B
    Therefore, when applied as a transaction by a slave, the slave will atomically move from consistent state A to consistent state B
  • Inter-epoch ordering : Any row event recorded in epoch n+1 logically happened after every row event in epoch n
  • Intra-epoch disorder : Any two row events recorded in epoch n, affecting different rows, may have happened in any order.
  • Intra-epoch key-order : Any two row events recorded in epoch n, affecting the same row, happened in the order they are recorded.

The ordering properties show that epochs give only a partial order, enough to subdivide the row change streams into self-consistent chunks. Within an epoch, row changes may be interleaved in any way, except that multiple changes to the same row will be recorded in the order they were committed.

Each epoch transaction contains the row changes for a particular epoch, and that information is recorded in the epoch transaction itself, as an extra WRITE_ROW event on a system table called mysql.ndb_apply_status. This WRITE_ROW event contains the binlogging MySQLD's server id and the epoch number. This event is added so that it will be atomically applied by the Slave along with the rest of the row changes in the epoch transaction, giving an atomically reliable indicator of the replication 'position' of the Slave relative to the Master Cluster in terms of epoch number. As the epoch number is abstracted from the details of a particular Master MySQLD's binlog files and offsets, it can be used to failover to an alternative Master.

We can visualise a MySQL Cluster Binlog as looking something like this. Each Binlog transaction contains one 'artificially generated' WRITE_ROW event at the start, and then RBR row events for all row changes that occurred in that epoch.

    BEGIN
WRITE_ROW mysql.ndb_apply_status server_id=4, epoch=6998
WRITE_ROW ...
UPDATE_ROW ...
DELETE_ROW ...
...
COMMIT # Consistent state of the database

BEGIN
WRITE_ROW mysql.ndb_apply_status server_id=4, epoch=6999
...
COMMIT # Consistent state of the database

BEGIN
WRITE_ROW mysql.ndb_apply_status server_id=4, epoch=7000
...
COMMIT # Consistent state of the database
...


A series of epoch transactions, each with a special WRITE_ROW event for recording the epoch on the Slave. You can see this structure using the mysqlbinlog tool with the --verbose option.

Rows tagged with last-commit epoch

Each row in a MySQL Cluster stores a hidden metadata column which contains the epoch at which a write to the row was last committed. This information is used internally by the Cluster during node recovery and other operations. The ndb_select_all tool can be used to see the epoch numbers for rows in a table by supplying the --gci or --gci64 options. Note that the per-row epoch is not a row version, as two updates to a row in reasonably quick succession will have the same commit epoch.

Epochs and eventual consistency

Reviewing epochs from the point of view of my previous posts on eventual consistency we see that :
  • Epochs provide an incrementing logical clock
  • Epochs are recorded in the Binlog, and therefore shipped to Slaves
  • Epoch boundaries imply happened-before relationships between events before and after them in the Binlog

The properties mean that epochs are almost perfect for monitoring conflict windows in an active-active circular replication setup, with only a few enhancements required.

I'll describe these enhancements in the next post.

PlanetMySQL Voting: Vote UP / Vote DOWN

Speaking at Oracle UK User Group conference

Ноябрь 25th, 2011

I will be speaking in the MySQL track of the UK Oracle User Group conference on 5th December in Birmingham UK. The title of the session is "Building Highly Available and Scalable, Real Time Services with MySQL Cluster" - full details here.

I'm not a regular conference attendee, never mind speaker. However I'm looking forward to meeting current and potential MySQL users, and also attending some of the talks in the MySQL and other tracks. Maybe I can learn something about RAC, or Exadata?

If you are attending and want to talk about MySQL or MySQL Cluster then please track me down and say hello.

Note that this is the first picture I have included in 3 years of posts - maybe I shouldn't wait 3 years for the next one?

PlanetMySQL Voting: Vote UP / Vote DOWN

MySQL Cluster, and NoSQL

Ноябрь 2nd, 2011

Those are the topics we cover in the latest episode of our “Meet The MySQL Experts” podcast.

Mat Keep and Bernd Ocklin talk about new database requirements, and walk us through what's new in the second Development Milestone Release of MySQL Cluster 7.2, including impressive performance improvements, new NoSQL access via memcached, cross data center scalability, and more...

Enjoy the podcast!


PlanetMySQL Voting: Vote UP / Vote DOWN

Eventual Consistency — detecting conflicts

Октябрь 20th, 2011
In my previous posts I introduced two new conflict detection functions, NDB$EPOCH and NDB$EPOCH_TRANS without explaining how these functions actually detect conflicts? To simplify the explanation I'll initially consider two circularly replicating MySQL Servers, A and B, rather than two replicating Clusters, but the principles are the same.

Commit ordering

Avoiding conflicts requires that data is only modified on one Server at a time. This can be done by defining Master/Slave roles or Active/Passive partitions etc. Where this is not done, and data can be modified anywhere, there can be conflicts. A conflict occurs when the same data is modified at both Servers concurrently, but what does concurrently mean? On a single server, modifications to the same data are serialised by locking or MVCC mechanisms, so that there is a defined order between them. e.g. two modifications MX and MY are committed either in order {MX, MY} or {MY, MX}.

For the purposes of replication, two modifications MX and MY on the same data are concurrent if the order of commit is different at different servers in the system. Each server will choose one order, but if they don't all choose the same order then there is a conflict. Having a different order means that the last modification on each server is different, and therefore the final state of the data can be different on different servers.

One way to avoid conflicts is to get all servers to agree on a commit order before processing an operation - this ensures that all replicas process operations in the same order, waiting if necessary for missing operations to arrive to ensure no commit-order variance.

Note that commit-ordering is only important between modifications affecting the same data - modifications which do not overlap in their data footprint are unrelated and can be committed in any order. A system which totally orders commits may be less efficient than one which only orders conflicting commits.

Happened before

For the NDB$EPOCH asynchronous conflict detection functions, commit orders are monitored to detect when two modifications to the same data have been committed in different orders.

Given two modifications MX and MY to the same data, each server will decide a happened before (denoted ->) relationship between them :

  1. MX -> MY (MX happened before MY)

    or

  2. MY -> MX (MY happened before MX)

If all servers agree on order 1, or all servers agree on order 2 then there is no conflict. If there is any disagreement then there is a conflict.

In practice, disagreement arises because the same data is modified at both Server A and Server B before the Server A modification is replicated to B and/or vice-versa.

Sometimes when reading about commit ordering, the reason why commit orders should not diverge is lost - the only reason to care about commit ordering is because it is related to conflicting modifications and the potential for data divergence.

Determining happened before from the Binlog


We assume a steady start state, where both Server A and Server B agree about the state of their data, and no modifications are in-flight. If a client of Server A then commits modification MA1 to row X, then from Server A's point of view, MA1 happened before any future modification to row X.

MA1 -> M*

If a client of Server B commits modification MB1 to row X around the same time (before, or after, or thereabouts), from Server B's point of view, MB1 happened before any future modification to row X.

MB1 -> M*

Both Servers are correct, and content with their world view. Note that in general, when committing a modification Mj, a server naturally asserts that from its point of view the modification happened before any as-yet-unseen modification Mk.

Some time will pass and the replication mechanisms will pull Binlogged changes across and apply them. When Server B pulls and applies Server A's Binlogged changes, modification MA1 will be applied to row X. Server B will then naturally be of the opinion that :

MB1 -> MA1

Independently, Server A will pull Server B's binlogged changes and apply modification MB1 to row X, and will come to the certain opinion that :

MA1 -> MB1

These happened before relationships are contradictory so there is a conflict. If nothing is done then A and B will have diverged, with Server A storing the outcome of MB1, and Server B storing the outcome of MA1.

Note that if the --log-slave-updates server option were on, then Server A's Binlog would have recorded {...MA1...MB1...}, whereas Server B's Binlog would have recorded {...MB1...MA1...}. By recording when the Slave applies replicated updates in the Binlog, we record the commit order of the replicated updates relative to other local updates, and encode the happened before relationship in the relative positions of events in the Binlog.

The Binlog is of course transferred between servers, so in a circular replication setup, Server A can become aware of the happened before information from Server B and vice-versa by examining the received Binlogs. The Slave SQL thread examines Binlogs as it applies them, so can be extended to extract happened before information, and use it to detect conflicts.

Recall that Server A asserts that its committed modification to row X (MA1) happened before any as-yet-unseen replicated modification :

MA1 -> M*

Therefore, to detect a conflict, Server A only needs to detect the case where the incoming Binlog from Server B infers that some modification MB* to row X happened before server A's already committed modification MA1.

If Server B Binlog implies MB* -> MA1 then there has been a conflict

This is in essence how the NDB$EPOCH functions work - the Binlog is used to capture happened before relationships which are checked to determine whether conflicting concurrent modifications have occurred.


Conflict Windows

In the previous example, Server A commits MA1 modifying row X, and Server B commits MB1 also modifying row X. From Server A's point of view, as soon as it commits MA1, there is potential for a replicated modification from B such as MB1 to be found in-conflict with MA1. We say that from Server A's point of view a window of potential conflict on row X has opened when MA1 was committed. Server A monitors Server B's Binlog as it is applied and when it reaches the point where the commit of MA1 at Server B is recorded, Server A knows that any further MB* recorded in Server B's Binlog after this cannot have happened before MA1, therefore the window of potential conflict on row X has closed.

We define the window of potential conflict on a row X as the time between the commit of a modification M1, and the Slave processing of an event in a replicated Binlog indicating that modification M1 has been applied on the other server(s) in the replication loop.

Any incoming replicated modification M2 also affecting row X while it has an open conflict window is in conflict with M1, as it must appear to have happened-before M1 to the server which committed it.

Observations about the window of potential conflict :
  • It is defined per committed modification per disjoint data set
  • It can be extended by further modifications to the same data from the same server
    The window does not close all further modifications have been fully replicated
  • Window duration is dependent on the replication round-trip delay
    Which can vary greatly
  • Once it closes, further modifications to the same data from anywhere are safe, but will each open their own window of potential conflict.
  • From the point of view of one Server, conflicts can occur at any time until the conflict window is closed
  • From the point of view of one Server, the duration of the window of potential conflict is similar to

    Replication Propagation Delay A to B + Replication Propagation Delay B to A

    These delays may not be symmetric.
  • From the point of view of an external observer/actor, the system will detect two modifications MA1 and MB1 committed at times tMA1 and tMA2 as in-conflict if

    tMB1 - tMA1 < Replication Propagation Delay A to B

    ( A before B, but not by enough to avoid conflict )

    or

    tMA1 - tMB1 < Replication Propagation Delay B to A

    ( B before A, but not by enough to avoid conflict )
  • The window of potential conflict can only be as short as the replication propagation delay between systems, which can tend towards, but never reach zero.

Tracking conflict windows with a logical clock

A row's conflict window opens when a modification is committed to it, and closes when the Slave processes an event indicating that the modification was committed on the other server(s). How can we track all of these independent conflict windows? If only we had a database :)

This is solved by maintaining a per-server logical clock, which increments periodically. Each modification to a row sets a hidden metacolumn of the row to the current value of the server's logical clock. This gives each row a kind of coarse logical timestamp. When the logical clock increments, an event is included in the Binlog to record the transition. Further, all row events for modifications with logical clock value X are stored in the Binlog before any row events for modifications with logical clock value X+1.

 Server A Binlog events    ClockVal stored in DB
by Modification

...
MA1 39
MA2 39
MA3 39
ClockVal_A = 40
MA4 40
MA5 40
ClockVal_A = 41
MA6 41


When a Slave applies the Binlog, the ClockVal events are passed through into its Binlog, and are then made available to the original server in a circular configuration.

 Server B Binlog events

...
MB1
MB2
ClockVal_A = 40
MB3
MB4
ClockVal_B = 234
MB5
MB6
ClockVal_A = 41
MB7
...



Using the Binlog ordering, we can see that ClockVal_A = 40 happened before MB3 and MB4 at Server B. This implies that MA1, MA2 and MA3 happened before MB3 and MB4 at server B.

When applying Server B's Binlog to Server A, the Slave at Server A maintains a maximum replicated clock value, which increases as it observes its ClockVal_A events returned. When applying a row event originating from Server B, the affected row's stored clock value is first compared to the maximum replicated clock value to determine whether the row event from B conflicts with the latest committed change to the row at Server A.

The two modifications are in conflict if the stored row's clock value is greater than or equal to the maximum replicated clock value.

in_conflict = row_clockval >= maximum_replicated_clockval

Using a logical clock to track conflict windows has the following benefits :
  • Automatic update on commit of row modification, opening conflict window
  • Automatic extension of conflict window on further modification on row with open conflict window.
  • Automatic closure of conflict window on maximum replicated clock value exceeding row's stored value
  • Efficient storage cost per row - one clock value.
  • Efficient runtime processing cost - inequality comparison between maximum replicated clock value and row's stored clock value.

As you might have guessed, NDB$EPOCH uses the MySQL Cluster epoch values as a logical clock to detect conflicts. The details of this will have to wait for yet another post. In my first two posts on this subject I thought, 'one more post and I can finish describing this', but here I am at three posts and still not finished. Hopefully the next will get more concrete and finally describe the mysterious workings of NDB$EPOCH. We're getting closer, honest.

PlanetMySQL Voting: Vote UP / Vote DOWN

Eventual consistency with transactions

Октябрь 10th, 2011
In my last post I described the motivation for the new NDB$EPOCH conflict detection function in MySQL Cluster. This function detects when a row has been concurrently updated on two asynchronously replicating MySQL Cluster databases, and takes steps to keep the databases in alignment.

With NDB$EPOCH, conflicts are detected and handled on a row granularity, as opposed to column granularity, as this is the granularity of the epoch metadata used to detect conflicts. Dealing with conflicts on a row-by-row basis has implications for schema and application design. The NDB$EPOCH_TRANS function extends NDB$EPOCH, giving stronger consistency guarantees and reducing the impact on applications and schemas.

Concurrency control in a single synchronous system

MySQL Cluster is a relational system. Data is stored in tables with defined schemas of typed columns. As with any relational system, real-world concepts can be modelled in a number of ways with different trade offs. One such consideration is the level of normalisation applied to a data model. Transactions and concurrency control ensure that some data modelled using multiple tables, rows and columns, appears to any external observer to move instantaneously between stable, self consistent states. This is a powerful simplification, and eases the complexity burden on application writers. Each transaction provides the illusion of serialised access to the database. Multiple transactions can execute in parallel, so long as they do not interfere by accessing the same data. Where transactions do interfere, some real serialisation can occur. In practice, applications depend on the serialisation and atomicity guarantees given by transactions, often in ways not fully made explicit or understood by the application designers.

Concurrency control in independent, asynchronously replicated systems

Asynchronously replicating writes between two independent systems erodes the guarantees given by single system concurrency control. Each system maintains its transactional guarantees in parallel, and incorporates modifications from the other system asynchronously, at some time after they were originally committed. Where the same row is modified on both systems concurrently, two versions of the same row are produced, and there is no longer a single history of values for the given row. This can cause replicas to diverge. Note that the window of 'concurrency', or 'potential conflict' is related to the time taken for a committed update to be applied on all replicas. This is similar, or equivalent to the commit delay experienced by a synchronous 2-phase commit system.

Conflicts can be detected using some form of conflict detection. On detecting a conflict, steps can then be taken to avoid divergence, and resolve any unwanted effects of the concurrent writes.

Replica divergence,
external effects and cascading impacts

Divergence can be avoided if conflicting writes can be merged in some way. Some write conflicts may be equivalent, associative or otherwise mergeable, especially if the operations are replicated rather than their resulting states. However merging requires specific schema and application knowledge to determine how to merge conflicting writes.

More generally, divergence can be avoided by rejecting one or both conflicting writes. This is the approach we have taken, with handling of rejected writes delegated to the application, where the knowledge exists to handle them via the exceptions table mechanism.

However write conflicts are handled, it is important to consider :
  1. Cascading impacts on dependent operations
    Operations based on the results of conflicting operations may themselves require handling to avoid divergence.
  2. Real world / other system effects based on conflicting writes
    Maintaining database consistency does not guarantee that real world effects have been correctly compensated.

A database system does not exist in a vacuum. Operations are performed to reflect external world, or external system events. When the effects of an operation are later reverted, the real world effects may also require some compensating actions. These external world compensating actions are beyond the scope of any DBMS system and are application specific. In a real application of this technology, this is probably the most important part of the design.

Any particular conflict originates between two concurrent operations, but once a conflicting operation is committed, other operations can read its results, and commit their own, expanding the impact of the original conflict. Conflicts are discovered asynchronously, some time after the original operations are committed, so there can be a large number of subsequent operations in the replication pipeline which depend on the conflicting operations at the point they are discovered. All of the invalidated subsequent operations must be handled.

Row based conflict detection and data shearing

Using row-based conflict detection and re-alignment can counteract data divergence so that rows become consistent eventually, but this comes at the cost of eroding the atomicity of committed transactions. For example, a committed transaction which writes to three rows may, after conflict handling, have none, one, two or all three row changes reverted.

Within a single system, the two potentially visible states were :
  1. Before transaction (All rows at version 1 : Av1, Bv1, Cv1)
  2. After transaction (All rows at version 2 : Av2, Bv2, Cv2)

With row based conflict detection, ignoring the row variants we're actually conflicting with, there could be :

Before transaction : (All rows at version 1 : Av1, Bv1, Cv1)

After transaction
  1. Av2, Bv2, Cv2 (All rows at version 2)
  2. Av2, Bv2, Cv1 (Cv2 reverted)
  3. Av2, Bv1, Cv2 (Bv2 reverted)
  4. Av2, Bv1, Cv1 (Bv2, Cv2 reverted)
  5. Av1, Bv2, Cv2 (Av2 reverted)
  6. Av1, Bv2, Cv1 (Av2, Cv2 reverted)
  7. Av1, Bv1, Cv2 (Av2, Bv2 reverted)
  8. Av1, Bv1, Cv1 (Av2, Bv2, Cv2 reverted)

Depending on the concept that the distinct rows A,B,C represented, this can vastly increase the complexity of understanding the data. If A, B and C model entirely separate entities, which just happened to be transactionally updated together then there may be no problem if they fare differently in conflict detection. If they model portions of the state of a larger entity then reasoning about the state of that entity becomes complex.

This potential chopping up of changes committed in a transaction can be described as shearing of the data model represented by the schema. In practice, the potential for shearing between rows implies that for tables with conflicts handled on a row basis, cross row consistency is not available. This in turn implies that the schema must be modified to ensure that data items which cannot tolerate relative shear are placed in the same row so that they share the same fate and remain self-consistent. This single-row limit to consistency is native and natural to some NoSQL / key-value / wide column store products, but is a weakening of the normal guarantees in a transactional system.

Requiring that schemas and applications using conflict detection can tolerate shear between any two rows is quite a heavy burden to place on applications, especially those not written with eventual consistency in mind. Is there some way to support optimistic conflict detection without breaking up committed transactions, and shearing rows?

Transaction based conflict detection


One way to avoid inter-row shearing is to perform conflict detection on a row-by-row basis, but on discovering a conflict, take action on a transaction basis. More concretely, when a row conflict is discovered, any other rows written as part of the same transaction should also be considered in-conflict by implication. This reduces the set of stable states back to the original case - all rows at version 1 or all rows at version 2.

Where a row is found to be in-conflict with some replicated row operation, a further replicated row operation on the same row should also be found to be in-conflict, until the conflict condition has been cleared. This property is implicitly implemented in the existing row based conflict detection functions.

When the scope of a conflict is extended to include all row modifications in a transaction, this implies that all following replicated row operations which affect the same rows, must also be in conflict by implication. To avoid row shearing, these implied-in-conflict rows must implicate the other rows in their transactions, and those rows may in-turn implicate other rows. The overall effect is that a single row conflict must cause its transaction, and all dependent transactions to be considered to be in conflict.

From our database centric point of view, transactions can only become dependent on each other through the data they access in the database. If transaction X updates rows A and B, and transaction Y then reads row B and updates row C, then we can say that transaction B has a read-write dependency on transaction A via row B. We cannot tell whether there is some other out-of-band communication between transactions.

By tracking this transaction 'footprint' information, and looking for row overlaps, we can determine transaction dependencies. This is how the new NDB$EPOCH_TRANS function provides transactional conflict detection.

NDB$EPOCH_TRANS conflict detection function

The NDB$EPOCH_TRANS conflict function uses the same mechanism as the NDB$EPOCH function to detect concurrent updates to the same row across two clusters. However, once a row conflict has been detected in an operation which is part of a replicated transaction, all other operations in that replicated transaction are considered to be in conflict. Furthermore, any transactions found to be dependent on that transaction are also considered in conflict. Once the full set of in conflict transactions has been determined, the set of affected rows are handled in the same way as in NDB$EPOCH.

Specifically :
  • The replicated operations are not applied
  • The exceptions table(s) are populated with the affected primary keys
  • The affected row epochs are updated
  • Realignment Binlog events are generated to (eventually) realign the Secondary cluster

As with NDB$EPOCH, NDB$EPOCH_TRANS is asymmetric, so the Primary Cluster always wins when a conflict is detected. As with NDB$EPOCH, this allows applications needing pessimistic properties to obtain them by accessing the Primary Cluster. Applications which can handle the relaxed consistency of optimism can access either Cluster. With NDB$EPOCH_TRANS, transactions committed on the Secondary Cluster are guaranteed to be atomic, whether or not they are later found to be in conflict. Each committed transaction will either be unaffected by conflict detection, or be completely reverted. There will be no row shear.

This slightly stronger optimistic consistency guarantee may ease the implementation of relaxed consistency / eventually consistent applications. For example, where some concept is modelled by a number of different rows in different tables, any transactional modification will either be atomically applied, or not applied at all, so the relationships between the rows affected by a transaction will preserved. The need to flatten a schema into single-row entities is reduced, although careful design is still required to get a good understanding of transaction boundaries, and the behaviour of the overall system when transactions are reverted.

Transaction dependency tracking


NDB$EPOCH_TRANS is built in to the MySQL Cluster Storage Engine. It is active in the normal MySQL Slave SQL thread, as part of the normal table handler calls made when applying a replicated Binlog. The NDB$EPOCH_TRANS code in the Ndb storage engine tracks transaction dependencies based on the primary keys accessed by row events in the Binlog, and their transaction ids. If two row events have the same table and primary key values, then they affect the same row. If two events affect the same row, and are in different transactions, then the second transaction depends on the first. In this way, a transaction dependency graph is built by the MySQL Cluster Storage Engine as row events are applied by the Slave from a replicated Binlog. This graph is then used to find dependencies when a conflict is detected.

A Binlog only contains WRITE_ROW, UPDATE_ROW and DELETE_ROW events. This means that we only detect dependencies between transactions which write the same rows. We do not currently track dependencies between writers and readers. For example :

Transaction A : {Write row X, Write row Y}
Transaction B : {Read row Y, Write row Z}

Binlog : {{Tx A : Wr X, Wr Y}, {Tx B : Wr Z}}

In this example, the dependency of Transaction B on Transaction A is not recorded in the Binlog, and so the Slave is not aware of it. This would result in the write to row Z not being considered in conflict, when it should be.

A future improvement is to add selective tracking of reads to the Binlog, so that Write -> Read dependencies will implicate reading transactions when a conflict is discovered.

There's more to come


Another long dry post, best consumed with your favourite drink in hand. As I mentioned last time, these functions are pushed, and available in the latest releases of MySQL Cluster. I'd be happy to hear from anyone who wants to try them out and give feedback. I've been deliberately light with implementation details thus far, as I'm saving those for yet another posting. I think that some of the implementation details are interesting from a replication point of view, even if you're not interested in these particular conflict detection algorithms. You may disagree :)

PlanetMySQL Voting: Vote UP / Vote DOWN

MySQL Cluster 7.2 (DMR2): NoSQL, Key/Value, Memcached

Октябрь 7th, 2011

70x Higher Performance, Cross Data Center Scalability and New NoSQL Interface

Its been an exciting week for all involved with MySQL Cluster, with the announcement of the second Development Milestone Release (7.2.1) at Oracle Open World. Highlights include:

- Enabling next generation web services: 70x higher complex query performance, native memcached API and integration with the latest MySQL 5.5 server

- Enhancing cross data scalability: new multi-site clustering and enhanced active/active replication

- Simplified provisioning: consolidated user privileges.

You can download the DMR for evaluation now from: http://dev.mysql.com/downloads/cluster/ (select Development Milestone Release tab).

You can also read up on the detail of each of these features in the new article posted at the MySQL Developer Zone. In this blog, I’ll summarize the main parts of the announcement.

70x Higher Performance with Adaptive Query Localization (AQL)

Previewed as part of the first MySQL Cluster DMR, AQL is enabled by a new Index Statistics function that allows the SQL optimizer to build a better execution plan for each query.

As a result, JOIN operations are pushed down to the data nodes where the query executes in parallel on local copies of the data. A merged result set is then sent back to the MySQL Server, significantly enhancing performance by reducing network trips.

Take a look at how this is used by a web-based content management to increase performance by 70x

Adaptive Query Localization enables MySQL Cluster to better serve those use-cases that have the need to run real-time analytics across live data sets, along with high throughput OLTP operations. Examples include recommendations engines and clickstream analysis in web applications, pre-pay billing promotions in mobile telecoms networks or fraud detection in payment systems.

New NoSQL Interface and Schema-less Storage with the memcached API

The memcached interface released as an Early Access project with the first MySQL Cluster DMR is now integrated directly into the MySQL Cluster 7.2.1 trunk, enabling simpler evaluation.

The popularity of Key/Value stores has increased dramatically. With MySQL Cluster and the new memcached API, you have all the benefits of an ACID RDBMS, combined with the performance capabilities of Key/Value store.

By default, every Key / Value is written to the same table with each Key / Value pair stored in a single row – thus allowing schema-less data storage. Alternatively, the developer can define a key-prefix so that each value is linked to a pre-defined column in a specific table.

Of course if the application needs to access the same data through SQL then developers can map key prefixes to existing table columns, enabling Memcached access to schema-structured data already stored in MySQL Cluster.

You can read more about the design goals and implementation of the memcached API for MySQL Cluster here.

Integration with MySQL 5.5

MySQL Cluster 7.2.1 is integrated with MySQL Server 5.5, providing binary compatibility to existing MySQL Server deployments. Users can now fully exploit the latest capabilities of both the InnoDB and MySQL Cluster storage engines within a single application.

Users simply install the new MySQL Cluster binary including the MySQL 5.5 release, restart the server and immediate have access to both InnoDB and MySQL Cluster!

Enhancing Cross Data Center Scalability: Simplified Active / Active Replication

MySQL Cluster has long offered Geographic Replication, distributing clusters to remote data centers to reduce the affects of geographic latency by pushing data closer to the user, as well as providing a capability for disaster recovery.

Geographic replication has always been designed around an Active / Active technology, so if applications are attempting to update the same row on different clusters at the same time, the conflict can be detected and resolved. With the release of MySQL Cluster 7.2.1, implementing Active / Active replication has become a whole lot simpler. Developers no longer need to implement and manage timestamp columns within their applications. Also rollbacks can be made to whole transactions rather than just individual operations.

You can learn more here.

Enhancing Cross Data Center Scalability: Multi-Site Clustering

MySQL Cluster 7.2.1 DMR provides a new option for cross data center scalability – multi-site clustering. For the first time splitting data nodes across data centers is a supported deployment option.

Improvements to MySQL Cluster’s heartbeating mechanism with a new “ConnectivityCheckPeriod” parameter enables greater resilience to temporary latency spikes on a WAN, thereby maintaining operation of the cluster.

With this deployment model, users can synchronously replicate updates between data centers without needing conflict detection and resolution, and automatically failover between those sites in the event of a node failure.

Users need to characterize their network bandwidth and latencies, and observe best practices in configuring both their network environment and Cluster. More guidance is available here.

User Privilege Consolidation

User privilege tables are now consolidated into the data nodes and centrally accessible by all MySQL servers accessing the cluster.

Previously the privilege tables were local to each MySQL server, meaning users and their associated privileges had to be managed separately on each server. By consolidating privilege data, users need only be defined once and managed centrally, saving Systems Administrators significant effort and reducing cost of operations.

Summary

The MySQL Cluster 7.2.1 DMR enables new classes of use-cases to benefit from web-scale performance with carrier-grade availability.

You can download the DMR for evaluation now from: http://dev.mysql.com/downloads/cluster/ (select Development Milestone Release tab).

You can learn more about the MySQL Cluster architecture from our Guide to scaling web databases

Let us know what you think of these enhancements directly in comments of this or the associated blogs. We look forward to working with the community to perfect these new features.


PlanetMySQL Voting: Vote UP / Vote DOWN

Synchronously Replicating Databases Across Data Centers – Are you Insane?

Октябрь 4th, 2011
 

Well actually….no. The second Development Milestone Release of MySQL Cluster 7.2 introduces support for what we call “Multi-Site Clustering”. In this post, I’ll provide an overview of this new capability, and considerations you need to make when considering it as a deployment option to scale geographically dispersed database services.

You can read more about MySQL Cluster 7.2.1 in the article posted on the MySQL Developer Zone

MySQL Cluster has long offered Geographic Replication, distributing clusters to remote data centers to reduce the affects of geographic latency by pushing data closer to the user, as well as providing a capability for disaster recovery.

Multi-Site Clustering provides a new option for cross data center scalability. For the first time splitting data nodes across data centers is a supported deployment option. With this deployment model, users can synchronously replicate updates between data centers without needing to modify their application or schema for conflict handling, and automatically failover between those sites in the event of a node failure.

MySQL Cluster offers high availability by maintaining a configurable number of data replicas.  All replicas are synchronously maintained by a built-in 2 phase commit protocol.  Data node and communication failures are detected and handled automatically.  On recovery, data nodes automatically rejoin the cluster, synchronize with running nodes, and resume service.

All replicas of a given row are stored in a set of data nodes known as a nodegroup.  To provide service, a cluster must have at least one data node from each nodegroup available at all times.  When the cluster detects that the last node in a nodegroup has failed, the remaining cluster nodes will be gracefully shutdown, to ensure the consistency of the stored databases on recovery.

Improvements to the heartbeating mechanism used by MySQL Cluster enables greater resilience to temporary latency spikes on a WAN, thereby maintaining operation of the cluster. A new “ConnectivityCheck” mechanism is introduced, which must be explicitly configured. This extra mechanism adds messaging overheads and failure handling latency, and so is not switched on by default.

When configuring Multi-Site clustering, the following factors must be considered:

Bandwidth
Low bandwidth between data nodes can slow data node recovery.  In normal operation, the available bandwidth can limit the maximum system throughput.  If link saturation causes latency on individual links to increase, then node failures, and potentially cluster failure could occur.

Latency and performance
Synchronously committing transactions over a wide area increases the latency of operation execution and commit, therefore individual operations are slowed. To maintain the same overall throughput, higher client concurrency is required.  With the same client concurrency level, throughput will decrease relative to a lower latency configuration.

Latency and stability
Synchronous operation implies that clients wait to hear of the success or failure of each operation before continuing. Loss of communication to a node, and high latency communication to a node are indistinguishable in some cases.  To ensure availability, the Cluster monitors inter-node communication.  If a node experiences high communication latency, then it may be killed by another node, to prevent its high latency causing service loss.

Where inter-node latencies fluctuate, and are in the same range as the node-latency-monitoring trigger levels, node failures can result.  Node failures are expensive to recover from, and endanger Cluster availability. 

To avoid node failures, either the latency should be reduced, or the trigger levels should be raised.  Raising trigger levels can result in a longer time-to-detection of communication problems.

WAN latencies
Latency on an IP WAN may be a function of physical distance, routing hops, protocol layering, link failover times and rerouting times. The maximum expected latency on a link should be characterized as input to the cluster configuration.

Survivability of node failures
MySQL Cluster uses a fail fast mechanism to minimize time-to-recovery. Nodes that are suspected of being unreachable or dead are quickly excluded from the Cluster.  This mechanism is simple and fast, but sometimes takes steps that result in unnecessary cluster failure.  For this reason, latency trigger levels should be configured a safe margin
above the maximum latency variation on inter-data node links.

Users can configure various MySQL Cluster parameters including heartbeats, Connectivity_Check, GCP timeouts and transaction deadlock timeouts. You can read more about these parameters in the documentation

Recommendations for Multi-Site Clustering
- Ensure minimal, stable latency;
- Provision the network with sufficient bandwidth for the expected peak load - test with node recovery and system recovery;
- Configure the heartbeat period to ensure a safe margin above latency fluctuations;

- Configure the ConnectivtyCheckPeriod to avoid unnecessary node failures;

- Configure other timeouts accordingly including the GCP timeout, transaction deadlock timeout, and transaction inactivity timeout.

Example
The following is a recommendation of latency and bandwidth requirements for applications with high throughput and fast failure detection requirements:
- latency between remote data nodes must not exceed 20 milliseconds;
- bandwidth of the network link must be more than 1 Gigabit per Second.

For applications that do not require this type of stringent operating environment, latency and bandwidth can be relaxed, subject to the testing recommended above.

As the recommendations demonstrate, there are a number of factors that need to be considered before deploying multi-site clustering. For geo-redundancy, Oracle recommends Geographic Replication, but multi-site clustering does present an alternative deployment, subject to the considerations and constraints discussed above.

You can learn more about scaling web databases with MySQL Cluster from our new Guide.  We look forward to hearing your experiences with the new MySQL Cluster 7.2.1 DMR!


PlanetMySQL Voting: Vote UP / Vote DOWN

Eventual consistency with MySQL

Октябрь 3rd, 2011
tl;dr : New 'automatic' optimistic conflict detection functions available giving the best of both optimistic and pessimistic replication on the same data

MySQL replication supports a number of topologies, and one of the most interesting is an active-active, or master-master topology, where two or more Servers accept read and write traffic, with asynchronous replication between them.

This topology has a number of attractions, including :
  • Potentially higher availability
  • Potentially low impact on read/write latency
  • Service availability insensitive to replication failures
  • Conceptually simple

However, data consistency is hard to maintain in this environment. Data, and access to it, must usually be partitioned or otherwise controlled, so that the consistency of reads is acceptable, and to avoid lost writes or badly merged concurrent writes. Implementing a distributed data access partitioning scheme which can safely handle communication failures is not simple.

Relaxed read consistency

Relaxed read consistency is a fairly well understood concept, with many Master-Slave topologies deployed where some read traffic is routed to the Slave to offload the Master and get 'read scaling'.
Generally this is acceptable as :
  1. A Read-only Slave's state is self-consistent. It is a state which, at least logically, existed on the Master at some time in the past.
  2. The reading application can tolerate some level of read-staleness w.r.t. the most recently committed writes to the Master

A surprisingly large number of applications can manage with a stale view as long as it is self-consistent.

Read-your-writes


Applications requiring 'read your writes' consistency (or session consistency) must either read from the Master, or wait until the Slave has replicated up to at least the point in time where the application's last write committed on the Master before reading from it. It is simpler and less delay-prone to just read from the Master, but this increases the load on the Master, reducing the ability of a system to read-scale. When the Master is unavailable, some sort of failover is required, and therefore, some sort of recovery process is also required.

Partitioned Active-Active/ Balanced Master-Slave

Rather than treating a whole replica as either Master or Slave, we can have each replica be both a Master and a Slave. The partitioning could be on database level, table level, or some function of the rows contained in tables, perhaps some key prefix which maps to application entities. Balancing the Master/Slave role in this way allows the request load to be balanced, reducing issues with a single system providing the Master 'role' having to do more work.

In this configuration, rather than talking about Master and Slave, it makes more sense to talk about some partition of data being 'Active' on one replica, and 'Backup' on the others. Read requests routed to the Active replica will be guaranteed to get the latest state, whereas the Backup replicas can potentially return stale states. Write requests should always be routed to the Active replica to avoid potential races between concurrent writes.

Implementing a partitioned replicated system like this generally requires application knowledge to choose a partitioning scheme where expected transaction footprints align with the partitioning scheme, and cross-partition transactions are rare/non-existant. Additionally, it requires application modification, or a front-end routing mechanism to ensure that requests are correctly routed. The routing system must also be designed to re-route in cases of communication or system failure, to ensure availability, and avoid data divergence. After a failure, recovery must take care to ensure replicas are resynchronised before restoring Active status to partitions in a recovered replica.

Implementing a partitioned replicated system with request routing, failover and recovery is a complex undertaking. Additionally, it can be considered a pessimistic system. For embarassingly parallel applications, with constrained behaviours, most transactions are non-overlapping in their data footprint in space and (reasonable lengths of) time. Enforced routing of requests to a primary replica adds cost and complexity that is most often unnecessary. Is it possible to take a more optimistic approach?

Optimistic Active-Active replication

An optimistic active-active replication system assumes that conflicting operations are rare, and prefers to handle conflicts after they happen, than to make conflicts impossible, by mapping them to delays or overheads all of the time. The one-time cost of recovering from a conflict after it happens may be higher than the one-time cost of preventing a conflict, but this can be a win if conflicts are rare enough.

Practically, optimistic active-active replication involves allowing transactions to execute and commit at all replicas, and asynchronously propagating their effects between replicas. When applying replicated changes to a replica, checks are made to determine whether any conflicts have occurred.

Benefits of optimism include :
  • Local reads - low latency, higher availability
  • Local writes - low latency, higher availability
  • No need to route requests, failover, recover
    Recovery from network failure is the same as for normal async replication - catch up the backlog.

A pessimist is never disappointed, as they always expect the worst, but an optimist is occasionally (often?) disappointed. With active-active replication, this disappointment can include reading stale data, as with relaxed read consistency, or having committed writes later rejected due to a conflict. This is the price of optimism. Not all applications are suited to the slings and arrows inherent in optimism. Some prefer the safety of a pessimistic outlook.

Benefits of pessimism include :
  • Only durable data returned by reads
  • Committed writes are durable
MySQL Cluster replication has supported symmetric optimistic conflict detection functions since the 6.3 release. These provide detection of conflicts for optimistic active-active replication, allowing data to be written on any cluster, and write-write conflicts to be detected for handling. The functions use an application defined comparison value to determine when a conflict has occurred, and optionally, which change should 'win'. This is very flexible, but can be difficult to understand, and requires application and schema changes to be made use of.

When presented with an either-or decision, why not ask for both? Is it possible to have the benefits of both optimistic and pessimistic replication? Can we have them both on the same data at the same time?


Asymmetric optimistic Active-Active replication

I have recently been working on new asymmetric conflict detection functions for MySQL Cluster replication. These functions do not require schema or application modifications. They are asymmetric in that one data replica is regarded as the Active replica. However, unlike a pessimistic partitioned replicated system, writes can be made at Active or Backup replicas - they do not have to be routed to the Active replica. Writes made at the Backup replica will asynchronously propagate to the Active replica and be applied, but only if they do not conflict with writes made concurrently at the Active replica.

Having a first class Active replica and a second class Backup replica may seem like a weakness. However, it allows optimistic and pessimistic replication to be mixed, on the same data for different use-cases.

Where a pessimistic approach is required, requests can be routed to the Active replica. At the Active replica, they will be guaranteed to read durable data, and once committed, writes will not be rejected later.

Where an optimistic approach is acceptable, requests can also be routed to the Backup replica. At the Backup replica, committed writes may later be rejected, and reads may return data which will later be rejected. The potential for disappointment is there, and applications must be able to cope with that, but in return, they can read and write locally, with latency and availability independent of network conditions between replicas.

A well understood application and schema can use pessimistic replication, with request routing, where appropriate, and write-anywhere active-active where the application and schema can cope with the relaxed consistency.

New conflict functions - NDB$EPOCH, NDB$EPOCH_TRANS

The new NDB$EPOCH function implements asymmetic conflict detection, on a row basis. One replica of a table is considered Active (or Primary), and the other(s) are Backup (or Secondary). Writes originating from the Backup replica are checked at the Active replica to ensure that they don't conflict with concurrent writes originating at the Active replica. If they do conflict, then they are rejected, and the Backup is realigned to the Active replica's state. In this way, data divergence is avoided, and the replicated system eventually becomes consistent.

The conflict detection, and realignment to give eventual consistency all occur asynchronously as part of the normal MySQL replication mechanisms.

As with the existing conflict detection functions, an exceptions table can be defined which will be populated with the primary keys of rows which have experienced a conflict. This can be used to take application specific actions when a conflict is detected.

Unlike the existing conflict detection functions, no schema changes or application changes are required. However, as with any optimistic replication system, applications must be able to cope with the relaxed consistency on offer. Applications which cannot cope, can still access the data, but should route their requests to Active replicas only, as with a more traditional pessimistic system.

As these functions build on the existing MySQL Cluster asynchronous replication, the existing features are all available :
  • Slave batching performance optimisations
  • High availability - redundant replication channels
  • Transactional replication and progress tracking
  • Normal MySQL replication features : DDL replication, Binlog, replicate to other engines etc..
Ok, that's long enough for one post - I'll describe NDB$EPOCH_TRANS and its motivations in a follow-up. If you're interested in trying this out, then download the latest versions of MySQL Cluster. If you're interested in the optimistic replication concept in general, I recommend reading Saito and Shapiro's survey.

PlanetMySQL Voting: Vote UP / Vote DOWN

Eventual consistency with MySQL

Октябрь 3rd, 2011
tl;dr : New 'automatic' optimistic conflict detection functions available giving the best of both optimistic and pessimistic replication on the same data

MySQL replication supports a number of topologies, and one of the most interesting is an active-active, or master-master topology, where two or more Servers accept read and write traffic, with asynchronous replication between them.

This topology has a number of attractions, including :
  • Potentially higher availability
  • Potentially low impact on read/write latency
  • Service availability insensitive to replication failures
  • Conceptually simple

However, data consistency is hard to maintain in this environment. Data, and access to it, must usually be partitioned or otherwise controlled, so that the consistency of reads is acceptable, and to avoid lost writes or badly merged concurrent writes. Implementing a distributed data access partitioning scheme which can safely handle communication failures is not simple.

Relaxed read consistency

Relaxed read consistency is a fairly well understood concept, with many Master-Slave topologies deployed where some read traffic is routed to the Slave to offload the Master and get 'read scaling'.
Generally this is acceptable as :
  1. A Read-only Slave's state is self-consistent. It is a state which, at least logically, existed on the Master at some time in the past.
  2. The reading application can tolerate some level of read-staleness w.r.t. the most recently committed writes to the Master

A surprisingly large number of applications can manage with a stale view as long as it is self-consistent.

Read-your-writes


Applications requiring 'read your writes' consistency (or session consistency) must either read from the Master, or wait until the Slave has replicated up to at least the point in time where the application's last write committed on the Master before reading from it. It is simpler and less delay-prone to just read from the Master, but this increases the load on the Master, reducing the ability of a system to read-scale. When the Master is unavailable, some sort of failover is required, and therefore, some sort of recovery process is also required.

Partitioned Active-Active/ Balanced Master-Slave

Rather than treating a whole replica as either Master or Slave, we can have each replica be both a Master and a Slave. The partitioning could be on database level, table level, or some function of the rows contained in tables, perhaps some key prefix which maps to application entities. Balancing the Master/Slave role in this way allows the request load to be balanced, reducing issues with a single system providing the Master 'role' having to do more work.

In this configuration, rather than talking about Master and Slave, it makes more sense to talk about some partition of data being 'Active' on one replica, and 'Backup' on the others. Read requests routed to the Active replica will be guaranteed to get the latest state, whereas the Backup replicas can potentially return stale states. Write requests should always be routed to the Active replica to avoid potential races between concurrent writes.

Implementing a partitioned replicated system like this generally requires application knowledge to choose a partitioning scheme where expected transaction footprints align with the partitioning scheme, and cross-partition transactions are rare/non-existant. Additionally, it requires application modification, or a front-end routing mechanism to ensure that requests are correctly routed. The routing system must also be designed to re-route in cases of communication or system failure, to ensure availability, and avoid data divergence. After a failure, recovery must take care to ensure replicas are resynchronised before restoring Active status to partitions in a recovered replica.

Implementing a partitioned replicated system with request routing, failover and recovery is a complex undertaking. Additionally, it can be considered a pessimistic system. For embarassingly parallel applications, with constrained behaviours, most transactions are non-overlapping in their data footprint in space and (reasonable lengths of) time. Enforced routing of requests to a primary replica adds cost and complexity that is most often unnecessary. Is it possible to take a more optimistic approach?

Optimistic Active-Active replication

An optimistic active-active replication system assumes that conflicting operations are rare, and prefers to handle conflicts after they happen, than to make conflicts impossible, by mapping them to delays or overheads all of the time. The one-time cost of recovering from a conflict after it happens may be higher than the one-time cost of preventing a conflict, but this can be a win if conflicts are rare enough.

Practically, optimistic active-active replication involves allowing transactions to execute and commit at all replicas, and asynchronously propagating their effects between replicas. When applying replicated changes to a replica, checks are made to determine whether any conflicts have occurred.

Benefits of optimism include :
  • Local reads - low latency, higher availability
  • Local writes - low latency, higher availability
  • No need to route requests, failover, recover
    Recovery from network failure is the same as for normal async replication - catch up the backlog.

A pessimist is never disappointed, as they always expect the worst, but an optimist is occasionally (often?) disappointed. With active-active replication, this disappointment can include reading stale data, as with relaxed read consistency, or having committed writes later rejected due to a conflict. This is the price of optimism. Not all applications are suited to the slings and arrows inherent in optimism. Some prefer the safety of a pessimistic outlook.

Benefits of pessimism include :
  • Only durable data returned by reads
  • Committed writes are durable
MySQL Cluster replication has supported symmetric optimistic conflict detection functions since the 6.3 release. These provide detection of conflicts for optimistic active-active replication, allowing data to be written on any cluster, and write-write conflicts to be detected for handling. The functions use an application defined comparison value to determine when a conflict has occurred, and optionally, which change should 'win'. This is very flexible, but can be difficult to understand, and requires application and schema changes to be made use of.

When presented with an either-or decision, why not ask for both? Is it possible to have the benefits of both optimistic and pessimistic replication? Can we have them both on the same data at the same time?


Asymmetric optimistic Active-Active replication

I have recently been working on new asymmetric conflict detection functions for MySQL Cluster replication. These functions do not require schema or application modifications. They are asymmetric in that one data replica is regarded as the Active replica. However, unlike a pessimistic partitioned replicated system, writes can be made at Active or Backup replicas - they do not have to be routed to the Active replica. Writes made at the Backup replica will asynchronously propagate to the Active replica and be applied, but only if they do not conflict with writes made concurrently at the Active replica.

Having a first class Active replica and a second class Backup replica may seem like a weakness. However, it allows optimistic and pessimistic replication to be mixed, on the same data for different use-cases.

Where a pessimistic approach is required, requests can be routed to the Active replica. At the Active replica, they will be guaranteed to read durable data, and once committed, writes will not be rejected later.

Where an optimistic approach is acceptable, requests can also be routed to the Backup replica. At the Backup replica, committed writes may later be rejected, and reads may return data which will later be rejected. The potential for disappointment is there, and applications must be able to cope with that, but in return, they can read and write locally, with latency and availability independent of network conditions between replicas.

A well understood application and schema can use pessimistic replication, with request routing, where appropriate, and write-anywhere active-active where the application and schema can cope with the relaxed consistency.

New conflict functions - NDB$EPOCH, NDB$EPOCH_TRANS

The new NDB$EPOCH function implements asymmetic conflict detection, on a row basis. One replica of a table is considered Active (or Primary), and the other(s) are Backup (or Secondary). Writes originating from the Backup replica are checked at the Active replica to ensure that they don't conflict with concurrent writes originating at the Active replica. If they do conflict, then they are rejected, and the Backup is realigned to the Active replica's state. In this way, data divergence is avoided, and the replicated system eventually becomes consistent.

The conflict detection, and realignment to give eventual consistency all occur asynchronously as part of the normal MySQL replication mechanisms.

As with the existing conflict detection functions, an exceptions table can be defined which will be populated with the primary keys of rows which have experienced a conflict. This can be used to take application specific actions when a conflict is detected.

Unlike the existing conflict detection functions, no schema changes or application changes are required. However, as with any optimistic replication system, applications must be able to cope with the relaxed consistency on offer. Applications which cannot cope, can still access the data, but should route their requests to Active replicas only, as with a more traditional pessimistic system.

As these functions build on the existing MySQL Cluster asynchronous replication, the existing features are all available :
  • Slave batching performance optimisations
  • High availability - redundant replication channels
  • Transactional replication and progress tracking
  • Normal MySQL replication features : DDL replication, Binlog, replicate to other engines etc..
Ok, that's long enough for one post - I'll describe NDB$EPOCH_TRANS and its motivations in a follow-up. If you're interested in trying this out, then download the latest versions of MySQL Cluster. If you're interested in the optimistic replication concept in general, I recommend reading Saito and Shapiro's survey.

PlanetMySQL Voting: Vote UP / Vote DOWN