Source code for diaGrabber.source.stream
# -*- coding: utf-8 *-*
#from copy import deepcopy
#import select
#import time
import sys, os
import fcntl
import subprocess
from . import _dimension
from _source import _source
[docs]class stream(_source):
def __init__(self, command, start_via, stop_via, dim_seperator,
data_type = "unknown",
key_to_end_process = "",
readoutEverNLine = 1,
infoEveryNLines = 1000):
super(stream,self).__init__(data_type)
self.dimension = _dimension.dimension
self._command = command
self.file_name = command
self.key_to_end_process = key_to_end_process
self.dim_seperator = dim_seperator
self.start_via = start_via
self.stop_via = stop_via
self._readoutEverNLine = readoutEverNLine
self._infoEveryNLines = infoEveryNLines
[docs] def setInfoEveryNLines(self,nLines):
self._infoEveryNLines = int(nLines)
[docs] def getInfoEveryNLines(self):
return self._infoEveryNLines
[docs] def setReadoutEverNLine(self,nLine):
self._readoutEverNLine = int(nLine)
[docs] def getReadoutEverNLine(self):
return self._readoutEverNLine
def _getMinMax(self,dims):
sys.exit("ERROR: source.stream need a given dimension.includeFromTo()-range")
def _non_block_read(self, output):
''' even in a thread, a normal read with block until the buffer is full '''
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.read().strip().split("\n")
except:
return []
def _prepareReadOut(self, matrixClass):
self._prepareStandard(matrixClass)
#start stream
self.engine = subprocess.Popen(
self._command,
bufsize=-1,
stdin=sys.stdin,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
if self.start_via != "":
self.engine.stdin.write(b"%s\n" %self.start_via)
self.engine.stdin.flush()
self.done_reading_old_output = True
self.done_readout = False
self.output = []
self.len_output = 0
self.pos_output = 0
self.read_n_lines = 0
self.step_n_lines = 0
def _endReadOut(self):
try:
if self.key_to_end_process != "":
print "finish process via key ' %s '" %self.key_to_end_process
for h in range(10): # to ensure ending
#sys.stdin.write(b"%s\n" %self.key_to_end_process)
###dont work at the moment
self.engine.stdin.write(b"%s" %self.key_to_end_process)
#self.engine.stdin.flush()
#self.engine.stdin.close()
except (IOError, AttributeError):
pass
try:
print "closing process"
time.sleep(1)
#for h in range(10):
#self.engine.close()
self.engine.kill()
#self.engine.wait()
except (IOError, AttributeError):
pass
finally:
return True
def _readOut(self, readout_interrupt, end_readOut):
# ensure, that the process will terminate
if end_readOut:
done_readout = self._endReadOut()
else:
done_readout = False
done_one_line = False
while not done_readout and not done_one_line:
try:
if self.done_reading_old_output:
#get new output
self.output = self._non_block_read(self.engine.stdout)
#reset position
if self.len_output > 0:
self.len_output -=1
self.pos_output -= self.len_output
self.len_output = len(self.output)
self.done_reading_old_output = False
if self.len_output <= 1: #got no output
self.done_reading_old_output = True
else: #process output
while self.pos_output < self.len_output:
line = self.output[self.pos_output]
if line == self.stop_via:
done_readout = self._endReadOut()
break
file_dim = line.split(self.dim_seperator)
self.read_n_lines += 1
self.step_n_lines += 1
self._assignValues(file_dim)
self.pos_output += self._readoutEverNLine
if self.step_n_lines == self._infoEveryNLines:
print "readout %s lines" %self.read_n_lines
self.step_n_lines = 0
if readout_interrupt:
done_one_line = True
break
if self.pos_output+1 >= self.len_output:
break
if self.pos_output+1 >= self.len_output:
self.done_reading_old_output = True
except KeyboardInterrupt:
done_readout = self._endReadOut()
return done_readout