ACE_TAO
ACE_TAO copied to clipboard
AIO (Asynchronous IO) are silently converted to synchronous when using ACE_POSIX_Proactor on linux???
Dear DOCGroup team,
I'm working on task: porting a server running on windows to linux.
And i have a problem when converting source code using ACE_WIN32_Proactor to ACE_POSIX_SIG_Proactor.
I can't write stream (writer_->write()
) to the socket util i open read stream before (reader_->read()
).
I have the same problem with this guy: http://list.isis.vanderbilt.edu/pipermail/ace-users/2016-March/003340.html
Version
ACE 6.5.9 / ACE 6.5.4
Host machine and operating system
Ubuntu 18.04 - 64bit.
AREA/CLASS/EXAMPLE AFFECTED:
ACE_Proactor
The problem effects & Synopsis & Description
I was converting ACE_WIN32_Proactor to ACE_POSIX_SIG_Proactor
and i can't got callback handle_write_stream
when call: (writer_->write(*mb, mb->length()) < 0)
after handle message get from Channel Msg Queue (enqueued in handle_read_stream
)
Here is my code as below:
#ifdef WIN32
ACE_WIN32_Proactor *proactor_impl = new ACE_WIN32_Proactor(30);
#else
ACE_POSIX_Proactor *proactor_impl = new ACE_POSIX_SIG_Proactor();
#endif
ACE_Proactor *proactor = new ACE_Proactor(proactor_impl, 1);
ACE_Proactor::instance(proactor, 1/*true*/);
....
ACE_Asynch_Read_Stream* reader_;
ACE_Asynch_Write_Stream* writer_;
void MWI_Service_Handler::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{
LOG->info(ACE_TEXT("READ - EVENT success: %d; transfered: %ld"), result.success(), result.bytes_transferred());
if(!result.success() || result.bytes_transferred() == 0) {
LOG->info(ACE_TEXT("[EVENT]::READ STREAM - Socket closed, close it : Handler( %I64u ), Peer( %s ), Err( %lu )"), handler_id_, pi_.peer_id().c_str(), result.error());
close(0);
return;
} else {
if(!rd_mblk_) {
LOG->error(ACE_TEXT("[EVENT]::READ STREAM - Read message block is null, already connection closed : Handler( %I64u ), MappedAddr( %s )"), handler_id_, remote_address_.c_str());
close(0);
return;
}
ACE_Byte* rd_ptr = NULL;
int ret = 0;
int ret2 = 0;
size_t size = 0;
size = rd_mblk_->length();
rd_ptr = (ACE_Byte*)rd_mblk_->rd_ptr();
do {
MWI_Message* output = NULL;
ret = ar_reader_.deserialize_async(rd_ptr, size, msg_factory_, output, ret2);
if(CHANNEL_MGR->putMessage(this->handlerID(), output, last_runtime_) < 0) {
LOG->error(ACE_TEXT("[EVENT]::READ STREAM - Fail to insert message to Channel Msg Queue"));
delete output;
}
// msg will be handle and write in another thread;
} while(size > 0);
}
if(!reader_) {
LOG->error(ACE_TEXT("[EVENT]::READ STREAM - Asynch read stream is null, maybe closed connection : Handler( %I64u ), MappedAddr( %s )"), handler_id_, remote_address_.c_str());
close(0);
return;
}
// Reset pointers.
result.message_block().rd_ptr()[result.bytes_transferred()] = '\0';
// Set the pointers back in the message block.
result.message_block().wr_ptr(result.message_block().rd_ptr());
rd_mblk_->reset();
if(reader_->read(*rd_mblk_, rd_mblk_->size() - 1)) {
LOG->error(ACE_TEXT("[EVENT]::READ STREAM - Read message fail : Handler( %I64u ), MappedAddr( %s )"), handler_id_, remote_address_.c_str());
close(0);
return;
}
}
void MWI_Service_Handler::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
MWI_EventCounter eventCounter;
LOG->info(ACE_TEXT("[MWI_Service_Handler] WRITE STREAM success: %d; transfered: %ld"), result.success(), result.bytes_transferred());
size_t s1 = result.bytes_to_write();
size_t s2 = result.bytes_transferred();
if(!result.success() || result.bytes_transferred() == 0) {
LOG->error(ACE_TEXT("[EVENT]::WRITE STREAM - Write fail.. Transferred bytes is %d.. Remove It : Handler( %I64u ), MappedAddr( %s )")
, result.bytes_transferred(), handler_id_, remote_address_.c_str());
result.message_block().release();
close(0);
return;
} else if(result.bytes_transferred() < result.bytes_to_write()) {
LOG->error(ACE_TEXT("[EVENT]::WRITE STREAM - Write fail ( %d / %d ).. Remove It : Handler( %I64u ), MappedAddr( %s )")
, result.bytes_transferred(), result.bytes_to_write(), handler_id_, remote_address_.c_str());
result.message_block().release();
close(0);
return;
} else {
result.message_block().release();
}
}
I read a blog that talk about it: https://stevehuston.wordpress.com/2008/11/25/when-is-it-ok-to-use-ace-proactor-on-linux/
It says:
I/O locks up and all progress stops. Why?
Because the aio facility upon which ACE Proactor builds is very restricted for socket I/O on Linux (at least through the Linuxes I’ve worked on).
The issue is that the I/O operations initiated using aio from the application are silently converted to synchronous and executed in order based on the handle used.
The only way to make a Proactor-based application work on Linux is to follow a strict lock-step protocol. A ping-pong model, if you will.
Please help me, Does this blog right? How i can do?