ACE_TAO icon indicating copy to clipboard operation
ACE_TAO copied to clipboard

AIO (Asynchronous IO) are silently converted to synchronous when using ACE_POSIX_Proactor on linux???

Open dungbk1995 opened this issue 4 years ago • 0 comments

Dear DOCGroup team,
I'm working on task: porting a server running on windows to linux. And i have a problem when converting source code using ACE_WIN32_Proactor to ACE_POSIX_SIG_Proactor.
I can't write stream (writer_->write()) to the socket util i open read stream before (reader_->read()).

I have the same problem with this guy: http://list.isis.vanderbilt.edu/pipermail/ace-users/2016-March/003340.html

Version

ACE 6.5.9 / ACE 6.5.4

Host machine and operating system

Ubuntu 18.04 - 64bit.

AREA/CLASS/EXAMPLE AFFECTED:

ACE_Proactor

The problem effects & Synopsis & Description

I was converting ACE_WIN32_Proactor to ACE_POSIX_SIG_Proactor
and i can't got callback handle_write_stream when call: (writer_->write(*mb, mb->length()) < 0) after handle message get from Channel Msg Queue (enqueued in handle_read_stream)

Here is my code as below:

#ifdef WIN32
	ACE_WIN32_Proactor *proactor_impl = new ACE_WIN32_Proactor(30);
#else
	ACE_POSIX_Proactor *proactor_impl = new ACE_POSIX_SIG_Proactor();
#endif
	ACE_Proactor *proactor = new ACE_Proactor(proactor_impl, 1);
	ACE_Proactor::instance(proactor, 1/*true*/);
       .... 

ACE_Asynch_Read_Stream* reader_;
ACE_Asynch_Write_Stream* writer_;

void MWI_Service_Handler::handle_read_stream(const ACE_Asynch_Read_Stream::Result &result)
{
    LOG->info(ACE_TEXT("READ - EVENT success: %d; transfered: %ld"), result.success(), result.bytes_transferred());

    if(!result.success() || result.bytes_transferred() == 0) {
		LOG->info(ACE_TEXT("[EVENT]::READ STREAM - Socket closed, close it : Handler( %I64u ), Peer( %s ), Err( %lu )"), handler_id_, pi_.peer_id().c_str(), result.error());
		close(0);
		return;
	} else {
		if(!rd_mblk_) {
			LOG->error(ACE_TEXT("[EVENT]::READ STREAM - Read message block is null, already connection closed : Handler( %I64u ), MappedAddr( %s )"), handler_id_, remote_address_.c_str());
			close(0);
			return;
		}

		ACE_Byte* rd_ptr = NULL;
		int ret = 0;
		int ret2 = 0;
		size_t size = 0;
		size = rd_mblk_->length();
		rd_ptr = (ACE_Byte*)rd_mblk_->rd_ptr();

		do {
			MWI_Message* output = NULL;
			ret = ar_reader_.deserialize_async(rd_ptr, size, msg_factory_, output, ret2);
		
            if(CHANNEL_MGR->putMessage(this->handlerID(), output, last_runtime_) < 0) {
                LOG->error(ACE_TEXT("[EVENT]::READ STREAM - Fail to insert message to Channel Msg Queue"));
                delete output;
            }
                //   msg will be handle and write in another thread;
	    } while(size > 0);
	}

	if(!reader_) {
		LOG->error(ACE_TEXT("[EVENT]::READ STREAM - Asynch read stream is null, maybe closed connection : Handler( %I64u ), MappedAddr( %s )"), handler_id_, remote_address_.c_str());
		close(0);
		return;
	}

	// Reset pointers.
	result.message_block().rd_ptr()[result.bytes_transferred()] = '\0';
	// Set the pointers back in the message block.
	result.message_block().wr_ptr(result.message_block().rd_ptr());

	rd_mblk_->reset();

	if(reader_->read(*rd_mblk_, rd_mblk_->size() - 1)) {
		LOG->error(ACE_TEXT("[EVENT]::READ STREAM - Read message fail : Handler( %I64u ), MappedAddr( %s )"), handler_id_, remote_address_.c_str());
		close(0);
		return;
	}
}

void MWI_Service_Handler::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
	MWI_EventCounter eventCounter;

    LOG->info(ACE_TEXT("[MWI_Service_Handler] WRITE STREAM success: %d; transfered: %ld"), result.success(), result.bytes_transferred());

    size_t s1 = result.bytes_to_write();
	size_t s2 = result.bytes_transferred();

	if(!result.success() || result.bytes_transferred() == 0) {
		LOG->error(ACE_TEXT("[EVENT]::WRITE STREAM - Write fail.. Transferred bytes is %d.. Remove It : Handler( %I64u ), MappedAddr( %s )")
			, result.bytes_transferred(), handler_id_, remote_address_.c_str());
		result.message_block().release();
		close(0);
		return;
	} else if(result.bytes_transferred() < result.bytes_to_write()) {
		LOG->error(ACE_TEXT("[EVENT]::WRITE STREAM - Write fail ( %d / %d ).. Remove It : Handler( %I64u ), MappedAddr( %s )")
			, result.bytes_transferred(), result.bytes_to_write(), handler_id_, remote_address_.c_str());
		result.message_block().release();
		close(0);
		return;
	} else {
		result.message_block().release();
	}
}

I read a blog that talk about it: https://stevehuston.wordpress.com/2008/11/25/when-is-it-ok-to-use-ace-proactor-on-linux/
It says:

I/O locks up and all progress stops. Why?
Because the aio facility upon which ACE Proactor builds is very restricted for socket I/O on Linux (at least through the Linuxes I’ve worked on).
The issue is that the I/O operations initiated using aio from the application are silently converted to synchronous and executed in order based on the handle used.
The only way to make a Proactor-based application work on Linux is to follow a strict lock-step protocol. A ping-pong model, if you will.

Please help me, Does this blog right? How i can do?

dungbk1995 avatar Jun 12 '20 06:06 dungbk1995