arrow icon indicating copy to clipboard operation
arrow copied to clipboard

[C++] IpcWriter to memory stream race condition

Open gitmodimo opened this issue 1 year ago • 0 comments

Describe the bug, including details regarding any error messages, version, and platform.

Arrow version: 15.0.1 Platform: x64-windows

My Acero use case:

dataset scan node \/ filter/projection (not relevant) \/ consuming_sink \/ Custom SinkNodeConsumer writting batches to RecordBatchWriter RecordBatchWriter is an ipc::MakeStreamWriter with BufferOutputStream

This use case creates multiple threads writting to BufferOutputStream which causes race in BufferOutputStream::Write

SinkNodeConsumer:

struct RecordBatchWriterNodeConsumer : public ac::SinkNodeConsumer {
    RecordBatchWriterNodeConsumer(std::shared_ptr<ar::ipc::RecordBatchWriter> _writer)
        : writer(_writer){}
    arrow::Status Init(const std::shared_ptr<arrow::Schema>& _schema,
        ac::BackpressureControl* backpressure_control,
        ac::ExecPlan* plan) override {
        schema = _schema;
        return arrow::Status::OK();
    }

    arrow::Status Consume(cp::ExecBatch batch) override {
        ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(schema));
        //const std::lock_guard<std::mutex> lock(consume_mutex);
        return writer->WriteRecordBatch(*rb);
    }

    arrow::Future<> Finish() override {
        return writer->Close();
    }

    std::shared_ptr<ar::ipc::RecordBatchWriter> writer;
    std::shared_ptr<arrow::Schema> schema;
    //std::mutex consume_mutex;
};

AddressSanitizer dump:

=================================================================
==22728==ERROR: AddressSanitizer: attempting double-free on 0x038f001c4800 in thread T48:
    #0 0x7ffc9ae00798 in _asan_wrap_RtlValidateHeap+0x288 (\\?\C:\Users\USER\project\dist_MXP_web\libs\clang_rt.asan_dynamic-x86_64.dll+0x180040798)
    #1 0x7ffc99f31a7b in arrow::BaseMemoryPoolImpl<arrow::`anonymous namespace'::SystemAllocator>::Reallocate C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\memory_pool.cc:487
    #2 0x7ffc99f321e7 in arrow::PoolBuffer::Resize C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\memory_pool.cc:891
    #3 0x7ffc99ff2db2 in arrow::io::BufferOutputStream::Reserve C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\io\memory.cc:123
    #4 0x7ffc99ff3544 in arrow::io::BufferOutputStream::Write C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\io\memory.cc:105
    #5 0x7ffc99ff0770 in arrow::io::Writable::Write C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\io\interfaces.cc:203
    #6 0x7ffc9a9d7f93 in arrow::ipc::WriteIpcPayload C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:757
    #7 0x7ffc9a9d82ab in arrow::ipc::internal::PayloadStreamWriter::WritePayload C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:1406
    #8 0x7ffc9a9d8756 in arrow::ipc::internal::IpcFormatWriter::WriteRecordBatch C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:1195
    #9 0x7ffc9a9d8375 in arrow::ipc::internal::IpcFormatWriter::WriteRecordBatch C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:1176
    #10 0x7ffcadac9af5 in RecordBatchWriterNodeConsumer::Consume C:\Users\USER\project\src\mxp\electron\cmake\src\analysis\PdwProcessing.cpp:358
    #11 0x7ffcac3365e5 in arrow::acero::`anonymous namespace'::ConsumingSinkNode::Process C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\sink_node.cc:399
    #12 0x7ffcac334d67 in arrow::acero::`anonymous namespace'::ConsumingSinkNode::InputReceived C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\sink_node.cc:390
    #13 0x7ffcac32851a in arrow::acero::MapNode::InputReceived C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\map_node.cc:79
    #14 0x7ffcac344069 in <lambda_7d84ce2741d5a383b7c7277f0b787d5c>::operator() C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\source_node.cc:157
    #15 0x7ffcac349121 in std::_Func_impl_no_alloc<<lambda_7d84ce2741d5a383b7c7277f0b787d5c>,arrow::Status>::_Do_call C:\Program Files (x86)\Microsoft Visual Studio\2019\Community\VC\Tools\MSVC\14.29.30133\include\functional:822
    #16 0x7ffcac331b8f in std::_Call_binder<std::_Unforced,0,1,arrow::detail::ContinueFuture,std::tuple<arrow::Future<arrow::internal::Empty>,std::function<arrow::Status __cdecl(void)> >,std::tuple<> > C:\Program Files (x86)\Microsoft Visual Studio\2019\Community\VC\Tools\MSVC\14.29.30133\include\functional:1307
    #17 0x7ffcac33309d in arrow::internal::FnOnce<void __cdecl(void)>::FnImpl<std::_Binder<std::_Unforced,arrow::detail::ContinueFuture,arrow::Future<arrow::internal::Empty> &,std::function<arrow::Status __cdecl(void)> > >::invoke C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\util\functional.h:152
    #18 0x7ffc9a056b64 in std::thread::_Invoke<std::tuple<<lambda_72d791419a94ce2df79e1afeb11637d7> >,0> C:\Program Files (x86)\Microsoft Visual Studio\2019\Community\VC\Tools\MSVC\14.29.30133\include\thread:55
    #19 0x7ffd30571bb1 in configthreadlocale+0x91 (C:\WINDOWS\System32\ucrtbase.dll+0x180021bb1)
    #20 0x7ffc9ae0ebde in _asan_default_suppressions__dll+0x122e (\\?\C:\Users\USER\project\dist_MXP_web\libs\clang_rt.asan_dynamic-x86_64.dll+0x18004ebde)
    #21 0x7ffd32227343 in BaseThreadInitThunk+0x13 (C:\WINDOWS\System32\KERNEL32.DLL+0x180017343)
    #22 0x7ffd328226b0 in RtlUserThreadStart+0x20 (C:\WINDOWS\SYSTEM32\ntdll.dll+0x1800526b0)

0x038f001c4800 is located 0 bytes inside of 65536-byte region [0x038f001c4800,0x038f001d4800)
freed by thread T52 here:
    #0 0x7ffc9ae00798 in _asan_wrap_RtlValidateHeap+0x288 (\\?\C:\Users\USER\project\dist_MXP_web\libs\clang_rt.asan_dynamic-x86_64.dll+0x180040798)
    #1 0x7ffc99f31a7b in arrow::BaseMemoryPoolImpl<arrow::`anonymous namespace'::SystemAllocator>::Reallocate C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\memory_pool.cc:487
    #2 0x7ffc99f321e7 in arrow::PoolBuffer::Resize C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\memory_pool.cc:891
    #3 0x7ffc99ff2db2 in arrow::io::BufferOutputStream::Reserve C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\io\memory.cc:123
    #4 0x7ffc99ff3544 in arrow::io::BufferOutputStream::Write C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\io\memory.cc:105
    #5 0x7ffc99ff0770 in arrow::io::Writable::Write C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\io\interfaces.cc:203
    #6 0x7ffc9a9d7f93 in arrow::ipc::WriteIpcPayload C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:757
    #7 0x7ffc9a9d82ab in arrow::ipc::internal::PayloadStreamWriter::WritePayload C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:1406
    #8 0x7ffc9a9d8756 in arrow::ipc::internal::IpcFormatWriter::WriteRecordBatch C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:1195
    #9 0x7ffc9a9d8375 in arrow::ipc::internal::IpcFormatWriter::WriteRecordBatch C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:1176
    #10 0x7ffcadac9af5 in RecordBatchWriterNodeConsumer::Consume C:\Users\USER\project\src\mxp\electron\cmake\src\analysis\PdwProcessing.cpp:358
    #11 0x7ffcac3365e5 in arrow::acero::`anonymous namespace'::ConsumingSinkNode::Process C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\sink_node.cc:399
    #12 0x7ffcac334d67 in arrow::acero::`anonymous namespace'::ConsumingSinkNode::InputReceived C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\sink_node.cc:390
    #13 0x7ffcac32851a in arrow::acero::MapNode::InputReceived C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\map_node.cc:79
    #14 0x7ffcac344069 in <lambda_7d84ce2741d5a383b7c7277f0b787d5c>::operator() C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\source_node.cc:157
    #15 0x7ffcac349121 in std::_Func_impl_no_alloc<<lambda_7d84ce2741d5a383b7c7277f0b787d5c>,arrow::Status>::_Do_call C:\Program Files (x86)\Microsoft Visual Studio\2019\Community\VC\Tools\MSVC\14.29.30133\include\functional:822
    #16 0x7ffcac331b8f in std::_Call_binder<std::_Unforced,0,1,arrow::detail::ContinueFuture,std::tuple<arrow::Future<arrow::internal::Empty>,std::function<arrow::Status __cdecl(void)> >,std::tuple<> > C:\Program Files (x86)\Microsoft Visual Studio\2019\Community\VC\Tools\MSVC\14.29.30133\include\functional:1307
    #17 0x7ffcac33309d in arrow::internal::FnOnce<void __cdecl(void)>::FnImpl<std::_Binder<std::_Unforced,arrow::detail::ContinueFuture,arrow::Future<arrow::internal::Empty> &,std::function<arrow::Status __cdecl(void)> > >::invoke C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\util\functional.h:152
    #18 0x7ffc9a056b64 in std::thread::_Invoke<std::tuple<<lambda_72d791419a94ce2df79e1afeb11637d7> >,0> C:\Program Files (x86)\Microsoft Visual Studio\2019\Community\VC\Tools\MSVC\14.29.30133\include\thread:55
    #19 0x7ffd30571bb1 in configthreadlocale+0x91 (C:\WINDOWS\System32\ucrtbase.dll+0x180021bb1)
    #20 0x7ffc9ae0ebde in _asan_default_suppressions__dll+0x122e (\\?\C:\Users\USER\project\dist_MXP_web\libs\clang_rt.asan_dynamic-x86_64.dll+0x18004ebde)
    #21 0x7ffd32227343 in BaseThreadInitThunk+0x13 (C:\WINDOWS\System32\KERNEL32.DLL+0x180017343)
    #22 0x7ffd328226b0 in RtlUserThreadStart+0x20 (C:\WINDOWS\SYSTEM32\ntdll.dll+0x1800526b0)

previously allocated by thread T52 here:
    #0 0x7ffc9ae00a1a in _asan_wrap_RtlValidateHeap+0x50a (\\?\C:\Users\USER\project\dist_MXP_web\libs\clang_rt.asan_dynamic-x86_64.dll+0x180040a1a)
    #1 0x7ffc99f3197c in arrow::BaseMemoryPoolImpl<arrow::`anonymous namespace'::SystemAllocator>::Reallocate C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\memory_pool.cc:487
    #2 0x7ffc99f321e7 in arrow::PoolBuffer::Resize C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\memory_pool.cc:891
    #3 0x7ffc99ff2db2 in arrow::io::BufferOutputStream::Reserve C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\io\memory.cc:123
    #4 0x7ffc99ff3544 in arrow::io::BufferOutputStream::Write C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\io\memory.cc:105
    #5 0x7ffc99ff0770 in arrow::io::Writable::Write C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\io\interfaces.cc:203
    #6 0x7ffc9a9d7f93 in arrow::ipc::WriteIpcPayload C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:757
    #7 0x7ffc9a9d82ab in arrow::ipc::internal::PayloadStreamWriter::WritePayload C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:1406
    #8 0x7ffc9a9d8756 in arrow::ipc::internal::IpcFormatWriter::WriteRecordBatch C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:1195
    #9 0x7ffc9a9d8375 in arrow::ipc::internal::IpcFormatWriter::WriteRecordBatch C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\ipc\writer.cc:1176
    #10 0x7ffcadac9af5 in RecordBatchWriterNodeConsumer::Consume C:\Users\USER\project\src\mxp\electron\cmake\src\analysis\PdwProcessing.cpp:358
    #11 0x7ffcac3365e5 in arrow::acero::`anonymous namespace'::ConsumingSinkNode::Process C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\sink_node.cc:399
    #12 0x7ffcac334d67 in arrow::acero::`anonymous namespace'::ConsumingSinkNode::InputReceived C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\sink_node.cc:390
    #13 0x7ffcac32851a in arrow::acero::MapNode::InputReceived C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\map_node.cc:79
    #14 0x7ffcac344069 in <lambda_7d84ce2741d5a383b7c7277f0b787d5c>::operator() C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\acero\source_node.cc:157
    #15 0x7ffcac349121 in std::_Func_impl_no_alloc<<lambda_7d84ce2741d5a383b7c7277f0b787d5c>,arrow::Status>::_Do_call C:\Program Files (x86)\Microsoft Visual Studio\2019\Community\VC\Tools\MSVC\14.29.30133\include\functional:822
    #16 0x7ffcac331b8f in std::_Call_binder<std::_Unforced,0,1,arrow::detail::ContinueFuture,std::tuple<arrow::Future<arrow::internal::Empty>,std::function<arrow::Status __cdecl(void)> >,std::tuple<> > C:\Program Files (x86)\Microsoft Visual Studio\2019\Community\VC\Tools\MSVC\14.29.30133\include\functional:1307
    #17 0x7ffcac33309d in arrow::internal::FnOnce<void __cdecl(void)>::FnImpl<std::_Binder<std::_Unforced,arrow::detail::ContinueFuture,arrow::Future<arrow::internal::Empty> &,std::function<arrow::Status __cdecl(void)> > >::invoke C:\Users\USER\project\vcpkg\buildtrees\arrow\src\e-arrow-15-f337ad0391.clean\cpp\src\arrow\util\functional.h:152
    #18 0x7ffc9a056b64 in std::thread::_Invoke<std::tuple<<lambda_72d791419a94ce2df79e1afeb11637d7> >,0> C:\Program Files (x86)\Microsoft Visual Studio\2019\Community\VC\Tools\MSVC\14.29.30133\include\thread:55
    #19 0x7ffd30571bb1 in configthreadlocale+0x91 (C:\WINDOWS\System32\ucrtbase.dll+0x180021bb1)
    #20 0x7ffc9ae0ebde in _asan_default_suppressions__dll+0x122e (\\?\C:\Users\USER\project\dist_MXP_web\libs\clang_rt.asan_dynamic-x86_64.dll+0x18004ebde)
    #21 0x7ffd32227343 in BaseThreadInitThunk+0x13 (C:\WINDOWS\System32\KERNEL32.DLL+0x180017343)
    #22 0x7ffd328226b0 in RtlUserThreadStart+0x20 (C:\WINDOWS\SYSTEM32\ntdll.dll+0x1800526b0)

This can be fixed by adding lock in RecordBatchWriterNodeConsumer::Consume, however this not seem like a good solution. I think either: arrow::io::BufferOutputStream::Write should have write lock or: StreamWriter should have a write lock (as it writes to stream) or maybe both?

Component(s)

C++

gitmodimo avatar Mar 08 '24 09:03 gitmodimo