Usage question (or bug)
I'm trying to write a basic pipeline I can use for testing, and am getting some read failures, which I presume are due me misunderstanding how to read/write to the ring. Here's my code:
"""
# test_block.py
Test pipeline with prototype blocks
"""
import numpy as np
import threading
import json
import bifrost
from bifrost.ring import Ring
class BfPipeline(list):
""" Simple pipeline class.
Blocks are appended to the pipeline, which is list-like. The Pipeline class
provides a run() function, which will start the pipeline.
"""
def run(self):
threads = []
for item in self:
threads.append(threading.Thread(target=item.main))
for thread in threads:
thread.daemon = True
thread.start()
for thread in threads:
# wait for thread to terminate
thread.join()
class BfBlock(object):
""" Simple block class """
def __init__(self, gulp_size=4096, core=-1, guarantee=True):
self.gulp_size = gulp_size
self.core = core
self.guarantee = guarantee
def _main(self): # Launched in thread
bifrost.affinity.set_core(self.core)
class BfSourceBlock(BfBlock):
""" Block class for I/O sources (readers)
A source block requires an output ring.
"""
def __init__(self, oring, *args, **kwargs):
super(BfSourceBlock, self).__init__(*args, **kwargs)
self.oring = oring
def write_sequence(self, oring, name, header, data):
""" Write """
header_json = json.dumps(header)
with oring.begin_sequence(name, header=header_json) as sequence:
self.write_to_sequence(sequence, data)
def write_to_sequence(self, sequence, data):
with sequence.reserve(self.gulp_size) as span:
data = data.view('uint8').ravel()
span.data[0][:] = data
class BfSinkBlock(BfBlock):
""" Block class for I/O sinks (writers)
A source block requires an input ring.
"""
def __init__(self, iring, *args, **kwargs):
super(BfSinkBlock, self).__init__(*args, **kwargs)
self.iring = iring
class BfTaskBlock(BfBlock):
""" Block class for tasks (transform the data in some way
A source block requires an output ring AND and input ring.
"""
def __init__(self, iring, oring, *args, **kwargs):
super(BfTaskBlock, self).__init__(*args, **kwargs)
self.iring = iring
self.oring = oring
#####################
## ##
## Test classes ##
## ##
#####################
class NumpyTestSource(BfSourceBlock):
""" This generates a stream of numpy arrays """
def __init__(self, *args, **kwargs):
super(NumpyTestSource, self).__init__(*args, **kwargs)
self.seed = 1
def generate_new_data(self):
data = np.arange(1024).astype('float32') + self.seed
self.seed += 1
return data
def generate_header(self):
hdr = {'seed': self.seed}
return hdr
def main(self): # Launched in thread
self._main()
self.oring.resize(self.gulp_size)
with self.oring.begin_writing() as oring:
while self.seed <= 100:
ohdr = self.generate_header()
data = self.generate_new_data()
name = str(ohdr['seed'])
self.write_sequence(oring, name=name, header=ohdr, data=data)
class NumpyTestSink(BfSinkBlock):
""" This receives a stream of numpy arrays and verifies them """
def main(self):
self._main()
self.iring.resize(self.gulp_size)
for iseq in self.iring.read(guarantee=self.guarantee):
header = json.loads(iseq.header.tostring())
for ispan in iseq.read(self.gulp_size):
data = ispan.data.view('float32')
data_check = np.arange(1024).astype('float32') + header['seed']
try:
assert np.allclose(data_check, data)
except:
print "ERROR: %s" % header['seed']
print data
print data_check
if __name__ == "__main__":
import time
# Bring pipeline up and down several times
n_trials = 10
for ii in range(n_trials):
print "Pipeline bringup %i of %i" % (ii+1, n_trials)
ring1 = Ring()
pipeline = BfPipeline()
np_source = NumpyTestSource(ring1, core=1)
np_sink = NumpyTestSink(ring1, core=2, guarantee=True)
pipeline.append(np_source)
pipeline.append(np_sink)
pipeline.run()
time.sleep(0.1)
When I run this, I get output like:
Pipeline bringup 1 of 10
Pipeline bringup 2 of 10
ERROR: 9
[[ 13. 14. 15. ..., 1034. 1035. 1036.]]
[ 9. 10. 11. ..., 1030. 1031. 1032.]
ERROR: 10
[[ 14. 15. 16. ..., 1035. 1036. 1037.]]
[ 10. 11. 12. ..., 1031. 1032. 1033.]
ERROR: 12
[[ 16. 17. 18. ..., 1037. 1038. 1039.]]
[ 12. 13. 14. ..., 1033. 1034. 1035.]
Pipeline bringup 3 of 10
ERROR: 12
[[ 16. 17. 18. ..., 1037. 1038. 1039.]]
[ 12. 13. 14. ..., 1033. 1034. 1035.]
ERROR: 13
[[ 17. 18. 19. ..., 1038. 1039. 1040.]]
[ 13. 14. 15. ..., 1034. 1035. 1036.]
Pipeline bringup 4 of 10
ERROR: 6
[[ 10. 11. 12. ..., 1031. 1032. 1033.]]
[ 6. 7. 8. ..., 1027. 1028. 1029.]
ERROR: 21
[[ 25. 26. 27. ..., 1046. 1047. 1048.]]
[ 21. 22. 23. ..., 1042. 1043. 1044.]
Pipeline bringup 5 of 10
Pipeline bringup 6 of 10
Pipeline bringup 7 of 10
Pipeline bringup 8 of 10
Pipeline bringup 9 of 10
Pipeline bringup 10 of 10
ERROR: 8
[[ 12. 13. 14. ..., 1033. 1034. 1035.]]
[ 8. 9. 10. ..., 1029. 1030. 1031.]
ERROR: 9
[[ 13. 14. 15. ..., 1034. 1035. 1036.]]
[ 9. 10. 11. ..., 1030. 1031. 1032.]
ERROR: 77
[[ 81. 82. 83. ..., 1102. 1103. 1104.]]
[ 77. 78. 79. ..., 1098. 1099. 1100.]
ERROR: 81
[[ 85. 86. 87. ..., 1106. 1107. 1108.]]
[ 81. 82. 83. ..., 1102. 1103. 1104.]
i.e. my test fails. Where am I going wrong?
Interesting test, took me a bit too get my head around it :)
To confirm, it's failing on some trials but not others? Does it still happen if you make the sleep much longer?
On 20 Jul 2016 6:44 pm, "Danny Price" [email protected] wrote:
I'm trying to write a basic pipeline I can use for testing, and am getting some read failures, which I presume are due me misunderstanding how to read/write to the ring. Here's my code:
"""# test_block.pyTest pipeline with prototype blocks""" import numpy as npimport threadingimport json import bifrostfrom bifrost.ring import Ring
class BfPipeline(list): """ Simple pipeline class. Blocks are appended to the pipeline, which is list-like. The Pipeline class provides a run() function, which will start the pipeline. """ def run(self):
threads = [] for item in self: threads.append(threading.Thread(target=item.main)) for thread in threads: thread.daemon = True thread.start() for thread in threads: # wait for thread to terminate thread.join()class BfBlock(object): """ Simple block class """ def init(self, gulp_size=4096, core=-1, guarantee=True): self.gulp_size = gulp_size self.core = core self.guarantee = guarantee
def _main(self): # Launched in thread bifrost.affinity.set_core(self.core)class BfSourceBlock(BfBlock): """ Block class for I/O sources (readers) A source block requires an output ring. """
def __init__(self, oring, *args, **kwargs): super(BfSourceBlock, self).__init__(*args, **kwargs) self.oring = oring def write_sequence(self, oring, name, header, data): """ Write """ header_json = json.dumps(header) with oring.begin_sequence(name, header=header_json) as sequence: self.write_to_sequence(sequence, data) def write_to_sequence(self, sequence, data): with sequence.reserve(self.gulp_size) as span: data = data.view('uint8').ravel() span.data[0][:] = dataclass BfSinkBlock(BfBlock): """ Block class for I/O sinks (writers) A source block requires an input ring. """
def __init__(self, iring, *args, **kwargs): super(BfSinkBlock, self).__init__(*args, **kwargs) self.iring = iringclass BfTaskBlock(BfBlock): """ Block class for tasks (transform the data in some way A source block requires an output ring AND and input ring. """ def init(self, iring, oring, _args, *_kwargs): super(BfTaskBlock, self).init(_args, *_kwargs) self.iring = iring self.oring = oring
####################### #### Test classes #### ####################### class NumpyTestSource(BfSourceBlock): """ This generates a stream of numpy arrays """
def __init__(self, *args, **kwargs): super(NumpyTestSource, self).__init__(*args, **kwargs) self.seed = 1 def generate_new_data(self): data = np.arange(1024).astype('float32') + self.seed self.seed += 1 return data def generate_header(self): hdr = {'seed': self.seed} return hdr def main(self): # Launched in thread self._main() self.oring.resize(self.gulp_size) with self.oring.begin_writing() as oring: while self.seed <= 100: ohdr = self.generate_header() data = self.generate_new_data() name = str(ohdr['seed']) self.write_sequence(oring, name=name, header=ohdr, data=data)class NumpyTestSink(BfSinkBlock): """ This receives a stream of numpy arrays and verifies them """ def main(self): self._main()
self.iring.resize(self.gulp_size) for iseq in self.iring.read(guarantee=self.guarantee): header = json.loads(iseq.header.tostring()) for ispan in iseq.read(self.gulp_size): data = ispan.data.view('float32') data_check = np.arange(1024).astype('float32') + header['seed'] try: assert np.allclose(data_check, data) except: print "ERROR: %s" % header['seed'] print data print data_checkif name == "main":
import time # Bring pipeline up and down several times n_trials = 10 for ii in range(n_trials): print "Pipeline bringup %i of %i" % (ii+1, n_trials) ring1 = Ring() pipeline = BfPipeline() np_source = NumpyTestSource(ring1, core=1) np_sink = NumpyTestSink(ring1, core=2, guarantee=True) pipeline.append(np_source) pipeline.append(np_sink) pipeline.run() time.sleep(0.1)When I run this, I get output like:
Pipeline bringup 1 of 10 Pipeline bringup 2 of 10 ERROR: 9 [[ 13. 14. 15. ..., 1034. 1035. 1036.]] [ 9. 10. 11. ..., 1030. 1031. 1032.] ERROR: 10 [[ 14. 15. 16. ..., 1035. 1036. 1037.]] [ 10. 11. 12. ..., 1031. 1032. 1033.] ERROR: 12 [[ 16. 17. 18. ..., 1037. 1038. 1039.]] [ 12. 13. 14. ..., 1033. 1034. 1035.] Pipeline bringup 3 of 10 ERROR: 12 [[ 16. 17. 18. ..., 1037. 1038. 1039.]] [ 12. 13. 14. ..., 1033. 1034. 1035.] ERROR: 13 [[ 17. 18. 19. ..., 1038. 1039. 1040.]] [ 13. 14. 15. ..., 1034. 1035. 1036.] Pipeline bringup 4 of 10 ERROR: 6 [[ 10. 11. 12. ..., 1031. 1032. 1033.]] [ 6. 7. 8. ..., 1027. 1028. 1029.] ERROR: 21 [[ 25. 26. 27. ..., 1046. 1047. 1048.]] [ 21. 22. 23. ..., 1042. 1043. 1044.] Pipeline bringup 5 of 10 Pipeline bringup 6 of 10 Pipeline bringup 7 of 10 Pipeline bringup 8 of 10 Pipeline bringup 9 of 10 Pipeline bringup 10 of 10 ERROR: 8 [[ 12. 13. 14. ..., 1033. 1034. 1035.]] [ 8. 9. 10. ..., 1029. 1030. 1031.] ERROR: 9 [[ 13. 14. 15. ..., 1034. 1035. 1036.]] [ 9. 10. 11. ..., 1030. 1031. 1032.] ERROR: 77 [[ 81. 82. 83. ..., 1102. 1103. 1104.]] [ 77. 78. 79. ..., 1098. 1099. 1100.] ERROR: 81 [[ 85. 86. 87. ..., 1106. 1107. 1108.]] [ 81. 82. 83. ..., 1102. 1103. 1104.]
i.e. my test fails. Where am I going wrong?
— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/ledatelescope/bifrost/issues/35, or mute the thread https://github.com/notifications/unsubscribe-auth/ADy3WNH39TiCaiHuOm77wlUzA3VVrCjKks5qXs8FgaJpZM4JRXjN .
Sorry if it is a little strange! I just wanted to test sending numpy arrays through a pipeline.
Yep, seems to fail some trials but not others. Sleep doesn't seem to make a difference. Setting guarantee=False on the second ring causes a crash (it's probably trying to read before the ring is initialized).
The data are "correct" (not just random bits), but the header value appears offset by 4. IIRC this is the default size of the ring, right?