Source code for pydent.models.data_associations

"""Models related to data associations."""
import json
import os
import shutil
from typing import Union

import requests

from pydent.base import ModelBase
from pydent.marshaller import add_schema
from pydent.models.crud_mixin import JSONDeleteMixin
from pydent.models.crud_mixin import JSONSaveMixin
from pydent.relationships import HasOne
from pydent.relationships import JSON
from pydent.utils import make_async


# TODO: changing the value of a association (and saving it) shouldn't be difficult
[docs]class DataAssociatorMixin: """Mixin for handling data associations.""" def _make_association(self, key, value, parent, upload=None): association = self.session.DataAssociation.new( object=json.dumps({key: value}), upload=upload, key=key, parent_id=parent.id, parent_class=parent.__class__.__name__, ) association.parent = parent return association
[docs] def associate(self, key, value, upload=None, create_new=False, save=None): """Adds a data association with the key and value to this object. :param key: Key of the association :type key: str :param value: a json serializable object :type value: dict | str | int | float :param upload: optional file to upload :type upload: File :param create_new: if True (default) will create a new association instead of updating the existing association. :tuype create_new: bool :return: newly created or updated data association :rtype: DataAssociation """ if save is None: if self.id is None: save = False else: save = True if create_new: association = None else: association = self.get_data_association(key) if not association: if save: return self.session.utils.create_data_association( self, key, value, upload=upload ) else: association = self._make_association(key, value, self, upload) self.append_to_many("data_associations", association) return association else: association.value = value association.upload = upload association.parent_class = self.__class__.__name__ association.parent_id = self.id association.parent = self if save: self._try_update_data_association(association) return association
def delete_data_associations(self, key): associations = self.get_data_associations(key) for a in associations: if a.id: a.delete() self.data_associations.remove(a) def _try_update_data_association(self, association): upload = association.upload if upload and upload.id: association.upload_id = upload.id if self.id: if association.upload and not association.upload.id: association.upload.save() association.upload_id = association.upload.id association.save() return association # return self.session.utils.create_data_association( # self, key, value, upload=upload # ) # if unique_key: # existing_associations = self.get_data_associations(key) # for association in existing_associations: # association.value = value # association.upload = upload # association.upload_id = upload.id # association.save() # else: # self.session.utils.create_data_association( # self, key, value, upload=upload # ) # def save_data_associations(self): # if self.is_deserialized('data_associations'): #
[docs] def associate_file(self, key, value, file, job_id=None): """Associate a file. :param key: association key :type key: str or json :param value: association value :type value: str or json :param file: file to create :class:`Upload` :type file: file object :param job_id: optional job_id to associate the :class:`Upload` :type job_id: int :return: new data association :rtype: :class:`DataAssociation` """ u = self.session.Upload.new(job_id=job_id, file=file) return self.associate(key, value, upload=u)
[docs] def associate_file_from_path(self, key, value, filepath, job_id=None): """Associate a file from a filepath. :param key: association key :type key: str or json :param value: association value :type value: str or json :param filepath: path to file to create :class:`Upload` :type filepath: str :param job_id: optional job_id to associate the :class:`Upload` :type job_id: int :return: new data association :rtype: :class:`DataAssociation` """ with open(filepath, "rb") as f: return self.associate_file(key, value, f, job_id=job_id)
def get_data_association(self, key: Union[None, str]): das = self.get_data_associations(key) if das: return das[0] def get_data_associations(self, key: Union[None, str]): das = [] if self.data_associations: for da in self.data_associations: if da.key == key: das.append(da) return das # TODO: DataAssociation - do we really want to have this return either a list or # single value? How are people using this? def get(self, key): val = [] for da in self.get_data_associations(key): val.append(da.value) if len(val) == 1: return val[0] elif len(val) == 0: return None return val
class DataAssociationSaveContext: def __init__(self, model: DataAssociatorMixin): self.model = model self.data_associations = [] def __enter__(self): if self.model.is_deserialized("data_associations"): self.data_associations = self.model.data_associations def __exit__(self, exc_type, exc_val, exc_tb): if exc_type: return for da in self.data_associations: da.parent_id = self.model.id self.model._try_update_data_association(da) assert da.id self.model.data_associations = self.data_associations
[docs]@add_schema class DataAssociation(JSONDeleteMixin, JSONSaveMixin, ModelBase): """A DataAssociation model.""" fields = dict(object=JSON(), upload=HasOne("Upload")) @property def value(self): return self.object.get(self.key, None) @value.setter def value(self, new_value): self.object = {self.key: new_value} # def save(self): # if self.upload and not self.upload_id: # self.upload.save() # self.upload_id = self.upload_id # super().save() # # data_association = self.parent.session.DataAssociation.find(self.id) # if data_association.id not in [da.id for da in self.parent.data_associations]: # self.parent.data_associations.append(data_association) # return data_association # else: # for da in self.parent.data_associations: # if da.id == data_association.id: # return da def save(self): if self.parent_id is None: raise ValueError("Cannot save DataAssociation. `parent_id` cannot be None") if self.parent_class is None: raise ValueError( "Cannot save DataAssociation. `parent_class` cannot be None" ) super().save(do_reload=True) def __str__(self): return self._to_str("id", "object")
[docs]@add_schema class Upload(ModelBase): """An Upload model.""" fields = dict(job=HasOne("Job")) def __init__(self, job_id=None, file=None): """Create a new upload. :param job_id: job id to associate the upload to :type job_id: int :param file: file to upload :type file: file object """ super().__init__(job_id=job_id) self.file = file query_hook = dict(methods=["size", "name", "job"]) # def _get_uploads_from_job_id(self, job_id): def _get_uploads_from_job(self): http = self.session._http return http.get("krill/uploads?job={}".format(self.job_id))["uploads"] def temp_url(self): data = self.session.Upload.where({"id": self.id}, methods=["expiring_url"])[ 0 ].raw return data["expiring_url"]
[docs] @staticmethod def _download_file_from_url(url, outpath): """Downloads a file from a url. :param url: url of file :type url: str :param outpath: filepath of out file :type outpath: str :return: http response :rtype: str """ response = requests.get(url, stream=True) with open(outpath, "wb") as out_file: shutil.copyfileobj(response.raw, out_file) return response.raw
[docs] @staticmethod @make_async(1) def async_download(uploads, outdir=None, overwrite=True): """Asynchronously downloads from list of :class:`Upload` models. :param uploads: list of Uploads :type uploads: list :param outdir: path to output directory to save downloaded files :type outdir: str :param overwrite: if True, will overwrite existing files :type overwrite: bool :return: list of filepaths :rtype: list """ return Upload._download_files(uploads, outdir, overwrite)
[docs] @staticmethod def _download_files(uploads, outdir, overwrite): """Downloads uploaded file from list of :class:`Upload` models. :param uploads: list of Uploads :type uploads: list :param outdir: path to output directory to save downloaded files (defaults to current directory) :type outdir: str :param overwrite: if True, will overwrite existing files (default: True) :type overwrite: bool :return: list of filepaths :rtype: list """ filepaths = [] for upload in uploads: filepath = upload.download(outdir=outdir, overwrite=overwrite) filepaths.append(filepath) return filepaths
[docs] def fetch(self, outdir: str = None, filename: str = None, overwrite: bool = True): """Alias for `download` :param outdir: path of directory of output file (default is current directory) :param outfile: filename of output file (defaults to upload_filename) :param overwrite: whether to overwrite file if it already exists :return: filepath of the downloaded file """ return self.download(outdir=outdir, filename=filename, overwrite=overwrite)
[docs] def download( self, outdir: str = None, filename: str = None, overwrite: bool = True ): """Downloads the uploaded file to the specified output directory. If no output directory is specified, the file will be downloaded to the current directory. :param outdir: path of directory of output file (default is current directory) :param outfile: filename of output file (defaults to upload_filename) :param overwrite: whether to overwrite file if it already exists :return: filepath of the downloaded file """ if outdir is None: outdir = "." if filename is None: filename = "{}_{}".format(self.id, self.upload_file_name) filepath = os.path.join(outdir, filename) if not os.path.exists(filepath) or overwrite: self._download_file_from_url(self.temp_url(), filepath) return filepath
@property def data(self): """Return the data associated with the upload.""" result = requests.get(self.temp_url()) return result.content
[docs] def create(self): """Save the upload to the server.""" return self.session.utils.create_upload(self)
def save(self): return self.create()