Source code for th260.th260sorter

# This file is part of Pals3D
#
# TH260Sorter is meant to sort data from a PicoQuant TimeHarp 260 Pico
# via TH260LIB.DLL v 3.1. for applications to positron annihilation
# lifetime spectroscopy.
#
# ---------------------------------------------
#
# Copyright (c) 2018-2019 Aurelie Vancraeyenest 
# ---------------------------------------------
#
# Pals3D is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pals3D is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Pals3D.  If not, see <http://www.gnu.org/licenses/>.
#
# Based on demo code from:
# Keno Goertz, PicoQuant GmbH, February 2018
#

from collections import deque
import time

from PyQt5 import QtCore
import numpy as np

from toolbox import utils as ut


[docs]class SortingWorker(QtCore.QObject): """ Data sorting class for data received from a TH260 PicoQuant card Inherit from QObect to use Qt signal and slot logic for receiving and sorting data. Can then be easily used in a GUI application Parameters ---------- kwargs : kwargs Keyword arguments passed from the main thread Attributes ---------- dataArray : dict dataDeck : deque islastEvent : bool globRes : double resultArray_01 : list resultArray_02 : list resultArray_12 : list Keyword Args ------------ sortingType : str '2C' or '3C' timeGate : int Long time gate for positron lifetime (in ps) timeRes : int Short time gate for 511 MeV photons (in ps) filename : str Filename base for output file CFDset : dict Dictionnary of all the CFD settings for all channels acqTime : int Acquisition time (in min) nftot : int Total number of files during current measurement """ COINCRATE = QtCore.pyqtSignal(int) #: :obj:pyqtSignal(int) NEW_OUTPUT = QtCore.pyqtSignal(str) #: :obj:pyqtSignal(str) T2WRAPAROUND_V1 = 33552000 #: int : Wraparound for version 1 T2WRAPAROUND_V2 = 33554432 #: int : Wraparound for version 2 VERSION = 2 #: int: Version ==> remove? def __init__(self, **kwargs): """Constructor method of the TH260sorter class""" super(SortingWorker, self).__init__() self.kwargs = kwargs self.globRes = 25.0e-12 self.resultArray_01 = list() self.resultArray_02 = list() self.resultArray_12 = list() self.dataArray = dict([('01', self.resultArray_01), ('02', self.resultArray_02), ('12', self.resultArray_12)]) self.dataDeck = deque(maxlen=500) self.islastEvent = False
[docs] def newMeasurement(self, noFile): """ Initialise class attributes for new measurement Parameters ---------- noFile : int File numero in case of multiple file acquisition """ self.dataDeck.clear() self.dataArray['01'] = list() self.dataArray['02'] = list() self.dataArray['12'] = list() self.sortingType = self.kwargs["sortingType"] self.timeGate = self.kwargs["timeGate"] self.timeRes = self.kwargs["timeRes"] self.file = self.kwargs["filename"] self.cfd = self.kwargs["CFDset"] self.oflcorrection = 0 self.islastEvent = False
[docs] def saveData(self, noFile, **kwargs): """ Do histogramming of the whole data set and save to files Parameters ---------- noFile : int Numero of the current acquisition file to be appended to the filename base. """ self.NEW_OUTPUT.emit("Saving data...") try: filebase, extension = self.file.rsplit(sep=".", maxsplit=1) except ValueError: filebase = self.file outputFileName = "".join((filebase, "_", str(noFile).zfill(3))) # Saving raw dT for each channel for triple coinc mode if self.sortingType == '3C': evtl = [(self.dataArray['01'][i], self.dataArray['02'][i], self.dataArray['12'][i]) for i in range(len(self.dataArray['01']))] np.save(outputFileName, evtl) # Do histogramming rmax = int(self.timeGate) # We want bins centered on multiple of 25ps bins_sync = np.arange(-12.5, rmax+13, 25) bins_chn = np.arange(-(rmax//2+12.5), rmax//2+13, 25) histo01, bin_edges01 = np.histogram(np.array(self.dataArray['01']), bins=bins_sync, range=[-12.5, rmax+13]) histo02, bin_edges02 = np.histogram(np.array(self.dataArray['02']), bins=bins_sync, range=[0, rmax]) histo12, bin_edges12 = np.histogram(np.array(self.dataArray['12']), bins=bins_chn, range=[(rmax//2+12.5), rmax//2+12.5]) # Saving all 3 hist at once # bincenters01/02 histo01 histo02 bincenters12 histo12 histos = np.array([0.5*(bin_edges01[1:]+bin_edges01[:-1]), histo01, histo02, 0.5*(bin_edges12[1:]+bin_edges12[:-1]), histo12]) np.savetxt(outputFileName+'.hst', histos.T, fmt='%10i', header=("Measurement date : {0}" "\nCFD settings:" "\nChannel |\tCFD ZeroCross |\tCFD level |\tOffset" "\nSync \t {z0} mV \t {l0} mV \t{o0} ps" "\nChn1 \t {z1} mV \t {l1} mV \t{o1} ps" "\nChn2 \t {z2} mV \t {l2} mV \t{o2} ps" "\nAcquisition settings:" "\nMode: {m} |\t long gate: {lg} ps \t" "|\t short gate: {sg} ps" "\nAcquisition time: {at:.0f} min \t" "|\t file #{nf} out of {nftot}" "\n\ntime\tsync-1 \t sync-2 \t time \t chn1-chn2") .format(time.asctime(), z0=self.cfd['zero0'], l0=self.cfd['lev0'], o0=self.cfd['off0'], z1=self.cfd['zero1'], l1=self.cfd['lev1'], o1=self.cfd['off1'], z2=self.cfd['zero2'], l2=self.cfd['lev2'], o2=self.cfd['off2'], m=self.sortingType, lg=self.timeGate, sg=self.timeRes, at=self.kwargs['acqTime'], nf=noFile+1, nftot=self.kwargs['nftot']), comments='#', delimiter='\t')
def _gotPhoton(self, recNum, timeTag, channel, dtime): """ Append real photon events to dataDeck parameters ---------- recNum : int Record numero timeTag: int Timetag corrected by the overflow tags with respect to the overall measurment start (tick resolution 25 ps) channel: int Channel number: 0 (sync), 1-2 (channel) dtime: int Real time : timeTag * self.globRes * 1e12 """ self.dataDeck.append((recNum, channel, timeTag, timeTag * self.globRes * 1e12)) self.sortDeck()
[docs] @QtCore.pyqtSlot() def processLastEvents(self): """Force the processing of the last events to empty dataDeck""" self.islastEvent = True self.sortDeck()
[docs] def sortDeck(self): """ Check if the data deque is full to start the sorting according to the sortingType ('2C' or '3C') """ if ((len(self.dataDeck) == self.dataDeck.maxlen) or (self.islastEvent is True)): if self.sortingType is '2C': n2Dcoinc = self._2Cfiltering() self.COINCRATE.emit(n2Dcoinc) elif self.sortingType is '3C': n3Dcoinc = self._3Cfiltering() self.COINCRATE.emit(n3Dcoinc)
def _2Cfiltering(self): """ Process the events into double coincidence events Use the pairwise function of the utils module to look at each pair of successive photons and determine if they are recorded in the timeGate time interval, and if they are issued from different channels. If so, store the time difference into the corresponding list of the dataArray dict. Warnings -------- Note that we are looking at paires only, so in case of true triple coinc, we might miss the coinc between first and 3rd event. It doesn't matter for resolution with Co measurements, but can have a slight impact for true experiments """ # !!! Note that we are looking at paires only, so in case of # true triple coinc, we might miss the coinc between first and 3rd evt. # Doesn't matter for resolution with Co, but can have a slight impact # for true experiments keeplast = False ncoinc = 0 for pair in ut.pairwise(self.dataDeck): is2D = (pair[0][1] != pair[1][1]) if not is2D: keeplast = True continue chnPair = str(int(pair[0][1])) + str(int(pair[1][1])) dtime = pair[1][3] - pair[0][3] isInGate = (dtime < self.timeGate) if not isInGate: keeplast = True continue if chnPair in {'01', '02', '12'}: self.dataArray[chnPair].append(dtime) else: self.dataArray[chnPair[::-1]].append(-dtime) keeplast = False ncoinc += 1 if keeplast is True: lastEvent = self.dataDeck.pop() self.dataDeck.clear() self.dataDeck.append(lastEvent) else: self.dataDeck.clear() return ncoinc def _3Cfiltering(self): """ Process the events into triple coincidence events Use the tripletwise function of the utils module to look at each triplet of successive photons and determine if they are real triple coincidence events. A triple coicidence event is defined as follow: three successive events are recorded from three different channels and the first event should be recorded in the sync channel (chan 0). Further requirements are that the time difference between records in channel 1 and 2 should be less than timeRes and that the three events are recorded within the timeGate time interval. If so, store the time difference into the corresponding list of the dataArray dict. """ keeplast = False ncoinc = 0 for triplet in ut.tripletwise(self.dataDeck): is3D = (triplet[0][1] == 0 and triplet[1][1] != 0 and triplet[2][1] != 0 and triplet[1][1] != triplet[2][1]) if not is3D: keeplast = True continue isEv1inChn1 = triplet[1][1] == 1 dtimeS1 = triplet[1][3] - triplet[0][3] dtimeS2 = triplet[2][3] - triplet[0][3] dtime12 = triplet[2][3] - triplet[1][3] isInGate = (dtimeS1 < self.timeGate and dtimeS2 < self.timeGate and dtime12 < self.timeRes) if not isInGate: keeplast = True continue if isEv1inChn1: self.dataArray['01'].append(dtimeS1) self.dataArray['02'].append(dtimeS2) self.dataArray['12'].append(dtime12) else: self.dataArray['01'].append(dtimeS2) self.dataArray['02'].append(dtimeS1) self.dataArray['12'].append(-dtime12) keeplast = False ncoinc += 1 # Clear the deque and keep the last 2 events if keeplast is True if keeplast is True: lastEvents = [self.dataDeck.pop() for _ in range(2) if keeplast is True] self.dataDeck.clear() self.dataDeck.append(lastEvents[-1]) self.dataDeck.append(lastEvents[-2]) else: self.dataDeck.clear() return ncoinc
[docs] def sortBuffer(self, buffer, nrecords): """ Decode buffer individual events to produce a time tagged event Parameters ---------- buffer : object - ctype array buffer Raw data buffer received over a signal from the acquisition thread. nrecords : int Total number of records in the buffer """ # data received over a signal self.dataToSort = buffer # received from a signal ctype array self.numRecords = nrecords # received from a signal for recNum in range(0, self.numRecords): try: recordData = "{0:0{1}b}".format(self.dataToSort[recNum], 32) except: # TODO: deal with this bare exception print("The file ended earlier than expected, at record %d/%d." % (recNum, self.numRecords)) special = int(recordData[0:1], base=2) channel = int(recordData[1:7], base=2) timetag = int(recordData[7:32], base=2) if special == 1: if channel == 0x3F: # Overflow # Number of overflows in nsync. If old version, it's an # old style single overflow if self.VERSION == 1: self.oflcorrection += self.T2WRAPAROUND_V1 else: if timetag == 0: # old style overflow shouldn't happen self.oflcorrection += self.T2WRAPAROUND_V2 else: self.oflcorrection += self.T2WRAPAROUND_V2*timetag if channel >= 1 and channel <= 15: # markers truetime = self.oflcorrection + timetag if channel == 0: # sync truetime = self.oflcorrection + timetag self._gotPhoton(recNum, truetime, 0, 0) else: # regular input channel truetime = self.oflcorrection + timetag self._gotPhoton(recNum, truetime, channel+1, 0)