orc icon indicating copy to clipboard operation
orc copied to clipboard

Add C++ API for Columnar Encryption

Open dongjoon-hyun opened this issue 2 years ago • 22 comments

This is created based on the following dev@orc mail on June 11st.

https://lists.apache.org/thread/pkp6ffh9pqok7v618zxtox708mv26sz0

dongjoon-hyun avatar Jul 20 '22 16:07 dongjoon-hyun

cc @wgtmac , @williamhyun , @guiyanakuang , @stiga-huang

dongjoon-hyun avatar Jul 20 '22 16:07 dongjoon-hyun

I looked at the commit history of the Java api. It seems like a lot of work. Maybe we should follow the steps of the Java implementation and divide it into smaller topics. image

guiyanakuang avatar Jul 23 '22 03:07 guiyanakuang

I agree with you, @guiyanakuang . It will take lots of efforts.

dongjoon-hyun avatar Jul 23 '22 16:07 dongjoon-hyun

+1 @guiyanakuang @dongjoon-hyun.

This is a large feature. We can follow the java implementation and break it down into smaller work items first.

wgtmac avatar Jul 25 '22 03:07 wgtmac

Thank you, @wgtmac .

dongjoon-hyun avatar Jul 25 '22 15:07 dongjoon-hyun

I set a milestone 1.9.0 as a goal.

dongjoon-hyun avatar Jul 25 '22 15:07 dongjoon-hyun

Hi @dongjoon-hyun Do we have jira to track the work?

deshanxiao avatar Sep 27 '22 05:09 deshanxiao

Hi @dongjoon-hyun Do we have jira to track the work?

@deshanxiao Not yet. @coderex2522 is working on it already. Will create JIRAs later.

wgtmac avatar Sep 27 '22 09:09 wgtmac

Hi @dongjoon-hyun Do we have jira to track the work?

@deshanxiao Not yet. @coderex2522 is working on it already. Will create JIRAs later.

@coderex2522 Has the ORC file encryption and decryption functionality been implemented?

zxf216 avatar Sep 13 '23 12:09 zxf216

@dongjoon-hyun @coderex2522 I want to try implementing ORC encryption and decryption feature in C++. Are there any other developers working on this feature?

zxf216 avatar Sep 21 '23 03:09 zxf216

@coderex2522 Thank you for your support. I would like to know more about the design plan for ORC encryption function. Do you have any materials that you can provide me with? Below is a document I have summarized by reading the Java source code. I'm not sure if it's correct. document:https://www.processon.com/view/link/6507c37f464f0d50af3e2ae8

zxf216 avatar Sep 21 '23 03:09 zxf216

@zxf216 If you are interested in this part of the work, welcome to provide the implementation of the C++ encryption module to the Apache ORC community! Afterward, @dongjoon-hyun @wgtmac @deshanxiao can help with a thorough review of this part.

coderex2522 avatar Sep 21 '23 03:09 coderex2522

Thanks @zxf216 for taking up this! AFAIK, there aren't many materials for the design except the spec and the Java code. Let me spend some time to recall the detail. Feel free to discuss it in detail if you want.

wgtmac avatar Sep 21 '23 16:09 wgtmac

@wgtmac @coderex2522 My email is [email protected]. Can you share your email or WeChat with me? This way we can communicate more efficiently.

zxf216 avatar Sep 21 '23 22:09 zxf216

@wgtmac @coderex2522 My email is [email protected]. Can you share your email or WeChat with me? This way we can communicate more efficiently.

You can find me via [email protected]

wgtmac avatar Sep 22 '23 01:09 wgtmac

This is removed from Milestone 2.0.0.

dongjoon-hyun avatar Dec 27 '23 19:12 dongjoon-hyun

@dongjoon-hyun @wgtmac @coderex2522 We are developing a C++ feature to read encrypted columns in ORC, but have encountered an issue where the program throws an error when it needs to skip ahead due to a set filter condition. Below is a sample code of our attempt to read the encrypted columns. Define DecryptionInputStream:

class DecryptionInputStream : public SeekableInputStream {
   public:
    DecryptionInputStream(std::unique_ptr<SeekableInputStream> input,std::vector<unsigned char> key,
                          std::vector<unsigned char> iv,const EVP_CIPHER* cipher,MemoryPool& pool);
    virtual ~DecryptionInputStream();

    virtual bool Next(const void** data, int* size) override;
    virtual void BackUp(int count) override;
    virtual bool Skip(int count) override;
    virtual google::protobuf::int64 ByteCount() const override;
    virtual void seek(PositionProvider& position) override;
    virtual std::string getName() const override;

   private:
    std::unique_ptr<SeekableInputStream> input_;
    std::vector<unsigned char> key_;
    std::vector<unsigned char> iv_;
    EVP_CIPHER_CTX* ctx_;
    const EVP_CIPHER* cipher;
    MemoryPool& pool;
    std::unique_ptr<DataBuffer<unsigned char>> inputBuffer_;
    std::unique_ptr<DataBuffer<unsigned char>> outputBuffer_;
  };
}  // namespace orc

Implement DecryptionInputStream class:

DecryptionInputStream::DecryptionInputStream(std::unique_ptr<SeekableInputStream> input,
                                               std::vector<unsigned char> key,
                                               std::vector<unsigned char> iv,
                                               const EVP_CIPHER* cipher,MemoryPool& pool)
      : input_(std::move(input)),
        key_(key),
        iv_(iv),
        cipher(cipher),
        pool(pool){
    EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
    if (ctx == nullptr) {
      throw std::runtime_error("Failed to create EVP cipher context");
    }
    int ret = EVP_DecryptInit_ex(ctx, cipher, NULL, key_.data(), iv_.data());
    if (ret != 1) {
      EVP_CIPHER_CTX_free(ctx);
      EVP_CIPHER_free(const_cast<evp_cipher_st*>(cipher));
      throw std::runtime_error("Failed to initialize EVP cipher context");
    }
    ctx_ = ctx;
    outputBuffer_.reset(new DataBuffer<unsigned char>(pool));
    inputBuffer_.reset(new DataBuffer<unsigned char>(pool));
  }

  DecryptionInputStream::~DecryptionInputStream() {
    EVP_CIPHER_CTX_free(ctx_);
    EVP_CIPHER_free(const_cast<evp_cipher_st*>(cipher));
  }

  bool DecryptionInputStream::Next(const void** data, int* size) {
    int bytesRead = 0;
    //const void* ptr;
    const void* inptr = static_cast<void*>(inputBuffer_->data());
    input_->Next(&inptr, &bytesRead);
    if (bytesRead == 0) {
      return false;
    }
    //
    const unsigned char* result = static_cast<const unsigned char*>(inptr);
    int outlen = 0;
    //int blockSize = EVP_CIPHER_block_size(this->cipher);
    outputBuffer_->resize(bytesRead*2);
    int ret = EVP_DecryptUpdate(ctx_, outputBuffer_->data(), &outlen, result, bytesRead);
    if (ret != 1) {
      throw std::runtime_error("Failed to decrypt data");
    }
    outputBuffer_->resize(outlen);
    *data = outputBuffer_->data();
    *size = outputBuffer_->size();
    return true;
  }
  void DecryptionInputStream::BackUp(int count) {
    this->input_->BackUp(count);
  }

  bool DecryptionInputStream::Skip(int count) {
    return this->input_->Skip(count);

  }

  google::protobuf::int64 DecryptionInputStream::ByteCount() const {
    return input_->ByteCount();
  }

  void DecryptionInputStream::seek(PositionProvider& position) {
    input_->seek(position);
  }

  std::string DecryptionInputStream::getName() const {
    return "DecryptionInputStream("+input_->getName()+")";
  }
  

Used in StripeStream.cc:

std::unique_ptr<SeekableInputStream> StripeStreamsImpl::getStream(uint64_t columnId,
                                                                   proto::Stream_Kind kind,
                                                                   bool shouldStream) const{
   MemoryPool* pool = reader.getFileContents().pool;
   const std::string skey = std::to_string(columnId) + ":" + std::to_string(kind);
   StreamInformation* streamInformation = streamMap[skey].get();
   if(streamInformation == nullptr){
     return nullptr;
   }
   
   uint64_t myBlock = shouldStream ? input.getNaturalReadSize() : streamInformation->getLength();
   auto inputStream = std::make_unique<SeekableFileInputStream>(
       &input, streamInformation->getOffset(), streamInformation->getLength(), *pool, myBlock);
   ReaderEncryptionVariant* variant = reader.getReaderEncryption()->getVariant(columnId);
   if (variant != nullptr) {
     ReaderEncryptionKey* encryptionKey = variant->getKeyDescription();
     const int ivLength = encryptionKey->getAlgorithm()->getIvLength();
     std::vector<unsigned char> iv(ivLength);
     orc::CryptoUtil::modifyIvForStream(columnId, kind, originalStripeId, iv.data(), ivLength);
     const EVP_CIPHER* cipher = encryptionKey->getAlgorithm()->createCipher();
     // FooterKey
     std::vector<unsigned char> key = variant->getStripeKey(stripeIndex)->getEncoded();
     std::unique_ptr<SeekableInputStream> decompressStream = createDecompressorAndDecryption(
         reader.getCompression(), std::move(inputStream), reader.getCompressionSize(), *pool,
         reader.getFileContents().readerMetrics, key,
         iv, const_cast<EVP_CIPHER*>(cipher));
     return decompressStream;
   } else {
     return createDecompressor(reader.getCompression(), std::move(inputStream),
                               reader.getCompressionSize(), *pool,
                               reader.getFileContents().readerMetrics);
   }
 }
 
std::unique_ptr<SeekableInputStream> createDecompressorAndDecryption(
     CompressionKind kind, std::unique_ptr<SeekableInputStream> input, uint64_t blockSize,
     MemoryPool& pool, ReaderMetrics* metrics,std::vector<unsigned char> key,
     std::vector<unsigned char> iv,const EVP_CIPHER* cipher){
     
     auto dec = std::make_unique<DecryptionInputStream>(std::move(input),key,iv,cipher,pool);
     return createDecompressor(kind,std::move(dec),blockSize,pool,metrics);
}

Main function to read a file

int main(){
  try {
    orc::RowReaderOptions rowReaderOptions;
    
    orc::ReaderOptions readerOpts;
   
    std::shared_ptr<orc::InMemoryKeystore> keyStore = std::make_shared<orc::InMemoryKeystore>();
    std::vector<int> int_values = {9,10,-1,46,-125,13,-4,-6,18,96,-96,-20,62,-112,-125,-52};
    std::vector<unsigned char> vecOutput(int_values.begin(), int_values.end());
    keyStore->addKey("AES_CTR_128",0,orc::EncryptionAlgorithm::AES_CTR_128,vecOutput);
    readerOpts.setKeyProvider(keyStore);

    std::unique_ptr<orc::SearchArgumentBuilder> sarg = orc::SearchArgumentFactory::newBuilder();
    sarg->equals("imei", orc::PredicateDataType::STRING, orc::Literal("111111111111111",15));
    std::unique_ptr<orc::SearchArgument> final_sarg = sarg->build();
    rowReaderOptions.searchArgument(std::move(final_sarg));
    printContents("/root/starrocks/part-00001-0.snappy.orc",readerOpts, rowReaderOptions);
    
  } catch (std::exception& ex) {
    std::cerr << "Caught exception in : " << ex.what() << "\n";
    return 1;
  }
}

there is an error when reading encrypted columns

Caught exception in : Entry index out of range in StringDictionaryColumn

If no filter condition is added, the data in the ORC file can be read normally.

zxf216 avatar Jan 23 '24 06:01 zxf216

@zxf216 This error happens when reading dictionary-encoded string values. Please check following:

  • Check whether the dictionary data is correct. Especially pay attention to the number of entries in the dictionary. The dictionary will anyway be read regardless of any filter, so I think it should be good according to your description. But please check it just in case.
  • Check what is the entry id when the exception happen. It probably is an undefined value.
  • Please also check the positions in the StringDictionaryColumnReader::seekToRowGroup call to see if any value is suspicious.

wgtmac avatar Jan 27 '24 16:01 wgtmac

@wgtmac I found in the Java version of ORC code that IV is updated during seeking. I will study this first.

zxf216 avatar Feb 03 '24 03:02 zxf216

Solution @wgtmac In the method DecryptionInputStream::seek, update the value of IV.

void DecryptionInputStream::seek(PositionProvider& position) {
  changeIv(position.current());
  input_->seek(position);
}

The changeIv method is defined as follows.

void DecryptionInputStream::changeIv(long offset) {
      int blockSize = EVP_CIPHER_key_length(cipher);
      long encryptionBlocks = offset / blockSize;
      long extra = offset % blockSize;
      std::fill(iv_.end() - 8, iv_.end(), 0);
      if (encryptionBlocks != 0) {
          // Add the encryption blocks into the initial iv, to compensate for
          // skipping over decrypting those bytes.
          int posn = iv_.size() - 1;
          while (encryptionBlocks > 0) {
              long sum = (iv_[posn] & 0xff) + encryptionBlocks;
              iv_[posn--] = (unsigned char) sum;
              encryptionBlocks = sum / 0x100;
          }
      }
      EVP_DecryptInit_ex(ctx_, cipher, NULL, key_.data(), iv_.data());
      // If the range starts at an offset that doesn't match the encryption
      // block, we need to advance some bytes within an encryption block.
      if (extra > 0) {
          std::vector<unsigned char> decrypted(extra);
          int decrypted_len;
          EVP_DecryptUpdate(ctx_, decrypted.data(), &decrypted_len, decrypted.data(), extra);
      }
  }

zxf216 avatar Feb 29 '24 06:02 zxf216