orc
orc copied to clipboard
Add C++ API for Columnar Encryption
This is created based on the following dev@orc mail on June 11st.
https://lists.apache.org/thread/pkp6ffh9pqok7v618zxtox708mv26sz0
cc @wgtmac , @williamhyun , @guiyanakuang , @stiga-huang
I looked at the commit history of the Java api. It seems like a lot of work. Maybe we should follow the steps of the Java implementation and divide it into smaller topics.
I agree with you, @guiyanakuang . It will take lots of efforts.
+1 @guiyanakuang @dongjoon-hyun.
This is a large feature. We can follow the java implementation and break it down into smaller work items first.
Thank you, @wgtmac .
I set a milestone 1.9.0 as a goal.
Hi @dongjoon-hyun Do we have jira to track the work?
Hi @dongjoon-hyun Do we have jira to track the work?
@deshanxiao Not yet. @coderex2522 is working on it already. Will create JIRAs later.
Hi @dongjoon-hyun Do we have jira to track the work?
@deshanxiao Not yet. @coderex2522 is working on it already. Will create JIRAs later.
@coderex2522 Has the ORC file encryption and decryption functionality been implemented?
@dongjoon-hyun @coderex2522 I want to try implementing ORC encryption and decryption feature in C++. Are there any other developers working on this feature?
@coderex2522 Thank you for your support. I would like to know more about the design plan for ORC encryption function. Do you have any materials that you can provide me with? Below is a document I have summarized by reading the Java source code. I'm not sure if it's correct. document:https://www.processon.com/view/link/6507c37f464f0d50af3e2ae8
@zxf216 If you are interested in this part of the work, welcome to provide the implementation of the C++ encryption module to the Apache ORC community! Afterward, @dongjoon-hyun @wgtmac @deshanxiao can help with a thorough review of this part.
Thanks @zxf216 for taking up this! AFAIK, there aren't many materials for the design except the spec and the Java code. Let me spend some time to recall the detail. Feel free to discuss it in detail if you want.
@wgtmac @coderex2522 My email is [email protected]. Can you share your email or WeChat with me? This way we can communicate more efficiently.
@wgtmac @coderex2522 My email is [email protected]. Can you share your email or WeChat with me? This way we can communicate more efficiently.
You can find me via [email protected]
This is removed from Milestone 2.0.0
.
@dongjoon-hyun @wgtmac @coderex2522 We are developing a C++ feature to read encrypted columns in ORC, but have encountered an issue where the program throws an error when it needs to skip ahead due to a set filter condition. Below is a sample code of our attempt to read the encrypted columns. Define DecryptionInputStream:
class DecryptionInputStream : public SeekableInputStream {
public:
DecryptionInputStream(std::unique_ptr<SeekableInputStream> input,std::vector<unsigned char> key,
std::vector<unsigned char> iv,const EVP_CIPHER* cipher,MemoryPool& pool);
virtual ~DecryptionInputStream();
virtual bool Next(const void** data, int* size) override;
virtual void BackUp(int count) override;
virtual bool Skip(int count) override;
virtual google::protobuf::int64 ByteCount() const override;
virtual void seek(PositionProvider& position) override;
virtual std::string getName() const override;
private:
std::unique_ptr<SeekableInputStream> input_;
std::vector<unsigned char> key_;
std::vector<unsigned char> iv_;
EVP_CIPHER_CTX* ctx_;
const EVP_CIPHER* cipher;
MemoryPool& pool;
std::unique_ptr<DataBuffer<unsigned char>> inputBuffer_;
std::unique_ptr<DataBuffer<unsigned char>> outputBuffer_;
};
} // namespace orc
Implement DecryptionInputStream class:
DecryptionInputStream::DecryptionInputStream(std::unique_ptr<SeekableInputStream> input,
std::vector<unsigned char> key,
std::vector<unsigned char> iv,
const EVP_CIPHER* cipher,MemoryPool& pool)
: input_(std::move(input)),
key_(key),
iv_(iv),
cipher(cipher),
pool(pool){
EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
if (ctx == nullptr) {
throw std::runtime_error("Failed to create EVP cipher context");
}
int ret = EVP_DecryptInit_ex(ctx, cipher, NULL, key_.data(), iv_.data());
if (ret != 1) {
EVP_CIPHER_CTX_free(ctx);
EVP_CIPHER_free(const_cast<evp_cipher_st*>(cipher));
throw std::runtime_error("Failed to initialize EVP cipher context");
}
ctx_ = ctx;
outputBuffer_.reset(new DataBuffer<unsigned char>(pool));
inputBuffer_.reset(new DataBuffer<unsigned char>(pool));
}
DecryptionInputStream::~DecryptionInputStream() {
EVP_CIPHER_CTX_free(ctx_);
EVP_CIPHER_free(const_cast<evp_cipher_st*>(cipher));
}
bool DecryptionInputStream::Next(const void** data, int* size) {
int bytesRead = 0;
//const void* ptr;
const void* inptr = static_cast<void*>(inputBuffer_->data());
input_->Next(&inptr, &bytesRead);
if (bytesRead == 0) {
return false;
}
//
const unsigned char* result = static_cast<const unsigned char*>(inptr);
int outlen = 0;
//int blockSize = EVP_CIPHER_block_size(this->cipher);
outputBuffer_->resize(bytesRead*2);
int ret = EVP_DecryptUpdate(ctx_, outputBuffer_->data(), &outlen, result, bytesRead);
if (ret != 1) {
throw std::runtime_error("Failed to decrypt data");
}
outputBuffer_->resize(outlen);
*data = outputBuffer_->data();
*size = outputBuffer_->size();
return true;
}
void DecryptionInputStream::BackUp(int count) {
this->input_->BackUp(count);
}
bool DecryptionInputStream::Skip(int count) {
return this->input_->Skip(count);
}
google::protobuf::int64 DecryptionInputStream::ByteCount() const {
return input_->ByteCount();
}
void DecryptionInputStream::seek(PositionProvider& position) {
input_->seek(position);
}
std::string DecryptionInputStream::getName() const {
return "DecryptionInputStream("+input_->getName()+")";
}
Used in StripeStream.cc:
std::unique_ptr<SeekableInputStream> StripeStreamsImpl::getStream(uint64_t columnId,
proto::Stream_Kind kind,
bool shouldStream) const{
MemoryPool* pool = reader.getFileContents().pool;
const std::string skey = std::to_string(columnId) + ":" + std::to_string(kind);
StreamInformation* streamInformation = streamMap[skey].get();
if(streamInformation == nullptr){
return nullptr;
}
uint64_t myBlock = shouldStream ? input.getNaturalReadSize() : streamInformation->getLength();
auto inputStream = std::make_unique<SeekableFileInputStream>(
&input, streamInformation->getOffset(), streamInformation->getLength(), *pool, myBlock);
ReaderEncryptionVariant* variant = reader.getReaderEncryption()->getVariant(columnId);
if (variant != nullptr) {
ReaderEncryptionKey* encryptionKey = variant->getKeyDescription();
const int ivLength = encryptionKey->getAlgorithm()->getIvLength();
std::vector<unsigned char> iv(ivLength);
orc::CryptoUtil::modifyIvForStream(columnId, kind, originalStripeId, iv.data(), ivLength);
const EVP_CIPHER* cipher = encryptionKey->getAlgorithm()->createCipher();
// FooterKey
std::vector<unsigned char> key = variant->getStripeKey(stripeIndex)->getEncoded();
std::unique_ptr<SeekableInputStream> decompressStream = createDecompressorAndDecryption(
reader.getCompression(), std::move(inputStream), reader.getCompressionSize(), *pool,
reader.getFileContents().readerMetrics, key,
iv, const_cast<EVP_CIPHER*>(cipher));
return decompressStream;
} else {
return createDecompressor(reader.getCompression(), std::move(inputStream),
reader.getCompressionSize(), *pool,
reader.getFileContents().readerMetrics);
}
}
std::unique_ptr<SeekableInputStream> createDecompressorAndDecryption(
CompressionKind kind, std::unique_ptr<SeekableInputStream> input, uint64_t blockSize,
MemoryPool& pool, ReaderMetrics* metrics,std::vector<unsigned char> key,
std::vector<unsigned char> iv,const EVP_CIPHER* cipher){
auto dec = std::make_unique<DecryptionInputStream>(std::move(input),key,iv,cipher,pool);
return createDecompressor(kind,std::move(dec),blockSize,pool,metrics);
}
Main function to read a file
int main(){
try {
orc::RowReaderOptions rowReaderOptions;
orc::ReaderOptions readerOpts;
std::shared_ptr<orc::InMemoryKeystore> keyStore = std::make_shared<orc::InMemoryKeystore>();
std::vector<int> int_values = {9,10,-1,46,-125,13,-4,-6,18,96,-96,-20,62,-112,-125,-52};
std::vector<unsigned char> vecOutput(int_values.begin(), int_values.end());
keyStore->addKey("AES_CTR_128",0,orc::EncryptionAlgorithm::AES_CTR_128,vecOutput);
readerOpts.setKeyProvider(keyStore);
std::unique_ptr<orc::SearchArgumentBuilder> sarg = orc::SearchArgumentFactory::newBuilder();
sarg->equals("imei", orc::PredicateDataType::STRING, orc::Literal("111111111111111",15));
std::unique_ptr<orc::SearchArgument> final_sarg = sarg->build();
rowReaderOptions.searchArgument(std::move(final_sarg));
printContents("/root/starrocks/part-00001-0.snappy.orc",readerOpts, rowReaderOptions);
} catch (std::exception& ex) {
std::cerr << "Caught exception in : " << ex.what() << "\n";
return 1;
}
}
there is an error when reading encrypted columns
Caught exception in : Entry index out of range in StringDictionaryColumn
If no filter condition is added, the data in the ORC file can be read normally.
@zxf216 This error happens when reading dictionary-encoded string values. Please check following:
- Check whether the dictionary data is correct. Especially pay attention to the number of entries in the dictionary. The dictionary will anyway be read regardless of any filter, so I think it should be good according to your description. But please check it just in case.
- Check what is the entry id when the exception happen. It probably is an undefined value.
- Please also check the positions in the
StringDictionaryColumnReader::seekToRowGroup
call to see if any value is suspicious.
@wgtmac I found in the Java version of ORC code that IV is updated during seeking. I will study this first.
Solution @wgtmac In the method DecryptionInputStream::seek, update the value of IV.
void DecryptionInputStream::seek(PositionProvider& position) {
changeIv(position.current());
input_->seek(position);
}
The changeIv method is defined as follows.
void DecryptionInputStream::changeIv(long offset) {
int blockSize = EVP_CIPHER_key_length(cipher);
long encryptionBlocks = offset / blockSize;
long extra = offset % blockSize;
std::fill(iv_.end() - 8, iv_.end(), 0);
if (encryptionBlocks != 0) {
// Add the encryption blocks into the initial iv, to compensate for
// skipping over decrypting those bytes.
int posn = iv_.size() - 1;
while (encryptionBlocks > 0) {
long sum = (iv_[posn] & 0xff) + encryptionBlocks;
iv_[posn--] = (unsigned char) sum;
encryptionBlocks = sum / 0x100;
}
}
EVP_DecryptInit_ex(ctx_, cipher, NULL, key_.data(), iv_.data());
// If the range starts at an offset that doesn't match the encryption
// block, we need to advance some bytes within an encryption block.
if (extra > 0) {
std::vector<unsigned char> decrypted(extra);
int decrypted_len;
EVP_DecryptUpdate(ctx_, decrypted.data(), &decrypted_len, decrypted.data(), extra);
}
}