manticoresearch icon indicating copy to clipboard operation
manticoresearch copied to clipboard

per-index binlog

Open sanikolaev opened this issue 3 years ago • 3 comments
trafficstars

We've tested inserts into a sharded table when each shard is located in a separate Manticore instance and got very interesting results: it can increase inserts max throughput by 66%, at least on a server with an SSD. The task is to implement per-index binary logging.

Script, config, start instruction etc. for better understand and future reference.

root@perf3 ~/rt_load # cat test_sharding_multiple_instances.php    
#!/usr/bin/php    
<?php    
if (count($argv) < 6) die("Usage: ".__FILE__." <batch size> <concurrency> <docs> <shards> <multiplier>\n");    
    
require_once 'vendor/autoload.php';    
    
// This function waits for an idle mysql connection for the $query, runs it and exits    
function process($query) {    
    global $all_links;    
    global $requests;    
    foreach ($all_links as $k=>$link) {    
        if (@$requests[$k]) continue;    
        mysqli_query($link, $query, MYSQLI_ASYNC);    
        @$requests[$k] = microtime(true);    
        return true;    
    }    
    do {    
        $links = $errors = $reject = array();    
        foreach ($all_links as $link) {    
            $links[] = $errors[] = $reject[] = $link;    
        }    
        $count = @mysqli_poll($links, $errors, $reject, 0, 1000);    
        if ($count > 0) {    
            foreach ($links as $j=>$link) {    
                $res = @mysqli_reap_async_query($links[$j]);    
                foreach ($all_links as $i=>$link_orig) if ($all_links[$i] === $links[$j]) break;    
                if ($link->error) {    
                    echo "ERROR: {$link->error}\n";    
                    if (!mysqli_ping($link)) {    
                        echo "ERROR: mysql connection is down, removing it from the pool\n";    
                        unset($all_links[$i]); // remove the original link from the pool    
                        unset($requests[$i]); // and from the $requests too    
                    }    
                    return false;    
                }    
                if ($res === false and !$link->error) continue;    
                if (is_object($res)) {    
                    mysqli_free_result($res);    
                }    
                $requests[$i] = microtime(true);    
		mysqli_query($link, $query, MYSQLI_ASYNC); // making next query    
                return true;    
            }    
        };    
    } while (true);    
    return true;    
}    
    
$all_links = [];    
$requests = [];    
$c = 0;    
for ($i=0;$i<$argv[2];$i  ) {    
  $m = @mysqli_connect('127.0.0.1', '', '', '', 9400 $i%$argv[4]);    
  if (mysqli_connect_error()) die("Cannot connect to Manticore\n");    
  $all_links[] = $m;    
}    
    
// init    
for ($n=0;$n<$argv[4];$n  ) {    
    $m = @mysqli_connect('127.0.0.1', '', '', '', 9400 $n);    
    mysqli_query($m, "drop table if exists user");    
    mysqli_query($m, "create table user(name text, email string, description text, age int, active bit(1))");    
}    
    
$batch = [];    
$query_start = "insert into user(id, name, email, description, age, active) values ";    
    
$faker = Faker\Factory::create();    
    
echo "preparing...\n";    
$error = false;    
$cache_file_name = '/tmp/'.md5($query_start).'_'.$argv[1].'_'.$argv[3];    
$c = 0;    
if (!file_exists($cache_file_name)) {    
    $batches = [];    
    while ($c < $argv[3]) {    
      $ar = [addslashes($faker->name()), addslashes($faker->email()), addslashes($faker->text()), rand(10,90), rand(0,1)];    
      $batch[] = "(0,'".$ar[0]."','".$ar[1]."','".$ar[2]."',".$ar[3].",".$ar[4].")";    
      $c  ;    
      if (floor($c/1000) == $c/1000) echo "\r".($c/$argv[3]*100)."%       ";    
        if (count($batch) == $argv[1]) {    
          $batches[] = $query_start.implode(',', $batch);    
          $batch = [];    
        }    
    }    
    if ($batch) $batches[] = $query_start.implode(',', $batch);    
    file_put_contents($cache_file_name, serialize($batches));    
} else {    
    echo "found in cache $cache_file_name\n";    
    $batches = unserialize(file_get_contents($cache_file_name));    
}    
    
echo "querying...\n";    
    
$t = microtime(true);    
    
for ($n=0;$n<$argv[5];$n  ) {    
  foreach ($batches as $batch) {    
    if (!process($batch)) die("ERROR\n");    
  }    
}    
    
// wait until all the workers finish    
do {    
  $links = $errors = $reject = array();    
  foreach ($all_links as $link)  $links[] = $errors[] = $reject[] = $link;    
  $count = @mysqli_poll($links, $errors, $reject, 0, 100);    
} while (count($all_links) != count($links)   count($errors)   count($reject));    
    
echo "finished inserting\n";    
echo "Total time: ".(microtime(true) - $t)."\n";    
echo round($argv[3]*$argv[5] / (microtime(true) - $t))." docs per sec\n";    

Config:

root@perf3 ~/rt_load # cat pool/manticore.conf    
#!/usr/bin/php    
<?php    
    
$port = 9400 getenv('shard');    
file_put_contents('/tmp/t', $port);    
echo "    
common {    
    plugin_dir = /usr/local/manticore/lib    
}    
    
searchd {    
    listen = 127.0.0.1:".$port.":mysql    
    log = /dev/null    
    pid_file = ".$port.".pid    
    data_dir = data".$port."    
    max_packet_size = 128M    
    binlog_path = data".$port."    
}    
";    

How to start the instances pool:

for n in `seq 0 19`; do shard=$n searchd -c manticore.conf; done;    

How the test was run:

root@perf3 ~/rt_load # php test_sharding_multiple_instances.php 10000 64 32000000 20 1    
preparing...    
found in cache /tmp/bc9719fb0d26e18fc53d6d5aaaf847b4_10000_32000000    
querying...    
finished inserting    
Total time: 35.07500910759    
912330 docs per sec    
root@perf3 ~/rt_load # php test_sharding_multiple_instances.php 10000 64 32000000 20 1    
preparing...    
found in cache /tmp/bc9719fb0d26e18fc53d6d5aaaf847b4_10000_32000000    
querying...    
finished inserting    
Total time: 35.026075124741    
913605 docs per sec    
    
root@perf3 ~/rt_load/pool # (for n in `seq 9400 9419`; do mysql -P$n -h0 -NB -e "select count(*) from user"; done;)|awk '{sum =} END {print sum}'    
32000000    

Script to stress-load sharded indexes on the same instance:

root@perf3 ~/rt_load # cat test_sharding.php    
#!/usr/bin/php    
<?php    
if (count($argv) < 6) die("Usage: ".__FILE__." <batch size> <concurrency> <docs> <shards> <multiplier>\n");    
    
require_once 'vendor/autoload.php';    
    
// This function waits for an idle mysql connection for the $query, runs it and exits    
function process($query, $shard) {    
    $query = str_replace('insert into user', 'insert into user'.$shard, $query);    
    global $all_links;    
    global $requests;    
    foreach ($all_links as $k=>$link) {    
        if (@$requests[$k]) continue;    
        mysqli_query($link, $query, MYSQLI_ASYNC);    
        @$requests[$k] = microtime(true);    
        return true;    
    }    
    do {    
        $links = $errors = $reject = array();    
        foreach ($all_links as $link) {    
            $links[] = $errors[] = $reject[] = $link;    
        }    
        $count = @mysqli_poll($links, $errors, $reject, 0, 1000);    
        if ($count > 0) {    
            foreach ($links as $j=>$link) {    
                $res = @mysqli_reap_async_query($links[$j]);    
                foreach ($all_links as $i=>$link_orig) if ($all_links[$i] === $links[$j]) break;    
                if ($link->error) {    
                    echo "ERROR: {$link->error}\n";    
                    if (!mysqli_ping($link)) {    
                        echo "ERROR: mysql connection is down, removing it from the pool\n";    
                        unset($all_links[$i]); // remove the original link from the pool    
                        unset($requests[$i]); // and from the $requests too    
                    }    
                    return false;    
                }    
                if ($res === false and !$link->error) continue;    
                if (is_object($res)) {    
                    mysqli_free_result($res);    
                }    
                $requests[$i] = microtime(true);    
		mysqli_query($link, $query, MYSQLI_ASYNC); // making next query    
                return true;    
            }    
        };    
    } while (true);    
    return true;    
}    
    
$all_links = [];    
$requests = [];    
$c = 0;    
for ($i=0;$i<$argv[2];$i  ) {    
  $m = @mysqli_connect('127.0.0.1', '', '', '', 9306);    
      if (mysqli_connect_error()) die("Cannot connect to Manticore\n");    
      $all_links[] = $m;    
  }    
    
// init    
$dist = [];    
for ($n=1;$n<=$argv[4];$n  ) {    
    mysqli_query($all_links[0], "drop table if exists user$n");    
    mysqli_query($all_links[0], "create table user$n(name text, email string, description text, age int, active bit(1))");    
    $dist[] = "local='user$n'";    
}    
mysqli_query($all_links[0], "drop table if exists user");    
mysqli_query($all_links[0], "create table user type='distributed' ".implode(' ', $dist));    
    
$batch = [];    
$query_start = "insert into user(id, name, email, description, age, active) values ";    
    
$faker = Faker\Factory::create();    
    
echo "preparing...\n";    
$error = false;    
$cache_file_name = '/tmp/'.md5($query_start).'_'.$argv[1].'_'.$argv[3];    
$c = 0;    
if (!file_exists($cache_file_name)) {    
    $batches = [];    
    while ($c < $argv[3]) {    
      $ar = [addslashes($faker->name()), addslashes($faker->email()), addslashes($faker->text()), rand(10,90), rand(0,1)];    
      $batch[] = "(0,'".$ar[0]."','".$ar[1]."','".$ar[2]."',".$ar[3].",".$ar[4].")";    
      $c  ;    
      if (floor($c/1000) == $c/1000) echo "\r".($c/$argv[3]*100)."%       ";    
        if (count($batch) == $argv[1]) {    
          $batches[] = $query_start.implode(',', $batch);    
          $batch = [];    
        }    
    }    
    if ($batch) $batches[] = $query_start.implode(',', $batch);    
    file_put_contents($cache_file_name, serialize($batches));    
} else {    
    echo "found in cache $cache_file_name\n";    
    $batches = unserialize(file_get_contents($cache_file_name));    
}    
    
echo "querying...\n";    
    
$t = microtime(true);    
    
for ($n=0;$n<$argv[5];$n  ) {    
  foreach ($batches as $batch) {    
    if (!process($batch, rand(1, $argv[4]))) die("ERROR\n");    
  }    
}    
    
// wait until all the workers finish    
do {    
  $links = $errors = $reject = array();    
  foreach ($all_links as $link)  $links[] = $errors[] = $reject[] = $link;    
  $count = @mysqli_poll($links, $errors, $reject, 0, 100);    
} while (count($all_links) != count($links)   count($errors)   count($reject));    
    
echo "finished inserting\n";    
echo "Total time: ".(microtime(true) - $t)."\n";    
echo round($argv[3]*$argv[5] / (microtime(true) - $t))." docs per sec\n";    

Summary

Index Binlog QPS Perf. boost, %
On disk, single instance On disk 551K 0%
On disk, single instance In RAM 762K 38%
On disk, single instance No 907K 19% vs binlog in ram / 65% vs binlog on disk
In RAM, single instance No 931K 2.6% to prev
On disk, separate instances On disk 913K ** 66% vs single instance**
On disk, separate instances In RAM 935K 23% vs single instance
On disk, separate instances No 968K 7% vs single instance
In RAM, separate instances No 976K 7% vs single instance

Just in case I've also tested some of the results with auto_optimize = 0:

Index Binlog QPS QPS (no optimize)
On disk, single instance On disk 551K 550K
On disk, single instance No 907K 912K
On disk, separate instances On disk 913K 914K
On disk, separate instances No 968K 977K

i.e. no difference which is not surprising, because the optimize cutoff is higher than the number of chunks (the server has 32 cores)

Conclusion

Separate binlog per index has great potential, it can increase throughput by 66%.

Task

Implement per-index binlog. As we discussed we can first try to not put a binlog into an index's dir in RT mode, instead we can use the same naming approach as now, we'll just have more binlog files, each file dedicated to a separate index.

┆Issue is synchronized with this Github issue

sanikolaev avatar Sep 01 '22 07:09 sanikolaev

Also consider the following case: We have a few indexes (e.g. chunk_1...chunk_N) and we want to reindex some of them (schema or morphology changes) or add a new one (resharding). If we insert rows into the old chunks (e.g. chunk_1) via normal continuous indexing and start parallel reindexing, binlog will grow until the chunk with the oldest insert (chunk_1) gets flushed. There are workarounds: tune rt_flush_period (requires a restart, may lead to fragmentation) or manually flush chunks (requires additional automation). But it doesn't look so comfortable.

I think the feature should solve this at the cost of extra fsync's.

subnix avatar Jan 16 '24 10:01 subnix

As discussed on call with Alexey:

  • A new setting named binlog_parallel_write = 0/1 (or smth like this) will be introduced to control whether per-table binlogs are written in parallel or sequentially. This could be significant for users relying on the upcoming JOIN functionality.
  • Enabling parallel writes presents an opportunity to enhance binlog replay performance. This will be addressed in a separate issue.

sanikolaev avatar Feb 05 '24 04:02 sanikolaev

I want to store cluster `seqno into binlog and want dedicated binlog for make the cluster binlog truncation independent of the indexes flush. I'd wait refactor of the binlog code to continue the development of the https://github.com/manticoresoftware/manticoresearch/issues/1894

tomatolog avatar May 07 '24 13:05 tomatolog

I've merged https://github.com/manticoresoftware/manticoresearch/pull/2390 into the master branch.

sanikolaev avatar Jul 11 '24 15:07 sanikolaev

Great work! Thanks, especially for the opportunity to disable binary logging for a table in real-time! However, maybe it's better to set binlog_common = 1 to maintain backward-compatibility and reduce fsyncs by default?

subnix avatar Jul 11 '24 16:07 subnix

However, maybe it's better to set binlog_common = 1 to maintain backward-compatibility and reduce fsyncs by default?

We aim to enable a default setting that could increase write throughput by approximately 30%, e.g. inserting 32M docs into 20 shards:

binlog_common = 0:

root@perf3 ~/rt_load # php test_sharding.php 10000 64 32000000 20 1
preparing...
found in cache /tmp/bc9719fb0d26e18fc53d6d5aaaf847b4_10000_32000000
querying...
finished inserting
Total time: 50.223057985306

637157 docs per sec
root@perf3 ~/rt_load # php test_sharding.php 10000 64 32000000 20 1
preparing...
found in cache /tmp/bc9719fb0d26e18fc53d6d5aaaf847b4_10000_32000000
querying...
finished inserting
Total time: 50.185677051544
637632 docs per sec

Default:

root@perf3 ~/rt_load # php test_sharding.php 10000 64 32000000 20 1
preparing...
found in cache /tmp/bc9719fb0d26e18fc53d6d5aaaf847b4_10000_32000000
querying...
finished inserting
Total time: 38.961874961853
821315 docs per sec

root@perf3 ~/rt_load # php test_sharding.php 10000 64 32000000 20 1
preparing...
found in cache /tmp/bc9719fb0d26e18fc53d6d5aaaf847b4_10000_32000000
querying...
finished inserting
Total time: 38.949145078659
821584 docs per sec

The rationale for changing the default setting is:

  • Users with only a few tables will notice minimal impact.
  • Users who shard their tables will benefit from the above performance boost.

sanikolaev avatar Jul 13 '24 08:07 sanikolaev