Source code for dpp.helper_config

import psrqpy
import logging
from os.path import join
import yaml
import subprocess
import numpy as np

from dpp.helper_source_info import bin_sampling_limit, is_binary, required_bin_folds
from dpp.helper_obs_info import find_fold_times
from dpp.helper_files import file_precursor
from vcstools.metadb_utils import get_common_obs_metadata
from vcstools.progress_bar import progress_bar
from vcstools import data_load
from vcstools.config import load_config_file


logger = logging.getLogger(__name__)


[docs]def initiate_cfg(kwargs, psr, pointings, enter, leave, power, query=None, metadata=None): """ Adds all available keys to the cfg dictionary and figures out some useful constants Takes kwargs from observation_processing_pipeline """ comp_config = load_config_file() cfg = {"obs": {}, "source": {}, "completed": {}, "folds": {}, "run_ops": {}, "pol": {}, "files":{}} if query is None: query = psrqpy.QueryATNF(loadfromdb=data_load.ATNF_LOC).pandas if metadata is None: metadata = get_common_obs_metadata(kwargs["obsid"]) query_index = list(query["JNAME"]).index(psr) cfg["obs"]["ra"] = metadata[1] cfg["obs"]["dec"] = metadata[2] cfg["obs"]["dur"] = metadata[3] cfg["obs"]["freq"] = metadata[5] cfg["obs"]["id"] = kwargs["obsid"] cfg["obs"]["cal"] = kwargs["calid"] cfg["obs"]["beg"] = kwargs["beg"] cfg["obs"]["end"] = kwargs["end"] cfg["run_ops"]["loglvl"] = kwargs["loglvl"] cfg["run_ops"]["mwa_search"] = kwargs["mwa_search"] cfg["run_ops"]["vcstools"] = kwargs["vcstools"] cfg["run_ops"]["label"] = kwargs["label"] cfg["run_ops"]["thresh_chi"] = 3.5 cfg["run_ops"]["thresh_sn"] = 8.0 cfg["run_ops"]["good_chi"] = 4.0 cfg["run_ops"]["good_sn"] = 20.0 cfg["run_ops"]["vdif"] = None cfg["run_ops"]["mask"] = None cfg["run_ops"]["detection"] = False cfg["run_ops"]["exit_status"] = "300" # should be set appropriately on pipeline termination. 300 means pipelin hasn't begun cfg["files"]["file_precursor"] = file_precursor(kwargs, psr) cfg["files"]["psr_dir"] = join(comp_config["base_data_dir"], str(cfg["obs"]["id"]), "dpp", cfg["files"]["file_precursor"]) cfg["files"]["batch_dir"] = join(comp_config['base_data_dir'], cfg["obs"]["id"], "batch") cfg["files"]["classify_dir"] = join(cfg["files"]["psr_dir"], "classifier_ppp") cfg["files"]["my_name"] = join(cfg["files"]["psr_dir"], f"{cfg['files']['file_precursor']}_cfg.yaml") cfg["files"]["logfile"] = join(cfg["files"]["psr_dir"], f"{cfg['files']['file_precursor']}.log") cfg["files"]["archive"] = join(cfg["files"]["psr_dir"], f"{cfg['files']['file_precursor']}_archive.ar") cfg["files"]["archive_ascii"] = join(cfg["files"]["psr_dir"], f"{cfg['files']['file_precursor']}_archive.txt") cfg["files"]["gfit_plot"] = join(cfg["files"]["psr_dir"], f"{cfg['files']['file_precursor']}_gfit.png") cfg["files"]["converted_fits"] = join(cfg["files"]["psr_dir"], f"{cfg['files']['file_precursor']}_archive.fits") # debased fits file needs to be the same as archive except for the extension arch = cfg["files"]["archive"].split(".ar")[0] cfg["files"]["debased_fits"] = f"{arch}.debase.gg" # paswing file needs to be the same as debase except for the .gg extension debase = cfg["files"]["debased_fits"].split(".gg")[0] cfg["files"]["paswing"] = f"{debase}.paswing" cfg["files"]["chigrid_initial_ps"] = join(cfg["files"]["psr_dir"], f"{cfg['files']['file_precursor']}_chigrid_initial.ps") cfg["files"]["paswing_initial_ps"] = join(cfg["files"]["psr_dir"], f"{cfg['files']['file_precursor']}_paswing_initial.ps") cfg["files"]["RVM_fit_initial"] = join(cfg["files"]["psr_dir"], f"{cfg['files']['file_precursor']}_RVM_fit_initial.out") cfg["files"]["chigrid_final_ps"] = join(cfg["files"]["psr_dir"], f"{cfg['files']['file_precursor']}_chigrid_final.ps") cfg["files"]["paswing_final_ps"] = join(cfg["files"]["psr_dir"], f"{cfg['files']['file_precursor']}_paswing_final.ps") cfg["files"]["RVM_fit_final"] = join(cfg["files"]["psr_dir"], f"{cfg['files']['file_precursor']}_RVM_fit_final.out") cfg["files"]["ppol_profile_ps"] = join(cfg["files"]["psr_dir"], f"{cfg['files']['file_precursor']}_profile.ps") cfg["files"]["ppol_polar_profile_ps"] = join(cfg["files"]["psr_dir"], f"{cfg['files']['file_precursor']}_pol.ps") cfg["source"]["enter_frac"] = None cfg["source"]["exit_frac"] = None cfg["source"]["power"] = None cfg["source"]["name"] = psr try: cfg["source"]["enter_frac"] = float(enter) cfg["source"]["exit_frac"] = float(leave) cfg["source"]["power"] = float(power) except TypeError as _: #If any of these are nones, the pulsar isn't in the beam for given beg, end raise TypeError(f"{cfg['source']['name']} not in beam for given times") cfg["source"]["sampling_limit"] = int(bin_sampling_limit(cfg["source"]["name"], query=query)) cfg["source"]["ATNF_P"] = float(query["P0"][query_index]) cfg["source"]["ATNF_DM"] = float(query["DM"][query_index]) tau_sc = query["DM"][query_index] cfg["source"]["scattering"] = float(tau_sc) if not bool(np.isnan(tau_sc)) else None cfg["source"]["my_DM"] = None cfg["source"]["my_P"] = None cfg["source"]["my_Pdot"] = None cfg["source"]["my_bins"] = None cfg["source"]["my_pointing"] = None cfg["source"]["my_component"] = None cfg["source"]["gfit"] = None cfg["source"]["binary"] = is_binary(cfg["source"]["name"], query=query) cfg["source"]["seek"] = cfg["source"]["enter_frac"] * (cfg["obs"]["end"] - cfg["obs"]["beg"]) cfg["source"]["total"] = (cfg["source"]["exit_frac"] - cfg["source"]["enter_frac"]) * (cfg["obs"]["end"] - cfg["obs"]["beg"]) if cfg["source"]["binary"]: cfg["source"]["edited_eph_name"] = join(cfg["files"]["psr_dir"], f"{cfg['files']['file_precursor']}.eph") cfg["source"]["edited_eph"] = create_edited_eph(cfg["source"]["name"]) else: cfg["source"]["edited_eph"] = None cfg["source"]["edited_eph_name"] = None init, post = required_bin_folds(cfg["source"]["name"], query=query) for pointing in pointings: cfg["folds"] = {pointing: {"init":{}, "post":{}}} cfg["folds"][pointing]["classifier"] = 0 cfg["folds"][pointing]["dir"] = join(cfg["files"]["psr_dir"], pointing) for _, i in enumerate(init): cfg["folds"][pointing]["init"][str(i)] = {} for _, i in enumerate(post): cfg["folds"][pointing]["post"][str(i)] = {} cfg["pol"]["RM"] = None cfg["pol"]["RM_e"] = None cfg["pol"]["alpha"] = None cfg["pol"]["beta"] = None cfg["pol"]["l0"] = None cfg["pol"]["pa0"] = None cfg["pol"]["chi"] = None cfg["completed"] = {} cfg["completed"]["init_folds"] = False cfg["completed"]["classify"] = False cfg["completed"]["post_folds"] = False cfg["completed"]["upload"] = False cfg["completed"]["debase"] = False cfg["completed"]["RM"] = False cfg["completed"]["RVM_initial"] = False cfg["completed"]["RVM_final"] = False return cfg
[docs]def create_edited_eph(pulsar_name): """Created a string version of 'psrcat -e2' and removes the last line""" eph = subprocess.check_output(["psrcat", "-e2", pulsar_name]) eph = eph.decode("utf-8") eph = "\n".join(tuple(eph.split("\n")[:-2])) return eph
[docs]def reset_cfg(cfg): """Sets all progress to incomplete to force a rerun""" cfg["completed"]["init_folds"] = False cfg["completed"]["classify"] = False cfg["completed"]["post_folds"] = False cfg["completed"]["upload"] = False cfg["completed"]["debase"] = False cfg["completed"]["RM"] = False cfg["completed"]["RVM_initial"] = False cfg["completed"]["RVM_final"] = False cfg["run_ops"]["detection"] = False cfg["run_ops"]["exit_status"] = "300" # Pipeline hasn't begun
def from_yaml(filepath): with open(filepath) as f: my_dict = yaml.load(f, Loader=yaml.Loader) return my_dict def dump_to_yaml(cfg): with open(cfg["files"]["my_name"], 'w') as f: yaml.dump(cfg, f, default_flow_style=False) return cfg["files"]["my_name"]
[docs]def create_cfgs_main(kwargs, psrs_pointing_dict): """ uses kwargs from observation_processing_pipeline.py """ metadata, full_meta = get_common_obs_metadata(kwargs["obsid"], return_all=True) query = psrqpy.QueryATNF(loadfromdb=data_load.ATNF_LOC).pandas cfgs = [] # Make the fold times dictionary (it's done for all pulsars simultaneously for speed) psr_list = list(psrs_pointing_dict.keys()) fold_times_dict = find_fold_times(psr_list, kwargs["obsid"], kwargs["beg"], kwargs["end"], metadata=metadata, full_meta=full_meta, query=query) for psr in progress_bar(psr_list, "Initiating pulsar configs: "): logger.info(psr) pointing_list = psrs_pointing_dict[psr] enter = fold_times_dict[psr]["enter"] leave = fold_times_dict[psr]["leave"] power = fold_times_dict[psr]["power"] try: cfgs.append(initiate_cfg(kwargs, psr, pointing_list, enter, leave, power, query=query, metadata=metadata)) except TypeError as e: logger.info(e) continue return cfgs