Skip to content

loading.py

SQL_update(df, model, index=None, columns=None)

Generate the SQL code required to update the database.

Parameters:

Name Type Description Default
df pd.DataFrame

DataFrame containing the new data to be uploaded to the database. The columns to be updated need to have the same headers between the df and the table in the database.

required
model models.Model

The model that is being updated.

required
index Optional[str]

Header of the column to join on, determines which rows in the different tables match. If None, then use the primary key column.

None
columns Optional[List[str]]

The column headers of the columns to be updated. If None, updates all columns except the index column.

None

Returns:

Type Description
str

The SQL command to update the database.

Source code in vast_pipeline/pipeline/loading.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
def SQL_update(
    df: pd.DataFrame, model: models.Model, index: Optional[str] = None,
    columns: Optional[List[str]] = None
) -> str:
    '''
    Generate the SQL code required to update the database.

    Args:
        df:
            DataFrame containing the new data to be uploaded to the database.
            The columns to be updated need to have the same headers between
            the df and the table in the database.
        model:
            The model that is being updated.
        index:
            Header of the column to join on, determines which rows in the
            different tables match. If None, then use the primary key column.
        columns:
            The column headers of the columns to be updated. If None, updates
            all columns except the index column.

    Returns:
        The SQL command to update the database.
    '''
    # set index and columns if None
    if index is None:
        index = model._meta.pk.name
    if columns is None:
        columns = df.columns.tolist()
        columns.remove(index)

    # get names
    table = model._meta.db_table
    new_columns = ', '.join('new_'+c for c in columns)
    set_columns = ', '.join(c+'=new_'+c for c in columns)

    # get index values and new values
    column_headers = [index]
    column_headers.extend(columns)
    data_arr = df[column_headers].to_numpy()
    values = []
    for row in data_arr:
        val_row = '(' + ', '.join(f'{val}' for val in row) + ')'
        values.append(val_row)
    values = ', '.join(values)

    # update database
    SQL_comm = f"""
        UPDATE {table}
        SET {set_columns}
        FROM (VALUES {values})
        AS new_values (index_col, {new_columns})
        WHERE {index}=index_col;
    """

    return SQL_comm

bulk_upload_model(djmodel, generator, batch_size=10000, return_ids=False)

Bulk upload a list of generator objects of django models to db.

Parameters:

Name Type Description Default
djmodel models.Model

The Django pipeline model to be uploaded.

required
generator Iterable[Generator[models.Model, None, None]]

The generator objects of the model to upload.

required
batch_size int

How many records to upload at once.

10000
return_ids bool

When set to True, the database IDs of the uploaded objects are returned.

False

Returns:

Type Description
List[int]

None or a list of the database IDs of the uploaded objects.

Source code in vast_pipeline/pipeline/loading.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@transaction.atomic
def bulk_upload_model(
    djmodel: models.Model,
    generator: Iterable[Generator[models.Model, None, None]],
    batch_size: int=10_000, return_ids: bool=False
) -> List[int]:
    '''
    Bulk upload a list of generator objects of django models to db.

    Args:
        djmodel:
            The Django pipeline model to be uploaded.
        generator:
            The generator objects of the model to upload.
        batch_size:
            How many records to upload at once.
        return_ids:
            When set to True, the database IDs of the uploaded objects are
            returned.

    Returns:
        None or a list of the database IDs of the uploaded objects.

    '''
    bulk_ids = []
    while True:
        items = list(islice(generator, batch_size))
        if not items:
            break
        out_bulk = djmodel.objects.bulk_create(items)
        logger.info('Bulk created #%i %s', len(out_bulk), djmodel.__name__)
        # save the DB ids to return
        if return_ids:
            bulk_ids.extend(list(map(lambda i: i.id, out_bulk)))

    if return_ids:
        return bulk_ids

make_upload_associations(associations_df)

Uploads the associations from the supplied associations DataFrame.

Parameters:

Name Type Description Default
associations_df pd.DataFrame

DataFrame containing the associations information from the pipeline.

required

Returns:

Type Description
None

None.

Source code in vast_pipeline/pipeline/loading.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def make_upload_associations(associations_df: pd.DataFrame) -> None:
    """
    Uploads the associations from the supplied associations DataFrame.

    Args:
        associations_df:
            DataFrame containing the associations information from the
            pipeline.

    Returns:
        None.
    """
    logger.info('Upload associations...')
    bulk_upload_model(
        Association, association_models_generator(associations_df)
    )

make_upload_images(paths, config, pipeline_run)

Carry the first part of the pipeline, by uploading all the images to the image table and populated band and skyregion objects.

Parameters:

Name Type Description Default
paths Dict[str, Dict[str, str]]

Dictionary containing the image, noise and background paths of all the images in the pipeline run. The primary keys are selavy, 'noise' and 'background' with the secondary key being the image name.

required
config config

The config object of the pipeline run.

required
pipeline_run Run

The pipeline run object.

required

Returns:

Type Description
List[Image]

A list of image objects that have been uploaded along with a DataFrame

pd.DataFrame

containing the information of the sky regions associated with the run.

Source code in vast_pipeline/pipeline/loading.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def make_upload_images(
    paths: Dict[str, Dict[str, str]], config: PipelineConfig, pipeline_run: Run
) -> Tuple[List[Image], pd.DataFrame]:
    '''
    Carry the first part of the pipeline, by uploading all the images
    to the image table and populated band and skyregion objects.

    Args:
        paths:
            Dictionary containing the image, noise and background paths of all
            the images in the pipeline run. The primary keys are `selavy`,
            'noise' and 'background' with the secondary key being the image
            name.
        config (config):
            The config object of the pipeline run.
        pipeline_run:
            The pipeline run object.

    Returns:
        A list of image objects that have been uploaded along with a DataFrame
        containing the information of the sky regions associated with the run.
    '''
    timer = StopWatch()
    images = []
    skyregions = []
    bands = []

    for path in paths['selavy']:
        # STEP #1: Load image and measurements
        image = SelavyImage(
            path,
            paths,
            config
        )
        logger.info('Reading image %s ...', image.name)

        # 1.1 get/create the frequency band
        with transaction.atomic():
            band = get_create_img_band(image)
        if band not in bands:
            bands.append(band)

        # 1.2 create image and skyregion entry in DB
        with transaction.atomic():
            img, skyreg, exists_f = get_create_img(
                pipeline_run, band.id, image
            )

        # add image and skyregion to respective lists
        images.append(img)
        if skyreg not in skyregions:
            skyregions.append(skyreg)
        if exists_f:
            logger.info(
                'Image %s already processed, grab measurements',
                img.name
            )
            # grab the measurements and skip to process next image
            measurements = (
                pd.Series(
                    Measurement.objects.filter(forced=False, image__id=img.id),
                    name='meas_dj'
                )
                .to_frame()
            )
            measurements['id'] = measurements['meas_dj'].apply(lambda x: x.id)
            continue

        # 1.3 get the image measurements and save them in DB
        measurements = image.read_selavy(img)
        logger.info(
            'Processed measurements dataframe of shape: (%i, %i)',
            measurements.shape[0], measurements.shape[1]
        )

        # upload measurements, a column with the db is added to the df
        measurements = make_upload_measurements(measurements)

        # save measurements to parquet file in pipeline run folder
        base_folder = os.path.dirname(img.measurements_path)
        if not os.path.exists(base_folder):
            os.makedirs(base_folder)

        measurements.to_parquet(
            img.measurements_path,
            index=False
        )
        del measurements, image, band, img

    # write images parquet file under pipeline run folder
    images_df = pd.DataFrame(map(lambda x: x.__dict__, images))
    images_df = images_df.drop('_state', axis=1)
    images_df.to_parquet(
        os.path.join(config["run"]["path"], 'images.parquet'),
        index=False
    )
    # write skyregions parquet file under pipeline run folder
    skyregs_df = pd.DataFrame(map(lambda x: x.__dict__, skyregions))
    skyregs_df = skyregs_df.drop('_state', axis=1)
    skyregs_df.to_parquet(
        os.path.join(config["run"]["path"], 'skyregions.parquet'),
        index=False
    )
    # write skyregions parquet file under pipeline run folder
    bands_df = pd.DataFrame(map(lambda x: x.__dict__, bands))
    bands_df = bands_df.drop('_state', axis=1)
    bands_df.to_parquet(
        os.path.join(config["run"]["path"], 'bands.parquet'),
        index=False
    )

    logger.info(
        'Total images upload/loading time: %.2f seconds',
        timer.reset_init()
    )

    return images, skyregs_df

make_upload_measurement_pairs(measurement_pairs_df)

Uploads the measurement pairs from the supplied measurement pairs DataFrame.

Parameters:

Name Type Description Default
measurement_pairs_df pd.DataFrame

DataFrame containing the measurement pairs information from the pipeline.

required

Returns:

Type Description
pd.DataFrame

Original DataFrame with the database ID attached to each row.

Source code in vast_pipeline/pipeline/loading.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
def make_upload_measurement_pairs(
    measurement_pairs_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Uploads the measurement pairs from the supplied measurement pairs
    DataFrame.

    Args:
        measurement_pairs_df:
            DataFrame containing the measurement pairs information from the
            pipeline.

    Returns:
        Original DataFrame with the database ID attached to each row.
    """
    meas_pair_dj_ids = bulk_upload_model(
        MeasurementPair,
        measurement_pair_models_generator(measurement_pairs_df),
        return_ids=True
    )
    measurement_pairs_df["id"] = meas_pair_dj_ids

    return measurement_pairs_df

make_upload_measurements(measurements_df)

Uploads the measurements from the supplied measurements DataFrame.

Parameters:

Name Type Description Default
measurements_df pd.DataFrame

DataFrame containing the measurements information from the pipeline.

required

Returns:

Type Description
pd.DataFrame

Original DataFrame with the database ID attached to each row.

Source code in vast_pipeline/pipeline/loading.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def make_upload_measurements(measurements_df: pd.DataFrame) -> pd.DataFrame:
    """
    Uploads the measurements from the supplied measurements DataFrame.

    Args:
        measurements_df:
            DataFrame containing the measurements information from the
            pipeline.

    Returns:
        Original DataFrame with the database ID attached to each row.
    """
    meas_dj_ids = bulk_upload_model(
        Measurement,
        measurement_models_generator(measurements_df),
        return_ids=True
    )

    measurements_df['id'] = meas_dj_ids

    return measurements_df

Uploads the related sources from the supplied related sources DataFrame.

Parameters:

Name Type Description Default
related_df pd.DataFrame

DataFrame containing the related sources information from the pipeline.

required

Returns:

Type Description
None

None.

Source code in vast_pipeline/pipeline/loading.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def make_upload_related_sources(related_df: pd.DataFrame) -> None:
    """
    Uploads the related sources from the supplied related sources DataFrame.

    Args:
        related_df:
            DataFrame containing the related sources information from the
            pipeline.

    Returns:
        None.
    """
    logger.info('Populate "related" field of sources...')
    bulk_upload_model(RelatedSource, related_models_generator(related_df))

make_upload_sources(sources_df, pipeline_run, add_mode=False)

Delete previous sources for given pipeline run and bulk upload new found sources as well as related sources.

Parameters:

Name Type Description Default
sources_df pd.DataFrame

Holds the measurements associated into sources. The output of of thE association step.

required
pipeline_run Run

The pipeline Run object.

required
add_mode bool

Whether the pipeline is running in add image mode.

False

Returns:

Type Description
pd.DataFrame

The input dataframe with the 'id' column added.

Source code in vast_pipeline/pipeline/loading.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def make_upload_sources(
    sources_df: pd.DataFrame, pipeline_run: Run, add_mode: bool = False
) -> pd.DataFrame:
    '''
    Delete previous sources for given pipeline run and bulk upload
    new found sources as well as related sources.

    Args:
        sources_df:
            Holds the measurements associated into sources. The output of of
            thE association step.
        pipeline_run:
            The pipeline Run object.
        add_mode:
            Whether the pipeline is running in add image mode.

    Returns:
        The input dataframe with the 'id' column added.
    '''
    # create sources in DB
    with transaction.atomic():
        if (add_mode is False and
                Source.objects.filter(run=pipeline_run).exists()):
            logger.info('Removing objects from previous pipeline run')
            n_del, detail_del = (
                Source.objects.filter(run=pipeline_run).delete()
            )
            logger.info(
                ('Deleting all sources and related objects for this run. '
                 'Total objects deleted: %i'),
                n_del,
            )
            logger.debug('(type, #deleted): %s', detail_del)

    src_dj_ids = bulk_upload_model(
        Source,
        source_models_generator(sources_df, pipeline_run=pipeline_run),
        return_ids=True
    )

    sources_df['id'] = src_dj_ids

    return sources_df

update_sources(sources_df, batch_size=10000)

Update database using SQL code. This function opens one connection to the database, and closes it after the update is done.

Parameters:

Name Type Description Default
sources_df pd.DataFrame

DataFrame containing the new data to be uploaded to the database. The columns to be updated need to have the same headers between the df and the table in the database.

required
batch_size int

The df rows are broken into chunks, each chunk is executed in a separate SQL command, batch_size determines the maximum size of the chunk.

10000

Returns:

Type Description
pd.DataFrame

DataFrame containing the new data to be uploaded to the database.

Source code in vast_pipeline/pipeline/loading.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
def update_sources(
    sources_df: pd.DataFrame, batch_size: int = 10_000
) -> pd.DataFrame:
    '''
    Update database using SQL code. This function opens one connection to the
    database, and closes it after the update is done.

    Args:
        sources_df:
            DataFrame containing the new data to be uploaded to the database.
            The columns to be updated need to have the same headers between
            the df and the table in the database.
        batch_size:
            The df rows are broken into chunks, each chunk is executed in a
            separate SQL command, batch_size determines the maximum size of the
            chunk.

    Returns:
        DataFrame containing the new data to be uploaded to the database.
    '''
    # Get all possible columns from the model
    all_source_table_cols = [
        fld.attname for fld in Source._meta.get_fields()
        if getattr(fld, 'attname', None) is not None
    ]

    # Filter to those present in sources_df
    columns = [
        col for col in all_source_table_cols if col in sources_df.columns
    ]

    sources_df['id'] = sources_df.index.values

    batches = np.ceil(len(sources_df)/batch_size)
    dfs = np.array_split(sources_df, batches)
    with connection.cursor() as cursor:
        for df_batch in dfs:
            SQL_comm = SQL_update(
                df_batch, Source, index='id', columns=columns
            )
            cursor.execute(SQL_comm)

    return sources_df

Last update: March 2, 2022
Created: March 2, 2022