Source code for fancytools.fcollections.FIFObuffer

# -*- coding: utf-8 -*-
import numpy as np


[docs]class FIFObuffer(object): """ A circular FIFO buffer implemented on top of numpy. >>> a = FIFObuffer(shape=3) >>> a.add(2) >>> a.add(33) >>> a.add(124) >>> print a [ 2. 33. 124.] >>> a.add(21) >>> a.add(2456) >>> print a [ 124. 21. 2456.] """ def __init__(self, shape, dtype=np.float32, filled=False): self._cache = np.zeros(shape, dtype) self._values = np.zeros(shape, dtype) if not isinstance(shape,int): raise Exception('%s can only handle 1darrays at the moment, shape has to be int' %self.__class__.__name__) self.shape = shape self.size = shape self._splitPos = 0 if filled: self._ind = self.shape else: self._ind = 0 self._cached = False self._splitValue = 1
[docs] def setNextLineEveryNValues(self, n): self._splitValue = 1/float(n) if int(self._splitValue) == self._splitValue: self._splitValue = int(self._splitValue)
[docs] def add(self, value): """ Add a value to the buffer. """ ind = int(self._ind % self.shape) self._pos = self._ind % self.shape self._values[ind] = value if self._ind < self.shape: self._ind += 1 #fast fill else: self._ind += self._splitValue self._splitPos += self._splitValue self._cached = False
[docs] def array(self): """ Returns a numpy array containing the last stored values. """ if self._ind < self.shape: return self._values[:self._ind] if not self._cached: ind = int(self._ind % self.shape) self._cache[:self.shape - ind] = self._values[ind:] self._cache[self.shape - ind:] = self._values[:ind] self._cached = True return self._cache
def __len__(self): return min(int(self._ind), self.shape) @property def position(self): return self.__len__()-1
[docs] def splitPos(self): '''return the position of where to split the array to get the values in the right order''' if self._ind < self.shape: return 0 v = int(self._splitPos) if v >= 1: self._splitPos = 0 return v
def __repr__(self): return str(self.array()) def __getitem__(self, key): return self.array()[key]
if __name__ == "__main__": import doctest doctest.testmod()