norm icon indicating copy to clipboard operation
norm copied to clipboard

ProtoTree x->right ==x Caused infinite loop

Open honglei opened this issue 1 year ago • 41 comments

Call Stack:

 	Norm.dll!ProtoTree::Bit(const char * key=0x000001db527ca670, unsigned int keysize=96, unsigned int index=64, ProtoTree::Endian keyEndian=ENDIAN_BIG) Line 232	C++
>	Norm.dll!ProtoTree::Remove(ProtoTree::Item & item={...}) Line 459	C++
 	[Inline Frame] Norm.dll!ProtoTreeTemplate<ProtoSortedTree::Item>::Remove(ProtoSortedTree::Item &) Line 354	C++
 	Norm.dll!ProtoSortedTree::Remove(ProtoSortedTree::Item & item={...}) Line 1640	C++
 	Norm.dll!ProtoTimerMgr::RemoveLongTimer(ProtoTimer & theTimer={...}) Line 508	C++
 	Norm.dll!ProtoTimerMgr::OnPulseTimeout(ProtoTimer & __formal) Line 218	C++
 	[Inline Frame] Norm.dll!ProtoTimer::DoTimeout() Line 206	C++
 	Norm.dll!ProtoTimerMgr::OnSystemTimeout() Line 181	C++
 	Norm.dll!ProtoDispatcher::Dispatch() Line 2702	C++
 	Norm.dll!ProtoDispatcher::Run(bool oneShot) Line 1003	C++
 	Norm.dll!ProtoDispatcher::DoThreadStart(void * param=0x000001db52299b18) Line 1061	C++

Code:ProtoTree::Bit

        do                      
        {     
            q = x;              
            if (Bit(key, keysize, x->bit, keyEndian))
                x = x->right; // !!! x == x->right  !!!
            else                
                x = x->left;    
        } while (x != &item);

tree

honglei avatar Aug 24 '22 14:08 honglei

This infinite loop cause call to norm.dll blocked forever:

 	[Inline Frame] Norm.dll!ProtoDispatcher::Lock(_RTL_CRITICAL_SECTION &) Line 306	C++
	Norm.dll!ProtoDispatcher::SuspendThread() Line 1233	C++
 	Norm.dll!NormObjectRetain(const void * objectHandle=0x000001db5b9ce9b0) Line 2726	C++

honglei avatar Aug 24 '22 14:08 honglei

Hi @honglei - Do you have some test code that produces this condition? Also, the current Protolib code does not have the "x->right == x" you show here so I am a little confused about which version of NORM/Protolib code you are using?

bebopagogo avatar Aug 27 '22 17:08 bebopagogo

I means x->right and x is the same object, ( means &(x->right) == &x), and x != &item , so dead loop occured. The code is : https://github.com/USNavalResearchLaboratory/protolib/blob/6db401d30dd36ee4ddc777d8c626c88cdcd5a28f/src/common/protoTree.cpp#L424

Test case: I send 10K 20-25KB files using norm, and and then remove files to another dir when they was succeed sended. this occured after 2K~6K files sended.

honglei avatar Aug 28 '22 06:08 honglei

create_test_files.py:

import os
dirPath = r'E:\PythonPrj/testFiles'

if not os.path.isdir(dirPath):
    os.mkdir(dirPath)
for i in range(1, 10000):
    with open(os.path.join(dirPath, f"文件{i:04d}.txt"), "w") as f:
        f.write(f"{i} "*5_000)

Env: Win10 x64/VC 2022 x64/Python3.10

honglei avatar Aug 28 '22 06:08 honglei

So you are copying the received files to another directory upon NORM_RX_OBJECT_COMPLETED notifications? Are you making any NORM API calls during that time?

bebopagogo avatar Sep 07 '22 00:09 bebopagogo

Yes, The error occured at the NORM sender side, At the NORM sender-side,I move the sended files( send file dir) to another directory(sended file dir) after NORM_TX_QUEUE_EMPTY or .TX_OBJECT_PURGED. At the NORM receiver side, I move the received files(temp file name) to another directory(real file name) upon NORM_RX_OBJECT_COMPLETED notifications.

the sender side:

async def proc_sender_event(event:pynorm.event.Event):
    #print(f"{str(event)}")
    obj = event.object
    evtType:pynorm.EventType = event.type
    session:pynorm.session.Session = event.session
    linkID:int = session.conf['linkID']
    userName:str = session.conf['userName']
    
    if obj:   #and obj.handle not in (None,pynorm.NORM_OBJECT_INVALID):
        objType:ObjectType = obj.type
        info:str|None =obj.info.decode() if obj.info else None
        print(event, linkID, obj.type, info)
    else:
        print (event)
    
    if evtType == EventType.TX_OBJECT_SENT:
        if objType == ObjectType.FILE and obj.info:
            session.sendFiles.add(obj.info.decode())
            
    elif evtType == EventType.TX_QUEUE_EMPTY:
        '''
        '''
        for key, obj in session.id2obj.items():
            obj.cancel()
        session.id2obj.clear()
        #用于关闭obj的文件句柄
        del obj
        del event 
        
        #listProcOpenFiles() #用于测试是否有未关闭的文件句柄 
        for file in session.sendFiles:
            try:
                moveFile2SendedDir(file)
            except Exception as e:
                logging.warning( traceback.format_exc() )
        session.sendFiles.clear()
        files = await get_some_waitingfiles_of_link(settings.sendFileDir,linkID)
        pushFiles2session(session, waitingFiles=files)
        
    elif evtType == EventType.TX_OBJECT_PURGED:
        '''
            
        '''
        if obj.handle in session.id2obj:
            session.id2obj.pop(obj.handle)
        info = obj.info.decode()
        if info in session.sendFiles:
            session.sendFiles.remove( info )
        obj.cancel()
        del obj
        del event
        try:
            moveFile2SendedDir( info )
        except Exception as e:
            logging.warning( traceback.format_exc() )
            
        
    elif evtType == EventType.TX_FLUSH_COMPLETED:
        '''
            the NORM sender observes when it no longer has data ready for transmission has completed
        '''
        for file in session.sendFiles:
            moveFile2SendedDir(file)
        session.sendFiles.clear()        
        files = await get_some_waitingfiles_of_link(settings.sendFileDir,linkID)
        pushFiles2session(session, waitingFiles=files)
        
    elif event.type == EventType.TX_OBJECT_PURGED:
        
        # 参见 NormSetTxCacheBounds 说明 
        handle:int = obj.handle
        if objType and handle in event.session.id2obj:
            event.session.id2obj.pop(handle) 
            
        if objType == ObjectType.FILE and obj.info:
            '''
                move sended files to the sended dir
            '''
            info:str = obj.info.decode()
            obj.cancel()
            del obj
            del event
            moveFile2SendedDir(info)
            if info in session.sendFiles:
                session.sendFiles.remove(info)
            filePath = await get_one_waitingfile_of_link(settings.sendFileDir, session.conf['linkID'])
            pushFiles2session(session, waitingFiles=[filePath])

the NORM receiver side:


async def proc_recver_file_event(event:pynorm.event.Event):
    '''
         file info is {channelName}/{fileName} encoding: uft-8 
    '''
    #mqttClient:MQTTClient = mqtt_client()
    
    evtType:EventType = event.type
    session:pynorm.session.Session = event.session
    obj:pynorm.object.Object = event.object
    #linkID = session.conf['linkID']
    info:str = obj.info.decode() if obj and obj.info else None
    print (f" {evtType}  {info=}")

    if evtType == EventType.RX_OBJECT_NEW:
        now:str = datetime.datetime.now().isoformat()
        
        channelName, fileName = info.split("/")
        obj.begin = now
        obj.fileName = fileName
        obj.channelName = channelName
        session.id2obj[ info ] =  obj


    elif evtType == EventType.RX_OBJECT_COMPLETED:
        '''
        This event is posted when a receive object is completely received, including available NORM_INFO content. 
        Unless the application specifically retains the "object" handle, the indicated NormObjectHandle becomes invalid and must no longer be referenced.
        '''
        if info in session.id2obj:
            session.id2obj.pop( info )
        #obj.cancel()
        
        newFilePath:str = os.path.join(settings.recvFileDir, obj.channelName, obj.fileName )
        oldPath:str = event.object.filename.decode( locale.getpreferredencoding() )
        print (f"{oldPath=} {newFilePath}")
        try:
            #remove conflict file 
            if os.path.isfile(newFilePath):
                os.remove(newFilePath)
            newDirPath:str = os.path.join(settings.recvFileDir, obj.channelName )
            #create dest dir.
            if not os.path.isdir(newDirPath):
                os.makedirs( newDirPath )           
            #remove temp file to new dir and give the final real name. 
            os.rename(src=oldPath, dst=newFilePath)
        except Exception as e:
            logging.warning ( traceback.format_exc() )  
            
        logging.info(f"{obj.channelName, obj.fileName} completed")
        
    elif evtType == EventType.RX_OBJECT_ABORTED:
        oldPath:str = event.object.filename.decode( locale.getpreferredencoding() )
        obj.cancel()
        os.remove(oldPath) #移除 临时文件 

        logging.warning( f'{obj.channelName, obj.fileName} aborted' )    

honglei avatar Sep 07 '22 01:09 honglei

Are you making any NORM API calls during that time?

Yes, user may add new send files(by calling session.fileEnqueue) while event notification process not complete.

honglei avatar Sep 07 '22 02:09 honglei

The TX_QUEUE_EMPTY notification should only be used as a cue for the sender application to enqueue new content for transmission.

The TX_OBJECT_PURGED notification is how the NORM lets the sender application that it is done with the associated file or memory content. NORM itself will delete the object after the notification is issued and the application does not need to (and should not) do that itself. It is an opportunity for the application dispose of (or otherwise manage) the actual file (or memory block) associated with the NormObject, but NORM itself will clean up the NormObject state.

I think the error is occurring because somehow a ProtoTree::Item is being removed from a tree twice (possibly when your code cancels/deletes the obj and then when NORM tries to also do the same) and the Item left/right pointers are not valid. I should probably "harden" the ProtoTree code a little more to avoid the loop issue here ... or at least make sure there is an ASSERT() that will fail instead of an infinite loop.

My recommendation is to remove the calls you make to:

obj.cancel()
del obj
del event

from your handling of those two notifications since the NORM code does that itself under the hood. Also you should only move the sender file during the PURGED notification.

The "norm/examples/python/normMsgr.py" illustrates the handling of TX_QUEUE_EMPTY and TX_OBJECT_PURGED notifications. Even though it is using NORM_OBJECT_DATA (memory blocks) instead of NORM_OBJECT_FILE (files), the handling of those notifications is similar. It simply is deleting the application message buffer upon purge where you want to move the sent files to another directory or something. I do plan to create a "normCast.py" example at some point that is similar to the "normCast.cpp" example. The "normMsgr.cpp" and "normMsgr.py" examples are very similar to an example of file transmission/reception.

bebopagogo avatar Sep 07 '22 02:09 bebopagogo

1 Norm sender, 1 norm receiver, the sender side has never received a TX_OBJECT_PURGED event, TX_QUEUE_EMPTY or TX_FLUSH_COMPLETED do received.

the events received in the NORM sender side:

GRTT_UPDATED
CC_ACTIVE
TX_RATE_CHANGED
CC_ACTIVE
GRTT_UPDATED
TX_OBJECT_SENT 1 ObjectType.FILE a中文频道1/文件0001.txt
GRTT_UPDATED
GRTT_UPDATED
TX_OBJECT_SENT 1 ObjectType.FILE a中文频道1/文件0002.txt
GRTT_UPDATED
TX_OBJECT_SENT 1 ObjectType.FILE a中文频道1/文件0003.txt
TX_OBJECT_SENT 1 ObjectType.FILE a中文频道1/文件0004.txt
TX_OBJECT_SENT 1 ObjectType.FILE a中文频道1/文件0005.txt
GRTT_UPDATED
TX_OBJECT_SENT 1 ObjectType.FILE a中文频道1/文件0006.txt
TX_OBJECT_SENT 1 ObjectType.FILE a中文频道1/文件0007.txt
TX_OBJECT_SENT 1 ObjectType.FILE a中文频道1/文件0008.txt
TX_OBJECT_SENT 1 ObjectType.FILE a中文频道1/文件0009.txt
GRTT_UPDATED
TX_OBJECT_SENT 1 ObjectType.FILE a中文频道1/文件0010.txt
TX_QUEUE_EMPTY
GRTT_UPDATED
GRTT_UPDATED
TX_FLUSH_COMPLETED

honglei avatar Sep 07 '22 03:09 honglei

A TX_OBJECT_PURGED notification is issued depending upon the sender tx cache parameters which consist of min count, max count, and max size. For example if min_count/max_count is 8, a purge notification for object #1 will be issued when object #9 is enqueued. For NACK-based operation, the NORM sender keeps transmit objects cached for possible response to NACKs ... the cache limit parameters determine when the oldest object is purged from the cache to make room for a newly enqueued object. (and newly enqueued objects are subject to the default timer-based flow control or if the sender app implements flow control using the ACK option)

bebopagogo avatar Sep 07 '22 03:09 bebopagogo

So, I can manually/safely remove all send files when TX_FLUSH_COMPLETED occured?

      elif evtType == EventType.TX_FLUSH_COMPLETED:
        '''
            the NORM sender observes when it no longer has data ready for transmission has completed
        '''
        for key, obj in session.id2obj.items():
            obj.cancel()
        session.id2obj.clear()
        del obj
        
        for file in session.sendFiles:
            moveFile2SendedDir(file)
        session.sendFiles.clear()   
 
        files = await get_some_waitingfiles_of_link(settings.sendFileDir,linkID)
        pushFiles2session(session, waitingFiles=files)

honglei avatar Sep 07 '22 03:09 honglei

I need to do

del obj
del event

because when TX_OBJECT_PURGED occured(the time I wanna move the file to sended file dir), the related obj file handle is still not closed by NORM:

TX_OBJECT_PURGED 1 ObjectType.FILE a中文频道1/文件0081.txt
11:44:46.996ERROR|deps.py|watch_norm_events|Traceback (most recent call last):
  File "E:\PythonPrj\crud_test\norm\deps.py", line 310, in watch_norm_events
    await proc_event(event)
  File "E:\PythonPrj\crud_test\server\proc_norm.py", line 129, in proc_sender_event
    moveFile2SendedDir(info)
  File "E:\PythonPrj\crud_test\server\proc_norm.py", line 53, in moveFile2SendedDir
    os.rename( currentFilePath, newFilePath )#移动到已发送目录
PermissionError: [WinError 32] 另一个程序正在使用此文件,进程无法访问。: '../sendFiles\\a中文频道1/文件0081.txt' -> '../sendedFiles\\a中文频道1\\文件0081.txt'

honglei avatar Sep 07 '22 03:09 honglei

Even the TX_OBJECT_PURGED event process complete, the file handle is still not closed by NORM. I try to move file after 1 seconds :

    elif event.type == EventType.TX_OBJECT_PURGED:
        # 参见 NormSetTxCacheBounds 说明 
        handle:int = obj.handle
        if objType and handle in event.session.id2obj:
            event.session.id2obj.pop(handle) 
            
        if objType == ObjectType.FILE and obj.info:
            info:str = obj.info.decode()
            #obj.cancel()
            #del obj
            #del event
            #moveFile2SendedDir(info)
            if info in session.sendFiles:
                session.sendFiles.remove(info)
            filePath = await get_one_waitingfile_of_link(settings.sendFileDir, linkID, not_in=session.sendFiles)
            pushFiles2session(session, waitingFiles=[filePath])
            loop = asyncio.get_running_loop()
            loop.call_later(1, moveFile2SendedDir, info)  

honglei avatar Sep 07 '22 04:09 honglei

OK ... so the event is dispatched to your application before the file is closed? I probably should update the code to close the file handle before the PURGE is issued. By the way, you could also instead use the NormFileRename() function to rename/move the file. I need to double check if that function is part of the Python binding. If not, it should be added. But it's a good idea to have the file handle closed before the PURGE notification in any case, so this is another good find by you. Thanks! I appreciate your rigor and patience.

So I just pushed a commit that adds and calls a NormFileObject::CloseFile() method just before the TX_OBJECT_PURGED notification is issued. This lets your code manipulate the file as desired. For NORM_OBJECT_DATA, the object was left "open" so the application could have the option of accessing and managing the associated memory block (e.g., freeing it if allocated by the application, etc). But for manipulating a file associated with a NORM_OBJECT_FILE, the file handle needs to be closed as you point out.

bebopagogo avatar Sep 07 '22 15:09 bebopagogo

NORM is a wonderful and amazing project! I think it's the best multicast file transfer protocol right now. uftp or udpcast is not actively maintained.

After changing the waiting time to 30 seconds to do rename after TX_OBJECT_PURGED event, and it works right now:

elif event.type == EventType.TX_OBJECT_PURGED:
    if info in session.info2obj:
        session.info2obj.pop(info)

    # 1, to avoid same file enqueued twice.
    #if info in session.sendFiles:
        #session.sendFiles.remove( info )        
        
    if objType == ObjectType.FILE and obj.info:
        filePath = await get_one_waitingfile_of_link(settings.sendFileDir, linkID, not_in=session.sendFiles)
        pushFiles2session(session, waitingFiles=[filePath])
        
        #2, waiting 30seconds for NORM to close file handle. 
        loop = asyncio.get_running_loop()
        loop.call_later(30, moveFile2SendedDir, info) 

honglei avatar Sep 08 '22 00:09 honglei

Did you try the new code I commited? I would not expect you to have to wait to move the file since the newly committed code closes the file handle before the PURGE notification is issued. Also, as mentioned, you could use the NormFileRename() call to move/rename the file as it does what is needed (for example, Linux lets you move/rename files while they are open but Windows does not ... although we recently learned if you are using a Samba file system with Linux it is better to close the file while it is being renamed and then re-open it).

In any case, I am curious if the newly committed code works as expected and you should not have to wait to move the filed after the PURGE notification is issued. BTW, have you looked at the examples/normCast.cpp example. It provides some useful file cast features such as iterating through directories, supporting optional positive acknowledgment/flow control, etc. As mentioned my plan is to also make a normCast.py example that has the same features / command-line syntax.

bebopagogo avatar Sep 08 '22 01:09 bebopagogo

the commit https://github.com/USNavalResearchLaboratory/norm/commit/b896f5408fc98b1355041609f0a5d23ca46de2c0 works with the following code:

if info in session.info2obj:
    session.info2obj.pop(info)
      
if objType == ObjectType.FILE and obj.info:
    moveFile2SendedDir(info)
    
    if info in session.sendFiles:
        session.sendFiles.remove( info )
    
    filePath = await get_one_waitingfile_of_link(settings.sendFileDir, linkID, not_in=session.sendFiles)
    pushFiles2session(session, waitingFiles=[filePath])

So, the issued solved !

honglei avatar Sep 08 '22 01:09 honglei

I changed pynorm with type hints added and using enum instead of int variables, this will be commited later.
https://github.com/honglei/fastapi-norm/tree/main/pynorm

As mentioned my plan is to also make a normCast.py example that has the same features / command-line syntax.

I’m willing to do this work.

honglei avatar Sep 08 '22 02:09 honglei

Same problem occured again when I try to send 100K small files! Env: Win10/1Gbps Ethernet/VC 2022 x64/Python3.10.6 Args: ccEnable=False txRate=5_000_000

    for i in range(1, 100_000):
        with open(os.path.join(dirPath, f"文件{i:05d}.txt"), "w") as f:
            f.write(f"{i} "*5_000)    

honglei avatar Sep 09 '22 01:09 honglei

This is caused by https://github.com/USNavalResearchLaboratory/norm/blob/40717bb5dd6f5dfd1ae2f207e8f1d4487603ed1e/src/common/normObject.cpp#L58-L63

Python can create two different Objects with same object_id,so when Python Object delete, the following code:

https://github.com/USNavalResearchLaboratory/norm/blob/40717bb5dd6f5dfd1ae2f207e8f1d4487603ed1e/src/pynorm/object.py#L122-L123

make the NormObjectRelease called twice or more, so delete this is called more than once, which caused undefined behavior in C++.

honglei avatar Sep 17 '22 09:09 honglei

like this: https://github.com/USNavalResearchLaboratory/norm/blob/40717bb5dd6f5dfd1ae2f207e8f1d4487603ed1e/src/pynorm/instance.py#L88-L91 and https://github.com/USNavalResearchLaboratory/norm/blob/40717bb5dd6f5dfd1ae2f207e8f1d4487603ed1e/src/pynorm/session.py#L124-L125

Maybe NormFileEnqueue and NormGetNextEvent can keep track of NORM Object's reference_count, Other Python code may not do such things.

honglei avatar Sep 17 '22 09:09 honglei

OK - I think I see the problem. The session.fileEnqueue() creates an Object that is returned, but it is not cached in the instance._objects. dictionary. Later when a sender notification for that object is issued, a second Python Object for that same object handle is created and placed into the instance._objects dictionary. That then leads to the error you have found. I think the correct solution is have the file/data/stream methods that create sender Object add reference of those objects to instance._objects dictionary so duplicative Python objects are not created. Then all the retain/release reference counting will be managed properly.

FYI, I also noticed that the session.fileEnqueue() (and dataEnqueue() and streamOpen()) function does not handle the case for when the underlying NormFileEnqueue() returns NORM_OBJECT_INVALID due to flow control / buffer limits. I think it should return 'None' in this case (instead of throwing the error check exception it does now) so I will also be making that change. I'm not sure how this one slipped through as the normMsgr.py testing should have revealed this, but I may have overlooked testing it in a way that would have revealed this problem.

bebopagogo avatar Sep 17 '22 14:09 bebopagogo

After thinking about it more, I am not sure this is the problem. Since NormObjectRetain() is called in the pynorm.Object.init() method and NormObjectRelease() is called in the pynorm.Object.del() method, the underlying NormObject.reference_count should be maintained properly even if multiple Python objects are created that reference the same NormObject. So I think there is something else involved here. I think I need to get your test code and try it myself. It may be related to your event handling somehow.

Note I do think the change mentioned above to cache the Python objects created with fileEnqueue(), etc so the same Python pynorm.Object returned is referenced in the Event handling may be useful. Typically it doesn't matter since the pynorm.Object just references the underlying C++ NormObject and doesn't have any state variables of its own. But if a user application did extend the pynorm.Object, they may want to be able to access the same Python object upon event notifications.

By the way, what is the session.id2obj() in your code? As mentioned, I may need to get a copy of your code that is having the problem for me to be able to diagnose it.

bebopagogo avatar Sep 17 '22 15:09 bebopagogo

The dead loop always occured when ProtoTimerMgr try to remove the report ProtoTimer ( OnReportTimeout) image

session.id2obj() is used th cache the objects in tx_queue of a session, since no norm api can do this righ now.

honglei avatar Sep 27 '22 08:09 honglei

Norm sender sider, use one instance, two sender session, link 224.1.2.4 use dir ../sendFiles/a中文频道1 with 10K small files. link 224.1.2.3 use dir ../sendFiles/频道2 with 100K small files.

import os
if os.name=='nt':
    #os.add_dll_directory(os.path.dirname(__file__))
    os.add_dll_directory(r'E:\PythonPrj\NORM\norm159\makefiles\win32\x64\Release')
import asyncio
import logging
import socket
import traceback
import re

import win32event
import netifaces

def get_interface_by_ip(ip:str):
    '''
       根据IP地址获取对应的接口,
       耗时Win7 0.07秒  Deepin/SW 0.001秒 
    '''
    for iface in netifaces.interfaces():
        allAddrs = netifaces.ifaddresses(iface)        
        if addrs:= allAddrs.get(socket.AF_INET):
            for addr in addrs:
                if addr['addr'] == ip:
                    return iface
        elif addrs:= allAddrs.get(socket.AF_INET6):
            for addr in addrs:
                if addr['addr'] == ip:
                    return iface
import pynorm

def pushFiles2session(session:pynorm.session.Session, waitingFiles:list[str], watermark=False) -> list[str]:
    '''
        try enqueue some files to `session`, return those succeed enqueued ones,
        向Session添加若干待发送文件,返回成功加入到队列的文件 
    '''
    added_files =[]
    for index, filePath in enumerate(waitingFiles):
        channelName, fileName = filePath.split("/")
        fullFilePath:str = os.path.join(settings.sendFileDir, channelName, fileName )
        
        #the session sending queue already has same filePath.  跳过队列中已有的同名文件,
        if filePath in session.info2obj:
            continue 
            #obj = session.info2obj.pop(filePath)
            #obj.cancel()
        obj:pynorm.Object = session.fileEnqueue( fullFilePath , info=filePath.encode() )
        handle:int|None = obj.handle
        if handle in (None, pynorm.NORM_OBJECT_INVALID):
            logging.info(f'pushed files:{added_files}')
            return added_files
        else:
            added_files.append( filePath )
            session.info2obj[filePath] = obj 
        if watermark:
            session.setWatermark(obj)
       
    logging.info(f'pushed files:{added_files}')
    return added_files

def moveFile2SendedDir(info:str) -> bool:
    currentFilePath = os.path.join(settings.sendFileDir, info)
    if not os.path.isfile(currentFilePath):
        return False
        #raise FileNotFoundError(currentFilePath)
    channelName, fileName = info.split('/')
    #1, create dir if not exits, 没有则创建目录
    dirPath = os.path.join(settings.sendedFileDir, channelName )
    if not os.path.exists(dirPath):
        os.makedirs(dirPath)
    newFilePath = os.path.join(dirPath, fileName)
    
    # 移除重名的已下载文件
    if os.path.isfile(newFilePath):
        os.remove(newFilePath)
    
    os.rename( currentFilePath, newFilePath )  #移动到已发送目录    
    logging.info( f"{info} moved!")
    return True

def moveFiles2SendedDir(infos:list[str])-> bool:
    for info in infos:
        moveFile2SendedDir(info)
        

def _get_some_waitingfiles_of_link(basicPath:str, channnelName:str, limit:int=20, not_in:dict|None=None):
    '''
        获取一个频道的待发送文件列表,
        limit: 限制最大数量
    '''
    path:str = os.path.join(basicPath, channnelName)
    #没有创建频道目录,
    if not os.path.isdir(path):
        #os.makedirs(path)
        return []
    
    waiting_files=[]
    dir_or_files = os.listdir(path)
    fileNum =0 
    for fileName in dir_or_files:
        file_path = os.path.join(path,fileName)
        # 临时文件、已完成文件不发送 
        if os.path.isfile(file_path) and not re.findall("\.tmp$|\.end$", fileName): 
            filePath =  f"{channnelName}/{fileName}"
            if not_in is None or filePath not in not_in:   # 未设置或者 文件名称不在待处理的名单中              
                waiting_files.append(filePath)
                fileNum +=1
                if fileNum >=limit:
                    break   
    return waiting_files

from aiofiles.os import wrap

get_some_waitingfiles_of_link = wrap(_get_some_waitingfiles_of_link)



import ipaddress
import typing
from pydantic import BaseSettings
class Settings(BaseSettings):
    sendFileDir:str=""
    sendedFileDir:str =""
    
settings = Settings()

import pynorm    
from pynorm.constants import EventType,NackingMode

SENDER_BUFFER_SPACE = 100*1024*1024

#from util.netifaces_ext import get_interface_by_ip
def create_sender_session(instance:pynorm.Instance, 
                          destAddr:str,
                          destPort:int, 
                          localAddr:str|None=None, 
                          localPort:int=0,                            
                          iface:str|None=None, # the interface of <localAddr>
                          srcAddr:str|None=None, #usef for setSSM
                          sessionIndex:typing.Hashable=None,
                          ccEnable:bool = True,
                          rateMin:float|None=-1,
                          rateMax:float|None=-1,
                          txRate:float|None =None,
                          bufferSpace:int = SENDER_BUFFER_SPACE,
                          segmentSize:int = 1400,
                          blockSize:int = 128,
                          numParity:int = 0,
                          loopbackEnable:bool = False
                        
                          ) -> pynorm.session.Session:

    session = instance.createSession(destAddr, destPort,  localId=ipaddress.IPv4Address(localAddr)._ip, index=sessionIndex  )
    if session._session == pynorm.NORM_SESSION_INVALID:
        raise Exception( f"createSession: NORM_SESSION_INVALID {destAddr}")
    if localAddr or localPort:
        session.setTxPort(localPort,txBindAddr=localAddr) #
    if iface:
        session.setMulticastInterface(iface)
    if srcAddr:
        session.setSSM(srcAddr=srcAddr)    
    session.setTxOnly(txOnly=True) 
    if txRate:
        session.setTxRate(txRate*1000)
    session.setCongestionControl(ccEnable=ccEnable) # 
    if loopbackEnable:
        session.setLoopback(True)
    if rateMin>=0 or rateMax>=0:
        session.setTxRateBounds(rateMin=rateMin*1000,rateMax=rateMax*1000)

    #sessionID = randint(0, 1000)
    success:bool = session.startSender(sessionIndex, bufferSpace, segmentSize=segmentSize, blockSize=blockSize, numParity=numParity ) 
    print (f"startSender:{success}")
    #session.setGroupSize(4)
    return session 
                
                
instance:pynorm.Instance = pynorm.Instance()
async def start_one_link(sendFileDir:str, link:dict, pushFileNum:int=10):
    global instance
    localAddr =link['localAddr'] 
    iface = link.get('iface')
    if os.name =='posix' and iface is None:
        loop = asyncio.get_running_loop()
        iface:str|None  = await loop.run_in_executor(None, get_interface_by_ip, localAddr)
        if not iface:
            raise Exception(f"{localAddr} is not valid local Addr")   
        
    session = create_sender_session(instance=instance,
                          destAddr=link['privateMulticastAddr'], destPort=link['destPort'],
                          localAddr = localAddr ,
                          #localPort:int=0,                            
                          iface= iface, # the interface of <localAddr>
                          #srcAddr:str|None=None, #usef for setSSM
                          sessionIndex = link['linkID'],
                          ccEnable = link['ccEnable'],
                          rateMin =link['rateMin'],
                          rateMax =link['rateMax'],
                          txRate =link['txRate'], 
                          bufferSpace = 100*1024*1024,
                          segmentSize = link['segmentSize'],
                          blockSize = link['blockSize'],
                          numParity = link['numParity'],
                          loopbackEnable=False,
                          )
    session.conf= link
    
    if ttl:=link.get('TTL'):
        session.setTTL(ttl)
    if tos:=link.get('TOS'):
        session.setTOS(tos)
    if autoParity:= link.get("autoParity"):
        session.setAutoParity(autoParity) 
    if waitingFiles:= link.get('waitingFiles'):
        pushFiles2session(session, waitingFiles[:pushFileNum])
        
    session.setTxCacheBounds(50_000_000, 200, 200) #NormSetTxCacheBounds
    files = await  get_some_waitingfiles_of_link(settings.sendFileDir,channnelName=link['channels'][0], )
    pushFiles2session(session, waitingFiles=files)
    

from pynorm.constants import  ObjectType

import copy           
async def proc_sender_event(event:pynorm.Event):
    '''
    
    '''
    #print(f"{str(event)}")
    obj = event.object
    evtType:pynorm.EventType = event.type
    session:pynorm.Session = event.session
    linkID:int = session.conf['linkID']
    
    if obj:
        objType:ObjectType = obj.type
        info:str|None =obj.info.decode() if obj.info else None
        print(event, linkID, obj.type, info)
    else:
        pass
        #print (event)
    
    if evtType == EventType.TX_OBJECT_SENT:
        '''
            需要缓存已经发送的对象,当收到TX_FLUSH_COMPLETED或TX_OBJECT_PURGED或者TX_QUEUE_EMPTY时  
        '''
        pass
        
    elif event.type == EventType.TX_OBJECT_PURGED:

            
        ## 不删除避免同一文件重复添加, to avoid same file enqueued twice.
        if objType == ObjectType.FILE:
            loop=asyncio.get_running_loop()
            #_future:asyncio.Future  = loop.run_in_executor(None, moveFile2SendedDir, info)  
            # 问题: 不能直接启动,在移动完成前从info2obj中移除, 这会导致重复添加文件  
            await loop.run_in_executor(None, moveFile2SendedDir, info) 
            if info in session.info2obj:
                session.info2obj.pop(info)        
                
                
    elif evtType == EventType.TX_QUEUE_EMPTY:
        if len(session.info2obj)>200:
            return 
        files = await  get_some_waitingfiles_of_link(settings.sendFileDir, channnelName=session.conf['channels'][0], limit=10, not_in=session.info2obj )
        if files:
            pushFiles2session(session, waitingFiles=files)
        
    elif evtType == EventType.TX_FLUSH_COMPLETED:
        '''
            the NORM sender observes when it no longer has data ready for transmission has completed
        '''
        try:
            
            files =await get_some_waitingfiles_of_link(settings.sendFileDir,channnelName=session.conf['channels'][0],  not_in=session.info2obj)
            if files:
                pushFiles2session(session, waitingFiles=files)
            else:
                # 当没有新文件需要发送时, 将队列中的文件移出到已发送目录 
                infos =[]
                for info, obj in session.info2obj.items():
                    obj.cancel()
                    infos.append(info)
                session.info2obj.clear()
                loop=asyncio.get_running_loop()
                _future:asyncio.Future  = loop.run_in_executor(None, moveFiles2SendedDir, infos)
        except Exception as e:
            print (traceback.format_exc())
        
import typing
from tornado.ioloop import PeriodicCallback

import pynorm
import select
import time
from pynorm import DebugLevel as NormDebugLevel
import logging
async def watch_norm_events(proc_event: typing.Callable[[pynorm.event.Event],None ], timeout:int=1):
    global instance
    instance.setDebugLevel(level=NormDebugLevel.INFO) #2 Warning 3 INFO 4 DEBUG


    handle = instance.getDescriptor()
    while True:
        try:
            if os.name =='nt':
                '''
                    DWORD WaitForSingleObject(
                     [in] HANDLE hHandle,
                     [in] DWORD  dwMilliseconds
                   );
                   [in] dwMilliseconds

                   The time-out interval, in milliseconds. If a nonzero value is specified, 
                   the function waits until the object is signaled or the interval elapses. 
                   If dwMilliseconds is zero, the function does not enter a wait state if the object is not signaled; 
                   it always returns immediately.
                   If dwMilliseconds is INFINITE, the function will return only when the object is signaled.

                '''
                value:int = await asyncio.to_thread(win32event.WaitForSingleObject, handle,1_000 ) # win32event.INFINITE
                if value == win32event.WAIT_TIMEOUT:
                    await asyncio.sleep(0)
                    continue
                elif value == win32event.WAIT_FAILED:
                    print ( f"error:{win32api.GetLastError()}" )
                elif value == win32event.WAIT_ABANDONED:
                    pass 
                elif value == win32event.WAIT_OBJECT_0:       
                    while event:= instance.getNextEvent( ):
                        beg = time.time()
                        await proc_event(event)
                        print ( f" {event.type.name} {time.time()-beg:.4f}")
                    #print(event) 
            else:
                readable, writable, exceptional = await asyncio.to_thread(select.select, [handle],[],[handle]) 
                if readable:
                    event: pynorm.event.Event = instance.getNextEvent( )
                    await proc_event(event)              
                    
        except Exception as e:
            logging.error ( traceback.format_exc() )
    print("watch_norm_events finished" )
    return 0
        
        
async def periodic_func():
    for session in instance._sessions.values():
        print (f"{session.multiAddr},  {session.getSenderReport().get_dict()}")
   
import tornado 
if __name__ =='__main__':
    logging.getLogger().setLevel(logging.INFO)
    link =  { 'linkID': 1, 'userID': 1, 'linkName': 'a', 'destPort': 6003, 'localAddr': '10.65.39.191',
              'TTL': None, 'TOS': None, 'ccEnable': False,
              'rateMin': -1, 'rateMax': 8000000, 'txRate': 8000000,
              'segmentSize': 1400, 'blockSize': 128, 'numParity': 4, 'autoParity': 1,
              'waitingFiles': [], 'channels': ['a中文频道1'],
              'sendFileDir': '../sendFiles',
              'privateMulticastAddr': '224.1.2.4'
              }

    link2 =  { 'linkID': 1, 'userID': 1, 'linkName': 'a', 'destPort': 6003, 'localAddr': '10.65.39.191',
              'TTL': None, 'TOS': None, 'ccEnable': False,
              'rateMin': -1, 'rateMax': 8000000, 'txRate': 8000000,
              'segmentSize': 1400, 'blockSize': 128, 'numParity': 4, 'autoParity': 1,
              'waitingFiles': [], 'channels': ['频道2'],
              'sendFileDir': '../sendFiles',
              'privateMulticastAddr': '224.1.2.3'
              }
    
    
    
    settings.sendFileDir= '../sendFiles'  # read files from subdir, each subdir is a channel 
    settings.sendedFileDir = '../sendedFiles'
    #loop = asyncio.get_event_loop_policy().new_event_loop() 
    t_loop = tornado.ioloop.IOLoop.current()
    loop = t_loop.asyncio_loop
    
    periodic_check:PeriodicCallback = PeriodicCallback( periodic_func , callback_time=10_000) 
    periodic_check.start()
    
    loop.create_task( watch_norm_events( proc_sender_event) )
    asyncio.run(  start_one_link( settings.sendFileDir, link) )
    asyncio.run(  start_one_link( settings.sendFileDir, link2) )
    loop.run_forever()

honglei avatar Sep 27 '22 09:09 honglei

ProtoTree has two different Report timer(each belong to one session), but with same timeout value: image Report1.timeout == Report2.timeout

image

honglei avatar Sep 28 '22 01:09 honglei

Each instance has a single ProtoDispatcher, so two sessions in one instance use the one single ProtoTree, and two report_timers also added to the same ProtoTree.

void ProtoTree::Remove(ProtoTree::Item& item)
{
    ASSERT(0 != item.GetKeysize());
    if (((&item == item.left) || (&item == item.right)) && (NULL != item.parent))  //1, False !!!! 
    {
        //....
    }
    else
    {
        // Root or "item" with no self-pointers 
        // (a.k.a an "internal entry"?)
        // 1) Find terminal "q"  with backpointer to "item"  
        const char* key = item.GetKey();
        unsigned int keysize = item.GetKeysize();
        Endian keyEndian = item.GetEndian();
        Item* x = &item;
        Item* q;
        do                      
        {     
            q = x;              
            if (Bit(key, keysize, x->bit, keyEndian)) //2, alaways True, two report_timer(s) have same timeout value. 
                x = x->right;
            else                
                x = x->left;    
        } while (x != &item); //3 always True ,since they are two different report_timer( session 224.1.2.3 and session 224.1.2.4).

use address to esscape while-loop, but use value to iterate in the tree, that is the problem.

honglei avatar Sep 28 '22 02:09 honglei

The ProtoTimer class uses the ProtoSortedTree for the timer items. The ProtoSortedTree::Insert() and ProtoSortedTree::Remove() methods only allow a single item for a given value to be in the underlying ProtoTree used. If two items have the same value (e.g., two ProtoTimers with the same timeout), only one of them is in the tree while the other is present only in the ordered linked list that is maintained as part of the ProtoSortedTree. So, I don't understand how the condition you describe might occur and I am going to look into it.

Note that ProtoTree is a Patricia Trie implementation and the left/right item pointers can be uplinks to the item itself thus producing the case where "x == &item" and is the condition to escape the while loop above.

bebopagogo avatar Sep 28 '22 14:09 bebopagogo

The ProtoSortedTree.unique_items_only is false by default, so ProtoTimerTable allow two items have the same value.

#include "protoDefs.h"   // for ProtoSystemTime()  
#include "protoTimer.h"

int main(int argc, char* argv[])
{
    ProtoTimerTable table;
    ProtoTimer cc_timer, ack_timer;
    cc_timer.SetInterval(1);
    cc_timer.SetRepeat(1);
    cc_timer.UpdateKey();

    ack_timer.SetInterval(2);
    ack_timer.SetRepeat(2);
    cc_timer.UpdateKey();

    table.Insert(cc_timer);
    table.Insert(ack_timer);

    table.Remove(cc_timer);
    table.Remove(ack_timer);
}

honglei avatar Sep 29 '22 01:09 honglei

Does that code produce the problem? It is OK for the ProtoSortedTree to have multiple (but different) items with the same value. For example, when there are two items with the same value, internally only one of the items is inserted into the internal ProtoTree used while both are part of an ordered linked list. If the item that is in the tree is removed, the other one from the linked list takes the first item's place in the the tree structure. I actually wrote some similar test code today that worked just to double check:

`int main(int argc, char* argv[]) { TestItem a(1.0); TestItem b(2.0); TestItem c(1.0);

TestTree tree;

tree.Insert(a);
tree.Insert(b);
tree.Insert(c);

TestTree::Iterator iter(tree);
TestItem* next;
while (NULL != (next = iter.GetNextItem()))
    TRACE("item value: %lf\n", next->GetValue());

TRACE("now removing ...\n");

tree.Remove(b);
tree.Remove(c);
tree.Remove(a);

TRACE("done.\n");

} // end main()`

This code worked without problem. The TestTree and TestItem are simple ProtoSortedTree and ProtoSortedTree::Item subclasses but use a floating point 'double' as the key.

bebopagogo avatar Sep 29 '22 01:09 bebopagogo