Archive for the ‘distributed-systems’ Category

Eventual consistency with MySQL

Октябрь 3rd, 2011
tl;dr : New 'automatic' optimistic conflict detection functions available giving the best of both optimistic and pessimistic replication on the same data

MySQL replication supports a number of topologies, and one of the most interesting is an active-active, or master-master topology, where two or more Servers accept read and write traffic, with asynchronous replication between them.

This topology has a number of attractions, including :
  • Potentially higher availability
  • Potentially low impact on read/write latency
  • Service availability insensitive to replication failures
  • Conceptually simple

However, data consistency is hard to maintain in this environment. Data, and access to it, must usually be partitioned or otherwise controlled, so that the consistency of reads is acceptable, and to avoid lost writes or badly merged concurrent writes. Implementing a distributed data access partitioning scheme which can safely handle communication failures is not simple.

Relaxed read consistency

Relaxed read consistency is a fairly well understood concept, with many Master-Slave topologies deployed where some read traffic is routed to the Slave to offload the Master and get 'read scaling'.
Generally this is acceptable as :
  1. A Read-only Slave's state is self-consistent. It is a state which, at least logically, existed on the Master at some time in the past.
  2. The reading application can tolerate some level of read-staleness w.r.t. the most recently committed writes to the Master

A surprisingly large number of applications can manage with a stale view as long as it is self-consistent.

Read-your-writes


Applications requiring 'read your writes' consistency (or session consistency) must either read from the Master, or wait until the Slave has replicated up to at least the point in time where the application's last write committed on the Master before reading from it. It is simpler and less delay-prone to just read from the Master, but this increases the load on the Master, reducing the ability of a system to read-scale. When the Master is unavailable, some sort of failover is required, and therefore, some sort of recovery process is also required.

Partitioned Active-Active/ Balanced Master-Slave

Rather than treating a whole replica as either Master or Slave, we can have each replica be both a Master and a Slave. The partitioning could be on database level, table level, or some function of the rows contained in tables, perhaps some key prefix which maps to application entities. Balancing the Master/Slave role in this way allows the request load to be balanced, reducing issues with a single system providing the Master 'role' having to do more work.

In this configuration, rather than talking about Master and Slave, it makes more sense to talk about some partition of data being 'Active' on one replica, and 'Backup' on the others. Read requests routed to the Active replica will be guaranteed to get the latest state, whereas the Backup replicas can potentially return stale states. Write requests should always be routed to the Active replica to avoid potential races between concurrent writes.

Implementing a partitioned replicated system like this generally requires application knowledge to choose a partitioning scheme where expected transaction footprints align with the partitioning scheme, and cross-partition transactions are rare/non-existant. Additionally, it requires application modification, or a front-end routing mechanism to ensure that requests are correctly routed. The routing system must also be designed to re-route in cases of communication or system failure, to ensure availability, and avoid data divergence. After a failure, recovery must take care to ensure replicas are resynchronised before restoring Active status to partitions in a recovered replica.

Implementing a partitioned replicated system with request routing, failover and recovery is a complex undertaking. Additionally, it can be considered a pessimistic system. For embarassingly parallel applications, with constrained behaviours, most transactions are non-overlapping in their data footprint in space and (reasonable lengths of) time. Enforced routing of requests to a primary replica adds cost and complexity that is most often unnecessary. Is it possible to take a more optimistic approach?

Optimistic Active-Active replication

An optimistic active-active replication system assumes that conflicting operations are rare, and prefers to handle conflicts after they happen, than to make conflicts impossible, by mapping them to delays or overheads all of the time. The one-time cost of recovering from a conflict after it happens may be higher than the one-time cost of preventing a conflict, but this can be a win if conflicts are rare enough.

Practically, optimistic active-active replication involves allowing transactions to execute and commit at all replicas, and asynchronously propagating their effects between replicas. When applying replicated changes to a replica, checks are made to determine whether any conflicts have occurred.

Benefits of optimism include :
  • Local reads - low latency, higher availability
  • Local writes - low latency, higher availability
  • No need to route requests, failover, recover
    Recovery from network failure is the same as for normal async replication - catch up the backlog.

A pessimist is never disappointed, as they always expect the worst, but an optimist is occasionally (often?) disappointed. With active-active replication, this disappointment can include reading stale data, as with relaxed read consistency, or having committed writes later rejected due to a conflict. This is the price of optimism. Not all applications are suited to the slings and arrows inherent in optimism. Some prefer the safety of a pessimistic outlook.

Benefits of pessimism include :
  • Only durable data returned by reads
  • Committed writes are durable
MySQL Cluster replication has supported symmetric optimistic conflict detection functions since the 6.3 release. These provide detection of conflicts for optimistic active-active replication, allowing data to be written on any cluster, and write-write conflicts to be detected for handling. The functions use an application defined comparison value to determine when a conflict has occurred, and optionally, which change should 'win'. This is very flexible, but can be difficult to understand, and requires application and schema changes to be made use of.

When presented with an either-or decision, why not ask for both? Is it possible to have the benefits of both optimistic and pessimistic replication? Can we have them both on the same data at the same time?


Asymmetric optimistic Active-Active replication

I have recently been working on new asymmetric conflict detection functions for MySQL Cluster replication. These functions do not require schema or application modifications. They are asymmetric in that one data replica is regarded as the Active replica. However, unlike a pessimistic partitioned replicated system, writes can be made at Active or Backup replicas - they do not have to be routed to the Active replica. Writes made at the Backup replica will asynchronously propagate to the Active replica and be applied, but only if they do not conflict with writes made concurrently at the Active replica.

Having a first class Active replica and a second class Backup replica may seem like a weakness. However, it allows optimistic and pessimistic replication to be mixed, on the same data for different use-cases.

Where a pessimistic approach is required, requests can be routed to the Active replica. At the Active replica, they will be guaranteed to read durable data, and once committed, writes will not be rejected later.

Where an optimistic approach is acceptable, requests can also be routed to the Backup replica. At the Backup replica, committed writes may later be rejected, and reads may return data which will later be rejected. The potential for disappointment is there, and applications must be able to cope with that, but in return, they can read and write locally, with latency and availability independent of network conditions between replicas.

A well understood application and schema can use pessimistic replication, with request routing, where appropriate, and write-anywhere active-active where the application and schema can cope with the relaxed consistency.

New conflict functions - NDB$EPOCH, NDB$EPOCH_TRANS

The new NDB$EPOCH function implements asymmetic conflict detection, on a row basis. One replica of a table is considered Active (or Primary), and the other(s) are Backup (or Secondary). Writes originating from the Backup replica are checked at the Active replica to ensure that they don't conflict with concurrent writes originating at the Active replica. If they do conflict, then they are rejected, and the Backup is realigned to the Active replica's state. In this way, data divergence is avoided, and the replicated system eventually becomes consistent.

The conflict detection, and realignment to give eventual consistency all occur asynchronously as part of the normal MySQL replication mechanisms.

As with the existing conflict detection functions, an exceptions table can be defined which will be populated with the primary keys of rows which have experienced a conflict. This can be used to take application specific actions when a conflict is detected.

Unlike the existing conflict detection functions, no schema changes or application changes are required. However, as with any optimistic replication system, applications must be able to cope with the relaxed consistency on offer. Applications which cannot cope, can still access the data, but should route their requests to Active replicas only, as with a more traditional pessimistic system.

As these functions build on the existing MySQL Cluster asynchronous replication, the existing features are all available :
  • Slave batching performance optimisations
  • High availability - redundant replication channels
  • Transactional replication and progress tracking
  • Normal MySQL replication features : DDL replication, Binlog, replicate to other engines etc..
Ok, that's long enough for one post - I'll describe NDB$EPOCH_TRANS and its motivations in a follow-up. If you're interested in trying this out, then download the latest versions of MySQL Cluster. If you're interested in the optimistic replication concept in general, I recommend reading Saito and Shapiro's survey.

PlanetMySQL Voting: Vote UP / Vote DOWN

Journey upriver to the dark heart of ha_ndbcluster

Апрель 2nd, 2011
Unlike most other MySQL storage engines, Ndb does not perform all of its work in the MySQLD process. The Ndb table handler maps Storage Engine Api calls onto NdbApi calls, which eventually result in communication with data nodes. In terms of layers, we have SQL -> Handler Api -> NdbApi -> Communication. At each of these layer boundaries, the mapping between operations at the upper layer to operations at the lower layer is non trivial, based on runtime state, statistics, optimisations etc.

The MySQL status variables can be used to understand the behaviour of the MySQL Server in terms of user commands processed, and also how these map to some of the Storage Engine Handler Api calls.

Status variables tracking user commands start with 'Com_'

mysql> show status like 'Com\_%';
+---------------------------+-------+
| Variable_name | Value |
+---------------------------+-------+
| Com_admin_commands | 0 |
| Com_assign_to_keycache | 0 |
| Com_alter_db | 0 |
| Com_alter_db_upgrade | 0 |
| Com_alter_event | 0 |
| Com_alter_function | 0 |
| Com_alter_procedure | 0 |
| Com_alter_server | 0 |
| Com_alter_table | 0 |
| Com_alter_tablespace | 0 |
| Com_analyze | 0 |
| Com_backup_table | 0 |
| Com_begin | 0 |
| Com_binlog | 0 |
| Com_call_procedure | 0 |
| Com_change_db | 1 |
| Com_change_master | 0 |
| Com_check | 0 |
| Com_checksum | 0 |
.........
| Com_stmt_reset | 0 |
| Com_stmt_send_long_data | 0 |
| Com_truncate | 0 |
| Com_uninstall_plugin | 0 |
| Com_unlock_tables | 0 |
| Com_update | 1 |
| Com_update_multi | 0 |
| Com_xa_commit | 0 |
| Com_xa_end | 0 |
| Com_xa_prepare | 0 |
| Com_xa_recover | 0 |
| Com_xa_rollback | 0 |
| Com_xa_start | 0 |
+---------------------------+-------+
144 rows in set (0.01 sec)

mysql>

Status variables tracking Handler (Storage engine) Api calls start with 'Handler_'.

mysql> show status like 'Handler\_%';
+----------------------------+-------+
| Variable_name | Value |
+----------------------------+-------+
| Handler_commit | 1 |
| Handler_delete | 0 |
| Handler_discover | 0 |
| Handler_prepare | 0 |
| Handler_read_first | 0 |
| Handler_read_key | 0 |
| Handler_read_next | 0 |
| Handler_read_prev | 0 |
| Handler_read_rnd | 0 |
| Handler_read_rnd_next | 21 |
| Handler_rollback | 0 |
| Handler_savepoint | 0 |
| Handler_savepoint_rollback | 0 |
| Handler_update | 4 |
| Handler_write | 14 |
+----------------------------+-------+
15 rows in set (0.00 sec)

mysql>

The 'Com_%' and 'Handler_%' variables are maintained by the Server for all storage engines. The server maintains these on a per-session, and global basis. By default the session status is shown, but the GLOBAL keyword shows the global view, aggregated across all sessions.

mysql> show global status like 'Handler\_%';
+----------------------------+--------+
| Variable_name | Value |
+----------------------------+--------+
| Handler_commit | 167 |
| Handler_delete | 494041 |
| Handler_discover | 0 |
| Handler_prepare | 0 |
| Handler_read_first | 3 |
| Handler_read_key | 1 |
| Handler_read_next | 0 |
| Handler_read_prev | 0 |
| Handler_read_rnd | 0 |
| Handler_read_rnd_next | 561132 |
| Handler_rollback | 6 |
| Handler_savepoint | 0 |
| Handler_savepoint_rollback | 0 |
| Handler_update | 24 |
| Handler_write | 43442 |
+----------------------------+--------+
15 rows in set (0.00 sec)

mysql>

The SHOW STATUS command is handy for a quick check, but for more interesting analysis, the INFORMATION_SCHEMA tables SESSION_STATUS and GLOBAL_STATUS contain the same data, and support all SQL queries and views.

mysql> select * from information_schema.session_status where Variable_name like 'Handler\_%';
+----------------------------+----------------+
| VARIABLE_NAME | VARIABLE_VALUE |
+----------------------------+----------------+
| HANDLER_COMMIT | 1 |
| HANDLER_DELETE | 0 |
| HANDLER_DISCOVER | 0 |
| HANDLER_PREPARE | 0 |
| HANDLER_READ_FIRST | 0 |
| HANDLER_READ_KEY | 0 |
| HANDLER_READ_NEXT | 0 |
| HANDLER_READ_PREV | 0 |
| HANDLER_READ_RND | 0 |
| HANDLER_READ_RND_NEXT | 85 |
| HANDLER_ROLLBACK | 0 |
| HANDLER_SAVEPOINT | 0 |
| HANDLER_SAVEPOINT_ROLLBACK | 0 |
| HANDLER_UPDATE | 4 |
| HANDLER_WRITE | 89 |
+----------------------------+----------------+
15 rows in set (0.40 sec)

mysql>

Unfortunately the entries in the information_schema are in shouty 1960s CAPITALS. Also note that some Handler calls are made as part of using the information_schema 'database' to fetch the data. This is Heisenberg's principle in action!

To shine some light into the depths of the Ndb storage engine, a set of new status variables has been added to recent cluster-7.0 and cluster-7.1 releases. These status variables track activity at the NdbApi and data node communication layers of the stack. Currently they are divided into 4 subsets :
  • Global counters
    show status like 'ndb_api_%_count';
  • Session counters
    show status like 'ndb_api_%_session';
  • Slave counters
    show status like 'ndb_api_%_slave';
  • Binlog injector counters
    show status like 'ndb_api_%_injector';
Unfortunately the mysql-5.1 server does not allow Storage Engines to differentiate GLOBAL or SESSION status variables, so the Global and Session specific versions of these variables are differentiated by name, and are visible from both GLOBAL and SESSION views of status, so it doesn't matter which you look at.

Global counters
mysql> show status like 'ndb_api_%_count';
+------------------------------------+------------+
| Variable_name | Value |
+------------------------------------+------------+
| Ndb_api_wait_exec_complete_count | 10 |
| Ndb_api_wait_scan_result_count | 19295 |
| Ndb_api_wait_meta_request_count | 67 |
| Ndb_api_wait_nanos_count | 6361966340 |
| Ndb_api_bytes_sent_count | 415704 |
| Ndb_api_bytes_received_count | 116921552 |
| Ndb_api_trans_start_count | 14 |
| Ndb_api_trans_commit_count | 3 |
| Ndb_api_trans_abort_count | 1 |
| Ndb_api_trans_close_count | 14 |
| Ndb_api_pk_op_count | 6 |
| Ndb_api_uk_op_count | 0 |
| Ndb_api_table_scan_count | 11 |
| Ndb_api_range_scan_count | 0 |
| Ndb_api_pruned_scan_count | 0 |
| Ndb_api_scan_batch_count | 25850 |
| Ndb_api_read_row_count | 103371 |
| Ndb_api_trans_local_read_row_count | 51625 |
| Ndb_api_event_data_count | 0 |
| Ndb_api_event_nondata_count | 0 |
| Ndb_api_event_bytes_count | 0 |
+------------------------------------+------------+
21 rows in set (0.00 sec)

mysql>

These counts are aggregated across all MySQL clients in the Server accessing tables in Ndb, since this MySQL Server was started. Note that this *does not* include accesses by clients of other MySQL Servers, or other NdbApi clients.

Session counters
mysql> show status like 'ndb_api_%_session';
+--------------------------------------------+----------+
| Variable_name | Value |
+--------------------------------------------+----------+
| Ndb_api_wait_exec_complete_count_session | 0 |
| Ndb_api_wait_scan_result_count_session | 38 |
| Ndb_api_wait_meta_request_count_session | 2 |
| Ndb_api_wait_nanos_count_session | 11064398 |
| Ndb_api_bytes_sent_count_session | 872 |
| Ndb_api_bytes_received_count_session | 230764 |
| Ndb_api_trans_start_count_session | 1 |
| Ndb_api_trans_commit_count_session | 0 |
| Ndb_api_trans_abort_count_session | 0 |
| Ndb_api_trans_close_count_session | 1 |
| Ndb_api_pk_op_count_session | 0 |
| Ndb_api_uk_op_count_session | 0 |
| Ndb_api_table_scan_count_session | 1 |
| Ndb_api_range_scan_count_session | 0 |
| Ndb_api_pruned_scan_count_session | 0 |
| Ndb_api_scan_batch_count_session | 51 |
| Ndb_api_read_row_count_session | 204 |
| Ndb_api_trans_local_read_row_count_session | 136 |
+--------------------------------------------+----------+
18 rows in set (0.00 sec)

mysql>

These counts are for the current session (MySQL client connection)

Slave counters
mysql> show status like 'ndb_api_%_slave';
+------------------------------------------+-------+
| Variable_name | Value |
+------------------------------------------+-------+
| Ndb_api_wait_exec_complete_count_slave | 0 |
| Ndb_api_wait_scan_result_count_slave | 0 |
| Ndb_api_wait_meta_request_count_slave | 0 |
| Ndb_api_wait_nanos_count_slave | 0 |
| Ndb_api_bytes_sent_count_slave | 0 |
| Ndb_api_bytes_received_count_slave | 0 |
| Ndb_api_trans_start_count_slave | 0 |
| Ndb_api_trans_commit_count_slave | 0 |
| Ndb_api_trans_abort_count_slave | 0 |
| Ndb_api_trans_close_count_slave | 0 |
| Ndb_api_pk_op_count_slave | 0 |
| Ndb_api_uk_op_count_slave | 0 |
| Ndb_api_table_scan_count_slave | 0 |
| Ndb_api_range_scan_count_slave | 0 |
| Ndb_api_pruned_scan_count_slave | 0 |
| Ndb_api_scan_batch_count_slave | 0 |
| Ndb_api_read_row_count_slave | 0 |
| Ndb_api_trans_local_read_row_count_slave | 0 |
+------------------------------------------+-------+
18 rows in set (0.00 sec)

mysql>

Hopefully you are seeing a pattern here. These counters are the NdbApi operations performed by the Slave SQL thread as part of replicating Binlogs into Ndb tables. These counts will only increase from zero if the MySQLD is acting, or has acted as a Slave, and has accessed tables stored in Ndb.

Binlog Injector counters
mysql> show status like 'ndb_api_%_injector';
+--------------------------------------+-------+
| Variable_name | Value |
+--------------------------------------+-------+
| Ndb_api_event_data_count_injector | 0 |
| Ndb_api_event_nondata_count_injector | 0 |
| Ndb_api_event_bytes_count_injector | 0 |
+--------------------------------------+-------+
3 rows in set (0.01 sec)

mysql>

These counts track the data change events received by the Ndb Binlog Injector thread. The Binlog Injector is responsible for recording Cluster changes in the Binlog, but even when Binlogs are not being written, it receives events related to schema changes and other system management functions, so these counts can be non zero on Servers which are not writing a Binlog.

Counter definitions
As you've hopefully noticed, there is naming overlap between each set of status counters. The same events are being counted, and recorded globally, per-session, against the Slave SQL thread and against the Ndb Binlog injector thread.

So what do these different counts actually mean?

Ndb_api_wait_exec_complete_count[_session|_slave]

The number of times a user thread has blocked waiting for some batch of primary key, or secondary unique hash key operations to complete. From the point of view of the MySQL Server, this is idle time, waiting for data nodes to send some response. An alternative name for it could be 'round trip count', and minimising it through operation batching is a good way to reduce response time and increase throughput.

Ndb_api_wait_scan_result_count[_session|_slave]

The number of times a user thread has blocked waiting for some scan operation to complete. It could be waiting for a batch of scan results, or waiting for an acknowledgement of a scan close. In any case, it indicates time spent waiting on communication related to scan processing.

Ndb_api_wait_meta_request_count[_session|_slave]

The number of times a user thread has blocked waiting for some metadata operation to complete. This is quite a catch-all term, which can include DDL (Create/Drop table etc), and some transaction initialisation steps.

Ndb_api_wait_nanos_count[_session|_slave]

The number of nanoseconds a user thread has blocked in one of the three scenarios above. This is kind of an 'IO_WAIT' time for Ndb operations. It tracks how long the thread was blocked waiting for the data nodes to complete their operations. The resolution is nanoseconds, but this requires support from the operating system. On operating systems with lower resolution, this count will be coarser, and some operations may complete with zero observed wait time.

Ndb_api_bytes_sent_count[_session|_slave]

The number of bytes sent to the Ndb data nodes. This includes all request types, rows inserted etc. It does not include regular heartbeating as that generally adds too much noise to make the counters useful.

Ndb_api_bytes_received_count[_session|_slave]

The number of bytes received from the Ndb data nodes. This includes all request types, rows read etc. It does not include regular heartbeating as that generally adds too much noise to make the counters useful.

Ndb_api_trans_start_count[_session|_slave]

The number of NdbApi transactions started. Note that NdbApi transactions are not always immediately on a BEGIN statement as an optimisation.

Ndb_api_trans_commit_count[_session|_slave]

The NdbApi transactions which have been explicitly committed. Not all transactions started are committed or aborted, some are started, and then closed.

Ndb_api_trans_abort_count[_session|_slave]

The NdbApi transactions which have been explicitly aborted.

Ndb_api_trans_close_count[_session|_slave]

The NdbApi transactions which have been closed. It should closely track the number which have been started.

Ndb_api_pk_op_count[_session|_slave]

The number of Primary Key (pk) operations which have been executed. Eack pk operation affects zero or one rows. This includes read, insert, update, write, delete. Note that operations on tables with Blobs can also generate pk and uk operations.

Ndb_api_uk_op_count[_session|_slave]

The number of Unique key (uk) operations which have been executed. Each uk operation affects zero or one rows. This includes read, update, write, delete by unique key.

Ndb_api_table_scan_count[_session|_slave]

The number of table scans which have been started. Table scans can have pushed-down filters, so although they must access all data in a table, they might not return it all to the MySQL Server. Also, started scans may be stopped before all table data is accessed.

Ndb_api_range_scan_count[_session|_slave]

The number of range (Ordered Index) scans which have been started. Range scans take bounds and pushed-down filters, so may access and/or return zero to all rows to the MySQL Server.

Ndb_api_pruned_scan_count[_session|_slave]

The number of scans which have been successfully pruned to one partition of the scanned table/index. See my previous Blog entries for details about scan partition pruning.

Ndb_api_scan_batch_count[_session|_slave]

The number of batches of rows returned to the MySQL Server from scans. Each scanned table/index fragment returns matching rows in batches, whose size is controlled by the batchsize parameters. NdbApi handles one batch from each fragment at a time. Fetching the next batch from a fragment requires a round-trip to the data nodes, although multiple fragments can be asked for their next batch in one trip. Minimising the Ndb_api_scan_batch_count, by increasing batchsize and improving scan selectivity can improve throughput, latency and efficiency.

Ndb_api_read_row_count[_session|_slave]

The number of rows returned to the MySQL Server from Primary Key read, Unique Key reads and Table and Index scans.

Ndb_api_trans_local_read_row_count[_session|_slave]

The number of rows returned to the MySQL Server from the same data node where the reading transaction has its Transaction Coordinator (TC). Where the TC and the data read reside on the same data node, one hop in the data reading control protocols is avoided. This is the goal of transaction hinting and distribution awareness, and its effectiveness can be checked by comparing Ndb_api_trans_local_read_row_count to Ndb_api_trans_read_row_count. The higher the proportion of local reads, the better.

Ndb_api_event_data_count[_injector]

The number of data change events (row insert, delete, update notifications) received from the data nodes. On a Binlogging MySQL Server, this count can give a measure of the rate of data change in a cluster in terms of rows/second.

Ndb_api_event_nondata_count[_injector]

The number of non-data events (table alter/drop notifications etc) received from the data nodes.

Ndb_api_event_bytes_count[_injector]

The total number of bytes of event data (data and nondata events) received from the data nodes. This gives another measure of the Cluster change rate in terms of bytes/second.

At NdbApi level
As the names suggest, all of these counters are reflecting things happening in the NdbApi implementation. The data collection is built into NdbApi, and is therefore also available to any NdbApi client, not just the MySQL Server.

Using the counters
During implementation of these counters, I found it easiest to create a temporary table to store 'base' values for the counters, and define a view containing the difference between the current values and the base values. This made it easy to see the effect on the counters of various different SQL statements, slave operations etc.

Setting up a baseline and a view
Please excuse my schoolboy SQL :

mysql> create table test.counts_base(variable_name varchar(255) primary key, variable_value bigint);
Query OK, 0 rows affected (0.04 sec)

mysql> replace into test.counts_base (variable_name, variable_value) select * from information_schema.session_status where variable_name like 'ndb_api%';
Query OK, 60 rows affected (0.01 sec)
Records: 60 Duplicates: 0 Warnings: 0

mysql> create view test.counts_diff as select test.counts_base.variable_name, information_schema.session_status.variable_value - test.counts_base.variable_value as diff from test.counts_base, information_schema.session_status where test.counts_base.variable_name = information_schema.session_status.variable_name and (information_schema.session_status.variable_value - test.counts_base.variable_value) > 0;
Query OK, 0 rows affected (0.05 sec)

mysql> select * from test.counts_diff where variable_name like 'ndb_api%_session';
Empty set (0.05 sec)

mysql>


Looking at the effects of a SQL statement
First check that the baseline is ok :

mysql> select * from test.counts_diff where variable_name like 'ndb_api%_session';
Empty set (0.05 sec)

Now run the statement :

mysql> select count(1) from demo_table;
+----------+
| count(1) |
+----------+
| 103343 |
+----------+
1 row in set (0.00 sec)

Now look at the difference :

mysql> select * from test.counts_diff where variable_name like 'ndb_api%_session';
+--------------------------------------------+--------+
| variable_name | diff |
+--------------------------------------------+--------+
| NDB_API_WAIT_SCAN_RESULT_COUNT_SESSION | 4 |
| NDB_API_WAIT_META_REQUEST_COUNT_SESSION | 1 |
| NDB_API_WAIT_NANOS_COUNT_SESSION | 619143 |
| NDB_API_BYTES_SENT_COUNT_SESSION | 116 |
| NDB_API_BYTES_RECEIVED_COUNT_SESSION | 268 |
| NDB_API_TRANS_START_COUNT_SESSION | 1 |
| NDB_API_TRANS_CLOSE_COUNT_SESSION | 1 |
| NDB_API_TABLE_SCAN_COUNT_SESSION | 1 |
| NDB_API_SCAN_BATCH_COUNT_SESSION | 2 |
| NDB_API_READ_ROW_COUNT_SESSION | 2 |
| NDB_API_TRANS_LOCAL_READ_ROW_COUNT_SESSION | 1 |
+--------------------------------------------+--------+
11 rows in set (0.05 sec)

mysql>

The select count(1) statement blocked 4 times on scan results, sent 116 bytes and received 268 bytes of data. Two batches of rows were received, and two rows were received in total. One of these rows was from the same node as the transaction's transaction coordinator.
This indicates that select count(1) is optimised in the Ndb handler !

Let's try a more tricky select count. First we must reset the baseline to get a 'clean' difference.

mysql> replace into test.counts_base (variable_name, variable_value) select * from information_schema.session_status where variable_name like 'ndb_api%';
Query OK, 120 rows affected (0.01 sec)
Records: 60 Duplicates: 60 Warnings: 0

mysql> select * from test.counts_diff where variable_name like 'ndb_api%_session';
Empty set (0.05 sec)

mysql> select count(length(string_value) > 10) from demo_table;
+----------------------------------+
| count(length(string_value) > 10) |
+----------------------------------+
| 103343 |
+----------------------------------+
1 row in set (6.77 sec)

mysql> select * from test.counts_diff where variable_name like 'ndb_api%_session';
+--------------------------------------------+------------+
| variable_name | diff |
+--------------------------------------------+------------+
| NDB_API_WAIT_SCAN_RESULT_COUNT_SESSION | 21139 |
| NDB_API_WAIT_NANOS_COUNT_SESSION | 5322402052 |
| NDB_API_BYTES_SENT_COUNT_SESSION | 441628 |
| NDB_API_BYTES_RECEIVED_COUNT_SESSION | 109026900 |
| NDB_API_TRANS_START_COUNT_SESSION | 1 |
| NDB_API_TRANS_CLOSE_COUNT_SESSION | 1 |
| NDB_API_TABLE_SCAN_COUNT_SESSION | 1 |
| NDB_API_SCAN_BATCH_COUNT_SESSION | 25836 |
| NDB_API_READ_ROW_COUNT_SESSION | 103343 |
| NDB_API_TRANS_LOCAL_READ_ROW_COUNT_SESSION | 51735 |
+--------------------------------------------+------------+
10 rows in set (0.05 sec)

mysql>

This select count waited for scan results ~21 thousand times, sent ~440kB of data, and received ~109MB of data from the data nodes. 103,343 rows were read in ~25 thousand scan batches, and roughly half came from the same node as the tranaction coordinator. 5.3 seconds of the 6.77 second runtime were spent waiting for responses from the data nodes.

Other uses
Hopefully this gives some notion of the possibilities with these new counters. Some other ideas :
  • Debug slow queries
  • Optimise data distribution and table partitioning
  • Get real post-execution costs for queries, DML etc.
  • Understand how data transfer for Blobs is batched
  • Check bulk insert/deletes are functioning efficiently
  • Verify Ndb slave batching is in operation
  • Draw cool graphs

PlanetMySQL Voting: Vote UP / Vote DOWN

Data distribution in MySQL Cluster

Март 26th, 2011
MySQL Cluster distributes rows amongst the data nodes in a cluster, and also provides data replication. How does this work? What are the trade offs?

Table fragments

Tables are 'horizontally fragmented' into table fragments each containing a disjoint subset of the rows of the table. The union of rows in all table fragments is the set of rows in the table. Rows are always identified by their primary key. Tables with no primary key are given a hidden primary key by MySQLD.

By default, one table fragment is created for each data node in the cluster at the time the table is created.

Node groups and Fragment replicas

The data nodes in a cluster are logically divided into Node groups. The size of each Node group is controlled by the NoOfReplicas parameter. All data nodes in a Node group store the same data. In other words, where the NoOfReplicas parameter is two or greater, each table fragment has a number of replicas, stored on multiple separate data nodes in the same nodegroup for availability.

One replica of each fragment is considered primary, and the other(s) are considered backup replicas. Normally, each node contains a mix of primary and backup fragments for every table, which encourages system balance.

Which replica to use?

The primary fragment replica is used to serialise locking between transactions concurrently accessing the same row. Write operations update all fragment replicas synchronously, ensuring no committed data loss on node failure. Read operations normally access the primary fragment replica, ensuring consistency. Reads with a special lock mode can access the backup fragment replicas.

Primary key read protocol

When an NdbApi client (for example a MySQLD process) wants to read a row by primary key, it sends a read request to a data node acting as a Transaction Coordinator (TC).
The TC node will determine which fragment the row would be stored in from the primary key, decide which replica to access (usually the primary), and send a read request to the data node containing that fragment replica. The data node containing the fragment replica then sends the row's data (if present) directly back to the requesting NdbApi client, and also sends a read acknowledgement or failure notification back to the TC node, which also propagates it back to the NdbApi client.

Minimising inter data node hops

The 'critical path' for this protocol in terms of potential inter-data-node hops is four hops :

Client -> TC -> Fragment -> TC -> Client

To minimise remote client experienced latency, ideally two inter-node hops can be avoided by having the TC node and the Fragment replica(s) on the same node. This requires controlling the choice of node for TC based on the primary key of the data which will be read. Where a transaction only reads rows stored on the same node as its TC, this can improve latency and system efficiency.

Distribution awareness

From NdbApi, users can specify a table and key when starting a transaction. The transaction will then choose a TC data node based on where the corresponding row's primary fragment replica is located in the system. This mechanism is sometimes referred to as 'transaction hinting'.

The Ndb handler in MySQLD generally waits for the first primary key lookup in a user session before starting an NdbApi transaction, so that it can choose a TC node based on this. This is a best-effort attempt at having the data node acting as TC colocated with the accessed data. This feature is usually referred to as 'Distribution Awareness'.

Write operations also benefit from distribution awareness, but not to the same extent in systems with NoOfReplicas > 1. Write operations must update all fragment replicas, which must be stored on different nodes, in the same nodegroup, so for NoOfReplicas > 1, distribution awareness avoids inter-node-group communication, and some intra-node-group communication, but some inter-data-node communication is always required. In a system with good data partitioning and distribution awareness, most read transactions will access only one data node, and write transactions will result in messaging between the data nodes of a single node group. Messaging between node groups will be minimal.

Distribution keys

By default, the whole of a table's primary key is used to determine which fragment replica will store a row. However, any subset of the columns in the primary key can be used. The key columns used to determine the row distribution are called the 'distribution key'.

Where a table's primary key contains only one column, the distribution key must be the full primary key. Where the primary key has more than one column, the distribution key can be different to (a subset of) the primary key.

From MySQLD, a distribution key can be set using the normal PARTITION BY KEY() syntax. The effect of using a distribution key which is a subset of the primary key is that rows with different primary key values, but the same distribution key values are guaranteed to be stored in the same table fragment.

For example, if we create a table :

CREATE TABLE user_accounts (user_id BIGINT,
account_type VARCHAR(255),
username VARCHAR(60),
state INT,
PRIMARY KEY (user_id, account_type))
engine = ndb partition by key (user_id);


Then insert some rows :


INSERT INTO user_accounts VALUES (22, "Twitter", "Bader", 2),
(22, "Facebook", "Bd77", 2),
(22, "Flickr", "BadB", 3),
(23, "Facebook", "JJ", 2);


Then we know that all rows with the same value(s) for the distribution key (user_id), will be stored on the same fragment. If we know that individual transactions are likely to access rows with the same distribution key value then this will increase the effectiveness of distribution awareness. Many schemas are 'partitionable' like this, though not all.

Note that partitioning is a performance hint in Ndb - correctness is not affected in any way, and transactions can always span table fragments on the same or different data nodes. This allows applications to take advantage of the performance advantages of distribution awareness without requiring that all transactions affect only one node etc as required by simpler 'sharding' mechanisms.

Correlated distribution keys across tables

A further guarantee from Ndb is that two tables with the same number of fragments, and the same number and type of distribution keys will have rows distributed in the same way.

For example, if we add another table :


CREATE TABLE user_prefs (user_id BIGINT,
type VARCHAR(60),
value VARCHAR(255),
PRIMARY KEY (user_id, type))
engine = ndb partition by key (user_id);


Then insert some rows :


INSERT INTO user_prefs VALUES (22, "Coffee", "Milk + 6 sugars"),
(22, "Eggs", "Over easy"),
(23, "Custard", "With skin");


Then we know that the rows with the same user_id in the user_prefs and user_accounts tables will be stored on the same data node. Again, this helps with distribution awareness. In this example, we are ensuring that rows related to a single user, as identified by a common user_id, will be located on one data node, maximising system efficiency, and minimising latency.

Ordered index scan pruning

MySQL Cluster supports arbitrary ordered indexes. Ordered indexes are defined on one or more columns and support range scan operations. Range scans are defined by supplying optional lower and upper bounds. All rows between these bounds are returned.

Each Ndb ordered index is implemented as a number of in memory tree structures (index fragments), distributed with the fragments of the indexed table. Each index fragment contains the index entries for the local table fragment. Having ordered indexes local to the table fragments makes index maintenance more efficient, but means that there may not be much locality to exploit when scanning as rows in a range may be spread across all index fragments of an index.

The only case where an ordered index scan does not require to scan all index fragments is where it is known that all rows in the range will be found in one table fragment.
This is the case where both :
  1. The ordered index has all of the table's distribution keys as a prefix
  2. The range is contained within one value of the table's distribution keys

NdbApi detects this case when a range scan is defined, and 'prunes' the scan to one index fragment (and therefore one data node). For all other cases, all index fragments must be scanned.

Continuing the example above, assuming an ordered index on the primary key, the following ordered index scans can be pruned :


SELECT * FROM user_accounts WHERE user_id = 22;
SELECT * FROM user_accounts WHERE user_id = 22 AND account_type LIKE 'F%';


However, the following ordered index scans cannot be pruned, as matching rows are not guaranteed to be stored in one table fragment :


SELECT * FROM user_accounts WHERE account_type = "Facebook";
SELECT * FROM user_accounts WHERE user_id > 20 AND user_id <>


MySQLD partitioning variants and manually controlling distribution

Since MySQL 5.1, table partitioning has been supported. Tables can be partitioned based on functions of the distribution keys such as :

  • KEY
  • LINEAR KEY
  • HASH
  • RANGE
  • LIST

For engines other than Ndb, partitioning is implemented in the Server, with each partition implemented as a separate table in the Storage engine. Ndb implements these partition functions natively, using them to control data distribution across table fragments in a single table.

From Ndb's point of view, KEY and LINEAR KEY are native partitioning functions. Ndb knows how to determine which table fragment to use for a row from a table's distribution key, based on an MD5 hash of the distribution key.

HASH, RANGE and LIST are not natively supported by Ndb. When accessing tables defined using these functions, MySQLD must supply information to NdbApi to indicate which fragments to access. For example before primary key insert, update, delete and read operations, the table fragment to perform the operation on must be supplied. From MySQLD, the partitioning layer supplies this information.

Any NdbApi application can use the same mechanisms to manually control data distribution across table fragments. At the NdbApi level this is referred to as 'User Defined' partitioning. This feature is rarely used. One downside of using User Defined partitioning is that online data redistribution is not supported. I'll discuss Online data redistribution in a future post here.

PlanetMySQL Voting: Vote UP / Vote DOWN

Some MySQL projects I think are cool — Spider Storage Engine

Сентябрь 27th, 2010
One thing that has puzzled me about MySQL Server is that it became famous for sharded scale-out deployments in well known web sites and yet has no visible support for such deployments. The MySQL killer feature for some time has been built-in asynchronous replication and gigabytes of blogs have been written about how to setup, use, debug and optimise replication, but when it comes to 'sharding' there is nothing built in. Perhaps to have attempted to implement something would have artificially constrained user's imaginations, whereas having no support at all has allowed 1,000 solutions to sprout? Perhaps there just wasn't MySQL developer bandwidth available, or perhaps it just wasn't the best use of the available time. In any case, it remains unclaimed territory to this day.

On first hearing of the Federated storage engine some years ago, I mistakenly assumed that this could be the basis of some MySQL scale-out solution. Perhaps a layer of front end 'proxy' MySQLDs could federate tables from a layer of backend MySQLDs giving some level of distribution transparency to sharded data. However as I found out, the Federated engine was not designed with such a scenario in mind. It has a certain internal elegance and simplicity, but unfortunately it is a little too simple for anything other than light duties.

The Spider storage engine extends the Federated concept of a table definition being a 'link' to a table on a remote MySQL server. However, it also integrates with the table partitioning features of MySQL 5.1, allowing each partition of a table to be specified as a 'link' to a table on a remote MySQL server. This effectively allows the built-in partitioning mechanisms of MySQLD (PARTITION BY RANGE/LIST/HASH) to be used to shard/partition rows across multiple MySQL servers transparently.

One of the major drawbacks of the Federated engine was that it had very little support for 'pushing conditions' to the MySQLD instance storing the source tables. This meant that well behaved selective queries issued on the 'front-end' MySQLD instance could result in non-selective queries being issued to the 'back-end' MySQLD instances, and large volumes of data being unnecessarily transferred back to the 'front-end' MySQLD where query processing then discarded it.

Spider attempts to improve this situation by pushing conditions down to the MySQLDs containing the source data. Combined with the partition pruning available from the MySQLD partitioning engine this should significantly reduce the amount of redundant data transferred in some cases.

So I think Spider is a pretty cool project. Like MySQL Cluster, it bears the burden of making MySQLD more data-distribution-aware and I think they're doing great work. It'd be great to hear stories about how Spider is being used, especially if anyone is using it *with* MySQL Cluster.

PlanetMySQL Voting: Vote UP / Vote DOWN

ACID tradeoffs, modularity, plugins, Drizzle

Март 25th, 2010
Most software people are aware of the ACID acronym coined by Jim Gray. With the growth of the web and open source, the scaling and complexity constraints imposed on DBMS implementations supporting ACID are more visible, and new (or at least new terms for known) compromises and tradeoffs are being discussed widely. The better known NoSQL systems are giving insight by example into particular choices of tradeoffs.

Working at MySQL, I have often been surprised at the variety of potential alternatives when implementing a DBMS, and the number of applications which don't need the full set of ACID letters in the strictest form. The original MySQL storage engine, MyISAM is one of the first and most successful examples of an 'ACID remix'. The people drawn to DBMS development work often have a perfectionist streak, which can cause them to tend to prefer 'nothing' over 'imperfect'. MyISAM was and still is a flag-bearer for 'good enough'. Perhaps we should be less modest and call it 'more than good enough'.

One seldom discussed benefit of MySQL's storage engine architecture is that pressure to make 'The One True Storage Engine' is reduced. DBMS products with one fixed database engine need to optimise for all supported use cases. This is a great engineering challenge, but increases design effort, requirements for configuration and auto-tuning, constraints on any design change or reoptimisation etc. With MySQL, there are multiple existing storage engines, each with a (sub)set of target use-cases in mind. A single MySQL server can maintain and access tables in different storage engines, each tuned as closely as possible to the use-case for the data, without adding complexity to unrelated engines. Engines can be wildly optimised for a narrow use case as there are plausible alternative engines available for other use cases.

I understand that one aim of the Drizzle project is to extend the modularity of the MySQL Server on multiple axes, allowing diversity to flourish. As a one-time Java coder, who enjoyed the pleasures of design-by-interface, I can see the attraction. While the effort is guided by an actual need for modularity and real examples of alternative plugins, it can be a great force multiplier. There is always the risk of modularity for its own sake - a branch of Architecture Astronautics. Sure symptoms, which I may have suffered from in the past, include the class names FactoryFactory..., PolicyPolicy, or [Anything]Broker).

Another good vibe from Drizzle is the microkernel concept, although would say that there's some terminological abuse occurring here! Perhaps it could more reasonably be said that MySQL has a TeraKernel and Drizzle has a MegaKernel? In any case the motivations are good. Decoupling the huge chunks of functionality glued together inside MySQLD is great for long term software integrity, understanding dependencies, finding (and introducing) bugs, and might make it easier to start adding functionality again. Replication seems especially ripe ground for alternative plugins. User authentication is another often requested 'chunk'. It will take longer to crystalise interfaces for more deeply embedded areas like the query Optimizer/Executor, but if these interfaces are arising from a real need then that can drive the API design.

One aspect of storage engine modularity that is not often mentioned is that some MySQL storage engines also moonlight with other products. The Berkeley database (BDB) is probably the oldest and most promiscuous, embedded in DNS daemons, LDAP servers and all sorts of other places. Ndb is unusual in that it can be used from separate MySQLD and other NdbApi processes at the same time. InnoDB has also recently added an embedded variant. This trend will accelerate, especially when some of the distributed NoSQL systems start supporting 'pluggable local storage' APIs. I imagine that a NoSQL local storage engine API could be somewhat simpler to implement than the MySQL SE API, at least to start with!

PlanetMySQL Voting: Vote UP / Vote DOWN

Ndb software architecture

Сентябрь 29th, 2009
I'm sure that someone else can describe the actual history of Ndb development much better, but here's my limited and vague understanding.

  • Ndb is developed in an environment (Ericsson AXE telecoms switch) where Ericsson's PLEX is the language of choice
    PLEX supports multiple state machines (known as blocks) sending messages (known as signals) between them with some system-level conventions for starting up, restart and message classes. Blocks maintain internal state and define signal handling routines for different signal types. Very little abstraction within a block beyond subroutines is supported. (I'd love to hear some more detail on PLEX and how it has evolved). This architecture maps directly to the AXE processor design (APZ) which is unusual in having signal buffers implemented directly in silicon rather than software. This hard-coding drove Ndb's initial max supported signal size of 25 x 32-bit words.
  • An emulated PLEX environment (VM) is made available on Unix systems, written in C++
    The VM runs as a Unix process. PLEX code for blocks is interpreted. Signals are routed between blocks by the VM. This allows development and deployment of PLEX based systems on standard Unix systems. It also allows Plex based systems to easily interact with Unix software. Each VM instance is a single threaded process routing incoming signals to the signal handling functions in each block class.
  • A PLEX to C++ translation system is designed
    Blocks are mapped to large C++ classes with signal handling methods and per-block global state mapped to member variables. The limited labelling and abstraction encoded in the PLEX source are mapped to C style code within C++ classes.
  • The VM environment is 'branched' from the original PLEX/AXE environment and starts to evolve independently as a base for Ndb.
    It offers access to more OS services such as communication, disk IO etc. Plex interpretation functionality is removed as all relevant Plex code has been mapped to native C++. VM instances can communicate with each other over various channels and form a distributed system.
  • At some point in the timeline around here the Ndb team and product leave Ericsson
  • Over time, common block functionality is abstracted into base and utility classes.
    Hardware and system-convention sourced constraints are eased, the level of abstraction is raised. New blocks are designed and implemented without a Plex heritage making use of C++ abstraction facilities. Existing blocks are refactored.
  • Multi-threaded Ndbd (ndbmtd) is introduced, with groups of block instances running on different threads.
    Rather than being a radical design, it's a move back towards the original PLEX design point of 1 block instance per processor.

Today, Ndb executes a blocks communicating via signals model. Signals are no longer limited to 25 words. In single threaded Ndb (ndbd), all blocks share a single thread, with separate threads used for inter-VM communication setup and disk IO. In multi threaded Ndb (ndbmtd), block instances are grouped, and different functional groups share threads. In all cases, each block instance remains single-threaded, although the thread may be shared with other blocks.

The blocks and signals model is reminiscent of Erlang and Hoare's CSP – where concurrency is modelled as serial (or sequential) processes communicating with explicit messages, as opposed to a shared-memory model where communication occurs via memory with correctness controlled by locks, memory barriers and atomic instructions. It can also be considered similar to MPI and the Active object / Actor model.

Using explicit messaging for synchronisation/communication has costs – at runtime a given algorithm may require more data copying. At design time, potential concurrency must be explicitly designed-in with messaging and state changes. Mapping sequential algorithms to message passing state machines may require a bigger code transformation than mapping to a naive multithread safe shared-memory and locks implementation.

However I believe that these costs are generally paid off by the benefit of improved code clarity. Inter state-machine synchronisation becomes clearly visible, making synchronisation costs easier to visualise and understand. With explicit messaging as the main mechanism for inter-thread and inter-process communication, there is only a small kernel of multithreaded code to be implemented, proved correct and optimised. The bulk of the code can be implemented in single threaded style. There is no need for diverse libraries of multithread-optimised data structures. Processor and system architecture specific code and tradeoffs are minimised.

Internally, Ndb's VM supports only asynchronous messages between blocks. Using an asynchonous message passing style has many benefits. As the sending thread does not block awaiting a response to a message sent, it can work on other jobs, perhaps including the message just sent. This allows it to make the best use of warm instruction and data caches, reduces voluntary context switches and can reduce the likelihood of deadlock. Blocking IO (network, disk) is outsourced to a pool of threads. The signal processing thread(s) never block, except when no signals are available to process. The responsiveness of the system can be ensured by using prioritised job queues to determine the job to execute next and minimising the time spent processing individual jobs. From a formal point of view the number of possible multithreaded interactions is vastly reduced as thread-interleaving is only significant at signal processing boundaries. These limitations can make it easier to reason about the correctness and timing properties of the system.

However, coding in this asynchronous, event-driven style can be demanding. Any blocking operations (disk access, blocking communications, requests to other threads or processes etc.) must be implemented as an asynchronous request and response pair. This style can have an abstraction-dissolving property as many published data structures and algorithms are implemented assuming a synchronous model and making much use of the caller's stack for state storage and managing control flow. It can be difficult to design abstractions for the asynchronous style which don't leak so much messy detail as to be pointless. Additionally, the asynchronous style tends to flatten a system – as the need to return control to the lowest-level call point whenever concurrency is possible acts as a force against deep layers of abstraction. Side effects of this can include a tendency for error handling code to be non-localised to the source of the error. However, that is part of the charm of working on the system. The C++ environment gives a wide set of tools for designing such abstractions, and each improvement made simplifies future work.

Comments, corrections?

PlanetMySQL Voting: Vote UP / Vote DOWN

MySQL Cluster development

Сентябрь 10th, 2009
MySQL Cluster is the name given to one or more MySQL Server processes, connected to an Ndb Cluster database. From the point of view of the MySQL Server processes, the Ndb Cluster is a Storage Engine, implementing transactional storage of tables containing rows. From the point of view of the Ndb Cluster database, the MySQL Server processes are API nodes, performing DDL and DML transactions on tables stored in the cluster. Both exist independently – Ndb Cluster can be used without attached MySQL Server processes, but almost all users of Ndb Cluster connect at least one MySQL Server for DDL and administration.

Ndb stands for Network DataBase. This is a telecoms phrase where Network usually refers to a fixed or wireless telephone network, rather than the database topology definition of the term. Ndb was originally designed as a platform for implementing databases required to operate telecoms networks - HLR, VLR, Number Portability, Fraud Detection etc. At the time Ndb was first designed, Network Databases were generally implemented in-house on exotic 'switch' hardware by telecoms equipment vendors, often with hard-coded schemas and very inflexible query capabilities. These databases were expensive to develop and maintain, but had superb reliability and exceptional performance on minimal spec. hardware. The aim of the original Ndb design was to couple these desirable properties with more general purpose database functionality and deliver the result on a more standard hardware and OS stack.

I first discovered Ndb Cluster around 2001, when looking at potential designs for the next generation of an existing HLR database. I read the paper by Mikael Ronström in Ericsson Review (No 4,1997) which gives a good overview of the Ndb functionality. This paper describes functionality in the current tense when in fact some of the features described are yet to be implemented in 2009! This sort of optimism and vision has helped Ndb to survive and thrive over the years. The Ericsson Review paper was written while Ndb was one of multiple telecoms-database projects at Ericsson. Since then the Ndb product and team were spun out as a separate company, before being sold to MySQL AB in 2003 as a result of the dot com affair.

Ndb was originally designed for :
  • High throughput – sustaining tens to hundreds of thousands of transactions per second
  • Low latency – bounded transactions latencies which can be reliably factored into end-to-end latency budgets, implying main-memory storage
  • High update to read ratio – 50/50 as the norm
  • Transactional properties : Atomicity, Consistency, Isolation, Durability
  • Fault tolerance + HA – No single point of failure, automatic failover and recovery with minimal user or application involvement. Online upgrade. N-way synchronous and asynchronous replication. Fail-fast fault isolation.
  • Persistence – disk checkpointing and logging with automated recovery
  • Scalability – Parallel query execution. Distributed system can utilise > 1 system's resources. Capacity can be expanded horizontally with extra systems.

In the original Ndb design, high volume low latency transactions are submitted directly to the cluster using simple access primitives on the client. More complex queries are submitted to a separate query processor which itself uses combinations of the simpler primitives to access the cluster. An early example of a higher-level query processor was created by Martin Sköld who extended an Object Oriented query processor to create 'QDB' which could perform queries against data stored in Ndb. Numerous high level front-end processors have been implemented since.

Using MySQLD as a higher-level query processing front end we come to the architecture of MySQL Cluster, with MySQLD providing SQL based access to data stored in the cluster. In this sense MySQLD and Ndb cluster are a perfect fit and were designed for each other before they first met! Despite MySQLD being the default and most prominent front end to Ndb cluster, a number of others exist including several open and closed-source LDAP servers (OpenLDAP, OpenDS), several Java APIs and an Apache module giving HTTP access to data stored in Ndb.

The separation of low level, simple, fast access and higher level, more flexible access allows MySQL Cluster to offer many benefits of a full RDBMS without always incurring the drawback of over-generality. This fits well with many large transaction processing systems, where most heavy transaction processing does not require the full flexibility of the RDBMS, but some less frequent analysis does. Separating the central database engine (which in Ndb is referred to as the kernel ) from the query processing layer can also help with workload management – even the most complex queries are subdivided into manageable components and resources can be shared fairly.

The original Ndb design was not aimed at :
  • Disk resident storage
    Where data larger-than-aggregate-system-memory-capacity can be stored on disk. This functionality was later added in the MySQL 5.1 timeframe
  • Complex query processing
    Where multiple tables are joined. This was always possible, but not always efficient. Improving the efficiency of MySQL and Ndb on complex query processing is ongoing work - as it is in all actively developed RDBMS, for some definition of complex :).
  • Storing large rows
    Ndb currently has a per-row size limit of around 8kB, ignoring Blob and Text column types.
  • One size fits all
    Being a drop-in replacement for an existing MySQL engine such as MyISAM or InnoDB
    Many initial users were not aware of the history of Ndb, and expected it to be (MySQL + InnoDB/MyISAM) + 'Clustering'. Issuing 'ALTER TABLE xxx ENGINE=ndbcluster;' appeared to be all that was required to gain fault tolerance, but the performance of queries on the resulting tables was not always as expected!

Since the initial integration of Ndb Cluster with MySQLD in 2003+, there have been many improvements to bring Ndb closer in behaviour to the most popular MySQL engines, and to optimise MySQLD for Ndb's strengths, including :
  • Support for Autoincrement and primary key-less tables
  • Synchronisation of schemas across connected MySQLD instances
  • Support for MySQL character sets and collations
  • Storage and retrieval of Blob and Text columns
  • Support for pushed-down filter conditions
  • Support for batching of operations
  • Integration with MySQL asynchronous replication
  • 'Distribution awareness' in MySQLD for efficiency

These improvements have required work in the Ndb table handler - the code which maps MySQL storage engine API calls from the generic SQL layer to the underlying storage engine. Some improvements have also required enhancements in the storage engine API and Server, for example a new API to expose conditions (WHERE or HAVING clause predicates) to the storage engine, enabling it to perform more efficient filtering. These changes add complexity to MySQLD and the storage engine API, but as they are implemented generically, they can be reused by other engines. The pushed conditions API is now being used by the Spider engine for similar reasons to Ndb – e.g. to push filtering functionality as close to the data as possible. The Batched Key Access (BKA) improvements made to the MySQLD join executor benefit Ndb, but also benefit MyISAM and InnoDB to a lesser extent. This Functionality push-down pattern – increasing the granularity and complexity of work items which can be passed to the storage engine - will continue and benefit all storage engines.

The next large step to be taken by the MySQL Server team in this direction is referred to as Query Fragment Pushdown, where MySQLD can pass parts of queries to a storage engine for execution. Storage engines which support SQL natively could perhaps use their own implementation-aware optimisation and execution engines to efficiently evaluate query fragments. For Ndb, we are designing composite primitives at the NdbApi level for evaluating query fragments more efficiently - in parallel and closer to the data. This work will increase the number of query types that Ndb can handle efficiently, increasing the number of applications where Ndb is a good fit.

For an in-depth description of the original Ndb requirements, design approach and some specific design solutions, Mikael's phD thesis is the place to go. This is probably the best source of information on the design philosophy of Ndb Cluster. However as it is a frozen document it does not reflect the current state of the system, and as it is an academic paper, it does not describe the lower level, more software engineering oriented aspects of the system implementation.

I hope to cover some of these aspects in a future post.

PlanetMySQL Voting: Vote UP / Vote DOWN