horaedb icon indicating copy to clipboard operation
horaedb copied to clipboard

Refactor EngineBuilder to remove duplicate code

Open chunshao90 opened this issue 2 years ago • 20 comments

Description Refactor EngineBuilder to remove duplicate code

Proposal code: https://github.com/CeresDB/ceresdb/blob/890fecf10848a3d09b485af52ff516deccf771fc/analytic_engine/src/setup.rs#L92 https://github.com/CeresDB/ceresdb/blob/890fecf10848a3d09b485af52ff516deccf771fc/analytic_engine/src/setup.rs#L120 https://github.com/CeresDB/ceresdb/blob/890fecf10848a3d09b485af52ff516deccf771fc/analytic_engine/src/setup.rs#L155

Additional context

chunshao90 avatar Jun 29 '22 02:06 chunshao90

hi, chunshao90.

It seems we can make Wal as an associate type of EngineBuilder, then the build logic can be reused, like:

#[async_trait]
pub trait EngineBuilder: Default + Send + Sync {
    type Wal: WalManager + Send + Sync + 'static;

    /// Build the analytic engine from `config` and `engine_runtimes`.
    async fn build(
        &self,
        config: Config,
        engine_runtimes: Arc<EngineRuntimes>,
    ) -> Result<TableEngineRef> {
        assert!(!config.obkv_wal.enable);

        match config.storage {
            crate::storage_options::StorageOptions::Local(ref opts) => {
                let store = open_storage_local(opts.clone()).await?;
                let instance = self
                    .open_instance::<LocalFileSystem>(config.clone(), engine_runtimes, store)
                    .await?;
                Ok(Arc::new(TableEngineImpl::new(instance)))
            }
            crate::storage_options::StorageOptions::Aliyun(ref opts) => {
                let store = open_storage_aliyun(opts.clone()).await?;
                let instance = self
                    .open_instance::<AliyunOSS>(config.clone(), engine_runtimes, store)
                    .await?;
                Ok(Arc::new(TableEngineImpl::new(instance)))
            }
        }
    }

    async fn open_instance<Store>(
        &self,
        config: Config,
        engine_runtimes: Arc<EngineRuntimes>,
        store: Store,
    ) -> Result<InstanceRef<Self::Wal, ManifestImpl<Self::Wal>, Store, FactoryImpl>>
    where
        Store: ObjectStore;
}

Is this the expected to this task?

ygf11 avatar Jul 08 '22 03:07 ygf11

@ygf11 This is a good proposal but considering that more implements of these components will be provided in the future, Wal, Store and other components needed by Instance are planned to be made as Trait Object.

ShiKaiWi avatar Jul 08 '22 09:07 ShiKaiWi

I agree with @ShiKaiWi

@ygf11 This is good proposal but considering that more implements of these components will be provided in the future, Wal, Store and other components needed by Instance are planned to be made as Trait Object.

chunshao90 avatar Jul 09 '22 07:07 chunshao90

Thanks for your response. I can think the usage of Trait Object here.

Instance has four generic parameters now, which are all only used by space_store.

pub struct Instance<Wal, Meta, Store, Fa> {
    /// Space storage
    space_store: Arc<SpaceStore<Wal, Meta, Store, Fa>>,
    ...
}

The main issue is that Wal and its associate type are not Object Safe, this needs some refactors.

I still do not have idea about it, what do you think?

ygf11 avatar Jul 09 '22 13:07 ygf11

Vote for removing some type params (I prefer to remove all four params) either. Let's make endeavors to achieve this.

trait WalManager presents in Wal and Meta. It's a bit complicated in my perspective:

pub trait WalManager: LogWriter + LogReader{
    async fn sequence_num(&self, region_id: RegionId) -> Result<SequenceNumber>;
    async fn mark_delete_entries_up_to(...) -> Result<()>;
    async fn close_gracefully(&self) -> Result<()>;
}

pub trait LogWriter {
    async fn write<P: Payload>(...) -> Result<SequenceNumber>;
}

pub trait LogReader {
    type BatchIter: BatchLogIterator + Send;

    async fn read_batch(...) -> Result<Self::BatchIter>;
}

pub trait BatchLogIterator {
    async fn next_log_entries<D: PayloadDecoder + Send + 'static>(...) -> Result<VecDeque<LogEntry<D::Target>>>;
}

Besides other constrains, IMO LogWriter and LogReader should be parts of WalManager rather than bounds. i.e., a WalManager itself can read and write.

The main issue is LogReader. I propose to remove the BatchLogIterator abstraction -- This is bound to WAL implementation and doesn't need to be dynamic. User only needs to specify the decoded data type via PayloadDecoder just like the write() method. The method now looks like

pub trait WalManager {
    // with some batching operations
    async fn read_batch<D: PayloadDecoder>(..., mut buffer: VecDeque<D::Target>) -> Result<VecDeque<D::Target>>;
}

waynexia avatar Jul 12 '22 07:07 waynexia

Thanks for your response. I can think the usage of Trait Object here.

Instance has four generic parameters now, which are all only used by space_store.

pub struct Instance<Wal, Meta, Store, Fa> {
    /// Space storage
    space_store: Arc<SpaceStore<Wal, Meta, Store, Fa>>,
    ...
}

The main issue is that Wal and its associate type are not Object Safe, this needs some refactors.

I still do not have idea about it, what do you think?

Sorry for the late repsonse. The trait of Wal indeed needs refactoring and @waynexia has given a feasible proposal. By the way, are you willing to take the refactoring work? :laughing:

ShiKaiWi avatar Jul 13 '22 02:07 ShiKaiWi

pub trait WalManager { async fn read_batch<D: PayloadDecoder>(..., mut buffer: VecDeque<D::Target>) -> Result<VecDeque<D::Target>>; }

@waynexia Why the buffer is a VecDeque? It seems a Vec is more reasonable. How about such a method signature:

pub trait WalManager {
    async fn read_batch<D: PayloadDecoder>(..., buffer: &mut Vec<D::Target>) -> Result<()>;
}

ShiKaiWi avatar Jul 13 '22 02:07 ShiKaiWi

@waynexia Why the buffer is a VecDeque? It seems a Vec is more reasonable. How about such a method signature:

Wal records need to be consumed from the head (pop_front) and replayed to the end (push_back). Thus VecDeque is used for this queue-like usage.

Take mutable reference is ok to me :+1:

waynexia avatar Jul 13 '22 02:07 waynexia

Sorry for the late repsonse. The trait of Wal indeed needs refactoring and @waynexia has given a feasible proposal. By the way, are you willing to take the refactoring work? 😆

I'm glad to work on this :D

But maybe I need some time to familiar with codebase.

ygf11 avatar Jul 13 '22 02:07 ygf11

Thanks! Just take this at your pace and feel free to reach out to us if anything is unclear :heart:

waynexia avatar Jul 13 '22 03:07 waynexia

@waynexia Why the buffer is a VecDeque? It seems a Vec is more reasonable. How about such a method signature:

Wal records need to be consumed from the head (pop_front) and replayed to the end (push_back). Thus VecDeque is used for this queue-like usage.

Take mutable reference is ok to me 👍

I guess no need to do pop_front operation because the buffer seems not reusable if pop_front is called.

ShiKaiWi avatar Jul 13 '22 04:07 ShiKaiWi

I guess no need to do pop_front operation because the buffer seems not reusable if pop_front is called.

that method won't shrink the container so the buffer is still there. But I'm afraid if we don't pop the element out we may need to copy it to get the ownership for the consumer.

waynexia avatar Jul 13 '22 05:07 waynexia

I guess no need to do pop_front operation because the buffer seems not reusable if pop_front is called.

that method won't shrink the container so the buffer is still there. But I'm afraid if we don't pop the element out we may need to copy it to get the ownership for the consumer.

I read the docs about VecDequeue and find it out that pop_front indeed doesn't shrink the underlying RawVec but the poped element is copied from the RawVec. Actually it seems that one more copy is necessary if we want to reuse the buffer and take the ownership of elements in it at same time. So I guess Vec is enough here.

ShiKaiWi avatar Jul 13 '22 06:07 ShiKaiWi

I see. So use the element under a reference and copy it if needed is the same with pop(). We can replace pop_front with iter over a Vec.

waynexia avatar Jul 13 '22 06:07 waynexia

Hi @ygf11, I have removed Store bounds in #101. It may cause conflict if you have changed it either. Sorry for the inconvenience 🙇

By the way, those traits are widely used. You can change them one by one to avoid a massive change set.

waynexia avatar Jul 18 '22 03:07 waynexia

Hi @ygf11, I have removed Store bounds in https://github.com/CeresDB/ceresdb/pull/101. It may cause conflict if you have changed it either. Sorry for the inconvenience 🙇

@waynexia Thanks for your kindness remind :D

I am removing Fa type param now.

ygf11 avatar Jul 18 '22 06:07 ygf11

I meet some troubles.

To make WalManager object safe, Generic types are not allowed in trait assosiate methods.

pub trait WalManager {
    // read payload
    async fn read_batch<D: PayloadDecoder>(..., mut buffer: Vec<D::Target>) -> Result<Vec<D::Target>>;

    // write payload 
    async fn write<P: Payload>(&self, ctx: &WriteContext, batch: &LogWriteBatch<P>,) -> Result<SequenceNumber>;
}

We can simply use derives to remove Payload and PayloadDecoder, since we have two derives, MetaUpdatePayload and WritePayload, so:

pub trait WalManager {
    async fn read_meta_batch(..., mut buffer: ...)...
    async fn read_write_batch(..., mut buffer: ...)...
    async fn write_payload(&self, ctx: &WriteContext, batch: &LogWriteBatch<WritePayload>)...
    async fn write_meta_payload(&self, ctx: &WriteContext, batch: &LogWriteBatch<MetaUpdatePayload>)...
}

It has two drawbacks:

  1. WalManager only support read and write some payloads, if we want support other payload, we need add more methods.
  2. To avoid circle deps, WritePayload and MetaUpdatePayload need define in wal crate.

I think maybe we can define a trait in analytic_engine, which is object safe:

pub trait WalManagerWraper {
    async fn read_meta_batch(..., mut buffer: ...)...
    async fn read_write_batch(..., mut buffer: ...)...
    async fn write_payload(&self, ctx: &WriteContext, batch: &LogWriteBatch<WritePayload>)...
    async fn write_meta_payload(&self, ctx: &WriteContext, batch: &LogWriteBatch<MetaUpdatePayload>)...
}

// rocks version
pub struct RocksWalManagerWrapper {
    inner: RocksImpl,
}

impl WalManagerWraper for RocksWalManagerWrapper {
    ...
}

ygf11 avatar Jul 19 '22 01:07 ygf11

I think maybe we can define a trait in analytic_engine, which is object safe:

This is doable but enumerating all possible implementations is not a good way in my perspective. I've not considered the object safety problem before. I come up with another way to remove those type parameters:

  • Fow write, change the parameter to a trait object. like
pub trait WalManager {
    // write payload 
    async fn write(&self, ctx: &WriteContext, batch: &dyn Payload) -> Result<SequenceNumber>;
}
  • For read, add an indirect layer to place the type parameter. I write an example in https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=a0d46419f3910cf3c845352babbda71d. The main idea is to make read method to return a concrete type (Decoder in the playground), and put type parameter PayloadDecoder in Decoder's method. Decoder is only used to place type parameters for WalManager. Other non-generic logic like codec, batching, version checking, etc. should be placed elsewhere.

Please tell me what you think about this @ygf11 @ShiKaiWi


A side note, changing read and write methods in WalManager may involve lots of code. For now two different implementations have mixed their own data access logic up with data codec logic. I haven't given an actual try to refactor them so feel free to raise any problem to discuss.

waynexia avatar Jul 19 '22 07:07 waynexia

I come up with another way to remove those type parameters:

@waynexia Thanks. Your idea is the right direction.

For write, we need also make payload as object safe like you suggest.

pub trait Payload: Send + Sync + Debug {
    type Error: std::error::Error + Send + Sync + 'static;
    /// Compute size of the encoded payload.
    fn encode_size(&self) -> usize;
    /// Append the encoded payload to the `buf`.
    fn encode_to<B: MemBufMut>(&self, buf: &mut B) -> Result<(), Self::Error>;
}

Shall we replace Error with Box<dyn std::error::Error> here?

ygf11 avatar Jul 20 '22 03:07 ygf11

Shall we replace Error with Box<dyn std::error::Error> here?

It's ok to do so 👍

waynexia avatar Jul 20 '22 04:07 waynexia