Skip to content

runpipeline.py

The main command to launch the processing of a pipeline run.

Usage: ./manage.py runpipeline pipeline_run_name

Command

Bases: BaseCommand

This script is used to process images with the ASKAP transient pipeline. Use --help for usage, and refer README.

Source code in vast_pipeline/management/commands/runpipeline.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
class Command(BaseCommand):
    """
    This script is used to process images with the ASKAP transient pipeline.
    Use --help for usage, and refer README.
    """
    help = 'Process the pipeline for a list of images and Selavy catalogs'

    def add_arguments(self, parser: ArgumentParser) -> None:
        """
        Enables arguments for the command.

        Args:
            parser (ArgumentParser): The parser object of the command.

        Returns:
            None
        """
        # positional arguments
        parser.add_argument(
            'piperun',
            type=str,
            help='Path or name of the pipeline run.'
        )

        parser.add_argument(
            '--full-rerun',
            required=False,
            default=False,
            action='store_true',
            help=(
                'Flag to signify that a full re-run is requested.'
                ' Old data is completely removed and replaced.')
        )

    def handle(self, *args, **options) -> None:
        """
        Handle function of the command.

        Args:
            *args: Variable length argument list.
            **options: Variable length options.

        Returns:
            None
        """
        p_run_name, run_folder = get_p_run_name(
            options['piperun'],
            return_folder=True
        )
        # configure logging
        root_logger = logging.getLogger('')
        f_handler = logging.FileHandler(
            os.path.join(run_folder, 'log.txt'),
            mode='w'
        )
        f_handler.setFormatter(root_logger.handlers[0].formatter)
        root_logger.addHandler(f_handler)

        if options['verbosity'] > 1:
            # set root logger to use the DEBUG level
            root_logger.setLevel(logging.DEBUG)
            # set the traceback on
            options['traceback'] = True

        # p_run_name = p_run_path
        # remove ending / if present
        if p_run_name[-1] == '/':
            p_run_name = p_run_name[:-1]
        # grab only the name from the path
        p_run_name = p_run_name.split(os.path.sep)[-1]

        debug_flag = True if options['verbosity'] > 1 else False

        _ = run_pipe(
            p_run_name,
            path_name=run_folder,
            debug=debug_flag,
            full_rerun=options["full_rerun"],
        )

        self.stdout.write(self.style.SUCCESS('Finished'))

add_arguments(self, parser)

Enables arguments for the command.

Parameters:

Name Type Description Default
parser ArgumentParser

The parser object of the command.

required

Returns:

Type Description
None

None

Source code in vast_pipeline/management/commands/runpipeline.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def add_arguments(self, parser: ArgumentParser) -> None:
    """
    Enables arguments for the command.

    Args:
        parser (ArgumentParser): The parser object of the command.

    Returns:
        None
    """
    # positional arguments
    parser.add_argument(
        'piperun',
        type=str,
        help='Path or name of the pipeline run.'
    )

    parser.add_argument(
        '--full-rerun',
        required=False,
        default=False,
        action='store_true',
        help=(
            'Flag to signify that a full re-run is requested.'
            ' Old data is completely removed and replaced.')
    )

handle(self, *args, **options)

Handle function of the command.

Parameters:

Name Type Description Default
*args

Variable length argument list.

()
**options

Variable length options.

{}

Returns:

Type Description
None

None

Source code in vast_pipeline/management/commands/runpipeline.py
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
def handle(self, *args, **options) -> None:
    """
    Handle function of the command.

    Args:
        *args: Variable length argument list.
        **options: Variable length options.

    Returns:
        None
    """
    p_run_name, run_folder = get_p_run_name(
        options['piperun'],
        return_folder=True
    )
    # configure logging
    root_logger = logging.getLogger('')
    f_handler = logging.FileHandler(
        os.path.join(run_folder, 'log.txt'),
        mode='w'
    )
    f_handler.setFormatter(root_logger.handlers[0].formatter)
    root_logger.addHandler(f_handler)

    if options['verbosity'] > 1:
        # set root logger to use the DEBUG level
        root_logger.setLevel(logging.DEBUG)
        # set the traceback on
        options['traceback'] = True

    # p_run_name = p_run_path
    # remove ending / if present
    if p_run_name[-1] == '/':
        p_run_name = p_run_name[:-1]
    # grab only the name from the path
    p_run_name = p_run_name.split(os.path.sep)[-1]

    debug_flag = True if options['verbosity'] > 1 else False

    _ = run_pipe(
        p_run_name,
        path_name=run_folder,
        debug=debug_flag,
        full_rerun=options["full_rerun"],
    )

    self.stdout.write(self.style.SUCCESS('Finished'))

run_pipe(name, path_name=None, run_dj_obj=None, cli=True, debug=False, user=None, full_rerun=False, prev_ui_status='END')

Main function to run the pipeline.

Parameters:

Name Type Description Default
name str

The name of the pipeline run (p_run.name).

required
path_name Optional[str]

The path of the directory of the pipeline run (p_run.path), defaults to None.

None
run_dj_obj Optional[Run]

The Run object of the pipeline run, defaults to None.

None
cli bool

Flag to signify whether the pipeline run has been run via the UI (False), or the command line (True). Defaults to True.

True
debug bool

Flag to signify whether to enable debug verbosity to the logging output. Defaults to False.

False
user Optional[User]

The User of the request if made through the UI. Defaults to None.

None
full_rerun bool

If the run already exists, a complete rerun will be performed which will remove and replace all the previous results.

False
prev_ui_status str

The previous status through the UI. Defaults to 'END'.

'END'

Returns:

Type Description
bool

Boolean equal to True on a successful completion, or in cases of

bool

failures a CommandError is returned.

Source code in vast_pipeline/management/commands/runpipeline.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
def run_pipe(
    name: str, path_name: Optional[str] = None,
    run_dj_obj: Optional[Run] = None, cli: bool = True,
    debug: bool = False, user: Optional[User] = None, full_rerun: bool = False,
    prev_ui_status: str = 'END'
) -> bool:
    '''
    Main function to run the pipeline.

    Args:
        name:
            The name of the pipeline run (p_run.name).
        path_name:
            The path of the directory of the pipeline run (p_run.path),
            defaults to None.
        run_dj_obj:
            The Run object of the pipeline run, defaults to None.
        cli:
            Flag to signify whether the pipeline run has been run via the UI
            (False), or the command line (True). Defaults to True.
        debug:
            Flag to signify whether to enable debug verbosity to the logging
            output. Defaults to False.
        user:
            The User of the request if made through the UI. Defaults to None.
        full_rerun:
            If the run already exists, a complete rerun will be performed which
            will remove and replace all the previous results.
        prev_ui_status:
            The previous status through the UI. Defaults to 'END'.

    Returns:
        Boolean equal to `True` on a successful completion, or in cases of
        failures a CommandError is returned.
    '''
    path = run_dj_obj.path if run_dj_obj else path_name
    # set up logging for running pipeline from UI
    if not cli:
        # set up the logger for the UI job
        root_logger = logging.getLogger('')
        if debug:
            root_logger.setLevel(logging.DEBUG)
        f_handler = logging.FileHandler(
            os.path.join(path, 'log.txt'),
            mode='w'
        )
        f_handler.setFormatter(root_logger.handlers[0].formatter)
        root_logger.addHandler(f_handler)

    pipeline = Pipeline(
        name=run_dj_obj.name if run_dj_obj else name,
        config_path=os.path.join(path, 'config.yaml'),
        validate_config=False,  # delay validation
    )

    # Create the pipeline run in DB
    p_run, flag_exist = get_create_p_run(
        pipeline.name,
        pipeline.config["run"]["path"],
    )

    # copy across config file at the start
    logger.debug("Copying temp config file.")
    shutil.copyfile(
        os.path.join(p_run.path, 'config.yaml'),
        os.path.join(p_run.path, 'config_temp.yaml')
    )

    # validate run configuration
    try:
        pipeline.config.validate(user=user)
    except PipelineConfigError as e:
        if debug:
            traceback.print_exc()
        logger.exception('Config error:\n%s', e)
        msg = f'Config error:\n{e}'
        # If the run is already created (e.g. through UI) then set status to
        # error
        pipeline.set_status(p_run, 'ERR')
        raise CommandError(msg) if cli else PipelineConfigError(msg)

    # clean up pipeline images and forced measurements for re-runs
    # Scenarios:
    # A. Complete Re-run: If the job is marked as successful then backup
    # old parquets and proceed to remove parquets along with forced
    # extractions from the database.
    # B. Additional Run on successful run: Backup parquets, remove current
    # parquets and proceed.
    # C. Additional Run on errored run: Do not backup parquets, just delete
    # current.

    # Flag on the pipeline object on whether the addition mode is on or off.
    pipeline.add_mode = False
    pipeline.previous_parquets = {}

    if not flag_exist:
        # check for and remove any present .parquet (and .arrow) files
        parquets = (
            glob.glob(os.path.join(p_run.path, "*.parquet"))
            # TODO Remove arrow when vaex support is dropped.
            + glob.glob(os.path.join(p_run.path, "*.arrow"))
            + glob.glob(os.path.join(p_run.path, "*.bak"))
        )
        for parquet in parquets:
            os.remove(parquet)
    else:
        # Check if the status is already running or queued. Exit if this is the
        # case.
        if p_run.status in ['RUN', 'RES']:
            logger.error(
                "The pipeline run requested to process already has a running"
                " or restoring status! Performing no actions. Exiting."
            )
            return True

        # Check for an error status and whether any previous config file
        # exists - if it doesn't exist it means the run has failed during
        # the first run. In this case we want to clear anything that has gone
        # on before so to do that `complete-rerun` mode is activated.
        if p_run.status == "ERR" and not os.path.isfile(
            os.path.join(p_run.path, "config_prev.yaml")
        ):
            full_rerun = True

        # Backup the previous run config
        if os.path.isfile(
            os.path.join(p_run.path, 'config_prev.yaml')
        ):
            shutil.copy(
                os.path.join(p_run.path, 'config_prev.yaml'),
                os.path.join(p_run.path, 'config.yaml.bak')
            )

        # Check if the run has only been initialised, if so we don't want to do
        # any previous run checks or cleaning.
        if p_run.status == 'INI':
            initial_run = True
        # check if coming from UI
        elif cli is False and prev_ui_status == 'INI':
            initial_run = True
        else:
            initial_run = False

        if initial_run is False:
            parquets = (
                glob.glob(os.path.join(p_run.path, "*.parquet"))
                # TODO Remove arrow when arrow files are no longer needed.
                + glob.glob(os.path.join(p_run.path, "*.arrow"))
            )

            if full_rerun:
                if p_run.status == 'END':
                    backup_parquets(p_run.path)
                logger.info(
                    'Cleaning up pipeline run before re-process data'
                )
                p_run.image_set.clear()

                logger.info(
                    'Cleaning up forced measurements before re-process data'
                )
                remove_forced_meas(p_run.path)

                for parquet in parquets:
                    os.remove(parquet)

                # remove bak files
                bak_files = glob.glob(os.path.join(p_run.path, "*.bak"))
                if bak_files:
                    for bf in bak_files:
                        os.remove(bf)

                # remove previous config if it exists
                if os.path.isfile(os.path.join(p_run.path, 'config_prev.yaml')):
                    os.remove(os.path.join(p_run.path, 'config_prev.yaml'))

                # reset epoch_based flag
                with transaction.atomic():
                    p_run.epoch_based = False
                    p_run.save()
            else:
                # Before parquets are started to be copied and backed up, a
                # check is run to see if anything has actually changed in
                # the config
                config_diff = pipeline.config.check_prev_config_diff()
                if config_diff:
                    logger.info(
                        "The config file has either not changed since the"
                        " previous run or other settings have changed such"
                        " that a new or complete re-run should be performed"
                        " instead. Performing no actions. Exiting."
                    )
                    os.remove(os.path.join(p_run.path, 'config_temp.yaml'))
                    pipeline.set_status(p_run, 'END')

                    return True

                if pipeline.config.epoch_based != p_run.epoch_based:
                    logger.info(
                        "The 'epoch based' setting has changed since the"
                        " previous run. A complete re-run is required if"
                        " changing to epoch based mode or vice versa."
                    )
                    os.remove(os.path.join(p_run.path, 'config_temp.yaml'))
                    pipeline.set_status(p_run, 'END')
                    return True

                if cli and p_run.status == 'END':
                    backup_parquets(p_run.path)
                elif not cli and prev_ui_status == 'END':
                    backup_parquets(p_run.path)

                pipeline.add_mode = True
                for i in [
                    'images', 'associations', 'sources', 'relations',
                    'measurement_pairs'
                ]:
                    pipeline.previous_parquets[i] = os.path.join(
                        p_run.path, f'{i}.parquet.bak')

    if pipeline.config["run"]["suppress_astropy_warnings"]:
        warnings.simplefilter("ignore", category=AstropyWarning)

    logger.info("VAST Pipeline version: %s", pipeline_version)
    logger.info("Source finder: %s", pipeline.config["measurements"]["source_finder"])
    logger.info("Using pipeline run '%s'", pipeline.name)
    logger.info("Source monitoring: %s", pipeline.config["source_monitoring"]["monitor"])

    # log the list of input data files for posterity
    input_image_list = [
        image
        for image_list in pipeline.config["inputs"]["image"].values()
        for image in image_list
    ]
    input_selavy_list = [
        selavy
        for selavy_list in pipeline.config["inputs"]["selavy"].values()
        for selavy in selavy_list
    ]
    input_noise_list = [
        noise
        for noise_list in pipeline.config["inputs"]["noise"].values()
        for noise in noise_list
    ]
    if "background" in pipeline.config["inputs"].keys():
        input_background_list = [
            background
            for background_list in pipeline.config["inputs"]["background"].values()
            for background in background_list
        ]
    else:
        input_background_list = ["N/A", ] * len(input_image_list)
    for image, selavy, noise, background in zip(
        input_image_list, input_selavy_list, input_noise_list, input_background_list
    ):
        logger.info(
            "Matched inputs - image: %s, selavy: %s, noise: %s, background: %s",
            image,
            selavy,
            noise,
            background,
        )

    stopwatch = StopWatch()

    # run the pipeline operations
    try:
        # check if max runs number is reached
        pipeline.check_current_runs()
        # run the pipeline
        pipeline.set_status(p_run, 'RUN')
        pipeline.process_pipeline(p_run)
        # Create arrow file after success if selected.
        if pipeline.config["measurements"]["write_arrow_files"]:
            create_measurements_arrow_file(p_run)
            create_measurement_pairs_arrow_file(p_run)
    except Exception as e:
        # set the pipeline status as error
        pipeline.set_status(p_run, 'ERR')
        logger.exception('Processing error:\n%s', e)
        raise CommandError(f'Processing error:\n{e}')

    # copy across config file now that it is successful
    logger.debug("Copying and cleaning temp config file.")
    shutil.copyfile(
        os.path.join(p_run.path, 'config_temp.yaml'),
        os.path.join(p_run.path, 'config_prev.yaml'))
    os.remove(os.path.join(p_run.path, 'config_temp.yaml'))

    # set the pipeline status as completed
    pipeline.set_status(p_run, 'END')

    logger.info(
        'Total pipeline processing time %.2f sec',
        stopwatch.reset()
    )

    return True

Last update: March 2, 2022
Created: March 2, 2022