Skip to content

new_sources.py

check_primary_image(row)

Checks if the primary image is in the image list.

Parameters:

Name Type Description Default
row pd.Series

Row of the missing_sources_df, need the keys 'primary' and 'img_list'.

required

Returns:

Type Description
bool

True if the primary image is in the image list.

Source code in vast_pipeline/pipeline/new_sources.py
24
25
26
27
28
29
30
31
32
33
34
35
36
def check_primary_image(row: pd.Series) -> bool:
    """
    Checks if the primary image is in the image list.

    Args:
        row:
            Row of the missing_sources_df, need the keys 'primary' and
            'img_list'.

    Returns:
        True if the primary image is in the image list.
    """
    return row['primary'] in row['img_list']

gen_array_coords_from_wcs(coords, wcs)

Converts SkyCoord coordinates to array coordinates given a wcs.

Parameters:

Name Type Description Default
coords SkyCoord

The coordinates to convert.

required
wcs WCS

The WCS to use for the conversion.

required

Returns:

Type Description
np.ndarray

Array containing the x and y array coordinates of the input sky

np.ndarray

coordinates, e.g.:

np.ndarray

np.array([[x1, x2, x3], [y1, y2, y3]])

Source code in vast_pipeline/pipeline/new_sources.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def gen_array_coords_from_wcs(coords: SkyCoord, wcs: WCS) -> np.ndarray:
    """
    Converts SkyCoord coordinates to array coordinates given a wcs.

    Args:
        coords:
            The coordinates to convert.
        wcs:
            The WCS to use for the conversion.

    Returns:
        Array containing the x and y array coordinates of the input sky
        coordinates, e.g.:
        np.array([[x1, x2, x3], [y1, y2, y3]])
    """
    array_coords = wcs.world_to_array_index(coords)
    array_coords = np.array([
        np.array(array_coords[0]),
        np.array(array_coords[1]),
    ])

    return array_coords

get_image_rms_measurements(group, nbeam=3, edge_buffer=1.0)

Take the coordinates provided from the group and measure the array cell value in the provided image.

Parameters:

Name Type Description Default
group pd.DataFrame

The group of sources to measure in the image, requiring the columns: 'source', 'wavg_ra', 'wavg_dec' and 'img_diff_rms_path'.

required
nbeam int

The number of half beamwidths (BMAJ) away from the edge of the image or a NaN value that is acceptable.

3
edge_buffer float

Multiplicative factor applied to nbeam to act as a buffer.

1.0

Returns:

Type Description
pd.DataFrame

The group dataframe with the 'img_diff_true_rms' column added. The

pd.DataFrame

column will contain 'NaN' entires for sources that fail.

Source code in vast_pipeline/pipeline/new_sources.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def get_image_rms_measurements(
    group: pd.DataFrame, nbeam: int = 3, edge_buffer: float = 1.0
) -> pd.DataFrame:
    """
    Take the coordinates provided from the group
    and measure the array cell value in the provided image.

    Args:
        group:
            The group of sources to measure in the image, requiring the
            columns: 'source', 'wavg_ra', 'wavg_dec' and 'img_diff_rms_path'.
        nbeam:
            The number of half beamwidths (BMAJ) away from the edge of the
            image or a NaN value that is acceptable.
        edge_buffer:
            Multiplicative factor applied to nbeam to act as a buffer.

    Returns:
        The group dataframe with the 'img_diff_true_rms' column added. The
        column will contain 'NaN' entires for sources that fail.
    """
    image = group.iloc[0]['img_diff_rms_path']

    with fits.open(image) as hdul:
        header = hdul[0].header
        wcs = WCS(header, naxis=2)
        data = hdul[0].data.squeeze()

    # Here we mimic the forced fits behaviour,
    # sources within 3 half BMAJ widths of the image
    # edges are ignored. The user buffer is also
    # applied for consistency.
    pixelscale = (
        proj_plane_pixel_scales(wcs)[1] * u.deg
    ).to(u.arcsec)

    bmaj = header["BMAJ"] * u.deg

    npix = round(
        (nbeam / 2. * bmaj.to('arcsec') /
        pixelscale).value
    )

    npix = int(round(npix * edge_buffer))

    coords = SkyCoord(
        group.wavg_ra, group.wavg_dec, unit=(u.deg, u.deg)
    )

    array_coords = gen_array_coords_from_wcs(coords, wcs)

    # check for pixel wrapping
    x_valid = np.logical_or(
        array_coords[0] >= (data.shape[0] - npix),
        array_coords[0] < npix
    )

    y_valid = np.logical_or(
        array_coords[1] >= (data.shape[1] - npix),
        array_coords[1] < npix
    )

    valid = ~np.logical_or(
        x_valid, y_valid
    )

    valid_indexes = group[valid].index.values

    group = group.loc[valid_indexes]

    if group.empty:
        # early return if all sources failed range check
        logger.debug(
            'All sources out of range in new source rms measurement'
            f' for image {image}.'
        )
        group['img_diff_true_rms'] = np.nan
        return group

    # Now we also need to check proximity to NaN values
    # as forced fits may also drop these values
    coords = SkyCoord(
        group.wavg_ra, group.wavg_dec, unit=(u.deg, u.deg)
    )

    array_coords = gen_array_coords_from_wcs(coords, wcs)

    acceptable_no_nan_dist = int(
        round(bmaj.to('arcsec').value / 2. / pixelscale.value)
    )

    nan_valid = []

    # Get slices of each source and check NaN is not included.
    for i,j in zip(array_coords[0], array_coords[1]):
        sl = tuple((
            slice(i - acceptable_no_nan_dist, i + acceptable_no_nan_dist),
            slice(j - acceptable_no_nan_dist, j + acceptable_no_nan_dist)
        ))
        if np.any(np.isnan(data[sl])):
            nan_valid.append(False)
        else:
            nan_valid.append(True)

    valid_indexes = group[nan_valid].index.values

    if np.any(nan_valid):
        # only run if there are actual values to measure
        rms_values = data[
            array_coords[0][nan_valid],
            array_coords[1][nan_valid]
        ]

        # not matched ones will be NaN.
        group.loc[
            valid_indexes, 'img_diff_true_rms'
        ] = rms_values.astype(np.float64) * 1.e3

    else:
        group['img_diff_true_rms'] = np.nan

    return group

new_sources(sources_df, missing_sources_df, min_sigma, edge_buffer, p_run)

Processes the new sources detected to check that they are valid new sources. This involves checking to see that the source should be seen at all in the images where it is not detected. For valid new sources the snr value the source would have in non-detected images is also calculated.

Parameters:

Name Type Description Default
sources_df pd.DataFrame

The sources found from the association step.

required
missing_sources_df pd.DataFrame

The dataframe containing the 'missing detections' for each source. +----------+----------------------------------+-----------+------------+ | source | img_list | wavg_ra | wavg_dec | |----------+----------------------------------+-----------+------------+ | 278 | ['VAST_0127-73A.EPOCH01.I.fits'] | 22.2929 | -71.8717 | | 702 | ['VAST_0127-73A.EPOCH01.I.fits'] | 28.8125 | -69.3547 | | 776 | ['VAST_0127-73A.EPOCH01.I.fits'] | 31.8223 | -70.4674 | | 844 | ['VAST_0127-73A.EPOCH01.I.fits'] | 17.3152 | -72.346 | | 934 | ['VAST_0127-73A.EPOCH01.I.fits'] | 9.75754 | -72.9629 | +----------+----------------------------------+-----------+------------+ ------------------------------------------------------------------+ skyreg_img_list | ------------------------------------------------------------------+ ['VAST_0127-73A.EPOCH01.I.fits', 'VAST_0127-73A.EPOCH08.I.fits'] | ['VAST_0127-73A.EPOCH01.I.fits', 'VAST_0127-73A.EPOCH08.I.fits'] | ['VAST_0127-73A.EPOCH01.I.fits', 'VAST_0127-73A.EPOCH08.I.fits'] | ['VAST_0127-73A.EPOCH01.I.fits', 'VAST_0127-73A.EPOCH08.I.fits'] | ['VAST_0127-73A.EPOCH01.I.fits', 'VAST_0127-73A.EPOCH08.I.fits'] | ------------------------------------------------------------------+ ----------------------------------+ img_diff | ----------------------------------| ['VAST_0127-73A.EPOCH08.I.fits'] | ['VAST_0127-73A.EPOCH08.I.fits'] | ['VAST_0127-73A.EPOCH08.I.fits'] | ['VAST_0127-73A.EPOCH08.I.fits'] | ['VAST_0127-73A.EPOCH08.I.fits'] | ----------------------------------+

required
min_sigma float

The minimum sigma value acceptable when compared to the minimum rms of the respective image.

required
edge_buffer float

Multiplicative factor to be passed to the 'get_image_rms_measurements' function.

required
p_run Run

The pipeline run.

required

Returns:

Name Type Description
pd.DataFrame

The original input dataframe with the 'img_diff_true_rms' column

pd.DataFrame

added. The column will contain 'NaN' entires for sources that fail.

Columns pd.DataFrame

source - source id, int. img_list - list of images, List. wavg_ra - weighted average RA, float. wavg_dec - weighted average Dec, float. skyreg_img_list - list of sky regions of images in img_list, List. img_diff - The images missing from coverage, List. primary - What should be the first image, str. detection - The first detection image, str. detection_time - Datetime of detection, datetime.datetime. img_diff_time - Datetime of img_diff list, datetime.datetime. img_diff_rms_min - Minimum rms of diff images, float. img_diff_rms_median - Median rms of diff images, float. img_diff_rms_path - rms path of diff images, str. flux_peak - Flux peak of source (detection), float. diff_sigma - SNR in differnce images (compared to minimum), float. img_diff_true_rms - The true rms value from the diff images, float. new_high_sigma - peak flux / true rms value, float.

Source code in vast_pipeline/pipeline/new_sources.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
def new_sources(
    sources_df: pd.DataFrame, missing_sources_df: pd.DataFrame,
    min_sigma: float, edge_buffer: float, p_run: Run
) -> pd.DataFrame:
    """
    Processes the new sources detected to check that they are valid new sources.
    This involves checking to see that the source *should* be seen at all in
    the images where it is not detected. For valid new sources the snr
    value the source would have in non-detected images is also calculated.

    Args:
        sources_df:
            The sources found from the association step.
        missing_sources_df:
            The dataframe containing the 'missing detections' for each source.
            +----------+----------------------------------+-----------+------------+
            |   source | img_list                         |   wavg_ra |   wavg_dec |
            |----------+----------------------------------+-----------+------------+
            |      278 | ['VAST_0127-73A.EPOCH01.I.fits'] |  22.2929  |   -71.8717 |
            |      702 | ['VAST_0127-73A.EPOCH01.I.fits'] |  28.8125  |   -69.3547 |
            |      776 | ['VAST_0127-73A.EPOCH01.I.fits'] |  31.8223  |   -70.4674 |
            |      844 | ['VAST_0127-73A.EPOCH01.I.fits'] |  17.3152  |   -72.346  |
            |      934 | ['VAST_0127-73A.EPOCH01.I.fits'] |   9.75754 |   -72.9629 |
            +----------+----------------------------------+-----------+------------+
            ------------------------------------------------------------------+
             skyreg_img_list                                                  |
            ------------------------------------------------------------------+
             ['VAST_0127-73A.EPOCH01.I.fits', 'VAST_0127-73A.EPOCH08.I.fits'] |
             ['VAST_0127-73A.EPOCH01.I.fits', 'VAST_0127-73A.EPOCH08.I.fits'] |
             ['VAST_0127-73A.EPOCH01.I.fits', 'VAST_0127-73A.EPOCH08.I.fits'] |
             ['VAST_0127-73A.EPOCH01.I.fits', 'VAST_0127-73A.EPOCH08.I.fits'] |
             ['VAST_0127-73A.EPOCH01.I.fits', 'VAST_0127-73A.EPOCH08.I.fits'] |
            ------------------------------------------------------------------+
            ----------------------------------+
             img_diff                         |
            ----------------------------------|
             ['VAST_0127-73A.EPOCH08.I.fits'] |
             ['VAST_0127-73A.EPOCH08.I.fits'] |
             ['VAST_0127-73A.EPOCH08.I.fits'] |
             ['VAST_0127-73A.EPOCH08.I.fits'] |
             ['VAST_0127-73A.EPOCH08.I.fits'] |
            ----------------------------------+
        min_sigma:
            The minimum sigma value acceptable when compared to the minimum
            rms of the respective image.
        edge_buffer:
            Multiplicative factor to be passed to the
            'get_image_rms_measurements' function.
        p_run:
            The pipeline run.

    Returns:
        The original input dataframe with the 'img_diff_true_rms' column
        added. The column will contain 'NaN' entires for sources that fail.
        Columns:
            source - source id, int.
            img_list - list of images, List.
            wavg_ra - weighted average RA, float.
            wavg_dec - weighted average Dec, float.
            skyreg_img_list - list of sky regions of images in img_list, List.
            img_diff - The images missing from coverage, List.
            primary - What should be the first image, str.
            detection - The first detection image, str.
            detection_time - Datetime of detection, datetime.datetime.
            img_diff_time - Datetime of img_diff list, datetime.datetime.
            img_diff_rms_min - Minimum rms of diff images, float.
            img_diff_rms_median - Median rms of diff images, float.
            img_diff_rms_path - rms path of diff images, str.
            flux_peak - Flux peak of source (detection), float.
            diff_sigma - SNR in differnce images (compared to minimum), float.
            img_diff_true_rms - The true rms value from the diff images, float.
            new_high_sigma - peak flux / true rms value, float.
    """
    timer = StopWatch()

    logger.info("Starting new source analysis.")

    cols = [
        'id', 'name', 'noise_path', 'datetime',
        'rms_median', 'rms_min', 'rms_max',
    ]

    images_df = pd.DataFrame(list(
        Image.objects.filter(
            run=p_run
        ).values(*tuple(cols))
    )).set_index('name')

    # Get rid of sources that are not 'new', i.e. sources which the
    # first sky region image is not in the image list
    new_sources_df = missing_sources_df[
        missing_sources_df['in_primary'] == False
    ].drop(
        columns=['in_primary']
    )

    # Check if the previous sources would have actually been seen
    # i.e. are the previous images sensitive enough

    # save the index before exploding
    new_sources_df = new_sources_df.reset_index()

    # Explode now to avoid two loops below
    new_sources_df = new_sources_df.explode('img_diff')

    # Merge the respective image information to the df
    new_sources_df = new_sources_df.merge(
        images_df[['datetime']],
        left_on='detection',
        right_on='name',
        how='left'
    ).rename(columns={'datetime':'detection_time'})

    new_sources_df = new_sources_df.merge(
        images_df[[
            'datetime', 'rms_min', 'rms_median',
            'noise_path'
        ]],
        left_on='img_diff',
        right_on='name',
        how='left'
    ).rename(columns={
        'datetime':'img_diff_time',
        'rms_min': 'img_diff_rms_min',
        'rms_median': 'img_diff_rms_median',
        'noise_path': 'img_diff_rms_path'
    })

    # Select only those images that come before the detection image
    # in time.
    new_sources_df = new_sources_df[
        new_sources_df.img_diff_time < new_sources_df.detection_time
    ]

    # merge the detection fluxes in
    new_sources_df = pd.merge(
        new_sources_df, sources_df[['source', 'image', 'flux_peak']],
        left_on=['source', 'detection'], right_on=['source', 'image'],
        how='left'
    ).drop(columns=['image'])

    # calculate the sigma of the source if it was placed in the
    # minimum rms region of the previous images
    new_sources_df['diff_sigma'] = (
        new_sources_df['flux_peak'].values
        / new_sources_df['img_diff_rms_min'].values
    )

    # keep those that are above the user specified threshold
    new_sources_df = new_sources_df.loc[
        new_sources_df['diff_sigma'] >= min_sigma
    ]

    # Now have list of sources that should have been seen before given
    # previous images minimum rms values.

    # Current inaccurate sky regions may mean that the source
    # was in a previous 'NaN' area of the image. This needs to be
    # checked. Currently the check is done by filtering out of range
    # pixels once the values have been obtained (below).
    # This could be done using MOCpy however this is reasonably
    # fast and the io of a MOC fits may take more time.

    # So these sources will be flagged as new sources, but we can also
    # make a guess of how signficant they are. For this the next step is
    # to measure the true rms at the source location.

    # measure the actual rms in the previous images at
    # the source location.
    new_sources_df = parallel_get_rms_measurements(
        new_sources_df, edge_buffer=edge_buffer
    )

    # this removes those that are out of range
    new_sources_df['img_diff_true_rms'] = (
        new_sources_df['img_diff_true_rms'].fillna(0.)
    )
    new_sources_df = new_sources_df[
        new_sources_df['img_diff_true_rms'] != 0
    ]

    # calculate the true sigma
    new_sources_df['true_sigma'] = (
        new_sources_df['flux_peak'].values
        / new_sources_df['img_diff_true_rms'].values
    )

    # We only care about the highest true sigma
    new_sources_df = new_sources_df.sort_values(
        by=['source', 'true_sigma']
    )

    # keep only the highest for each source, rename for the daatabase
    new_sources_df = (
        new_sources_df
        .drop_duplicates('source')
        .set_index('source')
        .rename(columns={'true_sigma':'new_high_sigma'})
    )

    # moving forward only the new_high_sigma columns is needed, drop all others.
    new_sources_df = new_sources_df[['new_high_sigma']]

    logger.info(
        'Total new source analysis time: %.2f seconds', timer.reset_init()
    )

    return new_sources_df

parallel_get_rms_measurements(df, edge_buffer=1.0)

Wrapper function to use 'get_image_rms_measurements' in parallel with Dask. nbeam is not an option here as that parameter is fixed in forced extraction and so is made sure to be fixed here to. This may change in the future.

Parameters:

Name Type Description Default
df pd.DataFrame

The group of sources to measure in the images.

required
edge_buffer float

Multiplicative factor to be passed to the 'get_image_rms_measurements' function.

1.0

Returns:

Type Description
pd.DataFrame

The original input dataframe with the 'img_diff_true_rms' column

pd.DataFrame

added. The column will contain 'NaN' entires for sources that fail.

Source code in vast_pipeline/pipeline/new_sources.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def parallel_get_rms_measurements(
    df: pd.DataFrame, edge_buffer: float = 1.0
) -> pd.DataFrame:
    """
    Wrapper function to use 'get_image_rms_measurements'
    in parallel with Dask. nbeam is not an option here as that parameter
    is fixed in forced extraction and so is made sure to be fixed here to. This
    may change in the future.

    Args:
        df:
            The group of sources to measure in the images.
        edge_buffer:
            Multiplicative factor to be passed to the
            'get_image_rms_measurements' function.

    Returns:
        The original input dataframe with the 'img_diff_true_rms' column
        added. The column will contain 'NaN' entires for sources that fail.
    """
    out = df[[
        'source', 'wavg_ra', 'wavg_dec',
        'img_diff_rms_path'
    ]]

    col_dtype = {
        'source': 'i',
        'wavg_ra': 'f',
        'wavg_dec': 'f',
        'img_diff_rms_path': 'U',
        'img_diff_true_rms': 'f',
    }

    n_cpu = cpu_count() - 1

    out = (
        dd.from_pandas(out, n_cpu)
        .groupby('img_diff_rms_path')
        .apply(
            get_image_rms_measurements,
            edge_buffer=edge_buffer,
            meta=col_dtype
        ).compute(num_workers=n_cpu, scheduler='processes')
    )

    df = df.merge(
        out[['source', 'img_diff_true_rms']],
        left_on='source', right_on='source',
        how='left'
    )

    return df

Last update: March 2, 2022
Created: March 2, 2022