Manticore Search is a highly distributed system that provides all the necessary components to create a highly available and scalable database for search. This includes:
Manticore Search offers great flexibility in terms of how you set up your cluster. There are no limitations, so it's up to you to design your cluster according to your needs. Simply learn about the tools mentioned above and use them to achieve your desired goal.
To add a new node to a cluster, simply start another instance of Manticore and ensure that it is accessible by the other nodes in the cluster. Connect the new node to the rest of the cluster using a distributed table and ensure data safety with replication.
To understand how to add a distributed table with remote agents, it is important to first have a basic understanding of distributed tables In this article, we will focus on how to use a distributed table as the basis for creating a cluster of Manticore instances.
Here is an example of how to split data over 4 servers, each serving one of the shards:
table mydist {
type = distributed
agent = box1:9312:shard1
agent = box2:9312:shard2
agent = box3:9312:shard3
agent = box4:9312:shard4
}
In the event of a server failure, the distributed table will still work, but the results from the failed shard will be missing.
Now that we've added mirrors, each shard is found on 2 servers. By default, the master (the searchd instance with the distributed table) will randomly pick one of the mirrors.
The mode used for picking mirrors can be set using the ha_strategy setting. In addition to the default random
mode there's also ha_strategy = roundrobin
.
More advanced strategies based on latency-weighted probabilities include noerrors
and nodeads
. These not only take out mirrors with issues but also monitor response times and do balancing. If a mirror responds slower (for example, due to some operations running on it), it will receive fewer requests. When the mirror recovers and provides better times, it will receive more requests.
table mydist {
type = distributed
agent = box1:9312|box5:9312:shard1
agent = box2:9312:|box6:9312:shard2
agent = box3:9312:|box7:9312:shard3
agent = box4:9312:|box8:9312:shard4
}
Agent mirrors can be used interchangeably when processing a search query. The Manticore instance(s) hosting the distributed table where the mirrored agents are defined keeps track of mirror status (alive or dead) and response times, and performs automatic failover and load balancing based on this information.
agent = node1|node2|node3:9312:shard2
The above example declares that node1:9312
, node2:9312
, and node3:9312
all have a table called shard2, and can be used as interchangeable mirrors. If any of these servers go down, the queries will be distributed between the remaining two. When the server comes back online, the master will detect it and begin routing queries to all three nodes again.
A mirror may also include an individual table list, as follows:
agent = node1:9312:node1shard2|node2:9312:node2shard2
This works similarly to the previous example, but different table names will be used when querying different servers. For example, node1shard2 will be used when querying node1:9312, and node2shard will be used when querying node2:9312.
By default, all queries are routed to the best of the mirrors. The best mirror is selected based on recent statistics, as controlled by the ha_period_karma config directive. The master stores metrics (total query count, error count, response time, etc.) for each agent and groups these by time spans. The karma is the length of the time span. The best agent mirror is then determined dynamically based on the last two such time spans. The specific algorithm used to pick a mirror can be configured with the ha_strategy directive.
The karma period is in seconds and defaults to 60 seconds. The master stores up to 15 karma spans with per-agent statistics for instrumentation purposes (see SHOW AGENT STATUS
statement). However, only the last two spans out of these are used for HA/LB logic.
When there are no queries, the master sends a regular ping command every ha_ping_interval milliseconds in order to collect statistics and check if the remote host is still alive. The ha_ping_interval defaults to 1000 msec. Setting it to 0 disables pings, and statistics will only be accumulated based on actual queries.
Example:
# sharding table over 4 servers total
# in just 2 shards but with 2 failover mirrors for each shard
# node1, node2 carry shard1 as local
# node3, node4 carry shard2 as local
# config on node1, node2
agent = node3:9312|node4:9312:shard2
# config on node3, node4
agent = node1:9312|node2:9312:shard1
Load balancing is turned on by default for any distributed table that uses mirroring. By default, queries are distributed randomly among the mirrors. You can change this behavior by using the ha_strategy.
ha_strategy = {random|nodeads|noerrors|roundrobin}
The mirror selection strategy for load balancing is optional and is set to random
by default.
The strategy used for mirror selection, or in other words, choosing a specific agent mirror in a distributed table, is controlled by this directive. Essentially, this directive controls how master performs the load balancing between the configured mirror agent nodes. The following strategies are implemented:
The default balancing mode is simple linear random distribution among the mirrors. This means that equal selection probabilities are assigned to each mirror. This is similar to round-robin (RR), but does not impose a strict selection order.
ha_strategy = random
The default simple random strategy does not take into account the status of mirrors, error rates, and most importantly, actual response latencies. To address heterogeneous clusters and temporary spikes in agent node load, there are a group of balancing strategies that dynamically adjust the probabilities based on the actual query latencies observed by the master.
The adaptive strategies based on latency-weighted probabilities work as follows:
Initially, the probabilities are equal. On every step, they are scaled by the inverse of the latencies observed during the last karma period, and then renormalized. For example, if during the first 60 seconds after the master startup, 4 mirrors had latencies of 10 ms, 5 ms, 30 ms, and 3 ms respectively, the first adjustment step would go as follows:
This means that the first mirror would have a 15% chance of being chosen during the next karma period, the second one a 30% chance, the third one (slowest at 30 ms) only a 5% chance, and the fourth and fastest one (at 3 ms) a 50% chance. After that period, the second adjustment step would update those chances again, and so on.
The idea is that once the observed latencies stabilize, the latency weighted probabilities will stabilize as well. All these adjustment iterations are meant to converge at a point where the average latencies are roughly equal across all mirrors.
Latency-weighted probabilities, but dead mirrors are excluded from the selection. A "dead" mirror is defined as a mirror that has resulted in multiple hard errors (e.g. network failure, or no answer, etc) in a row.
ha_strategy = nodeads
Latency-weighted probabilities, but mirrors with a worse error/success ratio are excluded from selection.
ha_strategy = noerrors
Simple round-robin selection, that is, selecting the first mirror in the list, then the second one, then the third one, etc, and then repeating the process once the last mirror in the list is reached. Unlike with the randomized strategies, RR imposes a strict querying order (1, 2, 3, ..., N-1, N, 1, 2, 3, ..., and so on) and guarantees that no two consecutive queries will be sent to the same mirror.
ha_strategy = roundrobin
ha_period_karma = 2m
ha_period_karma
defines the size of the agent mirror statistics window, in seconds (or a time suffix). Optional, the default is 60.
For a distributed table with agent mirrors, the server tracks several different per-mirror counters. These counters are then used for failover and balancing. (The server picks the best mirror to use based on the counters.) Counters are accumulated in blocks of ha_period_karma
seconds.
After beginning a new block, the master may still use the accumulated values from the previous one until the new one is half full. Thus, any previous history stops affecting the mirror choice after at most 1.5 times ha_period_karma seconds.
Although at most 2 blocks are used for mirror selection, up to 15 last blocks are actually stored for instrumentation purposes. They can be inspected using the SHOW AGENT STATUS
statement.
ha_ping_interval = 3s
ha_ping_interval
directive defines the interval between pings sent to the agent mirrors, in milliseconds (or with a time suffix). This option is optional and its default value is 1000.
For a distributed table with agent mirrors, the server sends all mirrors a ping command during idle periods to track their current status (whether they are alive or dead, network roundtrip time, etc.). The interval between pings is determined by the ha_ping_interval setting.
If you want to disable pings, set ha_ping_interval to 0.
With Manticore, write transactions (such as INSERT
, REPLACE
, DELETE
, TRUNCATE
, UPDATE
, COMMIT
) can be replicated to other cluster nodes before the transaction is fully applied on the current node. Currently, replication is supported for percolate
, rt
and distributed
tables in Linux and macOS. However, Manticore Search packages for Windows do not provide replication support.
Manticore's replication is powered by the Galera library and boasts several impressive features:
To set up replication in Manticore Search:
server_id
.If there is no replication
listen directive set, Manticore will use the first two free ports in the range of 200 ports after the default protocol listening port for each created cluster. To set replication ports manually, the listen directive (of replication
type) port range must be defined and the address/port range pairs must not intersect between different nodes on the same server. As a rule of thumb, the port range should specify at least two ports per cluster.
A replication cluster is a group of nodes in which a write transaction is replicated. Replication is set up on a per-table basis, meaning that one table can only belong to one cluster. There is no limit on the number of tables that a cluster can have. All transactions such as INSERT
, REPLACE
, DELETE
, TRUNCATE
on any percolate or real-time table that belongs to a cluster are replicated to all the other nodes in that cluster. Distributed tables can also be part of the replication process. Replication is multi-master, so writes to any node or multiple nodes simultaneously will work just as well.
To create a cluster, you can typically use the command create cluster with CREATE CLUSTER <cluster name>
, and to join a cluster, you can use join cluster with JOIN CLUSTER <cluster name> at 'host:port'
. However, in some rare cases, you may want to fine-tune the behavior of CREATE/JOIN CLUSTER
. The available options are:
This option specifies the name of the cluster. It should be unique among all the clusters in the system.
Note: The maximum allowable hostname length for the
JOIN
command is 253 characters. If you exceed this limit, searchd will generate an error.
The path option specifies the data directory for write-set cache replication and incoming tables from other nodes. This value should be unique among all the clusters in the system and should be specified as a relative path to the data_dir. directory. By default, it is set to the value of data_dir.
The nodes
option is a list of address:port pairs for all the nodes in the cluster, separated by commas. This list should be obtained using the node's API interface and can include the address of the current node as well. It is used to join the node to the cluster and to rejoin it after a restart.
The options
option allows you to pass additional options directly to the Galera replication plugin, as described in the Galera Documentation Parameters
When working with a replication cluster, all write statements such as INSERT
, REPLACE
, DELETE
, TRUNCATE
, UPDATE
that modify the content of a cluster's table must use thecluster_name:index_name
expression instead of the table name. This ensures that the changes are propagated to all replicas in the cluster. If the correct expression is not used, an error will be triggered.
In the JSON interface, the cluster
property must be set along with the table
name for all write statements to a cluster's table. Failure to set the cluster
property will result in an error.
The Auto ID for a table in a cluster should be valid as long as the server_id is correctly configured.
INSERT INTO posts:weekly_index VALUES ( 'iphone case' )
TRUNCATE RTINDEX click_query:weekly_index
UPDATE INTO posts:rt_tags SET tags=(101, 302, 304) WHERE MATCH ('use') AND id IN (1,101,201)
DELETE FROM clicks:rt WHERE MATCH ('dumy') AND gid>206
POST /insert -d '
{
"cluster":"posts",
"index":"weekly_index",
"doc":
{
"title" : "iphone case",
"price" : 19.85
}
}'
POST /delete -d '
{
"cluster":"posts",
"index": "weekly_index",
"id":1
}'
$index->addDocuments([
1, ['title' => 'iphone case', 'price' => 19.85]
]);
$index->deleteDocument(1);
indexApi.insert({"cluster":"posts","index":"weekly_index","doc":{"title":"iphone case","price":19.85}})
indexApi.delete({"cluster":"posts","index":"weekly_index","id":1})
res = await indexApi.insert({"cluster":"posts","index":"weekly_index","doc":{"title":"iphone case","price":19.85}});
res = await indexApi.delete({"cluster":"posts","index":"weekly_index","id":1});
InsertDocumentRequest newdoc = new InsertDocumentRequest();
HashMap<String,Object> doc = new HashMap<String,Object>(){{
put("title","Crossbody Bag with Tassel");
put("price",19.85);
}};
newdoc.index("weekly_index").cluster("posts").id(1L).setDoc(doc);
sqlresult = indexApi.insert(newdoc);
DeleteDocumentRequest deleteRequest = new DeleteDocumentRequest();
deleteRequest.index("weekly_index").cluster("posts").setId(1L);
indexApi.delete(deleteRequest);
Dictionary<string, Object> doc = new Dictionary<string, Object>();
doc.Add("title", "Crossbody Bag with Tassel");
doc.Add("price", 19.85);
InsertDocumentRequest newdoc = new InsertDocumentRequest(index: "weekly_index", cluster:posts, id: 1, doc: doc);
var sqlresult = indexApi.Insert(newdoc);
DeleteDocumentRequest deleteDocumentRequest = new DeleteDocumentRequest(index: "weekly_index", cluster: "posts", id: 1);
indexApi.Delete(deleteDocumentRequest);
Read statements such as SELECT
, CALL PQ
, DESCRIBE
can either use regular table names that are not prepended with a cluster name, or they can use the cluster_name:index_name
format. If the latter is used, the cluster_name
component is ignored.
When using the HTTP endpoint json/search
, the cluster
property can be specified if desired, but it can also be omitted.
SELECT * FROM weekly_index
CALL PQ('posts:weekly_index', 'document is here')
POST /search -d '
{
"cluster":"posts",
"index":"weekly_index",
"query":{"match":{"title":"keyword"}}
}'
POST /search -d '
{
"index":"weekly_index",
"query":{"match":{"title":"keyword"}}
}'
Replication plugin options can be adjusted using the SET
statement.
A list of available options can be found in the Galera Documentation Parameters .
SET CLUSTER click_query GLOBAL 'pc.bootstrap' = 1
POST /cli -d "
SET CLUSTER click_query GLOBAL 'pc.bootstrap' = 1
"
It's possible for replicated nodes to diverge from one another, leading to a state where all nodes are labeled as non-primary
. This can occur as a result of a network split between nodes, a cluster crash, or if the replication plugin experiences an exception when determining the primary component
. In such a scenario, it's necessary to select a node and promote it to the role of primary component
.
To identify the node that needs to be promoted, you should compare the last_committed
cluster status variable value on all nodes. If all the servers are currently running, there's no need to restart the cluster. Instead, you can simply promote the node with the highest last_committed value to the primary component
using the SET
statement (as demonstrated in the example).
The other nodes will then reconnect to the primary component and resynchronize their data based on this node.
SET CLUSTER posts GLOBAL 'pc.bootstrap' = 1
POST /cli -d "
SET CLUSTER posts GLOBAL 'pc.bootstrap' = 1
"
To use replication, you need to define one listen port for SphinxAPI protocol and one listen for replication address and port range in the configuration file. Also, specify the data_dir folder to receive incoming tables.
searchd {
listen = 9312
listen = 192.168.1.101:9360-9370:replication
data_dir = /var/lib/manticore/
...
}
To replicate tables, you must create a cluster on the server that has the local tables to be replicated.
CREATE CLUSTER posts
POST /cli -d "
CREATE CLUSTER posts
"
$params = [
'cluster' => 'posts'
]
];
$response = $client->cluster()->create($params);
utilsApi.sql('CREATE CLUSTER posts')
res = await utilsApi.sql('CREATE CLUSTER posts');
utilsApi.sql("CREATE CLUSTER posts");
utilsApi.Sql("CREATE CLUSTER posts");
Add these local tables to the cluster
ALTER CLUSTER posts ADD pq_title
ALTER CLUSTER posts ADD pq_clicks
POST /cli -d "
ALTER CLUSTER posts ADD pq_title
"
POST /cli -d "
ALTER CLUSTER posts ADD pq_clicks
"
$params = [
'cluster' => 'posts',
'body' => [
'operation' => 'add',
'index' => 'pq_title'
]
];
$response = $client->cluster()->alter($params);
$params = [
'cluster' => 'posts',
'body' => [
'operation' => 'add',
'index' => 'pq_clicks'
]
];
$response = $client->cluster()->alter($params);
utilsApi.sql('ALTER CLUSTER posts ADD pq_title')
utilsApi.sql('ALTER CLUSTER posts ADD pq_clicks')
res = await utilsApi.sql('ALTER CLUSTER posts ADD pq_title');
res = await utilsApi.sql('ALTER CLUSTER posts ADD pq_clicks');
utilsApi.sql("ALTER CLUSTER posts ADD pq_title");
utilsApi.sql("ALTER CLUSTER posts ADD pq_clicks");
utilsApi.Sql("ALTER CLUSTER posts ADD pq_title");
utilsApi.Sql("ALTER CLUSTER posts ADD pq_clicks");
All other nodes that wish to receive a replica of the cluster's tables should join the cluster as follows:
JOIN CLUSTER posts AT '192.168.1.101:9312'
POST /cli -d "
JOIN CLUSTER posts AT '192.168.1.101:9312'
"
$params = [
'cluster' => 'posts',
'body' => [
'192.168.1.101:9312'
]
];
$response = $client->cluster->join($params);
utilsApi.sql('JOIN CLUSTER posts AT \'192.168.1.101:9312\'')
res = await utilsApi.sql('JOIN CLUSTER posts AT \'192.168.1.101:9312\'');
utilsApi.sql("JOIN CLUSTER posts AT '192.168.1.101:9312'");
utilsApi.Sql("JOIN CLUSTER posts AT '192.168.1.101:9312'");
When running queries, prepend the table name with the cluster name posts
: or use the cluster
property for HTTP request object.
INSERT INTO posts:pq_title VALUES ( 3, 'test me' )
POST /insert -d '
{
"cluster":"posts",
"index":"pq_title",
"id": 3
"doc":
{
"title" : "test me"
}
}'
$index->addDocuments([
3, ['title' => 'test me']
]);
indexApi.insert({"cluster":"posts","index":"pq_title","id":3"doc":{"title":"test me"}})
res = await indexApi.insert({"cluster":"posts","index":"pq_title","id":3"doc":{"title":"test me"}});
InsertDocumentRequest newdoc = new InsertDocumentRequest();
HashMap<String,Object> doc = new HashMap<String,Object>(){{
put("title","test me");
}};
newdoc.index("pq_title").cluster("posts").id(3L).setDoc(doc);
sqlresult = indexApi.insert(newdoc);
Dictionary<string, Object> doc = new Dictionary<string, Object>();
doc.Add("title", "test me");
InsertDocumentRequest newdoc = new InsertDocumentRequest(index: "pq_title", cluster: "posts", id: 3, doc: doc);
var sqlresult = indexApi.Insert(newdoc);
All queries that modify tables in the cluster are now replicated to all nodes in the cluster.
To create a replication cluster, you must set its name at a minimum.
If you are creating a single cluster or the first cluster, you may omit the path option. In this case, the data_dir option will be used as the cluster path. However, for all subsequent clusters, you must specify the path and the path must be available. The nodes option may also be set to list all nodes in the cluster.
CREATE CLUSTER posts
CREATE CLUSTER click_query '/var/data/click_query/' as path
CREATE CLUSTER click_query '/var/data/click_query/' as path, 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312' as nodes
POST /cli -d "
CREATE CLUSTER posts
"
POST /cli -d "
CREATE CLUSTER click_query '/var/data/click_query/' as path
"
POST /cli -d "
CREATE CLUSTER click_query '/var/data/click_query/' as path, 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312' as nodes
"
$params = [
'cluster' => 'posts',
]
];
$response = $client->cluster()->create($params);
$params = [
'cluster' => 'click_query',
'body' => [
'path' => '/var/data/click_query/'
]
]
];
$response = $client->cluster()->create($params);
$params = [
'cluster' => 'click_query',
'body' => [
'path' => '/var/data/click_query/',
'nodes' => 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312'
]
]
];
$response = $client->cluster()->create($params);
utilsApi.sql('CREATE CLUSTER posts')
utilsApi.sql('CREATE CLUSTER click_query \'/var/data/click_query/\' as path')
utilsApi.sql('CREATE CLUSTER click_query \'/var/data/click_query/\' as path, \'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312\' as nodes')
res = await utilsApi.sql('CREATE CLUSTER posts');
res = await utilsApi.sql('CREATE CLUSTER click_query \'/var/data/click_query/\' as path');
res = await utilsApi.sql('CREATE CLUSTER click_query \'/var/data/click_query/\' as path, \'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312\' as nodes');
utilsApi.sql("CREATE CLUSTER posts");
utilsApi.sql("CREATE CLUSTER click_query '/var/data/click_query/' as path");
utilsApi.sql("CREATE CLUSTER click_query '/var/data/click_query/' as path, 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312' as nodes");
utilsApi.Sql("CREATE CLUSTER posts");
utilsApi.Sql("CREATE CLUSTER click_query '/var/data/click_query/' as path");
utilsApi.Sql("CREATE CLUSTER click_query '/var/data/click_query/' as path, 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312' as nodes");
If the nodes option is not specified when creating a cluster, the first node that joins the cluster will be saved as the nodes option.
To join an existing cluster, you must specify at least:
host:port
of another node in the cluster you are joiningJOIN CLUSTER posts AT '10.12.1.35:9312'
POST /cli -d "
JOIN CLUSTER posts AT '10.12.1.35:9312'
"
$params = [
'cluster' => 'posts',
'body' => [
'10.12.1.35:9312'
]
];
$response = $client->cluster->join($params);
utilsApi.sql('JOIN CLUSTER posts AT \'10.12.1.35:9312\'')
{u'error': u'', u'total': 0, u'warning': u''}
res = await utilsApi.sql('JOIN CLUSTER posts AT \'10.12.1.35:9312\'');
{"total":0,"error":"","warning":""}
utilsApi.sql("JOIN CLUSTER posts AT '10.12.1.35:9312'");
utilsApi.Sql("JOIN CLUSTER posts AT '10.12.1.35:9312'");
In most cases, the above is sufficient when there is a single replication cluster. However, if you are creating multiple replication clusters, you must also set the path and ensure that the directory is available.
JOIN CLUSTER c2 at '127.0.0.1:10201' 'c2' as path
A node joins a cluster by obtaining data from a specified node and, if successful, updates the node lists across all other cluster nodes in the same way as if it was done manually through ALTER CLUSTER ... UPDATE nodes. This list is used to re-join nodes to the cluster upon restart.
There are two lists of nodes:
1.cluster_<name>_nodes_set
: used to re-join nodes to the cluster upon restart. It is updated across all nodes in the same way as ALTER CLUSTER ... UPDATE nodes does. JOIN CLUSTER
command performs this update automatically. The Cluster status displays this list as cluster_<name>_nodes_set
.
2. cluster_<name>_nodes_view
: this list contains all active nodes used for replication and does not require manual management. ALTER CLUSTER ... UPDATE nodes actually copies this list of nodes to the list of nodes used to re-join upon restart. The Cluster status displays this list as cluster_<name>_nodes_view
.
When nodes are located in different network segments or data centers, the nodes option may be set explicitly. This minimizes traffic between nodes and utilizes gateway nodes for intercommunication between data centers. The following code joins an existing cluster using the nodes option.
Note: The cluster
cluster_<name>_nodes_set
list is not updated automatically when this syntax is used. To update it, use ALTER CLUSTER ... UPDATE nodes.
JOIN CLUSTER click_query 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312' as nodes
POST /cli -d "
JOIN CLUSTER click_query 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312' as nodes
"
$params = [
'cluster' => 'posts',
'body' => [
'nodes' => 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312'
]
];
$response = $client->cluster->join($params);
utilsApi.sql('JOIN CLUSTER click_query \'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312\' as nodes')
{u'error': u'', u'total': 0, u'warning': u''}
res = await utilsApi.sql('JOIN CLUSTER click_query \'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312\' as nodes');
{"total":0,"error":"","warning":""}
utilsApi.sql("JOIN CLUSTER click_query 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312' as nodes");
utilsApi.Sql("JOIN CLUSTER click_query 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312' as nodes");
The JOIN CLUSTER
command works synchronously and completes as soon as the node receives all data from the other nodes in the cluster and is in sync with them.
The DELETE CLUSTER
statement removes the specified cluster with its name. Once the cluster is deleted, it is removed from all nodes, but its tables remain intact and become active local non-replicated tables.
DELETE CLUSTER click_query
POST /cli -d "DELETE CLUSTER click_query"
$params = [
'cluster' => 'click_query',
'body' => []
];
$response = $client->cluster()->delete($params);
utilsApi.sql('DELETE CLUSTER click_query')
{u'error': u'', u'total': 0, u'warning': u''}
res = await utilsApi.sql('DELETE CLUSTER click_query');
{"total":0,"error":"","warning":""}
utilsApi.sql("DELETE CLUSTER click_query");
utilsApi.Sql("DELETE CLUSTER click_query");
ALTER CLUSTER <cluster_name> ADD <table_name>
adds an existing local table to the cluster. The node that receives the ALTER query sends the table to the other nodes in the cluster. All the local tables with the same name on the other nodes of the cluster are replaced with the new table.
Once the table is replicated, write statements can be performed on any node, but the table name must be prefixed with the cluster name, like INSERT INTO <clusterName>:<table_name>
.
ALTER CLUSTER click_query ADD clicks_daily_index
POST /cli -d "
ALTER CLUSTER click_query ADD clicks_daily_index
"
$params = [
'cluster' => 'click_query',
'body' => [
'operation' => 'add',
'index' => 'clicks_daily_index'
]
];
$response = $client->cluster()->alter($params);
utilsApi.sql('ALTER CLUSTER click_query ADD clicks_daily_index')
{u'error': u'', u'total': 0, u'warning': u''}
res = await utilsApi.sql('ALTER CLUSTER click_query ADD clicks_daily_index');
{"total":0,"error":"","warning":""}
utilsApi.sql("ALTER CLUSTER click_query ADD clicks_daily_index");
utilsApi.Sql("ALTER CLUSTER click_query ADD clicks_daily_index");
ALTER CLUSTER <cluster_name> DROP <table_name>
forgets about a local table, meaning it does not remove the table files on the nodes, but rather just makes it an inactive, non-replicated table.
Once a table is removed from a cluster, it becomes a local
table, and write statements must use just the table name, like INSERT INTO <table_name>
, without the cluster prefix.
ALTER CLUSTER posts DROP weekly_index
POST /cli -d "
ALTER CLUSTER posts DROP weekly_index
"
$params = [
'cluster' => 'posts',
'body' => [
'operation' => 'drop',
'index' => 'weekly_index'
]
];
$response = $client->cluster->alter($params);
utilsApi.sql('ALTER CLUSTER posts DROP weekly_index')
{u'error': u'', u'total': 0, u'warning': u''}
res = await utilsApi.sql('ALTER CLUSTER posts DROP weekly_index');
{"total":0,"error":"","warning":""}
utilsApi.sql("ALTER CLUSTER posts DROP weekly_index");
utilsApi.Sql("ALTER CLUSTER posts DROP weekly_index");
The ALTER CLUSTER <cluster_name> UPDATE nodes
statement updates the node lists on each node within the specified cluster to include all active nodes in the cluster. For more information on node lists, see Joining a cluster.
ALTER CLUSTER posts UPDATE nodes
POST /cli -d "
ALTER CLUSTER posts UPDATE nodes
"
$params = [
'cluster' => 'posts',
'body' => [
'operation' => 'update',
]
];
$response = $client->cluster()->alter($params);
utilsApi.sql('ALTER CLUSTER posts UPDATE nodes')
{u'error': u'', u'total': 0, u'warning': u''}
res = await utilsApi.sql('ALTER CLUSTER posts UPDATE nodes');
{"total":0,"error":"","warning":""}
utilsApi.sql("ALTER CLUSTER posts UPDATE nodes");
utilsApi.Sql("ALTER CLUSTER posts UPDATE nodes");
For instance, when the cluster was initially established, the list of nodes used to rejoin the cluster was 10.10.0.1:9312,10.10.1.1:9312
. Since then, other nodes joined the cluster and now the active nodes are 10.10.0.1:9312,10.10.1.1:9312,10.15.0.1:9312,10.15.0.3:9312
.However, the list of nodes used to rejoin the cluster has not been updated.
To rectify this, you can run the ALTER CLUSTER ... UPDATE nodes
statement to copy the list of active nodes to the list of nodes used to rejoin the cluster. After this, the list of nodes used to rejoin the cluster will include all the active nodes in the cluster.
Both lists of nodes can be viewed using the Cluster status statement (cluster_post_nodes_set
and cluster_post_nodes_view
).
To remove a node from the replication cluster, follow these steps:
1. Stop the node
2. Remove the information about the cluster from <data_dir>/manticore.json
(usually /var/lib/manticore/manticore.json
) on the node that has been stopped.
3. Run ALTER CLUSTER cluster_name UPDATE nodes
on any other node.
After these steps, the other nodes will forget about the detached node and the detached node will forget about the cluster. This action will not impact the tables in the cluster or on the detached node.
You can view the cluster status information by checking the node status. This can be done using the Node status command, which displays various information about the node, including the cluster status variables.
The output format for the cluster status variables is as follows: cluster_name_variable_name
variable_value
. Most of the variables are described in the Galera Documentation Status Variables. In addition to these variables, Manticore Search also displays:
closed
, destroyed
, joining
, donor
, synced
CREATE
, JOIN
or ALTER UPDATE
commandsSHOW STATUS
+----------------------------+-------------------------------------------------------------------------------------+
| Counter | Value |
+----------------------------+-------------------------------------------------------------------------------------+
| cluster_name | post |
| cluster_post_state_uuid | fba97c45-36df-11e9-a84e-eb09d14b8ea7 |
| cluster_post_conf_id | 1 |
| cluster_post_status | primary |
| cluster_post_size | 5 |
| cluster_post_local_index | 0 |
| cluster_post_node_state | synced |
| cluster_post_indexes_count | 2 |
| cluster_post_indexes | pq1,pq_posts |
| cluster_post_nodes_set | 10.10.0.1:9312 |
| cluster_post_nodes_view | 10.10.0.1:9312,10.10.0.1:9320:replication,10.10.1.1:9312,10.10.1.1:9320:replication |
POST /cli -d "
SHOW STATUS
"
"
{"columns":[{"Counter":{"type":"string"}},{"Value":{"type":"string"}}],
"data":[
{"Counter":"cluster_name", "Value":"post"},
{"Counter":"cluster_post_state_uuid", "Value":"fba97c45-36df-11e9-a84e-eb09d14b8ea7"},
{"Counter":"cluster_post_conf_id", "Value":"1"},
{"Counter":"cluster_post_status", "Value":"primary"},
{"Counter":"cluster_post_size", "Value":"5"},
{"Counter":"cluster_post_local_index", "Value":"0"},
{"Counter":"cluster_post_node_state", "Value":"synced"},
{"Counter":"cluster_post_indexes_count", "Value":"2"},
{"Counter":"cluster_post_indexes", "Value":"pq1,pq_posts"},
{"Counter":"cluster_post_nodes_set", "Value":"10.10.0.1:9312"},
{"Counter":"cluster_post_nodes_view", "Value":"10.10.0.1:9312,10.10.0.1:9320:replication,10.10.1.1:9312,10.10.1.1:9320:replication"}
],
"total":0,
"error":"",
"warning":""
}
$params = [
'body' => []
];
$response = $client->nodes()->status($params);
(
"cluster_name" => "post",
"cluster_post_state_uuid" => "fba97c45-36df-11e9-a84e-eb09d14b8ea7",
"cluster_post_conf_id" => 1,
"cluster_post_status" => "primary",
"cluster_post_size" => 5,
"cluster_post_local_index" => 0,
"cluster_post_node_state" => "synced",
"cluster_post_indexes_count" => 2,
"cluster_post_indexes" => "pq1,pq_posts",
"cluster_post_nodes_set" => "10.10.0.1:9312",
"cluster_post_nodes_view" => "10.10.0.1:9312,10.10.0.1:9320:replication,10.10.1.1:9312,10.10.1.1:9320:replication"
)
utilsApi.sql('SHOW STATUS')
{u'columns': [{u'Key': {u'type': u'string'}},
{u'Value': {u'type': u'string'}}],
u'data': [
{u'Key': u'cluster_name', u'Value': u'post'},
{u'Key': u'cluster_post_state_uuid', u'Value': u'fba97c45-36df-11e9-a84e-eb09d14b8ea7'},
{u'Key': u'cluster_post_conf_id', u'Value': u'1'},
{u'Key': u'cluster_post_status', u'Value': u'primary'},
{u'Key': u'cluster_post_size', u'Value': u'5'},
{u'Key': u'cluster_post_local_index', u'Value': u'0'},
{u'Key': u'cluster_post_node_state', u'Value': u'synced'},
{u'Key': u'cluster_post_indexes_count', u'Value': u'2'},
{u'Key': u'cluster_post_indexes', u'Value': u'pq1,pq_posts'},
{u'Key': u'cluster_post_nodes_set', u'Value': u'10.10.0.1:9312'},
{u'Key': u'cluster_post_nodes_view', u'Value': u'10.10.0.1:9312,10.10.0.1:9320:replication,10.10.1.1:9312,10.10.1.1:9320:replication'}],
u'error': u'',
u'total': 0,
u'warning': u''}
res = await utilsApi.sql('SHOW STATUS');
{"columns": [{"Key": {"type": "string"}},
{"Value": {"type": "string"}}],
"data": [
{"Key": "cluster_name", "Value": "post"},
{"Key": "cluster_post_state_uuid", "Value": "fba97c45-36df-11e9-a84e-eb09d14b8ea7"},
{"Key": "cluster_post_conf_id", "Value": "1"},
{"Key": "cluster_post_status", "Value": "primary"},
{"Key": "cluster_post_size", "Value": "5"},
{"Key": "cluster_post_local_index", "Value": "0"},
{"Key": "cluster_post_node_state", "Value": "synced"},
{"Key": "cluster_post_indexes_count", "Value": "2"},
{"Key": "cluster_post_indexes", "Value": "pq1,pq_posts"},
{"Key": "cluster_post_nodes_set", "Value": "10.10.0.1:9312"},
{"Key": "cluster_post_nodes_view", "Value": "10.10.0.1:9312,10.10.0.1:9320:replication,10.10.1.1:9312,10.10.1.1:9320:replication"}],
"error": "",
"total": 0,
"warning": ""}
utilsApi.sql("SHOW STATUS");
{columns=[{ Key : { type=string }},
{ Value : { type=string }}],
data : [
{ Key=cluster_name, Value=post},
{ Key=cluster_post_state_uuid, Value=fba97c45-36df-11e9-a84e-eb09d14b8ea7},
{ Key=cluster_post_conf_id, Value=1},
{ Key=cluster_post_status, Value=primary},
{ Key=cluster_post_size, Value=5},
{ Key=cluster_post_local_index, Value=0},
{ Key=cluster_post_node_state, Value=synced},
{ Key=cluster_post_indexes_count, Value=2},
{ Key=cluster_post_indexes, Value=pq1,pq_posts},
{ Key=cluster_post_nodes_set, Value=10.10.0.1:9312},
{ Key=cluster_post_nodes_view, Value=10.10.0.1:9312,10.10.0.1:9320:replication,10.10.1.1:9312,10.10.1.1:9320:replication}],
error= ,
total=0,
warning= }
utilsApi.sql("SHOW STATUS");
{columns=[{ Key : { type=String }},
{ Value : { type=String }}],
data : [
{ Key=cluster_name, Value=post},
{ Key=cluster_post_state_uuid, Value=fba97c45-36df-11e9-a84e-eb09d14b8ea7},
{ Key=cluster_post_conf_id, Value=1},
{ Key=cluster_post_status, Value=primary},
{ Key=cluster_post_size, Value=5},
{ Key=cluster_post_local_index, Value=0},
{ Key=cluster_post_node_state, Value=synced},
{ Key=cluster_post_indexes_count, Value=2},
{ Key=cluster_post_indexes, Value=pq1,pq_posts},
{ Key=cluster_post_nodes_set, Value=10.10.0.1:9312},
{ Key=cluster_post_nodes_view, Value=10.10.0.1:9312,10.10.0.1:9320:replication,10.10.1.1:9312,10.10.1.1:9320:replication}],
error="" ,
total=0,
warning="" }
In a multi-master replication cluster, a reference point must be established before other nodes can join and form the cluster. This is called cluster bootstrapping and involves starting a single node as the primary component
. Restarting a single node or reconnecting after a shutdown can be done normally.
In case of a full cluster shutdown, the server that was stopped last should be started first with the --new-cluster
command line option or by running manticore_new_cluster
through systemd. To ensure that the server is capable of being the reference point, the grastate.dat
file located at the cluster path should be updated with a value of 1 for the safe_to_bootstrap
option. Both conditions, --new-cluster
and safe_to_bootstrap=1
, must be met. If any other node is started without these options set, an error will occur. The --new-cluster-force
command line option can be used to override this protection and start the cluster from another server forcibly. Alternatively, you can run manticore_new_cluster --force
to use systemd.
In the event of a hard crash or an unclean shutdown of all servers in the cluster, the most advanced node with the largest seqno
in the grastate.dat
file located at the cluster path must be identified and started with the --new-cluster-force
command line key.
In the event that the Manticore search daemon stops with no remaining nodes in the cluster to serve requests, recovery is necessary. Due to the multi-master nature of the Galera library used for replication, Manticore replication cluster is a single logical entity that maintains the consistency of its nodes and data, and the status of the entire cluster. This allows for safe writes on multiple nodes simultaneously and ensures the integrity of the cluster.
However, this also poses challenges. Let's examine several scenarios, using a cluster of nodes A, B, and C, to see what needs to be done when some or all nodes become unavailable.
When node A is stopped, the other nodes receive a "normal shutdown" message. The cluster size is reduced, and a quorum re-calculation takes place.
Upon starting node A, it joins the cluster and will not serve any write transactions until it is fully synchronized with the cluster. If the writeset cache on donor nodes B or C (which can be controlled with the Galera cluster's gcache.size) still contains all of the transactions missed at node A, node A will receive a fast incremental state transfer (IST), that is, a transfer of only missed transactions. If not, a snapshot state transfer (SST) will occur, which involves the transfer of table files.
In the scenario where nodes A and B are stopped, the cluster size is reduced to one, with node C forming the primary component to handle write transactions.
Nodes A and B can then be started as usual and will join the cluster after start-up. Node C acts as the donor, providing the state transfer to nodes A and B.
All nodes are stopped as usual and the cluster is off.
The problem now is how to initialize the cluster. It's important that on a clean shutdown of searchd the nodes write the number of last executed transaction into the cluster directory grastate.dat file along with flag safe_to_bootstrap
. The node which was stopped last will have option safe_to_bootstrap: 1
and the most advanced seqno
number.
It is important that this node starts first to form the cluster. To bootstrap a cluster the server should be started on this node with flag --new-cluster. On Linux you can also run manticore_new_cluster
which will start Manticore in --new-cluster
mode via systemd.
If another node starts first and bootstraps the cluster, then the most advanced node joins that cluster, performs full SST and receives a table file where some transactions are missed in comparison with the table files it got before. That is why it is important to start first the node which was shut down last, it should have flag safe_to_bootstrap: 1
in grastate.dat.
In the event of a crash or network failure causing Node A to disappear from the cluster, nodes B and C will attempt to reconnect with Node A. Upon failure, they will remove Node A from the cluster. With two out of the three nodes still running, the cluster maintains its quorum and continues to operate normally.
When Node A is restarted, it will join the cluster automatically, as outlined in Case 1.
Nodes A and B have gone offline. Node C is unable to form a quorum on its own as 1 node is less than half of the total nodes (3). As a result, the cluster on node C is shifted to a non-primary state and rejects any write transactions with an error message.
Meanwhile, node C waits for the other nodes to connect and also tries to connect to them. If this happens, and the network is restored and nodes A and B are back online, the cluster will automatically reform. If nodes A and B are just temporarily disconnected from node C but can still communicate with each other, they will continue to operate as normal, as they still form the quorum.
However, if both nodes A and B have crashed or restarted due to a power failure, someone must activate the primary component on node C using the following command:
SET CLUSTER posts GLOBAL 'pc.bootstrap' = 1
POST /cli -d "
SET CLUSTER posts GLOBAL 'pc.bootstrap' = 1
"
t's important to note that before executing this command, you must confirm that the other nodes are truly unreachable. Otherwise, a split-brain scenario may occur and separate clusters may form.
All nodes have crashed. In this situation, the grastate.dat file in the cluster directory has not been updated and does not contain a valid seqno
sequence number.
If this occurs, someone needs to locate the node with the most recent data and start the server on it using the --new-cluster-force command line key. All other nodes will start as normal, as described in Case 3).
On Linux, you can also use the manticore_new_cluster --force
, command, which will start Manticore in --new-cluster-force
mode via systemd.
Split-brain can cause the cluster to transition into a non-primary state. For example, consider a cluster comprised of an even number of nodes (four), such as two pairs of nodes located in different data centers. If a network failure interrupts the connection between the data centers, split-brain occurs as each group of nodes holds exactly half of the quorum. As a result, both groups stop handling write transactions, since the Galera replication model prioritizes data consistency, and the cluster cannot accept write transactions without a quorum. However, nodes in both groups attempt to reconnect with the nodes from the other group in an effort to restore the cluster.
If someone wants to restore the cluster before the network is restored, the same steps outlined in Case 5 hould be taken, but only at one group of nodes.
After the statement is executed, the group with the node that it was run on will be able to handle write transactions once again.
SET CLUSTER posts GLOBAL 'pc.bootstrap' = 1
POST /cli -d "
SET CLUSTER posts GLOBAL 'pc.bootstrap' = 1
"
However, it's important to note that if the statement is issued at both groups, it will result in the formation of two separate clusters, and the subsequent network recovery will not result in the groups rejoining.