Skip to content

createmeasarrow.py

This module defines the command for creating an arrow output file for a previously completed pipeline run.

Command

Bases: BaseCommand

This command creates measurements and measurement_pairs arrow files for a completed pipeline run.

Source code in vast_pipeline/management/commands/createmeasarrow.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class Command(BaseCommand):
    """
    This command creates measurements and measurement_pairs arrow files for a
    completed pipeline run.
    """
    help = (
        'Create `measurements.arrow` and `measurement_pairs.arrow` files for a'
        ' completed pipeline run.'
    )

    def add_arguments(self, parser: ArgumentParser) -> None:
        """
        Enables arguments for the command.

        Args:
            parser (ArgumentParser): The parser object of the command.

        Returns:
            None
        """
        # positional arguments
        parser.add_argument(
            'piperun',
            type=str,
            help='Path or name of the pipeline run.'
        )

        parser.add_argument(
            '--overwrite',
            action='store_true',
            required=False,
            default=False,
            help="Overwrite previous 'measurements.arrow' file.",
        )

    def handle(self, *args, **options) -> None:
        """
        Handle function of the command.

        Args:
            *args: Variable length argument list.
            **options: Variable length options.

        Returns:
            None
        """
        # configure logging
        if options['verbosity'] > 1:
            # set root logger to use the DEBUG level
            root_logger = logging.getLogger('')
            root_logger.setLevel(logging.DEBUG)
            # set the traceback on
            options['traceback'] = True

        piperun = options['piperun']

        p_run_name, run_folder = get_p_run_name(
            piperun,
            return_folder=True
        )
        try:
            p_run = Run.objects.get(name=p_run_name)
        except Run.DoesNotExist:
            raise CommandError(f'Pipeline run {p_run_name} does not exist')

        if p_run.status != 'END':
            raise CommandError(f'Pipeline run {p_run_name} has not completed.')

        measurements_arrow = os.path.join(run_folder, 'measurements.arrow')
        measurement_pairs_arrow = os.path.join(
            run_folder, 'measurement_pairs.arrow'
        )

        if os.path.isfile(measurements_arrow):
            if options['overwrite']:
                logger.info("Removing previous 'measurements.arrow' file.")
                os.remove(measurements_arrow)
            else:
                raise CommandError(
                    f'Measurements arrow file already exists for {p_run_name}'
                    ' and `--overwrite` has not been selected.'
                )

        if os.path.isfile(measurement_pairs_arrow):
            if options['overwrite']:
                logger.info(
                    "Removing previous 'measurement_pairs.arrow' file."
                )
                os.remove(measurement_pairs_arrow)
            else:
                raise CommandError(
                    'Measurement pairs arrow file already exists for'
                    f' {p_run_name} and `--overwrite` has not been selected.'
                )

        logger.info("Creating measurements arrow file for '%s'.", p_run_name)

        create_measurements_arrow_file(p_run)

        logger.info(
            "Creating measurement pairs arrow file for '%s'.", p_run_name
        )

        create_measurement_pairs_arrow_file(p_run)

add_arguments(self, parser)

Enables arguments for the command.

Parameters:

Name Type Description Default
parser ArgumentParser

The parser object of the command.

required

Returns:

Type Description
None

None

Source code in vast_pipeline/management/commands/createmeasarrow.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def add_arguments(self, parser: ArgumentParser) -> None:
    """
    Enables arguments for the command.

    Args:
        parser (ArgumentParser): The parser object of the command.

    Returns:
        None
    """
    # positional arguments
    parser.add_argument(
        'piperun',
        type=str,
        help='Path or name of the pipeline run.'
    )

    parser.add_argument(
        '--overwrite',
        action='store_true',
        required=False,
        default=False,
        help="Overwrite previous 'measurements.arrow' file.",
    )

handle(self, *args, **options)

Handle function of the command.

Parameters:

Name Type Description Default
*args

Variable length argument list.

()
**options

Variable length options.

{}

Returns:

Type Description
None

None

Source code in vast_pipeline/management/commands/createmeasarrow.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def handle(self, *args, **options) -> None:
    """
    Handle function of the command.

    Args:
        *args: Variable length argument list.
        **options: Variable length options.

    Returns:
        None
    """
    # configure logging
    if options['verbosity'] > 1:
        # set root logger to use the DEBUG level
        root_logger = logging.getLogger('')
        root_logger.setLevel(logging.DEBUG)
        # set the traceback on
        options['traceback'] = True

    piperun = options['piperun']

    p_run_name, run_folder = get_p_run_name(
        piperun,
        return_folder=True
    )
    try:
        p_run = Run.objects.get(name=p_run_name)
    except Run.DoesNotExist:
        raise CommandError(f'Pipeline run {p_run_name} does not exist')

    if p_run.status != 'END':
        raise CommandError(f'Pipeline run {p_run_name} has not completed.')

    measurements_arrow = os.path.join(run_folder, 'measurements.arrow')
    measurement_pairs_arrow = os.path.join(
        run_folder, 'measurement_pairs.arrow'
    )

    if os.path.isfile(measurements_arrow):
        if options['overwrite']:
            logger.info("Removing previous 'measurements.arrow' file.")
            os.remove(measurements_arrow)
        else:
            raise CommandError(
                f'Measurements arrow file already exists for {p_run_name}'
                ' and `--overwrite` has not been selected.'
            )

    if os.path.isfile(measurement_pairs_arrow):
        if options['overwrite']:
            logger.info(
                "Removing previous 'measurement_pairs.arrow' file."
            )
            os.remove(measurement_pairs_arrow)
        else:
            raise CommandError(
                'Measurement pairs arrow file already exists for'
                f' {p_run_name} and `--overwrite` has not been selected.'
            )

    logger.info("Creating measurements arrow file for '%s'.", p_run_name)

    create_measurements_arrow_file(p_run)

    logger.info(
        "Creating measurement pairs arrow file for '%s'.", p_run_name
    )

    create_measurement_pairs_arrow_file(p_run)

Last update: March 2, 2022
Created: March 2, 2022