stonedb icon indicating copy to clipboard operation
stonedb copied to clipboard

[RFC]: Tianmu In-Memory Column Store Engine

Open hustjieke opened this issue 1 year ago • 1 comments

  • 1 Why do we use seconardy engine?
  • 2 Why do we employ an in-memory column-based engine?
  • 3 Overview of In-memory column-based engine, Tianmu
    • 3.1 MySQL Buffer Pool
    • 3.2 MySQL Change Buffer
    • 3.3 MySQL Log Buffer
    • 3.4 Tianmu In-memory engine
      • 3.4.1 Location and Size
      • 3.4.2 Memory pools in Tianmu in-memory
  • 4 Tianmu In-memory engine
    • 4.1 Data Orginization
    • 4.2 Data Loading
      • 4.2.1 Data Loading
      • 4.2.2 Data Loading workers
      • 4.2.3 Settings
    • 4.3 DML operations and Changes propagation

This is a part of #436.

The In-memory engine is employed since StoneDB version 2.0. In version 1.0, StoneDB only support Tianmu on-disk column-based engine, which acts as another primary engine, parallel to InnoDB. In StoneDB version 2.0, we will use a new feature which was introduced in MySQL 8.0.2, secondary engine.

1 Why do we use secondary engine?

Before we start to state Tianmu in-memory engine, we want to give more words on this topic. There are some solutions available now, such as version 1.0 does that using two primary engines.

  • Firstly, secondary engine is a framwork of MySQL, which is used to provide multi-engine ability. With universal interfaces and framewrok, MySQL can route the workloads to corresponding engine according to type of each workload to leavage their advantages for an excellent service. Secondary engine also be a chance for MySQL to enhance its multi-model ability, such as making ClickHouse as a secondary engine to provide analytical services.

  • Secondly, logically, routing some subworks to secondary engine is naturelly coming into mind, and main works are done in primary engine.

  • Thirdly, As we known, The secondary engine feature has already used by Oracle in their online service. MySQL Heatwave, which is an In-Memory Query Accelerator with Built-in ML.

HeatWave. It increases MySQL performance by orders of magnitude for analytics and mixed workloads, without any changes to current applications. With HeatWave enabled, MySQL HeatWave is 6.5X faster than Amazon Redshift at half the cost, 7X faster than Snowflake at one-fifth the cost, and 1,400X faster than Amazon Aurora at half the cost. Customers run analytics on data that’s stored in MySQL databases without a separate analytics database and ETL duplication. https://www.oracle.com/mysql/heatwave/

2 Why do we employ an in-memory column-based engine?

Before the answer was given, Let's talk about some challenges for analytical processing system.

Traditionally, obtaining good performance for analytic queries meant satisfying several requirements. In a typical data warehouse or mixed-use database, requirements include the following: (1) You must understand user access patterns; (2) You must provide good performance, which typically requires creating indexes, materialized views, and OLAP cubes. lQLPJxagcNi-_obNA3bNBy2w93CZnjQkZLsDB-66EECHAA_1837_886

In order to advance the performance of StoneDB, in version 1.0, column-based data format used. The column-based data format orginze the data in column, not in row. For example, in a large sales table, the sales IDs reside in one column, and sales regions reside in a different column.

Analytical workloads usually access few columns while scanning, but scan operation fectches the entire data set. For this reason, the column-based format is the most efficient for analytical workloads. Because, as column-based format depicts, the columns are stored separately, an analytical query can access only columns needed, and avoid reading inessential data. Taking for instance, a report on sales totals by region can rapidly process many rows while accessing only a few columns.

Database systems typically force users to choose between a column-based and row-based format. For example, if the data format is column-based, then the database stores data in column-based format both in memory and on disk. Gaining the advantages of one format means losing the advantages of the alternate format.

Hence, applications either achieve rapid analytics or rapid transactions, but not both. The performance problems for mixed-workloads databases are not solved by storing data in just only ONE single format.

Based on what we discussed above, in version 2.0, we try to introduce a new data-format engine, in-memory column-based store, for analytical workloads. That in memory column-based engine also called Tianmu as we called in version 1.0.

The In-Memory feature set includes the IM column store, advanced query optimizations, and availability solutions. These features accelerate analytic queries peformance by orders of magnitude without sacrificing OLTP performance or availability.

3 Overview of In-memory column-based engine, Tianmu

In #436 , some brief descriptions of In-memory column-based engine are given. The data is compressed and encoded before being loading into in-memory column-based engine. Not all types of data are suitable for encoding and compressing. In #423, we define which type of data can be encoded and compressed.

In in-memory column-based engine, it holds copies of tables, partitions, or columns in compressed columnar format, which is optimized for scan operations.

3.1 MySQL Buffer Pool

In mysql, InnoDB buffer pool is multi-gigabyte range, and the memory distributes in different NUMA node. And, the cross-NUMA accessing is the performance bottle-neck in multi-cores system. So the memory allocation algorithm(or policy) in NUMA nodes should be chosen carefully. In innobase/buf/buf0buf.cc, it uses buf_block_alloc function to allocate a buffer block,and make sure that spreads the grace on all buffer pool instances.

buf_block_t *buf_block_alloc(
    buf_pool_t *buf_pool) /*!< in/out: buffer pool instance,
                          or NULL for round-robin selection
                          of the buffer pool */
{
  buf_block_t *block;
  ulint index;
  static ulint buf_pool_index;

  if (buf_pool == nullptr) {
    /* We are allocating memory from any buffer pool, ensure
    we spread the grace on all buffer pool instances. */
    index = buf_pool_index++ % srv_buf_pool_instances;
    buf_pool = buf_pool_from_array(index);
  }

  block = buf_LRU_get_free_block(buf_pool);

  buf_block_set_state(block, BUF_BLOCK_MEMORY);

  return (block);
}

Refer to NUMA memory allocation policies in RedHat.

In MySQL, we recommend using interleave memory allocation policy in NUMA architecture. The struct set_numa_interleave_t
is used to set the memory allocation policy to MPOL_INTERLEAVE.

struct set_numa_interleave_t {
  set_numa_interleave_t() {
    if (srv_numa_interleave) {
      ib::info(ER_IB_MSG_47) << "Setting NUMA memory policy to"
                                " MPOL_INTERLEAVE";
      struct bitmask *numa_nodes = numa_get_mems_allowed();
      if (set_mempolicy(MPOL_INTERLEAVE, numa_nodes->maskp, numa_nodes->size) !=
          0) {
        ib::warn(ER_IB_MSG_48) << "Failed to set NUMA memory"
                                  " policy to MPOL_INTERLEAVE: "
                               << strerror(errno);
      }
      numa_bitmask_free(numa_nodes);
    }
  }

  ~set_numa_interleave_t() {
    if (srv_numa_interleave) {
      ib::info(ER_IB_MSG_49) << "Setting NUMA memory policy to"
                                " MPOL_DEFAULT";
      if (set_mempolicy(MPOL_DEFAULT, nullptr, 0) != 0) {
        ib::warn(ER_IB_MSG_50) << "Failed to set NUMA memory"
                                  " policy to MPOL_DEFAULT: "
                               << strerror(errno);
      }
    }
  }
};

In general, in MySQL, more than one buffer pool instance is created, and the instance number is governed innodb_buffer_pool_instances. The number of buffer pool instance should adjust according to the size of buffer pool.

For systems with buffer pools in the multi-gigabyte range, dividing the buffer pool into separate instances can improve concurrency, by reducing contention as different threads read and write to cached pages.

And, MySQL buffer pool is consist of buffer blocks and control blocks, and index page, data page, undo page, insert buffer, AHI(adaptive hash index), lock information, and data dictionary, etc. are all in buffer pool.

The buffer pool size is set by innodb_buffer_pool_size. The size of each instance of buffer pool satisfy

 size_of_an_instance = innodb_buffer_pool_size / innodb_buffer_pool_instances

In storage/innobase/buf/buf0buf.cc , the function buf_pool_init creates the buffer pool when MySQL is in starting.

/** Creates the buffer pool.
@param[in]  total_size    Size of the total pool in bytes.
@param[in]  n_instances   Number of buffer pool instances to create.
@return DB_SUCCESS if success, DB_ERROR if not enough memory or error */
dberr_t buf_pool_init(ulint total_size, ulint n_instances) {
  ulint i;
  const ulint size = total_size / n_instances;
  ...
  NUMA_MEMPOLICY_INTERLEAVE_IN_SCOPE;

  /* Usually buf_pool_should_madvise is protected by buf_pool_t::chunk_mutex-es,
  but at this point in time there is no buf_pool_t instances yet, and no risk of
  race condition with sys_var modifications or buffer pool resizing because we
  have just started initializing the buffer pool.*/
  buf_pool_should_madvise = innobase_should_madvise_buf_pool();

  buf_pool_resizing = false;

  buf_pool_ptr =
      (buf_pool_t *)ut_zalloc_nokey(n_instances * sizeof *buf_pool_ptr);

  buf_chunk_map_reg = UT_NEW_NOKEY(buf_pool_chunk_map_t());

  std::vector<dberr_t> errs;

  errs.assign(n_instances, DB_SUCCESS);

#ifdef UNIV_LINUX
  ulint n_cores = sysconf(_SC_NPROCESSORS_ONLN);

  /* Magic nuber 8 is from empirical testing on a
  4 socket x 10 Cores x 2 HT host. 128G / 16 instances
  takes about 4 secs, compared to 10 secs without this
  optimisation.. */

  if (n_cores > 8) {
    n_cores = 8;
  }
#else
  ulint n_cores = 4;
#endif /* UNIV_LINUX */

  dberr_t err = DB_SUCCESS;

  for (i = 0; i < n_instances; /* no op */) { //initialize every instance, using multi-thread.
    ulint n = i + n_cores;

    if (n > n_instances) {
      n = n_instances;
    }

    std::vector<std::thread> threads;

    std::mutex m;

    for (ulint id = i; id < n; ++id) { // create threads to do initialization concurrently.
      threads.emplace_back(std::thread(buf_pool_create, &buf_pool_ptr[id], size,
                                       id, &m, std::ref(errs[id])));
    }

    ...

    /* Do the next block of instances */
    i = n;
  }

  buf_pool_set_sizes();
  buf_LRU_old_ratio_update(100 * 3 / 8, FALSE);

  btr_search_sys_create(buf_pool_get_curr_size() / sizeof(void *) / 64);

  buf_stat_per_index =
      UT_NEW(buf_stat_per_index_t(), mem_key_buf_stat_per_index_t);

  return (DB_SUCCESS);
}

image

The other operation functions on buffer pool can be found in this file, storage/innobase/buf/buf0buf.cc.

3.2 MySQL Change Buffer

The change buffer is a special buffer to cache all changes made by manipulating secondary index pages, but these pages are not in the buffer pool. When DML operations occurs, the changes are writing into change buffer, then merged later when the pages are loaded into buffer pool by other read operations.

image

The change buffer is used to improve the performance of DML operation by reorganizing the random IO to sequential IO,and less IO operations are needed, and the change buffer is a part of buffer pool in memeory. It will write into system tablespace when MySQL is shut down.

storage/innobase/ibuf/ibuf0ibuf.cc gives all the functions about change buffer.

The Layout of an ibuf record as following: image

3.3 MySQL Log Buffer

The log buffer is a type of memory objects in MySQL, which is used to store the data to be written to the log files on disk. And, the default size of log buffer is 16 MB. the data in gog buffer are periodically write to disk. A large log buffer can run long-transactions without the need to write redo log data to disk often before the long-transactions commit. Therefore, If you are running a transaction that updates, inserts, or deletes a bunch of rows. We can gets benifit from increasing the size of the log buffer, which can reduce the disk IO costs.

In storage/innobase/log/log0buf.cc , it defines the functions of log buffer operations, such as

lsn_t log_buffer_write(log_t &log, const Log_handle &handle, const byte *str,
                       size_t str_len, lsn_t start_lsn)

describs how to write to the reado log buffer.

3.4 Tianmu In-memory engine

3.4.1 Location and Size

In StoneDB version 2.0, Tianmu In-memory engine is employed. FIrst of all, There is problem we should answer at first. Where does Tianmu In-memory engine reside? And, what size of Tianmu in-memory engine does have? As we discussed above, we think that the buffer pool would be a good place to hold the Tianmu in-memory engine. When server boosted up, we can take a part of memory away from buffer pool as Tianmu in-memory engine's memory pool. We think that it would be a good and convenient way to allocate memory from system for Tianmu in-memory engine. After we answer the first question we met, the second one we encounter is that what size of Tianmu in-memory engine should take. As far as we know, more large buffer pool size we have, more performance can achieve. But, the memory capacity has its limitation.

We add some system variables to control the memory ussage of Tianmu in-memory engine. TIANMU_IN_MEM_SIZE and TIANMU_IN_MEM_SIZE_PCT.

image

TIANMU_IN_MEM_SIZE is size of memory of Tianmu in-memory engine. it should less than innodb_buffer_pool_size. TIANMU_IN_MEM_SIZE_PCT, despicts that Tianmu in-memory engine's memory size that how many percentage of MySQL buffer pool is. The value of TIANMU_IN_MEM_SIZE_PCT should be less than 0.5, which means that Tianmu in-memory engine SHOULD NOT take more than a half of MySQL buffer pool. In mysqld section of my.cnf, adds these codes to configure the memory usage of Tianmu in-memory engine.

[mysqld]

#configure the mem usage of tianmu in-memory engine
TIANMU_IN_MEM_SIZE = 100GB
TIANMU_IN_MEM_SIZE_PCT = 0.6

You can use show command to show the value of these variables.

3.4.2 Memory pools in Tianmu in-memory

The in-memory space is divided into subpools for columnar data and meta-data. There are two subpools in in-memory space.

  • Columnar Data Pool In this part, it stores in-memory column-based unit, which we gave the brief description in #436 firstly.
  • Meta-Data Pool This subpool stores metadata of column-based unit.

image

The size of each subpools is set by StoneDB itself, not user.

4 Tianmu In-memory engine

4.1 Data Orginization

In stoneDB version 2.0, all data is stored in memory. The discusssion about this memory are in above. Now, Let's to dive into the details.

As StoneDB version 1.0 does, all data of tables stores sperately in column format. One column, one file. Diferenece with on-disk , the way of data orginzation in-memory will change slightly.
The column data subpool will be divided into N partition(known as In-memory column-based unit, IMCU), N depends on the size of loaded table. That means all data of table will be loaded into memory as partition. As described in #423, if a column is defined as TIANM_COLUMN, that means this column will be loaded into column data pool in StoneDB version 2.0. Therefore, the partition is where the data loaded into. Each partition store one database object(table). For example, there two tables, table_a and table_b, are loaded into Tianmu, data of table_a is stored only in partion1, and data of table_b is stored in partion2. data of table_a and data of table_a are not stored mixed up in same partition.

Memory has sophisticated memory management mechanisms, such as memory allocation, memory deallocation, memory alignment, etc. even across-NUMA access problem, so that we MUST design the data orginization carefully. Otherwise, the performance will be cause performance reduced.

Each Partition contains several in-memory column-based data pack(IMCDP or abbrv called Chunk). Every chunk contains 65536 lines data.

image

How many chunks(IMCDP) will be stored in a partition that depends on the total size of table which loaded into Tianmu in-memory engine.

  • metadata of IMCUs Before we start to further dicussion on IMCU , firstly, we will talk about the meta-data management of IMCUs, and we think that it's very important thing. Due to StoneDB can process GB data or even TB, and that means we will have amount of IMCU. How to locate a IMCU and how to manage these IMCU, that become a very challenge problem.

Every IMCU should have a meta-data object, IMCU_Header, to describe its meta-data information, such as address, number of columns, number of IMCDP, TableID, Id of buffer pool instance, etc.

typedef struct IMCU_Header_t {
  ...
  int64_t id;
  OID table_id;
  short num_of_columns;
  int id_of_buffer_pool;
  ...
} IMCU_Header;

All object of IMCU_Header reside in meta-data pool which was given in section 4.3.2.

  • In-memory columned-based Data Pack(IMCDP) Data in in-memory column-oriented Store orginized in hierarchical type. On top is IMCU, and in a IMCU, there is some IMCDPs. and in a IMCDP there is some Data Packs. In short, Data Packs make a IMCDP, IMCDPs make a IMCU. And, IMCUs make Tainm in-memory column-oriented Store.

If a column defined as TIANM_COLUMN, this column will be loaded into Tianmu in-memory column-oriented store. If a table with three columns defined as TIANMU_COLUMN, all of these three columns will be loaded in Tianmu in-memory column-oriented store respectively, and stores sperately. A data pack stores ONLY one column. A data pack stores about 65536 rows of that column. image

The Data and its meta-data as following: image

Each DP has a header to stores the metadata about the data stored in this DP. The meta-data information includes the minimun and maximum value, aggregation values, number of rows, NULL information, etc., within the DP. A DP is divided into a DP body and a DP header. image

typedef struct DP_t {
   typedef sturct DP_header_t {
        int64_t max_, min_, avg_, sum_;
        int64_t count_;
        int64_t null_flags_;
        ...
    } DP_header;
    ... 
    void* data_;
} DP;

The IMCU stores IMCDPs, and manage these IMCDPs.

typedef strcut IMCU_t {
   //header i
   typdedef struct IMCU_header_t {
       int instance_id_of_buffer_pool_;
       OID table_id_;
   } IMCU_header; 
   ...
   //header info of the imcu.
   IMCU_header* header_ ;
   //data of this imuc. 
   void* data_;  
} IMCU;

In order to manage all the IMCDPs, which they are belonged to the same column. such as the first column of DPs in IMCDP_M, IMCDP_N, IMCDP_O, and IMCDP_P belong to Col1 logically. Therefore, we call the DPs set , which are in the first column, a column set(CU).

image

*The information of NULL also be stored.

  • In-memory column-based unit index (IMCU Index)

As we discussed in previous section, in production, there is a bunch of IMCUs existed in memory. And, each one stores a part of a table data, number of IMCUs depends on the size of tables and the number of tables loaded in. In order to avoid scanning all the IMCUs to accelerate the query performance, Therefore, Indexes of IMCUs will be created simultaneously when an IMCU is created. The index contains max, min of this IMCU, and column information in this IMCU. Similar to partition table, one an IMCU stores some of data of a table. Hence, in order to avoid un-necessary scanning, we can use these indexes we build. We can prune the un-related IMCUs by these indexes when we are running a query. Only the related IMCUs, which satisfied the query predictions, are checked. Taking an instance, select * from where order table where order_id > 100 and order_id < 1000. We only check the IMCUs which satisfy the query conditions, order_id >100 and order_id < 100. image

class IMCU_index {
	public:	
	  IMCU_index ();
	  ~IMCU_index();
	  IMCU_header get_IMCU_header(oid column_index);
	  int64_t max_of_imcu();
	  int64_t min_of_imcu();
	  double avg_of_imcu ();
	  int64_t max_of_cu(int index);
	  int64_t min_of_cu(int index);
	  double avg_of_cu(int index);
	  ...  
	private:
	  	IMCU_header imcu_headers_[MAX_COLUMN_NUMS];
     	oid column_id_[MAX_COLUMN_NUMS];
     	int64_t max_of_imcu, min_of_imcu;
     	double avg_of_imcu;
     	... 
} ;

4.2 Data Loading

4.2.1 Data Loading

Population means a operations, when the database starts reading the row-format data from innoDB and transforms the row-format data to column-format data, then loads it into Tianmu in-memory column-based store. Taking an instance, we run the commands, which are defined in #423, mysql> ALTER TABLE orders SECONDARY_LOAD;. After the command launches, StoneDB will start population operations. Firstly, execution flow will be in sql\sql_alter.cpp.

bool Sql_cmd_secondary_load_unload::execute(THD *thd) {
  // One of the SECONDARY_LOAD/SECONDARY_UNLOAD flags must have been set.
  assert(((m_alter_info->flags & Alter_info::ALTER_SECONDARY_LOAD) == 0) !=
         ((m_alter_info->flags & Alter_info::ALTER_SECONDARY_UNLOAD) == 0));

  // No other flags should've been set.
  assert(!(m_alter_info->flags & ~(Alter_info::ALTER_SECONDARY_LOAD |
                                   Alter_info::ALTER_SECONDARY_UNLOAD)));

  TABLE_LIST *table_list = thd->lex->query_block->get_table_list();

  if (check_access(thd, ALTER_ACL, table_list->db, &table_list->grant.privilege,
                   &table_list->grant.m_internal, false, false))
    return true;

  if (check_grant(thd, ALTER_ACL, table_list, false, UINT_MAX, false))
    return true;

  return mysql_secondary_load_or_unload(thd, table_list);
}

After access rights checking, it will invoke mysql_secondary_load_or_unload.

bool Sql_cmd_secondary_load_unload::mysql_secondary_load_or_unload(
    THD *thd, TABLE_LIST *table_list) {
       ..
       
    }

In this function, it opens the tables which defined in our SECONDARY_LOAD command.

BTW: the isolation level we used is always READ_COMMITTED. From MySQL comments.

// Always use isolation level READ_COMMITTED to ensure consistent view of // table data during entire load operation. Higher isolation levels provide no // benefits for this operation and could impact performance, so it's fine to // downgrade from both REPEATABLE_READ and SERIALIZABLE. Then it start to setting the column bit to indicate which columns are eligible for loading. This is done be the following code.

  bitmap_clear_all(table_list->table->read_set);
  for (Field **field = table_list->table->field; *field != nullptr; ++field) {
    // Skip hidden generated columns.
    if (bitmap_is_set(&table_list->table->fields_for_functional_indexes,
                      (*field)->field_index()))
      continue;

    // Skip columns marked as NOT SECONDARY.
    if ((*field)->is_flag_set(NOT_SECONDARY_FLAG)) continue;

    // Mark column as eligible for loading.
    table_list->table->mark_column_used(*field, MARK_COLUMNS_READ);
  }

Then, StoneDB performs the loading operation.

  // Initiate loading into or unloading from secondary engine.
  const bool error =
      is_load
          ? secondary_engine_load_table(thd, *table_list->table)
          : secondary_engine_unload_table(
                thd, table_list->db, table_list->table_name, *table_def, true);
  if (error) return true;

StoneDB invokes secondary_engine_load_table, and secondary_engine_unload_table for unloading. If we start the unload operation, StoneDB will perform data persistency operation, and will stores Tianmu in-memory data to disk. That leads that StoneDB will get some benefits as following: (1) The faster recovery. (2) That makes hierarchy storage system possible.

After that, StoneDB start loading the data from primary into secondary engine and adds to change propagation.

  // The engine must support being used as a secondary engine.
  handlerton *hton = plugin_data<handlerton *>(plugin);
  if (!(hton->flags & HTON_IS_SECONDARY_ENGINE)) {
    my_error(ER_SECONDARY_ENGINE, MYF(0),
             "Unsupported secondary storage engine");
    return true;
  }

  // Get handler to the secondary engine into which the table will be loaded.
  const bool is_partitioned = table.s->m_part_info != nullptr;
  unique_ptr_destroy_only<handler> handler(
      get_new_handler(table.s, is_partitioned, thd->mem_root, hton));

  // Load table from primary into secondary engine and add to change
  // propagation if that is enabled.
  return handler->ha_load_table(table);
/**
 * Loads a table into its defined secondary storage engine: public interface.
 *
 * @param table The table to load into the secondary engine. Its read_set tells
 * which columns to load.
 *
 * @sa handler::load_table()
 */
int handler::ha_load_table(const TABLE &table) { return load_table(table); }

/**
 * Unloads a table from its defined secondary storage engine: public interface.
 *
 * @sa handler::unload_table()
 */
int handler::ha_unload_table(const char *db_name, const char *table_name,
                             bool error_if_not_loaded) {
  return unload_table(db_name, table_name, error_if_not_loaded);
}

In sql\handler.h, to implement the table scanning logic and transform the data from row-format into column-format, then, load these data into the IMCUs of Tianmu in-memory store.

  /**
   * Loads a table into its defined secondary storage engine.
   *
   * @param table Table opened in primary storage engine. Its read_set tells
   * which columns to load.
   *
   * @return 0 if success, error code otherwise.
   */
  virtual int load_table(const TABLE &table MY_ATTRIBUTE((unused))) {
    /* purecov: begin inspected */
    assert(false);
    return HA_ERR_WRONG_COMMAND;
    /* purecov: end */
  }

4.2.2 Data Loading workers

As we discussed in previous words, in function load_table, we should implement the table scanning logic and the transformation, etc. Considering the size of tables, this load_table will take a long time, such as minutes or even hours. In order to shorten the loading operation. StoneDB will use parallel loading to speed up the loading. In MySQL 8.0, it implements parallel scanning functions. Therefore, StoneDB will use these functions to implement parallel loading.

  int ha_innobase::parallel_scan_init(void *&scan_ctx, size_t *num_threads,
                                    bool use_reserved_threads) {
                                      …
                                     size_t max_threads = thd_parallel_read_threads(m_prebuilt->trx->mysql_thd);
                                    }
   max_threads =
      Parallel_reader::available_threads(max_threads, use_reserved_threads);
     …

In parallel_scan_init, InnoDB do initialization at first. Then, start to do parallel scanning, ha_innobase::parallel_scan.

int ha_innobase::parallel_scan(void *scan_ctx, void **thread_ctxs,
                               Reader::Init_fn init_fn, Reader::Load_fn load_fn,
                               Reader::End_fn end_fn) {
  if (dict_table_is_discarded(m_prebuilt->table)) {
    ib_senderrf(ha_thd(), IB_LOG_LEVEL_ERROR, ER_TABLESPACE_DISCARDED,
                m_prebuilt->table->name.m_name);

    return (HA_ERR_NO_SUCH_TABLE);
  }

  ut_a(scan_ctx != nullptr);

  update_thd();

  build_template(true);

  auto adapter = static_cast<Parallel_reader_adapter *>(scan_ctx);

  auto err = adapter->run(thread_ctxs, init_fn, load_fn, end_fn);

  return (convert_error_code_to_mysql(err, 0, ha_thd()));
}

At last, the table will be loaded into Tianmu in-memory column store should have a primary key.

4.2.3 Settings

In MySQL you can use the innodb_parallel_read_threads param to control the degree of parallel(DoP). Hence, in StoneDB, we also use this parameter to control the DoP. For more information, please refer to : https://dev.mysql.com/doc/refman/8.0/en/innodb-parameters.html#sysvar_innodb_parallel_read_threads

4.3 DML operations and Changes propagation

4.3.1 Changes propagation

One of the challenges of a HTAP is data freshness. Which means that the OLAP workloads should use the latest version of data OLTP generated. If OLAP and OLTP share one copy of data, the data freshness is not a problem we think it over. But the architecture described in #436. We adopt one system with two copies architecture to implement StoneDB version 2.0. The data we want do analytical processing will be loaded into Tianmu in-memory column store. Hence, we have two copies of data in system. One copy is row-format, which is for OLTP, and one copy is column-format, which is for OLAP. This will lead a problem how to synchronize the data between TP and AP. If we were in a TP intensive application, the TP workloads will generate a bunch of data changes, and these changes should be synchronized to AP workloads. Otherwise, the AP workloads will use stale data, and that could lead to a wrong decision. So that a HTAP database should make sure the AP workloads can use the latest version of data to keep the data consistency. As we known, the query operations, such as selection dese not change any data. Therefore, query operations do not have synchronization problem. But, DML operations would have. If a TP workload modify some data, these changes should be visible to AP workloads immediately. Otherwise, the AP workloads read the stale data. Now, we will give our solution to solve this data freshness problem. Before giving our solution, firstly, some perquisition should be clarified. (1) Which transaction isolation level does we use? (2) Which transaction mode does we use? From complexity of implementation, we use read_committed isolation level, and auto commit mode.

If we support manual transaction, how to keep all the changes in a long transaction would become a biggest challenge. In a long (big) transaction, there is a lot of TP workloads, which could generate many versions for a row, it’s expensive to kept the version chains. Therefore, in the first version of implementation of StoneDB version 2.0, we only support auto commit mode, and read_committed transaction isolation level.

  • Snapshot Metadata Unit (SMU) All changes should be kept that request StoneDB version 2.0 to know all the active transaction information, row-column mapping information, information of IMCUs, and checkpoint. This meta data used to generate the latest version data with version chain of the changed rows.

  • Transaction Recorder : Population buffer As a part of snapshot metadata unit, transaction unit contains the changed rows and its transaction information. When you run a DML operation, the changes are updated in buffer pool. In order to not degrade the InnoDB DML performance, we also copy a copy of the changed data into population buffer, then start a worker to merge the latest version data to IMCU(convert the row-format to colum-format) asynchronously according to the row id and column id. Another problem is rebuilding the population buffer when StoneDB instance crash. So that StoneDB should have some way to make sure we can get back to the before-crash status. Supposing that in an intensive AP workloads application, population buffer stores a bunch of changed rows. Now, StoneDB come cross a bug and crash. All the changes in population buffer will be lost. You can re-run the SEONDARY_LOAD to reload the data to re-fill Tianmu in-memory column store.

4.3.2 Staleness Threshold

The changed data will copy to population buffer, and merge these changes to IMCUs, and get the latest version data. There are three thresholds, and when one of these met to trigger the workers to merge these changes asynchronously.

  • 1: The population buffer is filled up;
  • 2: Timeout, the clock was set.
  • 3: the data in population buffer were referred by a WP workload. Which means that a AP workload will check the population buffer to determine whether there are the latest version data or not before it starts.

4.3.3 Propagation Workers

When StoneDB starts up, a background worker will be launched, which is used to merge the changes in population buffer. The initial status of this worker is sleep. When threshold 1 and threshold 2 are met. It will send a signal to the worker to wake up the worker to perform the synchronization.

4.4 Transaction

Now, we only support the auto commit transaction and the isolation level is read committed.

4.5 Indexing

Tianmu in-memory column store version 2.0 does not need to build any indexes explicit.

4.6 Checkpoint

4.6.1 Checkpoint

As the checkpoint does in InnoDB, the checkpoint in Tianmu in-memory column store does same thing. If we don’t have checkpoint mechanism, the data with the latest version are all in Tianmu in-memory column store. The issues listed below should we pay more attention to. If the data in Tianmu in-memory column store are all modified, which means the latest version comes from merging the population buffer. Therefore, in order to reduce the recovery time we need checkpoint to flush the latest version data to disk.

4.6.2 Checkpoint Worker

When StoneDB starts up, checkpoint worker will be launched to do checkpoint operation periodically.

4.6.3 Settings

None

4.7 Persistence

In order to speed up the recovery. StoneDB would not do re-loading operation. StoneDB can recovery to previous status. In recovery stage, StoneDB can only read the data from Tianmu on-disk column store engine, but not loads data from InnoDB and convert the data from row-format to column-format. Hence, StoneDB will flush the changed data to disk periodically. That’s called – persistence.

4.8 Resource Management

The resource management is another challenge for a HTAP data. The AP workloads should not interfere the TP workloads, thus, we should manage the resource usage. We limit the usage ratio of system resource for AP workloads. The system resource includes: main memory, CPU, file handlers, etc. The main memory is limited by StoneDB system parameters. If you deploy StoneDB in single instance mode. The AP workloads and TP workloads are running on same host; therefore, system resource contention must exist. But in MPP mode, the AP workloads are offloaded to AP nodes, which is an independent physical resource. Hence, the AP workloads will not interfere the TP workloads. In single node deployment flavor, we will provide some system parameters to control the usage of system resource. We will discuss these in details in #xxx.

The access pattern of TP, AP and HTAP as below described. image

5 Miscellaneouse

None.

hustjieke avatar Aug 19 '22 15:08 hustjieke