PyDAQmx icon indicating copy to clipboard operation
PyDAQmx copied to clipboard

example / MultiChannelAnalogInput.py is synchronization for every Channel

Open rosickey opened this issue 11 years ago • 2 comments

the example is synchronization for every Channel?

rosickey avatar Jun 24 '14 04:06 rosickey

What exactly are yo asking? It creates taskhandles for every channel.

zfaizal avatar Jul 17 '15 05:07 zfaizal

Hi! i have a question which i think is similar to this thread.

I am working with a NI Compact DAQ with a 9222, a 4 channel high freq voltage input. So far i have succeed in reading in the data for each channel, one at the time. But what i want to achieve is to read the channels simultaneously in sync. Can some of your wizards in here help me with this? I have copied my code below.

Please bear with me, i am fairly new to Python.

Best

Jeppe

import numpy from PyDAQmx.DAQmxFunctions import * from PyDAQmx.DAQmxConstants import * import time class MultiChannelAnalogInput(): """Class to create a multi-channel analog input

Usage: AI = MultiChannelInput(physicalChannel)
    physicalChannel: a string or a list of strings
optional parameter: limit: tuple or list of tuples, the AI limit values
                    reset: Boolean
Methods:
    read(name), return the value of the input name
    readAll(), return a dictionary name:value
"""

#==============================================================================

Initate analog channels

#==============================================================================

def __init__(self,physicalChannel, limit = None, reset = False):
    if type(physicalChannel) == type(""):
        self.physicalChannel = [physicalChannel]
    else:
        self.physicalChannel  =physicalChannel
    self.numberOfChannel = physicalChannel.__len__()

    if limit is None:
        self.limit = dict([(name, (-5.0,5.0)) for name in self.physicalChannel])
    elif type(limit) == tuple:
        self.limit = dict([(name, limit) for name in self.physicalChannel])
    else:
        self.limit = dict([(name, limit[i]) for  i,name in enumerate(self.physicalChannel)])  

    if reset:
        DAQmxResetDevice(physicalChannel[0].split('/')[0] )

        self.BufferSize = 10

#==============================================================================

#============================================================================== def configure(self): # Create one task handle per Channel taskHandles = dict([(name,TaskHandle(0)) for name in self.physicalChannel]) for name in self.physicalChannel: DAQmxCreateTask("",byref(taskHandles[name])) DAQmxCreateAIVoltageChan(taskHandles[name],name,"",DAQmx_Val_Diff, self.limit[name][0],self.limit[name][1], DAQmx_Val_Volts,None)

    self.taskHandles = taskHandles
def readAll(self):
    return dict([(name,self.read(name)) for name in self.physicalChannel])

def read(self,name = None):
    start_time = time.time()

    dataSize = 833
    data = numpy.zeros((dataSize,), dtype=numpy.float64)
    if name is None:
        name = self.physicalChannel[0]
    taskHandle = self.taskHandles[name]  
    DAQmxCfgSampClkTiming(taskHandle,"",dataSize,DAQmx_Val_Rising,DAQmx_Val_FiniteSamps,dataSize)         
    DAQmxStartTask(taskHandle)

data = AI_data_type()

    read = int32()
    DAQmxReadAnalogF64(taskHandle,dataSize,10.0,DAQmx_Val_GroupByChannel,data,dataSize,byref(read),None)
    DAQmxStopTask(taskHandle)


    print("--- %s seconds ---" % (time.time() - start_time))

if name == 'main':

DAQ = MultiChannelAnalogInput(["M1/ai2","M1/ai1"])
DAQ.configure()

.

CPHLIVE avatar Oct 05 '15 14:10 CPHLIVE