TiDB Data Migration (DM) is an integrated data transfer and replication management platform that supports full data migration or incremental data replication from MySQL or MariaDB instances into a TiDB cluster.
A common real-life use case is using TiDB DM to connect sharded MySQL or MariaDB to TiDB, treating TiDB almost as a secondary, then run analytical workloads on this TiDB cluster to fulfill real-time reporting needs. TiDB DM provides good support if you need to manage multiple data replication tasks at the same time or need to merge multiple MySQL/MariaDB instances into a single TiDB cluster.
TiDB DM consists of three components: DM-master, DM-worker, and dmctl. It supports migrating the data of multiple upstream MySQL instances to multiple downstream TiDB clusters. The architecture design is as follows:
DM-master:
DM-worker:
dmctl
Now, I'll introduce TiDB DM's implementation principles in detail.
A single TiDB DM cluster can perform multiple data replication tasks simultaneously. For each task, it can be split into multiple subtasks undertaken by many DM-worker nodes. Each DM-worker node is responsible for replicating the data of the corresponding upstream MySQL instance.
The following diagram shows the data migration process of a single subtask of data replication on each DM-worker node.
In the whole process, the upper data flow is the full backup migration and the lower data flow is the incremental data replication.
In each DM-worker node, dumper, loader, relay, syncer (binlog replication) and other processing units perform the specific data replication step to complete a specific data replication subtask.
For full backup migration:
For incremental data replication:
This process is similar to the primary-secondary replication in MySQL. But the main difference is in TiDB DM, the persisted relay log in the local storage can be used simultaneously by multiple syncer units of different subtasks, which avoids multiple tasks’ repeatedly fetching the binlog from the upstream MySQL.
In order to accelerate data migration, TiDB DM applies the concurrency model in part of the process of both full backup migration and incremental data replication.
For full backup migration
Dumper calls Mydumper, a data exporting tool, to implement data exporting. For the corresponding concurrency model, see Mydumper source code.
Loader is used to load the data. For the corresponding concurrency model, see the following diagram:
During the data exporting process with Mydumper, a single table can be split into multiple SQL files with --chunk-filesize
and other parameters. Each of these SQL files corresponds to a static snapshot data of the upstream MySQL at a specific moment and no correlation exists between two SQL files. So when importing the data with loader, you can directly start multiple worker goroutines in a loader unit and each worker goroutine reads to-be-imported SQL files independently and concurrently and applies them into downside streaming. That's to say, loader loads data concurrently at the level of the SQL file. In task configuration,TiDB DM controls the number of worker goroutines with the pool-size
parameter in the loader unit.
For incremental data replication
When fetching the binlog from the upstream MySQL to persist it in the local storage, the binlog can only be handled serially because the upstream MySQL generates and sends the binlog in a stream.
When importing the data using syncer, you can import data concurrently under limited conditions. The corresponding model architecture is as follows:
Syncer reads and parses the local relay log in a stream, which is executed serially. When syncer parses the binlog events and builds the to-be-synchronized jobs, it delivers the jobs to different to-be-synchronized job channels after hash computing based on the primary key, index and other information of the corresponding row.
At the other end of the channel, the worker goroutine concurrently fetches the job from the corresponding channel and replicates the job to the downstream TiDB.
That's to say, Syncer imports data concurrently at the level of the binlog event. In task allocation, TiDB DM controls the number of worker goroutines with the worker-count parameter in the syncer unit.
However, some limitations exist in this process as follows:
For the DDL operation, the downstream table schema will change so the replication process can only start after all the DML events corresponding to the previous table schema have been successfully replicated.
In other words, DDL and DML events are not replicated concurrently, and the DML events before and after the DDL operation are not replicated concurrently either.
For the DML operation, the conflict exists when multiple DML statements possibly concurrently modify the data of the same row, even the primary or the same unique key, which leads to the failure of some DML operations. If these DML events are replicated concurrently, data inconsistency might occur. Detection and resolution of DML event conflicts in TiDB DM is similar to those in TiDB Binlog. For more details of the specific principles, see TiDB Binlog Architecture Evolution and Implementation Principles.
When handling a large amount of data using MySQL, manual sharding is commonly used. After data has been replicated to TiDB, logically merging tables needs to be done.
This section introduces some features of TiDB DM for supporting replicating data from merged tables as follows.
Let's start with an example as shown in the diagram below:
In this example, there are two MySQL instances in the upstream; each instance has two schemas and each schema has two tables; there are eight tables in total. After we replicate the data to the downstream TiDB, the eight tables should be merged and replicated into one table.
To replicate these tables with different names from different schemas of different instances to the same table, data of different tables should be routed to the same downstream table according to the predefined rules. In TiDB DM, these rules are router-rule
s.
For instance, the router rule for the above example is:
name-of-router-rule:
schema-pattern: "schema_*"
table-pattern: "table_*"
target-schema: "schema"
target-table: "table"
name-of-router-rule
: the rule name, specified by the user. When a same rule needs to be applied to multiple upstream instances, you can define only one rule that can be referenced by different instances.schema-pattern
: the pattern that matches the upstream schema name. It supports wildcard characters (such as “*") as the suffix. In this example, schema_*
matches both the two schemas.table-pattern
: the pattern that matches the upstream table name. Its usage is similar to that of schema-pattern
. In this example, table_*
matches both the two tables.target-schema
: the name of the target schema. The data matched will be routed into this schema.target-table
: the name of the target table. The data that matches the schema name and table name are routed to this table in the target-schema
.Now let's take a look at the internals of TiDB DM:
schema-pattern
/table-pattern
and store the rules in the trie nodes.trie
to obtain the corresponding rules via the schema name and table name in the upstream, and replace the original schema name and table name in the SQL statement based on the rules.With the table router feature, we can implement the basic function of replicating data from sharded tables. But in a database, auto-increment columns are widely used as the primary keys. If multiple primary keys of the sharded tables in the upstream generate their numbers automatically and independently, a conflict between primary keys might occur and result in the data mismatch after merging and synchronizing them into the downstream. Let's see another example as follows:
In this example, there are four tables that need to be merged and replicated into the table in the downstream TiDB. Each table has a record in the id
column whose value is 1. Suppose that this id
column is the primary key of the table. During the replication process, as some update operations use the id
column as the condition to confirm records that need to be updated, it might make the latter replicated data overwrite the data that has been replicated and thus result in some data loss.
Therefore, during the data replication process, the column mapping function is developed to convert the data of related columns based on specified rules, so as to avoid the data conflict and loss.
For instance, the column mapping rule for MySQL instance 1 is:
mapping-rule-of-instance-1:
schema-pattern: "schema_*"
table-pattern: "table_*"
expression: "partition id"
source-column: "id"
target-column: "id"
arguments: ["1", "schema_", "table_"]
mapping-rule-of-instance-1
: the rule name which is specified by the user. Because different upstream MySQL instances need to be converted into different values, each MySQL instance usually applies a unique rule.schema-pattern
/table-pattern
: the matching pattern of the upstream schema name and table name which is the same as the configuration of router-rules.expression
: the name of the expression that does the data conversion. Currently, the common expression is “partition id”. For details, see what follows in the passage.source-column
: the name of the source column that inputs the data into the conversion expression. “id” indicates that this expression will be applied on the column named “id” in the table. Temporarily we only support the data conversion from a single source column.target-column
: the name of the target column into which the conversion expression outputs the data. Its usage is similar to that of source-column
. Temporarily we only support the data conversion to a single target column, and the target column must exist.arguments
: the arguments of the conversion expression. The number and meanings of arguments depend on the specific expression.Currently, partition id
is the mainly supported conversion expression. It resolves the conflicts caused by merging and replicating data from different tables by adding the binary prefix to values of the bigint
type. partition id
includes three arguments:
schema_2
, it indicates that the remaining part after removing the prefix (i.e. the number “2”) will be added to the conversion result as a part of prefix in a binary form.table_3
, it indicates that the remaining part after removing the prefix (i.e. the number “3”) will be added to the conversion result as a part of prefix in a binary form.Each argument has the following binary distribution in the conversion result (the number of bits that each part occupies by default):
Suppose that the original data is “123” before conversion and the arguments are set up as above, then the conversion result is:
1<<(64-1-4) | 2<<(64-1-4-7) | 3<<(64-1-4-7-8) | 123
In addition, the three arguments can all be set as an empty string (""), which means that this part will not be added to the conversion result and occupy extra bits. For example, if you set them up as [“1”, “", “table_"], then the conversion result is:
1 << (64-1-4) | 3<< (64-1-4-8) | 123
For the detailed implementation of the column mapping, see column-mapping pkg source code in TiDB-Tools.
With the table router and column mapping function, we can replicate DML statements from sharded tables in a smooth manner. But in the process of incrementally replicating data, if DDL statements are executed on the upstream sharded tables that are waiting for merging, then an error might occur. Let's see a simple example of executing DDL statements on the sharded tables.
In this example, we simplify the replication process, in which there are only two MySQL instances in the upstream and each instance has only one table. Suppose that during the replication, we mark the schema version of two sharded tables as schema V1
, and mark the schema version after executing DDL statements as schema V2
.
Now, suppose that the binlog data received from the two upstream sharded tables has the following time sequence in the synchronization process:
schema V1
.t1
, the sharding DDL events on instance 1 are received.t2
on, the two sharded tables receive the DML events from schema V2
on the instance 1; but on instance 2, they are still receiving the DML events from schema V1
.t3
, the sharding DDL events on instance 2 are received.t4
on, the two sharded tables receive the DML events from schema V2
on instance 2 as well.Suppose that we do no operation to the DDL of the sharded tables during data replication. When the DDL of Instance 1 is replicated to downstream, the table structure of downstream will be changed to schema V2
. But DM-worker still receives the DML of schema V1
during the period between t2
and t3
. When DM-worker tries to replicate the DML of schema V1
to downstream, the inconsistency between the DML and the table structure may lead to error and data cannot be replicated correctly. Let's look at the example above again to see how we handle the DDL replication when merging tables in TiDB DM.
In this example, DM-worker-1 replicates the data from MySQL Instance 1 and DM-worker-2 replicates the data from MySQL Instance 2. DM-master coordinates the DDL replication among multiple DM-workers. After DM-worker-1 receives DDL, simplified procedures of DDL replication are as follows:
As for the sharding DDL replication within one TiDB DM, we can generalize some characteristics from the above procedures:
From the above characteristics, we can see some functional restrictions as follows:
In the example above, there is only one sharded table to be merged in the upstream MySQL instance that corresponds to each DM-worker. But in actual scenarios, there may be multiple sharded tables to be merged in a MySQL instance, and one such scenario is where we introduce table router and column mapping in the above section. With this scenario, the replication of sharding DDL becomes more complex. Assume that in a MySQL instance there are 2 tables to be merged, table_1
and table_2
. See the following figure:
Because data comes from the same MySQL instance, all the data is obtained from the same binlog flow. In this case, the time sequence is as follows:
schema V1
when replication begins.t1
, it receives the DDL of table_1
.t2
and t3
, the data received includes the DML of table_1
schema V2
and the DML of table_2
schema V1
.t3
, it receives the DDL of table_2
.t4
on, both tables receive the DML of schema V2
.If we do no special operation to DDL during the data replication, when the DDL of table_1
is replicated to downstream and changes the table structure of downstream, the DML of table_2
schema V1
will not be replicated as normal. Therefore, within a single DM-worker, we have created logical sharding groups which are similar to those within a DM-master. But the group members are the different sharded tables in the same upstream MySQL instance.
But when DM-worker coordinates the replication among sharding groups within a DM-worker, the coordination is not entirely the same as that performed by DM-master. The reasons are:
table_1
, it can not pause the replication and must continue parsing binlog to get the DDL of the following table_2
, namely continuing parsing from t2
to t3
.t2
to t3
, the DML of schema V2
of table_1
cannot be replicated to downstream until sharding DDL is replicated and successfully executed.In TiDB DM, a simplified replication process of sharding DDL within the TiDB DM worker is as described below:
table_1
at t1
, the DM-worker records the DDL information and the current position of the binlog.t2
and t3
.table_1
. Replicate the schema V1 DML statement normally to downstream if it belongs to table_2
.table_2
at t3
, the DM-worker records the DDL statement and the current position of the binlog.t2
to t3
.table_1
. Ignore the schema V1 DML that belongs to table_1
.t4
.As you can see, TiDB DM mostly uses a two-level sharding group for coordination and control when handling replication of sharding DDL. Here is the simplified process:
During data replication, sometimes it is not necessary to replicate all upstream data to downstream. This is a scenario where we could use certain rules to filter out the unwanted part of the data. In TiDB DM, we support two replication filters that apply to different levels.
TiDB DM allows you to configure inclusive/exclusive replication of a specific part of tables or schemas for processing units including Dumper, Loader, and Syncer.
For example, if we only want to export data from tables t1 and t2 in the test schema, we can configure the following rule for the dumper unit:
name-of-dump-rule:
extra-args: "-B test -T t1,t2"
name-of-dump-rule
: name of the rule specified by the user. Multiple upstream instances can share a common rule by referencing the rule name.extra-args
: an extra parameter for the dumper unit. Mydumper configuration options that are not explicitly defined in the dumper unit must be passed in through this parameter. The format is consistent with Mydumper.For more information on support for the block and allow list, see Mydumper parameters and its source code.
The corresponding rule of table and schema block and allow list rule for Loader and Syncer is block-allow-list. Assuming you only want to replicate data from tables t1 and t2 from the test schema, you can configure the rule as below:
The corresponding rule of table and the block and allow list rule for Loader and Syncer is block-allow-list. Assuming you only want to replicate data from tables t1
and t2
from the test schema, you can configure the rule as below:
name-of-bwl-rule:
do-tables:
- db-name: "test"
tbl-name: "t1"
- db-name: "test"
tbl-name: "t2"
Only part of the configuration options are used in the sample above. For complete configuration options and their definitions, see the user documentation for this feature. The rule used in TiDB DM is similar to the primary-secondary filter rule in MySQL, so you can also refer to Evaluation of Database-Level Replication and Binary Logging Options and Evaluation of Table-Level Replication Options.
For the Loader unit, after getting the schema name and table name by parsing the SQL file name, it identifies the configured block and allow list rule. If the result indicates no replication is required, the entire SQL file will be ignored. For the Syncer unit, after getting the schema name and table name by parsing the binlog file, it identifies the configured block and allow list rule. If the result indicates no replication is required, the corresponding binlog event data will be ignored.
During an incremental data replication, sometimes you may want to filter out specific types of binlog event. There are two typical scenarios:
TRUNCATE TABLE
in upstreamDROP TABLE
in upstream sharded tablesTiDB DM allows you to filter by binlog event types. For the TRUNCATE TABLE
and DROP TABLE
filter scenarios mentioned above, configure the rule as below:
name-of-filter-rule:
schema-pattern: "test_*"
table-pattern: "t_*"
events: ["truncate table", "drop table"]
action: Ignore
The matching pattern of the rule is similar to table routing and column mapping. For detailed configurations, see the user documentation for the feature.
To implement this, after getting the schema name, table name, and binlog event type, the TiDB DM processing unit will identify the configured rule, and decide whether to filter based on the action configuration. For detailed implementation of the filter function, see binlog-filter pkg under TiDB-tools.
As an integrated data transfer and replication management platform, TiDB DM plays an important role in the TiDB ecosystem. It works well in providing the full data migration and the incremental data replication services, and gains more and more popularity among customers. In the future, it will still be a key focus of our development team and we are expecting more contributors to join us to improve its reliability, stability, and usability together.