¶ 10

Manticore cluster

Manticore Search is a highly distributed system that provides all the necessary components to create a highly available and scalable database for search. This includes:

Manticore Search offers great flexibility in terms of how you set up your cluster. There are no limitations, so it's up to you to design your cluster according to your needs. Simply learn about the tools mentioned above and use them to achieve your desired goal.

¶ 10.1

Adding a new node

To add a new node to a cluster, simply start another instance of Manticore and ensure that it is accessible by the other nodes in the cluster. Connect the new node to the rest of the cluster using a distributed table and ensure data safety with replication.

¶ 10.2

Adding a distributed table with remote agents

To understand how to add a distributed table with remote agents, it is important to first have a basic understanding of distributed tables In this article, we will focus on how to use a distributed table as the basis for creating a cluster of Manticore instances.

Here is an example of how to split data over 4 servers, each serving one of the shards:

ini
table mydist {
          type  = distributed
          agent = box1:9312:shard1
          agent = box2:9312:shard2
          agent = box3:9312:shard3
          agent = box4:9312:shard4
}

In the event of a server failure, the distributed table will still work, but the results from the failed shard will be missing.

Now that we've added mirrors, each shard is found on 2 servers. By default, the master (the searchd instance with the distributed table) will randomly pick one of the mirrors.

The mode used for picking mirrors can be set using the ha_strategy setting. In addition to the default random mode there's also ha_strategy = roundrobin.

More advanced strategies based on latency-weighted probabilities include noerrors and nodeads. These not only take out mirrors with issues but also monitor response times and do balancing. If a mirror responds slower (for example, due to some operations running on it), it will receive fewer requests. When the mirror recovers and provides better times, it will receive more requests.

ini
table mydist {
          type  = distributed
          agent = box1:9312|box5:9312:shard1
          agent = box2:9312:|box6:9312:shard2
          agent = box3:9312:|box7:9312:shard3
          agent = box4:9312:|box8:9312:shard4
}
¶ 10.2.1

Mirroring

Agent mirrors can be used interchangeably when processing a search query. The Manticore instance(s) hosting the distributed table where the mirrored agents are defined keeps track of mirror status (alive or dead) and response times, and performs automatic failover and load balancing based on this information.

Agent mirrors

agent = node1|node2|node3:9312:shard2

The above example declares that node1:9312, node2:9312, and node3:9312 all have a table called shard2, and can be used as interchangeable mirrors. If any of these servers go down, the queries will be distributed between the remaining two. When the server comes back online, the master will detect it and begin routing queries to all three nodes again.

A mirror may also include an individual table list, as follows:

agent = node1:9312:node1shard2|node2:9312:node2shard2

This works similarly to the previous example, but different table names will be used when querying different servers. For example, node1shard2 will be used when querying node1:9312, and node2shard will be used when querying node2:9312.

By default, all queries are routed to the best of the mirrors. The best mirror is selected based on recent statistics, as controlled by the ha_period_karma config directive. The master stores metrics (total query count, error count, response time, etc.) for each agent and groups these by time spans. The karma is the length of the time span. The best agent mirror is then determined dynamically based on the last two such time spans. The specific algorithm used to pick a mirror can be configured with the ha_strategy directive.

The karma period is in seconds and defaults to 60 seconds. The master stores up to 15 karma spans with per-agent statistics for instrumentation purposes (see SHOW AGENT STATUS statement). However, only the last two spans out of these are used for HA/LB logic.

When there are no queries, the master sends a regular ping command every ha_ping_interval milliseconds in order to collect statistics and check if the remote host is still alive. The ha_ping_interval defaults to 1000 msec. Setting it to 0 disables pings, and statistics will only be accumulated based on actual queries.

Example:

# sharding table over 4 servers total
# in just 2 shards but with 2 failover mirrors for each shard
# node1, node2 carry shard1 as local
# node3, node4 carry shard2 as local

# config on node1, node2
agent = node3:9312|node4:9312:shard2

# config on node3, node4
agent = node1:9312|node2:9312:shard1
¶ 10.2.2

Load balancing

Load balancing is turned on by default for any distributed table that uses mirroring. By default, queries are distributed randomly among the mirrors. You can change this behavior by using the ha_strategy.

ha_strategy

ha_strategy = {random|nodeads|noerrors|roundrobin}

The mirror selection strategy for load balancing is optional and is set to random by default.

The strategy used for mirror selection, or in other words, choosing a specific agent mirror in a distributed table, is controlled by this directive. Essentially, this directive controls how master performs the load balancing between the configured mirror agent nodes. The following strategies are implemented:

Simple random balancing

The default balancing mode is simple linear random distribution among the mirrors. This means that equal selection probabilities are assigned to each mirror. This is similar to round-robin (RR), but does not impose a strict selection order.

Example
ha_strategy = random

Adaptive randomized balancing

The default simple random strategy does not take into account the status of mirrors, error rates, and most importantly, actual response latencies. To address heterogeneous clusters and temporary spikes in agent node load, there are a group of balancing strategies that dynamically adjust the probabilities based on the actual query latencies observed by the master.

The adaptive strategies based on latency-weighted probabilities work as follows:

  1. Latency stats are accumulated in blocks of ha_period_karma seconds.
  2. Latency-weighted probabilities are recomputed once per karma period.
  3. The "dead or alive" flag is adjusted once per request, including ping requests.

Initially, the probabilities are equal. On every step, they are scaled by the inverse of the latencies observed during the last karma period, and then renormalized. For example, if during the first 60 seconds after the master startup, 4 mirrors had latencies of 10 ms, 5 ms, 30 ms, and 3 ms respectively, the first adjustment step would go as follows:

  1. Initial percentages: 0.25, 0.25, 0.25, 0.25.
  2. Observed latencies: 10 ms, 5 ms, 30 ms, 3 ms.
  3. Inverse latencies: 0.1, 0.2, 0.0333, 0.333.
  4. Scaled percentages: 0.025, 0.05, 0.008333, 0.0833.
  5. Renormalized percentages: 0.15, 0.30, 0.05, 0.50.

This means that the first mirror would have a 15% chance of being chosen during the next karma period, the second one a 30% chance, the third one (slowest at 30 ms) only a 5% chance, and the fourth and fastest one (at 3 ms) a 50% chance. After that period, the second adjustment step would update those chances again, and so on.

The idea is that once the observed latencies stabilize, the latency weighted probabilities will stabilize as well. All these adjustment iterations are meant to converge at a point where the average latencies are roughly equal across all mirrors.

nodeads

Latency-weighted probabilities, but dead mirrors are excluded from the selection. A "dead" mirror is defined as a mirror that has resulted in multiple hard errors (e.g. network failure, or no answer, etc) in a row.

Example
ha_strategy = nodeads

noerrors

Latency-weighted probabilities, but mirrors with a worse error/success ratio are excluded from selection.

Example
ha_strategy = noerrors

Round-robin balancing

Simple round-robin selection, that is, selecting the first mirror in the list, then the second one, then the third one, etc, and then repeating the process once the last mirror in the list is reached. Unlike with the randomized strategies, RR imposes a strict querying order (1, 2, 3, ..., N-1, N, 1, 2, 3, ..., and so on) and guarantees that no two consecutive queries will be sent to the same mirror.

Example
ha_strategy = roundrobin

Instance-wide options

ha_period_karma

ha_period_karma = 2m

ha_period_karma defines the size of the agent mirror statistics window, in seconds (or a time suffix). Optional, the default is 60.

For a distributed table with agent mirrors, the server tracks several different per-mirror counters. These counters are then used for failover and balancing. (The server picks the best mirror to use based on the counters.) Counters are accumulated in blocks of ha_period_karma seconds.

After beginning a new block, the master may still use the accumulated values from the previous one until the new one is half full. Thus, any previous history stops affecting the mirror choice after at most 1.5 times ha_period_karma seconds.

Although at most 2 blocks are used for mirror selection, up to 15 last blocks are actually stored for instrumentation purposes. They can be inspected using the SHOW AGENT STATUS statement.

ha_ping_interval

ha_ping_interval = 3s

ha_ping_interval directive defines the interval between pings sent to the agent mirrors, in milliseconds (or with a time suffix). This option is optional and its default value is 1000.

For a distributed table with agent mirrors, the server sends all mirrors a ping command during idle periods to track their current status (whether they are alive or dead, network roundtrip time, etc.). The interval between pings is determined by the ha_ping_interval setting.

If you want to disable pings, set ha_ping_interval to 0.

¶ 10.3

Setting up replication

With Manticore, write transactions (such as INSERT, REPLACE, DELETE, TRUNCATE, UPDATE, COMMIT) can be replicated to other cluster nodes before the transaction is fully applied on the current node. Currently, replication is supported for percolate, rt and distributed tables in Linux and macOS. However, Manticore Search packages for Windows do not provide replication support.

Manticore's replication is powered by the Galera library and boasts several impressive features:

To set up replication in Manticore Search:

If there is no replication listen directive set, Manticore will use the first two free ports in the range of 200 ports after the default protocol listening port for each created cluster. To set replication ports manually, the listen directive (of replication type) port range must be defined and the address/port range pairs must not intersect between different nodes on the same server. As a rule of thumb, the port range should specify at least two ports per cluster.

Replication cluster

A replication cluster is a group of nodes in which a write transaction is replicated. Replication is set up on a per-table basis, meaning that one table can only belong to one cluster. There is no limit on the number of tables that a cluster can have. All transactions such as INSERT, REPLACE, DELETE, TRUNCATE on any percolate or real-time table that belongs to a cluster are replicated to all the other nodes in that cluster. Distributed tables can also be part of the replication process. Replication is multi-master, so writes to any node or multiple nodes simultaneously will work just as well.

To create a cluster, you can typically use the command create cluster with CREATE CLUSTER <cluster name>, and to join a cluster, you can use join cluster with JOIN CLUSTER <cluster name> at 'host:port'. However, in some rare cases, you may want to fine-tune the behavior of CREATE/JOIN CLUSTER. The available options are:

name

This option specifies the name of the cluster. It should be unique among all the clusters in the system.

Note: The maximum allowable hostname length for the JOIN command is 253 characters. If you exceed this limit, searchd will generate an error.

path

The path option specifies the data directory for write-set cache replication and incoming tables from other nodes. This value should be unique among all the clusters in the system and should be specified as a relative path to the data_dir. directory. By default, it is set to the value of data_dir.

nodes

The nodes option is a list of address:port pairs for all the nodes in the cluster, separated by commas. This list should be obtained using the node's API interface and can include the address of the current node as well. It is used to join the node to the cluster and to rejoin it after a restart.

options

The options option allows you to pass additional options directly to the Galera replication plugin, as described in the Galera Documentation Parameters

Write statements

When working with a replication cluster, all write statements such as INSERT, REPLACE, DELETE, TRUNCATE, UPDATE that modify the content of a cluster's table must use thecluster_name:index_name expression instead of the table name. This ensures that the changes are propagated to all replicas in the cluster. If the correct expression is not used, an error will be triggered.

In the JSON interface, the cluster property must be set along with the table name for all write statements to a cluster's table. Failure to set the cluster property will result in an error.

The Auto ID for a table in a cluster should be valid as long as the server_id is correctly configured.

SQL
INSERT INTO posts:weekly_index VALUES ( 'iphone case' )
TRUNCATE RTINDEX click_query:weekly_index
UPDATE INTO posts:rt_tags SET tags=(101, 302, 304) WHERE MATCH ('use') AND id IN (1,101,201)
DELETE FROM clicks:rt WHERE MATCH ('dumy') AND gid>206
JSON
POST /insert -d '
{
  "cluster":"posts",
  "index":"weekly_index",
  "doc":
  {
    "title" : "iphone case",
    "price" : 19.85
  }
}'
POST /delete -d '
{
  "cluster":"posts",
  "index": "weekly_index",
  "id":1
}'
PHP
$index->addDocuments([
        1, ['title' => 'iphone case', 'price' => 19.85]
]);
$index->deleteDocument(1);
Python
indexApi.insert({"cluster":"posts","index":"weekly_index","doc":{"title":"iphone case","price":19.85}})
indexApi.delete({"cluster":"posts","index":"weekly_index","id":1})
Javascript
res = await indexApi.insert({"cluster":"posts","index":"weekly_index","doc":{"title":"iphone case","price":19.85}});
 res = await indexApi.delete({"cluster":"posts","index":"weekly_index","id":1});
Java
InsertDocumentRequest newdoc = new InsertDocumentRequest();
HashMap<String,Object> doc = new HashMap<String,Object>(){{
    put("title","Crossbody Bag with Tassel");
    put("price",19.85);
}};
newdoc.index("weekly_index").cluster("posts").id(1L).setDoc(doc);
sqlresult = indexApi.insert(newdoc);

DeleteDocumentRequest deleteRequest = new DeleteDocumentRequest();
deleteRequest.index("weekly_index").cluster("posts").setId(1L);
indexApi.delete(deleteRequest);
C#
Dictionary<string, Object> doc = new Dictionary<string, Object>();
doc.Add("title", "Crossbody Bag with Tassel");
doc.Add("price", 19.85);
InsertDocumentRequest newdoc = new InsertDocumentRequest(index: "weekly_index", cluster:posts, id: 1, doc: doc);
var sqlresult = indexApi.Insert(newdoc);

DeleteDocumentRequest deleteDocumentRequest = new DeleteDocumentRequest(index: "weekly_index", cluster: "posts", id: 1);
indexApi.Delete(deleteDocumentRequest);

Read statements

Read statements such as SELECT, CALL PQ, DESCRIBE can either use regular table names that are not prepended with a cluster name, or they can use the cluster_name:index_nameformat. If the latter is used, the cluster_name component is ignored.

When using the HTTP endpoint json/search, the cluster property can be specified if desired, but it can also be omitted.

SQL
SELECT * FROM weekly_index
CALL PQ('posts:weekly_index', 'document is here')
JSON
POST /search -d '
{
  "cluster":"posts",
  "index":"weekly_index",
  "query":{"match":{"title":"keyword"}}
}'
POST /search -d '
{
  "index":"weekly_index",
  "query":{"match":{"title":"keyword"}}
}'

Cluster parameters

Replication plugin options can be adjusted using the SET statement.

A list of available options can be found in the Galera Documentation Parameters .

SQL
SET CLUSTER click_query GLOBAL 'pc.bootstrap' = 1
JSON
POST /cli -d "
SET CLUSTER click_query GLOBAL 'pc.bootstrap' = 1
"

Cluster with diverged nodes

It's possible for replicated nodes to diverge from one another, leading to a state where all nodes are labeled as non-primary. This can occur as a result of a network split between nodes, a cluster crash, or if the replication plugin experiences an exception when determining the primary component. In such a scenario, it's necessary to select a node and promote it to the role of primary component.

To identify the node that needs to be promoted, you should compare the last_committed cluster status variable value on all nodes. If all the servers are currently running, there's no need to restart the cluster. Instead, you can simply promote the node with the highest last_committed value to the primary component using the SET statement (as demonstrated in the example).

The other nodes will then reconnect to the primary component and resynchronize their data based on this node.

SQL
SET CLUSTER posts GLOBAL 'pc.bootstrap' = 1
JSON
POST /cli -d "
SET CLUSTER posts GLOBAL 'pc.bootstrap' = 1
"

Replication and cluster

To use replication, you need to define one listen port for SphinxAPI protocol and one listen for replication address and port range in the configuration file. Also, specify the data_dir folder to receive incoming tables.

ini
searchd {
  listen   = 9312
  listen   = 192.168.1.101:9360-9370:replication
  data_dir = /var/lib/manticore/
  ...
 }

To replicate tables, you must create a cluster on the server that has the local tables to be replicated.

SQL
CREATE CLUSTER posts
JSON
POST /cli -d "
CREATE CLUSTER posts
"
PHP
$params = [
    'cluster' => 'posts'
    ]
];
$response = $client->cluster()->create($params);
Python
utilsApi.sql('CREATE CLUSTER posts')
Javascript
res = await utilsApi.sql('CREATE CLUSTER posts');
Java
utilsApi.sql("CREATE CLUSTER posts");
C#
utilsApi.Sql("CREATE CLUSTER posts");

Add these local tables to the cluster

SQL
ALTER CLUSTER posts ADD pq_title
ALTER CLUSTER posts ADD pq_clicks
JSON
POST /cli -d "
ALTER CLUSTER posts ADD pq_title
"
POST /cli -d "
ALTER CLUSTER posts ADD pq_clicks
"
PHP
$params = [
  'cluster' => 'posts',
  'body' => [
     'operation' => 'add',
     'index' => 'pq_title'

  ]
];
$response = $client->cluster()->alter($params);
$params = [
  'cluster' => 'posts',
  'body' => [
     'operation' => 'add',
     'index' => 'pq_clicks'

  ]
];
$response = $client->cluster()->alter($params);   
Python
utilsApi.sql('ALTER CLUSTER posts ADD pq_title')
utilsApi.sql('ALTER CLUSTER posts ADD pq_clicks')
Javascript
res = await utilsApi.sql('ALTER CLUSTER posts ADD pq_title');
res = await utilsApi.sql('ALTER CLUSTER posts ADD pq_clicks');
Java
utilsApi.sql("ALTER CLUSTER posts ADD pq_title");
utilsApi.sql("ALTER CLUSTER posts ADD pq_clicks");
C#
utilsApi.Sql("ALTER CLUSTER posts ADD pq_title");
utilsApi.Sql("ALTER CLUSTER posts ADD pq_clicks");

All other nodes that wish to receive a replica of the cluster's tables should join the cluster as follows:

SQL
JOIN CLUSTER posts AT '192.168.1.101:9312'
JSON
POST /cli -d "
JOIN CLUSTER posts AT '192.168.1.101:9312'
"
PHP
$params = [
  'cluster' => 'posts',
  'body' => [
      '192.168.1.101:9312'
  ]
];
$response = $client->cluster->join($params);
Python
utilsApi.sql('JOIN CLUSTER posts AT \'192.168.1.101:9312\'')
Javascript
res = await utilsApi.sql('JOIN CLUSTER posts AT \'192.168.1.101:9312\'');
Java
utilsApi.sql("JOIN CLUSTER posts AT '192.168.1.101:9312'");
C#
utilsApi.Sql("JOIN CLUSTER posts AT '192.168.1.101:9312'");

When running queries, prepend the table name with the cluster name posts: or use the cluster property for HTTP request object.

SQL
INSERT INTO posts:pq_title VALUES ( 3, 'test me' )
JSON
POST /insert -d '
{
  "cluster":"posts",
  "index":"pq_title",
  "id": 3
  "doc":
  {
    "title" : "test me"
  }
}'
PHP
$index->addDocuments([
        3, ['title' => 'test me']
]);
Python
indexApi.insert({"cluster":"posts","index":"pq_title","id":3"doc":{"title":"test me"}})
Javascript
res = await indexApi.insert({"cluster":"posts","index":"pq_title","id":3"doc":{"title":"test me"}});
Java
InsertDocumentRequest newdoc = new InsertDocumentRequest();
HashMap<String,Object> doc = new HashMap<String,Object>(){{
    put("title","test me");
}};
newdoc.index("pq_title").cluster("posts").id(3L).setDoc(doc);
sqlresult = indexApi.insert(newdoc);
C#
Dictionary<string, Object> doc = new Dictionary<string, Object>();
doc.Add("title", "test me");
InsertDocumentRequest newdoc = new InsertDocumentRequest(index: "pq_title", cluster: "posts", id: 3, doc: doc);
var sqlresult = indexApi.Insert(newdoc);

All queries that modify tables in the cluster are now replicated to all nodes in the cluster.

¶ 10.3.1

Creating a replication cluster

To create a replication cluster, you must set its name at a minimum.

If you are creating a single cluster or the first cluster, you may omit the path option. In this case, the data_dir option will be used as the cluster path. However, for all subsequent clusters, you must specify the path and the path must be available. The nodes option may also be set to list all nodes in the cluster.

SQL
CREATE CLUSTER posts
CREATE CLUSTER click_query '/var/data/click_query/' as path
CREATE CLUSTER click_query '/var/data/click_query/' as path, 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312' as nodes
JSON
POST /cli -d "
CREATE CLUSTER posts
"
POST /cli -d "
CREATE CLUSTER click_query '/var/data/click_query/' as path
"
POST /cli -d "
CREATE CLUSTER click_query '/var/data/click_query/' as path, 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312' as nodes
"
PHP
$params = [
    'cluster' => 'posts',
    ]
];
$response = $client->cluster()->create($params);
$params = [
    'cluster' => 'click_query',
    'body' => [
        'path' => '/var/data/click_query/'
    ]    
    ]
];
$response = $client->cluster()->create($params);
$params = [
    'cluster' => 'click_query',
    'body' => [
        'path' => '/var/data/click_query/',
        'nodes' => 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312'
    ]    
    ]
];
$response = $client->cluster()->create($params);
Python
utilsApi.sql('CREATE CLUSTER posts')
utilsApi.sql('CREATE CLUSTER click_query \'/var/data/click_query/\' as path')
utilsApi.sql('CREATE CLUSTER click_query \'/var/data/click_query/\' as path, \'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312\' as nodes')
javascript
res = await utilsApi.sql('CREATE CLUSTER posts');
res = await utilsApi.sql('CREATE CLUSTER click_query \'/var/data/click_query/\' as path');
res = await utilsApi.sql('CREATE CLUSTER click_query \'/var/data/click_query/\' as path, \'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312\' as nodes');
Java
utilsApi.sql("CREATE CLUSTER posts");
utilsApi.sql("CREATE CLUSTER click_query '/var/data/click_query/' as path");
utilsApi.sql("CREATE CLUSTER click_query '/var/data/click_query/' as path, 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312' as nodes");
C#
utilsApi.Sql("CREATE CLUSTER posts");
utilsApi.Sql("CREATE CLUSTER click_query '/var/data/click_query/' as path");
utilsApi.Sql("CREATE CLUSTER click_query '/var/data/click_query/' as path, 'clicks_mirror1:9312,clicks_mirror2:9312,clicks_mirror3:9312' as nodes");

If the nodes option is not specified when creating a cluster, the first node that joins the cluster will be saved as the nodes option.

¶ 10.3.2

Joining a replication cluster

To join an existing cluster, you must specify at least:

SQL
JOIN CLUSTER posts AT '10.12.1.35:9312'
JSON
POST /cli -d "
JOIN CLUSTER posts AT '10.12.1.35:9312'
"
PHP
$params = [
  'cluster' => 'posts',
  'body' => [
      '10.12.1.35:9312'
  ]
];
$response = $client->cluster->join($params);
Python
utilsApi.sql('JOIN CLUSTER posts AT \'10.12.1.35:9312\'')
{u'error': u'', u'total': 0, u'warning': u''}
javascript
res = await utilsApi.sql('JOIN CLUSTER posts AT \'10.12.1.35:9312\'');
{"total":0,"error":"","warning":""}
Java
utilsApi.sql("JOIN CLUSTER posts AT '10.12.1.35:9312'");
C#
utilsApi.Sql("JOIN CLUSTER posts AT '10.12.1.35:9312'");

In most cases, the above is sufficient when there is a single replication cluster. However, if you are creating multiple replication clusters, you must also set the path and ensure that the directory is available.

SQL
JOIN CLUSTER c2 at '127.0.0.1:10201' 'c2' as path

A node joins a cluster by obtaining data from a specified node and, if successful, updates the node lists across all other cluster nodes in the same way as if it was done manually through ALTER CLUSTER ... UPDATE nodes. This list is used to re-join nodes to the cluster upon restart.

There are two lists of nodes:
1.cluster_<name>_nodes_set: used to re-join nodes to the cluster upon restart. It is updated across all nodes in the same way as ALTER CLUSTER ... UPDATE nodes does. JOIN CLUSTER command performs this update automatically. The Cluster status displays this list as cluster_<name>_nodes_set.
2. cluster_<name>_nodes_view: this list contains all active nodes used for replication and does not require manual management. ALTER CLUSTER ... UPDATE nodes actually copies this list of nodes to the list of nodes used to re-join upon restart. The Cluster status displays this list as cluster_<name>_nodes_view.

When nodes are located in different network segments or data centers, the nodes option may be set explicitly. This minimizes traffic between nodes and utilizes gateway nodes for intercommunication between data centers. The following code joins an existing cluster using the nodes option.

Note: The cluster cluster_<name>_nodes_set list is not updated automatically when this syntax is used. To update it, use ALTER CLUSTER ... UPDATE nodes.

SQL
JOIN CLUSTER click_query 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312' as nodes
JSON
POST /cli -d "
JOIN CLUSTER click_query 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312' as nodes
"
PHP
$params = [
  'cluster' => 'posts',
  'body' => [
      'nodes' => 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312'
  ]
];
$response = $client->cluster->join($params);
Python
utilsApi.sql('JOIN CLUSTER click_query \'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312\' as nodes')
{u'error': u'', u'total': 0, u'warning': u''}
javascript
res = await utilsApi.sql('JOIN CLUSTER click_query \'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312\' as nodes');
{"total":0,"error":"","warning":""}
Java
utilsApi.sql("JOIN CLUSTER click_query 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312' as nodes");
C#
utilsApi.Sql("JOIN CLUSTER click_query 'clicks_mirror1:9312;clicks_mirror2:9312;clicks_mirror3:9312' as nodes");

The JOIN CLUSTER command works synchronously and completes as soon as the node receives all data from the other nodes in the cluster and is in sync with them.

¶ 10.3.3

Deleting a replication cluster

The DELETE CLUSTER statement removes the specified cluster with its name. Once the cluster is deleted, it is removed from all nodes, but its tables remain intact and become active local non-replicated tables.

SQL
DELETE CLUSTER click_query
JSON
POST /cli -d "DELETE CLUSTER click_query"
PHP
$params = [
    'cluster' => 'click_query',
    'body' => []
];
$response = $client->cluster()->delete($params);                
Python
utilsApi.sql('DELETE CLUSTER click_query')
{u'error': u'', u'total': 0, u'warning': u''}
javascript
res = await utilsApi.sql('DELETE CLUSTER click_query');
{"total":0,"error":"","warning":""}
Java
utilsApi.sql("DELETE CLUSTER click_query");
C#
utilsApi.Sql("DELETE CLUSTER click_query");
¶ 10.3.4

Adding and removing a table from a replication cluster

ALTER CLUSTER <cluster_name> ADD <table_name> adds an existing local table to the cluster. The node that receives the ALTER query sends the table to the other nodes in the cluster. All the local tables with the same name on the other nodes of the cluster are replaced with the new table.

Once the table is replicated, write statements can be performed on any node, but the table name must be prefixed with the cluster name, like INSERT INTO <clusterName>:<table_name>.

SQL
ALTER CLUSTER click_query ADD clicks_daily_index
JSON
POST /cli -d "
ALTER CLUSTER click_query ADD clicks_daily_index
"
PHP
$params = [
  'cluster' => 'click_query',
  'body' => [
     'operation' => 'add',
     'index' => 'clicks_daily_index'

  ]
];
$response = $client->cluster()->alter($params);        
Python
utilsApi.sql('ALTER CLUSTER click_query ADD clicks_daily_index')
{u'error': u'', u'total': 0, u'warning': u''}
javascript
res = await utilsApi.sql('ALTER CLUSTER click_query ADD clicks_daily_index');
{"total":0,"error":"","warning":""}
Java
utilsApi.sql("ALTER CLUSTER click_query ADD clicks_daily_index");
C#
utilsApi.Sql("ALTER CLUSTER click_query ADD clicks_daily_index");

ALTER CLUSTER <cluster_name> DROP <table_name> forgets about a local table, meaning it does not remove the table files on the nodes, but rather just makes it an inactive, non-replicated table.

Once a table is removed from a cluster, it becomes a local table, and write statements must use just the table name, like INSERT INTO <table_name>, without the cluster prefix.

SQL
ALTER CLUSTER posts DROP weekly_index
JSON
POST /cli -d "
ALTER CLUSTER posts DROP weekly_index
"
PHP
$params = [
  'cluster' => 'posts',
  'body' => [
     'operation' => 'drop',
     'index' => 'weekly_index'

  ]
];
$response = $client->cluster->alter($params);
Python
utilsApi.sql('ALTER CLUSTER posts DROP weekly_index')
{u'error': u'', u'total': 0, u'warning': u''}
javascript
res = await utilsApi.sql('ALTER CLUSTER posts DROP weekly_index');
{"total":0,"error":"","warning":""}
Java
utilsApi.sql("ALTER CLUSTER posts DROP weekly_index");
C#
utilsApi.Sql("ALTER CLUSTER posts DROP weekly_index");
¶ 10.3.5

Managing replication nodes

The ALTER CLUSTER <cluster_name> UPDATE nodes statement updates the node lists on each node within the specified cluster to include all active nodes in the cluster. For more information on node lists, see Joining a cluster.

SQL
ALTER CLUSTER posts UPDATE nodes
JSON
POST /cli -d "
ALTER CLUSTER posts UPDATE nodes
"
PHP
$params = [
  'cluster' => 'posts',
  'body' => [
     'operation' => 'update',

  ]
];
$response = $client->cluster()->alter($params); 
Python
utilsApi.sql('ALTER CLUSTER posts UPDATE nodes')
{u'error': u'', u'total': 0, u'warning': u''}
javascript
res = await utilsApi.sql('ALTER CLUSTER posts UPDATE nodes');
{"total":0,"error":"","warning":""}
Java
utilsApi.sql("ALTER CLUSTER posts UPDATE nodes");
C#
utilsApi.Sql("ALTER CLUSTER posts UPDATE nodes");

For instance, when the cluster was initially established, the list of nodes used to rejoin the cluster was 10.10.0.1:9312,10.10.1.1:9312. Since then, other nodes joined the cluster and now the active nodes are 10.10.0.1:9312,10.10.1.1:9312,10.15.0.1:9312,10.15.0.3:9312.However, the list of nodes used to rejoin the cluster has not been updated.

To rectify this, you can run the ALTER CLUSTER ... UPDATE nodes statement to copy the list of active nodes to the list of nodes used to rejoin the cluster. After this, the list of nodes used to rejoin the cluster will include all the active nodes in the cluster.

Both lists of nodes can be viewed using the Cluster status statement (cluster_post_nodes_set and cluster_post_nodes_view).

Removing node from cluster

To remove a node from the replication cluster, follow these steps:
1. Stop the node
2. Remove the information about the cluster from <data_dir>/manticore.json (usually /var/lib/manticore/manticore.json) on the node that has been stopped.
3. Run ALTER CLUSTER cluster_name UPDATE nodes on any other node.

After these steps, the other nodes will forget about the detached node and the detached node will forget about the cluster. This action will not impact the tables in the cluster or on the detached node.

¶ 10.3.6

Replication cluster status

You can view the cluster status information by checking the node status. This can be done using the Node status command, which displays various information about the node, including the cluster status variables.

The output format for the cluster status variables is as follows: cluster_name_variable_name variable_value. Most of the variables are described in the Galera Documentation Status Variables. In addition to these variables, Manticore Search also displays:

SQL
SHOW STATUS
+----------------------------+-------------------------------------------------------------------------------------+
| Counter                    | Value                                                                               |
+----------------------------+-------------------------------------------------------------------------------------+
| cluster_name               | post                                                                                |
| cluster_post_state_uuid    | fba97c45-36df-11e9-a84e-eb09d14b8ea7                                                |
| cluster_post_conf_id       | 1                                                                                   |
| cluster_post_status        | primary                                                                             |
| cluster_post_size          | 5                                                                                   |
| cluster_post_local_index   | 0                                                                                   |
| cluster_post_node_state    | synced                                                                              |
| cluster_post_indexes_count | 2                                                                                   |
| cluster_post_indexes       | pq1,pq_posts                                                                        |
| cluster_post_nodes_set     | 10.10.0.1:9312                                                                      |
| cluster_post_nodes_view    | 10.10.0.1:9312,10.10.0.1:9320:replication,10.10.1.1:9312,10.10.1.1:9320:replication |
JSON
POST /cli -d "
SHOW STATUS
"
"
{"columns":[{"Counter":{"type":"string"}},{"Value":{"type":"string"}}],
"data":[
{"Counter":"cluster_name", "Value":"post"},
{"Counter":"cluster_post_state_uuid", "Value":"fba97c45-36df-11e9-a84e-eb09d14b8ea7"},
{"Counter":"cluster_post_conf_id", "Value":"1"},
{"Counter":"cluster_post_status", "Value":"primary"},
{"Counter":"cluster_post_size", "Value":"5"},
{"Counter":"cluster_post_local_index", "Value":"0"},
{"Counter":"cluster_post_node_state", "Value":"synced"},
{"Counter":"cluster_post_indexes_count", "Value":"2"},
{"Counter":"cluster_post_indexes", "Value":"pq1,pq_posts"},
{"Counter":"cluster_post_nodes_set", "Value":"10.10.0.1:9312"},
{"Counter":"cluster_post_nodes_view", "Value":"10.10.0.1:9312,10.10.0.1:9320:replication,10.10.1.1:9312,10.10.1.1:9320:replication"}
],
"total":0,
"error":"",
"warning":""
}
PHP
$params = [
    'body' => []
];
$response = $client->nodes()->status($params);         
(
"cluster_name" => "post",
"cluster_post_state_uuid" => "fba97c45-36df-11e9-a84e-eb09d14b8ea7",
"cluster_post_conf_id" => 1,
"cluster_post_status" => "primary",
"cluster_post_size" => 5,
"cluster_post_local_index" => 0,
"cluster_post_node_state" => "synced",
"cluster_post_indexes_count" => 2,
"cluster_post_indexes" => "pq1,pq_posts",
"cluster_post_nodes_set" => "10.10.0.1:9312",
"cluster_post_nodes_view" => "10.10.0.1:9312,10.10.0.1:9320:replication,10.10.1.1:9312,10.10.1.1:9320:replication"
)
Python
utilsApi.sql('SHOW STATUS')
{u'columns': [{u'Key': {u'type': u'string'}},
              {u'Value': {u'type': u'string'}}],
 u'data': [
    {u'Key': u'cluster_name', u'Value': u'post'},
    {u'Key': u'cluster_post_state_uuid', u'Value': u'fba97c45-36df-11e9-a84e-eb09d14b8ea7'},
    {u'Key': u'cluster_post_conf_id', u'Value': u'1'},
    {u'Key': u'cluster_post_status', u'Value': u'primary'},
    {u'Key': u'cluster_post_size', u'Value': u'5'},
    {u'Key': u'cluster_post_local_index', u'Value': u'0'},
    {u'Key': u'cluster_post_node_state', u'Value': u'synced'},
    {u'Key': u'cluster_post_indexes_count', u'Value': u'2'},
    {u'Key': u'cluster_post_indexes', u'Value': u'pq1,pq_posts'},
    {u'Key': u'cluster_post_nodes_set', u'Value': u'10.10.0.1:9312'},
    {u'Key': u'cluster_post_nodes_view', u'Value': u'10.10.0.1:9312,10.10.0.1:9320:replication,10.10.1.1:9312,10.10.1.1:9320:replication'}],
 u'error': u'',
 u'total': 0,
 u'warning': u''}
javascript
res = await utilsApi.sql('SHOW STATUS');
{"columns": [{"Key": {"type": "string"}},
              {"Value": {"type": "string"}}],
 "data": [
    {"Key": "cluster_name", "Value": "post"},
    {"Key": "cluster_post_state_uuid", "Value": "fba97c45-36df-11e9-a84e-eb09d14b8ea7"},
    {"Key": "cluster_post_conf_id", "Value": "1"},
    {"Key": "cluster_post_status", "Value": "primary"},
    {"Key": "cluster_post_size", "Value": "5"},
    {"Key": "cluster_post_local_index", "Value": "0"},
    {"Key": "cluster_post_node_state", "Value": "synced"},
    {"Key": "cluster_post_indexes_count", "Value": "2"},
    {"Key": "cluster_post_indexes", "Value": "pq1,pq_posts"},
    {"Key": "cluster_post_nodes_set", "Value": "10.10.0.1:9312"},
    {"Key": "cluster_post_nodes_view", "Value": "10.10.0.1:9312,10.10.0.1:9320:replication,10.10.1.1:9312,10.10.1.1:9320:replication"}],
 "error": "",
 "total": 0,
 "warning": ""}
Java
utilsApi.sql("SHOW STATUS");
{columns=[{ Key : { type=string }},
              { Value : { type=string }}],
  data : [
    { Key=cluster_name, Value=post},
    { Key=cluster_post_state_uuid, Value=fba97c45-36df-11e9-a84e-eb09d14b8ea7},
    { Key=cluster_post_conf_id, Value=1},
    { Key=cluster_post_status, Value=primary},
    { Key=cluster_post_size, Value=5},
    { Key=cluster_post_local_index, Value=0},
    { Key=cluster_post_node_state, Value=synced},
    { Key=cluster_post_indexes_count, Value=2},
    { Key=cluster_post_indexes, Value=pq1,pq_posts},
    { Key=cluster_post_nodes_set, Value=10.10.0.1:9312},
    { Key=cluster_post_nodes_view, Value=10.10.0.1:9312,10.10.0.1:9320:replication,10.10.1.1:9312,10.10.1.1:9320:replication}],
  error= ,
  total=0,
  warning= }
C#
utilsApi.sql("SHOW STATUS");
{columns=[{ Key : { type=String }},
              { Value : { type=String }}],
  data : [
    { Key=cluster_name, Value=post},
    { Key=cluster_post_state_uuid, Value=fba97c45-36df-11e9-a84e-eb09d14b8ea7},
    { Key=cluster_post_conf_id, Value=1},
    { Key=cluster_post_status, Value=primary},
    { Key=cluster_post_size, Value=5},
    { Key=cluster_post_local_index, Value=0},
    { Key=cluster_post_node_state, Value=synced},
    { Key=cluster_post_indexes_count, Value=2},
    { Key=cluster_post_indexes, Value=pq1,pq_posts},
    { Key=cluster_post_nodes_set, Value=10.10.0.1:9312},
    { Key=cluster_post_nodes_view, Value=10.10.0.1:9312,10.10.0.1:9320:replication,10.10.1.1:9312,10.10.1.1:9320:replication}],
  error="" ,
  total=0,
  warning="" }
¶ 10.3.7

Restarting a cluster

In a multi-master replication cluster, a reference point must be established before other nodes can join and form the cluster. This is called cluster bootstrapping and involves starting a single node as the primary component. Restarting a single node or reconnecting after a shutdown can be done normally.

In case of a full cluster shutdown, the server that was stopped last should be started first with the --new-cluster command line option or by running manticore_new_cluster through systemd. To ensure that the server is capable of being the reference point, the grastate.dat file located at the cluster path should be updated with a value of 1 for the safe_to_bootstrap option. Both conditions, --new-cluster and safe_to_bootstrap=1, must be met. If any other node is started without these options set, an error will occur. The --new-cluster-force command line option can be used to override this protection and start the cluster from another server forcibly. Alternatively, you can run manticore_new_cluster --force to use systemd.

In the event of a hard crash or an unclean shutdown of all servers in the cluster, the most advanced node with the largest seqno in the grastate.dat file located at the cluster path must be identified and started with the --new-cluster-force command line key.

¶ 10.3.8

Cluster recovery

In the event that the Manticore search daemon stops with no remaining nodes in the cluster to serve requests, recovery is necessary. Due to the multi-master nature of the Galera library used for replication, Manticore replication cluster is a single logical entity that maintains the consistency of its nodes and data, and the status of the entire cluster. This allows for safe writes on multiple nodes simultaneously and ensures the integrity of the cluster.

However, this also poses challenges. Let's examine several scenarios, using a cluster of nodes A, B, and C, to see what needs to be done when some or all nodes become unavailable.

Case 1

When node A is stopped, the other nodes receive a "normal shutdown" message. The cluster size is reduced, and a quorum re-calculation takes place.

Upon starting node A, it joins the cluster and will not serve any write transactions until it is fully synchronized with the cluster. If the writeset cache on donor nodes B or C (which can be controlled with the Galera cluster's gcache.size) still contains all of the transactions missed at node A, node A will receive a fast incremental state transfer (IST), that is, a transfer of only missed transactions. If not, a snapshot state transfer (SST) will occur, which involves the transfer of table files.

Case 2

In the scenario where nodes A and B are stopped, the cluster size is reduced to one, with node C forming the primary component to handle write transactions.

Nodes A and B can then be started as usual and will join the cluster after start-up. Node C acts as the donor, providing the state transfer to nodes A and B.

Case 3

All nodes are stopped as usual and the cluster is off.

The problem now is how to initialize the cluster. It's important that on a clean shutdown of searchd the nodes write the number of last executed transaction into the cluster directory grastate.dat file along with flag safe_to_bootstrap. The node which was stopped last will have option safe_to_bootstrap: 1 and the most advanced seqno number.

It is important that this node starts first to form the cluster. To bootstrap a cluster the server should be started on this node with flag --new-cluster. On Linux you can also run manticore_new_cluster which will start Manticore in --new-cluster mode via systemd.

If another node starts first and bootstraps the cluster, then the most advanced node joins that cluster, performs full SST and receives a table file where some transactions are missed in comparison with the table files it got before. That is why it is important to start first the node which was shut down last, it should have flag safe_to_bootstrap: 1 in grastate.dat.

Case 4

In the event of a crash or network failure causing Node A to disappear from the cluster, nodes B and C will attempt to reconnect with Node A. Upon failure, they will remove Node A from the cluster. With two out of the three nodes still running, the cluster maintains its quorum and continues to operate normally.

When Node A is restarted, it will join the cluster automatically, as outlined in Case 1.

Case 5

Nodes A and B have gone offline. Node C is unable to form a quorum on its own as 1 node is less than half of the total nodes (3). As a result, the cluster on node C is shifted to a non-primary state and rejects any write transactions with an error message.

Meanwhile, node C waits for the other nodes to connect and also tries to connect to them. If this happens, and the network is restored and nodes A and B are back online, the cluster will automatically reform. If nodes A and B are just temporarily disconnected from node C but can still communicate with each other, they will continue to operate as normal, as they still form the quorum.

However, if both nodes A and B have crashed or restarted due to a power failure, someone must activate the primary component on node C using the following command:

SQL
SET CLUSTER posts GLOBAL 'pc.bootstrap' = 1
JSON
POST /cli -d "
SET CLUSTER posts GLOBAL 'pc.bootstrap' = 1
"

t's important to note that before executing this command, you must confirm that the other nodes are truly unreachable. Otherwise, a split-brain scenario may occur and separate clusters may form.

Case 6

All nodes have crashed. In this situation, the grastate.dat file in the cluster directory has not been updated and does not contain a valid seqnosequence number.

If this occurs, someone needs to locate the node with the most recent data and start the server on it using the --new-cluster-force command line key. All other nodes will start as normal, as described in Case 3).
On Linux, you can also use the manticore_new_cluster --force, command, which will start Manticore in --new-cluster-force mode via systemd.

Case 7

Split-brain can cause the cluster to transition into a non-primary state. For example, consider a cluster comprised of an even number of nodes (four), such as two pairs of nodes located in different data centers. If a network failure interrupts the connection between the data centers, split-brain occurs as each group of nodes holds exactly half of the quorum. As a result, both groups stop handling write transactions, since the Galera replication model prioritizes data consistency, and the cluster cannot accept write transactions without a quorum. However, nodes in both groups attempt to reconnect with the nodes from the other group in an effort to restore the cluster.

If someone wants to restore the cluster before the network is restored, the same steps outlined in Case 5 hould be taken, but only at one group of nodes.

After the statement is executed, the group with the node that it was run on will be able to handle write transactions once again.

SQL
SET CLUSTER posts GLOBAL 'pc.bootstrap' = 1
JSON
POST /cli -d "
SET CLUSTER posts GLOBAL 'pc.bootstrap' = 1
"

However, it's important to note that if the statement is issued at both groups, it will result in the formation of two separate clusters, and the subsequent network recovery will not result in the groups rejoining.