Source code for dpp.helper_bestprof

import logging
import numpy as np
from glob import glob
from os.path import join

from dpp.helper_config import from_yaml, dump_to_yaml
from dpp.helper_files import glob_pfds
from vcstools.prof_utils import subprocess_pdv, get_from_ascii
from vcstools.gfit import gfit


logger = logging.getLogger(__name__)


[docs]class NoUsableFoldsError(Exception): """Raise when no usable folds are found in a pipe""" pass
[docs]def bestprof_info(filename): """ Finds various information on a .bestprof file Parameters ---------- filename: string The path of the bestprof file Returns ------- info_dict: dictionary A dictionary consisting of the following: obsid: int The ID of the observation puslar: string The J name of the pulsar nbins: int The number of bins used to fold this profile chi: float The reduced Chi squared value of the fold sn: float The signal to noise ratio of the fold dm: float The pulsar's dispersion measure period: float The pulsar's period period_error: float The error in the pulsar's period measurement """ #open the file and read the info into a dictionary info_dict = {} f = open(filename, "r") lines = f.read() f.close() lines = lines.split("\n") #info: #info_dict["obsid"] = int(lines[0].split()[4].split("_")[0]) #info_dict["pulsar"] = lines[1].split()[3].split("_")[1] info_dict["nbins"] = int(lines[9].split()[4]) info_dict["chi"] = float(lines[12].split()[4]) info_dict["sn"] = float(lines[13].split()[4][2:]) info_dict["dm"] = float(lines[14].split()[4]) info_dict["period"] = float(lines[15].split()[4])/1e3 #in seconds info_dict["period_error"] = float(lines[15].split()[6])/1e3 info_dict["pdot"] = float(lines[16].split()[4])/1e3 info_dict["pdot_error"] = float(lines[16].split()[6])/1e3 info_dict["profile"] = list(np.genfromtxt(filename)[:,1]) f.close() return info_dict
[docs]def bestprof_fit(cfg, cliptype="verbose"): """Fits a profile to the best bestprof and adds it to cfg. Cliptype options found in prof_utils.py""" # Get the profile bins = str(cfg["source"]["my_bins"]) pointing = cfg["source"]["my_pointing"] profile = cfg["folds"][pointing]["post"][bins]["profile"] # Gaussian fit g_fitter = gfit(profile, plot_name=cfg["files"]["gfit_plot"]) g_fitter.auto_gfit() fit = g_fitter.fit_dict() g_fitter.plot_fit() # Find the longest component longest_comp = 0 for comp_name in fit["comp_idx"].keys(): comp = fit["comp_idx"][comp_name] if len(comp) > longest_comp: longest_comp = len(comp) cfg["source"]["my_component"] = comp_name # Add the fit to cfg cfg["source"]["gfit"] = fit
[docs]def find_best_pointing(cfg): """Decides the best folding solution from bestprofs and updates cfg""" # Populate cfg with initial fold info for pointing in cfg["folds"].keys(): nbins = list(cfg["folds"][pointing]["init"].keys())[0] try: bestprof_name = glob_pfds(cfg, pointing, nbins, pfd_type="pfd.bestprof")[0] except IndexError as e: raise IndexError(f"No .bestprofs found: {cfg['files']['psr_dir']}") cfg["folds"][pointing]["init"][nbins] = bestprof_info(bestprof_name) # Search for pointings with positive classifications (>=3 out of 5 is positive classification) positive_pointings = [pointing for pointing in cfg["folds"].keys() if cfg["folds"][pointing]["classifier"]>=3] # DM > 0 check: pointings_to_remove = [] for pointing in positive_pointings: if cfg["folds"][pointing]["init"][nbins]["dm"] < 0.1: pointings_to_remove.append(pointing) [positive_pointings.remove(i) for i in pointings_to_remove] # Throw exception if there aren't any positive detections if len(positive_pointings) == 0: raise NoUsableFoldsError(f"No positive classifications found in pulsar directory {cfg['files']['psr_dir']}") else: cfg["run_ops"]["detection"] = True # A pulsar has been detected best_chi = 0 for pointing in positive_pointings: nbins = list(cfg["folds"][pointing]["init"].keys())[0] if cfg["folds"][pointing]["init"][nbins]["chi"] > best_chi: cfg["source"]["my_pointing"] = pointing # Check if there are header files for vdif processing cfg["run_ops"]["vdif"] = bool(glob(join(cfg["files"]["psr_dir"], pointing, "*.hdr"))) logger.info(f"Best pointing found with chi value of {best_chi}: {cfg['source']['my_pointing']}") # Update config file period and DM my_pointing = cfg["source"]["my_pointing"] nbins = list(cfg["folds"][my_pointing]["init"].keys())[0] my_P = cfg["folds"][my_pointing]["init"][nbins]["period"] my_DM = cfg["folds"][my_pointing]["init"][nbins]["dm"] cfg["source"]["my_P"] = my_P logger.info(f"Updated source period: {my_P}") cfg["source"]["my_DM"] = my_DM logger.info(f"Updated source DM: {my_DM}")
[docs]def populate_post_folds(cfg): """Fills the cfg with info on all of the post folds""" my_pointing = cfg["source"]["my_pointing"] for bins in cfg["folds"][my_pointing]["post"].keys(): try: bestprof_name = glob_pfds(cfg, my_pointing, bins, pfd_type="pfd.bestprof")[0] except IndexError as _: raise IndexError(f"No .bestprofs found: {cfg['files']['psr_dir']}") cfg["folds"][my_pointing]["post"][bins] = bestprof_info(bestprof_name)
[docs]def best_post_fold(cfg): """Finds the best fold to use in the cfg""" pointing = cfg["source"]["my_pointing"] min_chi = cfg["run_ops"]["thresh_chi"] min_sn = cfg["run_ops"]["thresh_sn"] good_chi = cfg["run_ops"]["good_chi"] good_sn = cfg["run_ops"]["good_sn"] post_folds = [int(i) for i in cfg["folds"][pointing]["post"].keys()] post_folds = sorted(post_folds, reverse=True) # Sort from highest to lowest bins # "good" loop for bin_count in post_folds: info = cfg["folds"][pointing]["post"][str(bin_count)] if info["sn"] >= good_sn and info["chi"] >= good_chi: cfg["source"]["my_bins"] = bin_count cfg["source"]["my_DM"] = info["dm"] cfg["source"]["my_P"] = info["period"] cfg["source"]["my_Pdot"] = info["pdot"] break # "minimum requirements" loop if cfg["source"]["my_bins"] == None: logger.info("No folds meet 'good' criteria") for bin_count in post_folds: info = cfg["folds"][pointing]["post"][str(bin_count)] if info["sn"] >= min_sn and info["chi"] >= min_chi: cfg["source"]["my_bins"] = bin_count cfg["source"]["my_DM"] = info["dm"] cfg["source"]["my_P"] = info["period"] cfg["source"]["my_Pdot"] = info["pdot"] break if cfg["source"]["my_bins"] == None: # The classifier still gave a positive detection somewhere. Continue with lowest bin post fold logger.warn("No folds meet minimum requirements. Will continue with lowest bin fold") bin_count = post_folds[-1] info = cfg["folds"][pointing]["post"][str(bin_count)] cfg["source"]["my_bins"] = bin_count cfg["source"]["my_DM"] = info["dm"] cfg["source"]["my_P"] = info["period"] cfg["source"]["my_Pdot"] = info["pdot"] else: logger.info(f"Continuing with bin count: {cfg['source']['my_bins']}")
[docs]def classify_init_bestprof(cfg): """Determines whether an iniital fold is a detection based on its PRESTO output""" for pointing in cfg["folds"].keys(): # Get the fold info bins = list(cfg["folds"][pointing]["init"].keys())[0] bprof = glob_pfds(cfg, pointing, bins, pfd_type=".pfd.bestprof")[0] cfg["folds"][pointing]["init"][bins] = bestprof_info(bprof) # Evaluate if cfg["folds"][pointing]["init"][bins]["sn"]>0: # Sometimes sigma is zero for some reason psr_eval = True if (cfg["folds"][pointing]["init"][bins]["chi"]>=4 and cfg["folds"][pointing]["init"][bins]["sn"]>=8) else False else: psr_eval = True if cfg["folds"][pointing]["init"][bins]["chi"]>=4 else False cfg["folds"][pointing]["classifier"] = 5 if psr_eval else 0 # classifier >=3 means this is a detection cfg["completed"]["classify"] = True