stonedb icon indicating copy to clipboard operation
stonedb copied to clipboard

[RFC]: Load data from Innodb to in-memory column store(Tianmu_rapid)

Open RingsC opened this issue 1 year ago • 1 comments

This is a submodule of Vesion 2.0 , which is mainly focusing on how to load data from innodb into in-memory column store of stonedb version 2.0. This issue will give all the information about this part.

When the table with secondary engine created via create statement. the next step will be loaded data into the secondary engine. After the all the data we need have been loaded, we can do query processing. The load data syntax is defined in #. The load operation perform via using alter table statement with SECONDARY_LOAD option.

Now, we list again here.

ALTER TABLE tb_name SECONDARY_LOAD.

1: Summary

When the load statement was executed, it would peform the load operation. Overall, just like insert into xxx select xxx statement, the system firstly do a table scan via index or full table scan.

1: It will scan the target table, usually it is an innodb table. And, here, there is a problem must be clarified at first. That is which data will be visible to operation, and which is not. Therefore, here, we define that only the committed data will be visible to scan operation. In other words, that means we will use auto commited transaction to do table scan. the transaciton will be read committed isolation level.

The new data inserted when we do table scaning, all these the latest data will not be seen by the operation.

2: Except the core functions, there must be some system parameters to monitors the load operations, for example, how many data have been loaded? and how many remains, and so on. some parallel related parameters also will be introduced into, such as POD( parallel of degree), etc. Therefore, some system parameters will be introduced.

2: Implementation

By now, the innodb has only a few parallel table scan abitities, such as based on index counting operations, and check table.

Innodb index is orginzied as B+tree, and each node in B+tree has a fixed size (generallly 16K, or denotated by UNIV_PAGE_SIZE in source code). Innodb reads the data from disk and loaded into buffer pool, and will spill out the old pages and load the latest page into buffer pool because the size of buffer pool is NOT un-limited. usually the buffer pool is configurated to 70%-80% of physical memory size.

ace4c18817518405ac8c4f002f12de5d

The data on disk is managed in three-level: (1)segment; (2)cluster; (3)page. InnoDB does in-place updates, so when accessing a specific page, latch protection is required.

2.1 Full table scan

As the innodb does. Here, we would not discuss anymore. We mainly foucs on parall index scan.

2.2 Parallel index scan

The basic idea of parall index scan is divided index B+tree into several workloads, and each workload is scanned by ONE thread. From the structure of innodb index, which is organized as Index of Table (IOT).

The index table scan will follow the following steps: 1:From the root node; 2:Turn to left or right according to the comparision result. (If node value less than key, then the cursor go to left, otherwise, to right) 3: Go through all the B+tree until the leaf nodes. 4: Check all the data in that leaf node, and if found return, or otherwise return NOT_FOUND;

If we do FULL table scan, it will check all the node in that B+tree.

In version 2.0, the semantic load operation is defined as following. StoneDB do full table scan, and find out all the records in object table, and transfer them from row-based tuple into column-based tuple, then be loaded into tianmu-rapid engine.

Now, we can do parallel scan B+tree via dividing scanning the levels simultaneously. For example, stonedb do table scan parallely according to POD and height of the index.

StoneDB uses Multi-version Concurrency Control (MVCC) to archive consistent lock-free read, and the new inserted record and deleted record(or so-called tuple) are invisible to scan operation.

2.2.1 InnoDB baisc parallel scan

Before we start to support more parallel operation in innodb, we, firstly, start to re-examine. All the worklogs of MySQL is listed as below. image

This research result is searched by parallel keyword. From the result we can read that parallel operation mainly includes: (1) parallel query execution; (2) innodb parallel read of index. And there are several main worklog we should notice, (1)WL#12978 | InnoDB:Fix imbalance during parallel scan; (2) WL#11720 InnoDB: Parallel read of index; (3) WL#1092 Parallel load data infile and bulk insert; (4)WL#12978 InnoDB:Fix imbalance during parallel scan

For more information, ref to: https://dev.mysql.com/worklog/?sc=&sd=&k=parallel&s=

PS: to find out what changeed in WL#11720, we can use git log --grep WL#11720 to figure out what changes in this commit. this commit id: dbfc59ffaf8096a2dc5b76766fedb45ff2fb8cbf

/** The core idea is to find the left and right paths down the B+Tree.These
paths correspond to the scan start and scan end search. Follow the links
at the appropriate btree level from the left to right and split the scan
on each of these sub-tree root nodes.

If the user has set the maximum number of threads to use at say 4 threads
and there are 5 sub-trees at the selected level then we will split the 5th
sub-tree dynamically when it is ready for scan.

We want to allow multiple parallel range scans on different indexes at the
same time. To achieve this split out the scan  context (Scan_ctx) from the
execution context (Ctx). The Scan_ctx has the index  and transaction
information and the Ctx keeps track of the cursor for a specific thread
during the scan.

To start a scan we need to instantiate a Parallel_reader. A parallel reader
can contain several Scan_ctx instances and a Scan_ctx can contain several
Ctx instances. Its' the Ctx instances that are eventually executed.

This design allows for a single Parallel_reader to scan multiple indexes
at once.  Each index range scan has to be added via its add_scan() method.
This functionality is required to handle parallel partition scans because
partitions are separate indexes. This can be used to scan completely
different indexes and tables by one instance of a Parallel_reader.

To solve the imbalance problem we dynamically split the sub-trees as and
when required. e.g., If you have 5 sub-trees to scan and 4 threads then
it will tag the 5th sub-tree as "to_be_split" during phase I (add_scan()),
the first thread that finishes scanning the first set of 4 partitions will
then dynamically split the 5th sub-tree and add the newly created sub-trees
to the execution context (Ctx) run queue in the Parallel_reader. As the
other threads complete their sub-tree scans they will pick up more execution
contexts (Ctx) from the Parallel_reader run queue and start scanning the
sub-partitions as normal.

Note: The Ctx instances are in a virtual list. Each Ctx instance has a
range to scan. The start point of this range instance is the end point
of the Ctx instance scanning values less than its start point. A Ctx
will scan from [Start, End) rows. We use std::shared_ptr to manage the
reference counting, this allows us to dispose of the Ctx instances
without worrying about dangling pointers.

2.2.2 InnoDB basic parallel scan implementation

The change for parallel scan mainly in these files. (1) sql/handler.h; (2)innobase/handler/ha_innodb.h; (3) innobase/handler/handle0alter.cc; (4) include/row0pread.h. And the system params mainly defined in srv/srv0srv.cc.

Limitation: Can NOT support secondary index parallel scan.

1: Key Data structures and functions

class Parallel_reader {
};

  /** Specifies the range from where to start the scan and where to end it. */
  struct Scan_range {
};

  /** Thread related context information. */
  struct Thread_ctx {
};

/** Parallel reader context. */
class Parallel_reader::Scan_ctx {
};


class handler {
...
/**
    Initializes a parallel scan. It creates a parallel_scan_ctx that has to
    be used across all parallel_scan methods. Also, gets the number of
    threads that would be spawned for parallel scan.
*/
  virtual int parallel_scan_init(void *&scan_ctx [[maybe_unused]],
                                 size_t *num_threads [[maybe_unused]],
                                 bool use_reserved_threads [[maybe_unused]]) {
    return 0;
  }

/**
    Run the parallel read of data.
*/
  virtual int parallel_scan(void *scan_ctx [[maybe_unused]],
                            void **thread_ctxs [[maybe_unused]],
                            Load_init_cbk init_fn [[maybe_unused]],
                            Load_cbk load_fn [[maybe_unused]],
                            Load_end_cbk end_fn [[maybe_unused]]) {
    return 0;
  }

/**
    End of the parallel scan.
*/
  virtual void parallel_scan_end(void *scan_ctx [[maybe_unused]]) { return; }
...

/** Start row of the scan range. */
struct Key_reader_row {
};

/** Parallel read implementation. */
template <typename T, typename R>
class Reader {
}; 

**2.3 Load Data for Secondary Engine **

As normal load data operation, the sematic of load of secondary engine is to load the data from primary engine to secondary engine.

when the statement, alter statement, is executed. mysql_execute_command will be executed and in this function. lex->m_sql_cmd is Sql_cmd_secondary_load_unload. Therefore, Sql_cmd_secondary_load_unload::execute is executed. At last, the execution will be in Tianmu_secondary_engine::ha_tianmu_secondary::load_table, and in this function, it starts to scan the object table to read the corresponding data.

/**
  Represents ALTER TABLE SECONDARY_LOAD/SECONDARY_UNLOAD statements.
*/
class Sql_cmd_secondary_load_unload final : public Sql_cmd_common_alter_table {
};

/**
 * Loads a table from its primary engine into its secondary engine.
 *
 * This call assumes that MDL_SHARED_NO_WRITE/SECLOAD_SCAN_START_MDL lock
 * on the table have been acquired by caller. During its execution it may
 * downgrade this lock to MDL_SHARED_UPGRADEABLE/SECLOAD_PAR_SCAN_MDL.
 *
 * @param thd              Thread handler.
 * @param table            Table in primary storage engine.
 *
 * @return True if error, false otherwise.
 */
static bool secondary_engine_load_table(THD *thd, const TABLE &table) {
};

class ha_tianmu_secondary : public handler {
 public:
  ha_tianmu_secondary(handlerton *hton, TABLE_SHARE *table_share);

 private:
  int create(const char *, TABLE *, HA_CREATE_INFO *, dd::Table *) override;

  int open(const char *name, int mode, unsigned int test_if_locked,
           const dd::Table *table_def) override;

  int close() override { return 0; }

  int rnd_init(bool) override { return 0; }

  int rnd_next(unsigned char *) override { return HA_ERR_END_OF_FILE; }

  int rnd_pos(unsigned char *, unsigned char *) override {
    return HA_ERR_WRONG_COMMAND;
  }

  int info(unsigned int) override;

  ha_rows records_in_range(unsigned int index, key_range *min_key,
                           key_range *max_key) override;

  void position(const unsigned char *) override {}

  unsigned long index_flags(unsigned int, unsigned int, bool) const override;

  THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
                             thr_lock_type lock_type) override;

  Table_flags table_flags() const override;

  const char *table_type() const override { return "TIANMU_RAPID"; }

  int load_table(const TABLE &table) override;

  int unload_table(const char *db_name, const char *table_name,
                   bool error_if_not_loaded) override;

  THR_LOCK_DATA m_lock;
};

3: Split the row data into columns

After the data be read from innodb, the next thing is to splite the row data into columns. the data layout of IMCS pls ref to issue #the_data_format_layout for details. in order to identify which row this data belongs to. every data in a column has its own row id. the row id can get from innodb tuples, and other important attribute of a data is transaction id. The transaction id is used to identify which transaction and which oper are done on this record. As we known that, each record in innodb has a row id, transaction id,rollback ptr. (all information about these can be found in https://dev.mysql.com/doc/refman/5.6/en/innodb-multi-versioning.html).

ONLY committed records be read from innodb and split into corresponding columns, and these organized as column-based format, stores into its columns keeping the same order as innodb's order.

The uncommitted record will not be read.

The meta-data of the loaded record will be update in loading stage. Such as, the maxmium and mimium, average, median, etc., will be updated. In one word, the meta-information about these data will be updated continuously.

the data in imcs was divided into several parts in hierachy. the first level is column, of course, all the data was orginized as column-based format. and then a column was split into some small part, which we call: chunk. and chunk was created by some verctors. some rows creates a vector(bucket).

Data type we supported, now, we only support number(int, double, etc.) and string two types.

> create table foo(col1 int, col2 varchar(5) col3 int not secondary, 
                   col4 numeric) engine=innodb, seondary_engine=tianmu_rapid;
> alter table foo secondary_load;

foo table in IMCS memory pool layout: image

The data is organized as chunks, buckets, tiles, etc. Some chunks create IMCUs. The definitions of these data structures are given in issue #the_data_format_layout.

Except the original fields in an innodb row, some invisible fields also should be loaded. There are two system columns should be loaded in loading stage. Firstly, RowID; secondly, TransactionID.

The rowid is used to identify a column belongs to which row. For example, there is a row which includes tree columns, colA, colB, colC.
the layout of this row in InnoDB, maybe just like this image

and the row data also be depicted as 'var length list', 'null bit flag', 'record header', 'column data'. If row data loads into IMCS. the column data will be spilt into several independent columns. and be stored as chunks. In order to indetify the rows in a chunk belongs to which rows and which transaction, every rows in a chunk also has a rowid and trx id.

Therefore, a row in a chunk. and be defined as:

class Row {
 public:
  Row() {}
  Row(const uchar *buf) {}
  virtual ~Row() {}

  //store the record data.
  int store(const uchar *buf);

  //clean up the record data.
  int reset();

  //this row has null field or not.
  inline bool has_null() { return (null_list_ == 0)?  false : true; }

  //the null fields index of this row.
  inline uint8 null_list() { return null_list_; }

  //gets the num of null fields. ervery null has one bit in null_list_.
  inline uint8 n_nulls() { return n_nulls_; }

  //gets the Feids of this row.
  inline Field* get_fields() { return fields_; }

  //get num of fields.
  inline uint8 n_fields() { return n_fields_; }

  //gets the record data.
  inline uchar* data() { return data_; }
 private:
  // trxid for this rows.
  // to store the rows data.
  uchar data_[MAX_ROW_LENGTH];

  //null list to indicate which cols is null or not.
  uint8 null_list_;

  //num of nulls.
  uint8 n_nulls_;

  //Fields belong to this row.
  Field* fields_;

  //num of fields
  uint8 n_fields_;

  //rowid of this row.
  RowID rowid_;

  //Trx ID of thsi row.
  TxnID trxid_;
};

How to get rowid and transaction id from a row. in row_search_mvcc, it will convert innodb row format to mysql row format by the following function.

/** Convert a row in the Innobase format to a row in the MySQL format.
Note that the template in prebuilt may advise us to copy only a few
columns to mysql_rec, other columns are left blank. All columns may not
be needed in the query.
@param[out]     mysql_rec           row in the MySQL format
@param[in,out]  prebuilt            prebuilt structure
@param[in]      rec                 Innobase record in the index
                                    which was described in prebuilt's
                                    template, or in the clustered index;
                                    must be protected by a page latch
@param[in]      vrow                virtual columns
@param[in]      rec_clust           true if rec is in the clustered index
                                    instead of index which could belong to
                                    prebuilt->index
@param[in]      rec_index           index of rec
@param[in]      prebuilt_index      prebuilt->index
@param[in]      offsets             array returned by rec_get_offsets(rec)
@param[in]      clust_templ_for_sec true if rec belongs to secondary index
                                    but the prebuilt->template is in
                                    clustered index format and it
                                    is used only for end range comparison
@param[in]      lob_undo            the LOB undo information.
@param[in,out]  blob_heap           If not null then use this heap for BLOBs
@return true on success, false if not all columns could be retrieved */
// clang-format on
bool row_sel_store_mysql_rec(byte *mysql_rec, row_prebuilt_t *prebuilt,
                             const rec_t *rec, const dtuple_t *vrow,
                             bool rec_clust, const dict_index_t *rec_index,
                             const dict_index_t *prebuilt_index,
                             const ulint *offsets, bool clust_templ_for_sec,
                             lob::undo_vers_t *lob_undo,
                             mem_heap_t *&blob_heap);

4: Unit Tests

All the function should have corresponding unit test.

5: Limitation

If a column is defined as secondary, after that, this column will be loaded into in-memory column store(Tianmu_rapid). But there is one exception here. Considering that if a column is defined as nullable. and There is not data inserted. that menas all rows data of this column is null. How do we do? The meta data of this column should be created whether data is in or not.

6: The system variables

Some system variables are employed to monitor the loading status. The show variables and show engine status statement are used to display these varaibles.

7: Exception handling

When an exception occurs in loading stage, loading operation will be irrupted, and the default behavior is to discard all the loaded data. You can also to enable crash-safe mode. When this option is set, the loaded data will be loaded into memory and also be written to disk. The load log is written. If an instance crashed, and re-starting, it will load the data from disk directly. Reading from innodb and splitting the record into columns, all these steps are not nesscessary.

The mechanism of recovery from exception or error fault will be disscussed in reconvery and fast recovery part.

RingsC avatar Feb 13 '23 06:02 RingsC