Source code for populse_db.database

import json
from datetime import date, datetime, time

import dateutil

populse_db_table = "populse_db"


def check_value_type(value, field_type):
    """
    Checks the type of a value

    :param value: value to check

    :param field_type: type that the value is supposed to have

    :return: true if the value is ``None`` or if the value is of that type,
        ``False`` otherwise
    """

    if field_type is None:
        return False
    if value is None:
        return True
    origin = getattr(field_type, "__origin__", None)
    if origin:
        # field_type is a parameterized type such as list[str]
        # origin is the parent type (e.g. list)
        if isinstance(value, origin):
            # The following code works only on list[...]
            # because other parameterized types are not
            # supported.
            item_type = field_type.__args__[0]
            for v in value:
                if not check_value_type(v, item_type):
                    return False
            return True
    elif field_type is float:
        return isinstance(value, int | float)
    else:
        return isinstance(value, field_type)
    return False


[docs] def type_to_str(type): """Convert a Python type to a string. Examples: - ``type_to_str(str) == 'str'`` - ``type_to_str(list[str]) == 'list[str]'`` """ args = getattr(type, "__args__", None) if args: return f"{type.__name__}[{','.join(type_to_str(i) for i in args)}]" else: return type.__name__
_type_to_sqlite = { str: "text", } def type_to_sqlite(type): """ Like type_to_str(type) but for internal use in SQLite column type definitions in order to avoid conversion problems due to SQlite type affinity. See https://www.sqlite.org/datatype3.html """ result = _type_to_sqlite.get(type) if result is None: args = getattr(type, "__args__", None) if args: result = f"{type.__name__}[{','.join(type_to_sqlite(i) for i in args)}]" else: result = type.__name__ return result _str_to_type = { type_to_sqlite(i): i for i in (str, int, float, bool, date, datetime, time, dict, list) } _str_to_type.update( { type_to_str(i): i for i in (str, int, float, bool, date, datetime, time, dict, list) } )
[docs] def str_to_type(str): """Convert a string to a Python type. Examples: - ``str_to_type('str') == str`` - ``str_to_type('list[str]') == list[str]`` """ global _str_to_type if not str: return None s = str.split("[", 1) if len(s) == 1: result = _str_to_type.get(s[0]) else: args = tuple(str_to_type(i) for i in s[1][:-1].split(",")) result = _str_to_type.get(s[0]) if result: result = result[args] if not result: raise ValueError(f'invalid type: "{str}"') return result
def python_value_type(value): """ Returns the Python type corresponding to a Python value. This type can be used in add_field(s) method. For list values, only the first item is considered to get the subtype of the list. Examples: - ``python_value_type('a value') == str`` - ``python_value_type([]) == list`` - ``python_value_type([1, 2, 3]) == list[int]`` - ``python_value_type(['string', 2, {}]) == list[str]`` - ``python_value_type({'one': 1, 'two': 2}) == dict`` """ if isinstance(value, list) and value: return list[type(value[0])] type(value) class DatabaseSession: """ Base class methods: *Database related methods:* - :py:meth:`execute` - :py:meth:`commit` - :py:meth:`rollback` *Database configuration methods:* - :any:`settings` - :py:meth:`set_settings` *Collections related methods* - :py:meth:`add_collection` - :py:meth:`remove_collection` - :py:meth:`has_collection` - :py:meth:`__getitem__` - :py:meth:`collections` *Obsolete methods kept for backward compatibility* - :py:meth:`get_collection` - :py:meth:`get_collections` - :py:meth:`get_collections_names` - :py:meth:`add_field` - :py:meth:`remove_field` - :py:meth:`get_field` - :py:meth:`get_fields_names` - :py:meth:`get_fields` - :py:meth:`set_values` - :py:meth:`has_document` - :py:meth:`get_document` - :py:meth:`get_documents_ids` - :py:meth:`get_documents` - :py:meth:`remove_document` - :py:meth:`add_document` - :py:meth:`filter_documents` .. automethod:: __getitem__ """ default_primary_key = "primary_key" def execute(self, *args, **kwargs): raise NotImplementedError() def commit(self): raise NotImplementedError() def rollback(self): raise NotImplementedError() def settings(self, category, key, default=None): raise NotImplementedError() def set_settings(self, category, key, value): raise NotImplementedError() def add_collection(self, name, primary_key=default_primary_key): """ Adds a collection :param name: New collection name (str, must not be existing) :param primary_key: New collection primary_key column (str) => "index" by default :raise ValueError: - If the collection is already existing - If the collection name is invalid - If the primary_key is invalid """ raise NotImplementedError() def remove_collection(self, name): """ Removes a collection :param name: Collection to remove (str, must be existing) :raise ValueError: If the collection does not exist """ raise NotImplementedError() def has_collection(self, name): """ Check if a collection with the given name exists. """ raise NotImplementedError() def __getitem__(self, collection_name): """Return a collection object given its name.""" raise NotImplementedError() def collections(self): """ Iterates over collections :return: generator """ yield from self def get_collection(self, name): """ .. deprecated:: 3.0 Use ``db_session[name]`` instead """ try: return self[name] except ValueError: return None def get_collections(self): """ .. deprecated:: 3.0 Use :py:meth:`collections()` instead """ return self.collections() def get_collections_names(self): """ .. deprecated:: 3.0 Use ``(i.name for i in db_session)`` instead """ return (i.name for i in self) def add_field(self, collection, name, field_type, description=None, index=False): """ .. deprecated:: 3.0 Use ``db_session[collection].add_field(...)`` instead. See :py:meth:`DatabaseCollection.add_field`. """ self[collection].add_field( name, field_type, description=description, index=index ) def remove_field(self, collection, field): """ .. deprecated:: 3.0 Use ``db_session[collection].remove_field(...)`` instead. See :py:meth:`DatabaseCollection.remove_field`. """ self[collection].remove_field(field) def get_field(self, collection, name): """ .. deprecated:: 3.0 Use ``db_session[collection].fields.get(name)`` instead. See :py:attr:`DatabaseCollection.fields`. """ try: return self[collection].fields.get(name) except ValueError: return None def get_fields_names(self, collection): """ .. deprecated:: 3.0 Use ``db_session[collection].fields.keys()`` instead. See :py:attr:`DatabaseCollection.fields`. """ try: return self[collection].fields.keys() except ValueError: return () def get_fields(self, collection): """ .. deprecated:: 3.0 Use ``db_session[collection].fields.values()`` instead. See :py:attr:`DatabaseCollection.fields`. """ try: return self[collection].fields.values() except ValueError: return () def set_values(self, collection, document_id, values): """ .. deprecated:: 3.0 Use ``db_session[collection].update_document(...)`` instead. See :py:meth:`DatabaseCollection.update_document`. """ self[collection].update_document(document_id, values) def has_document(self, collection, document_id): """ .. deprecated:: 3.0 Use ``db_session[collection].has_document(...)`` instead. See :py:meth:`DatabaseCollection.has_document`. """ return self[collection].has_document(document_id) def get_document(self, collection, document_id, fields=None, as_list=False): """ .. deprecated:: 3.0 Use ``db_session[collection].document(...)`` instead. See :py:meth:`DatabaseCollection.document`. """ try: collection = self[collection] except ValueError: return None return collection.document(document_id, fields, as_list) def get_documents_ids(self, collection): """ .. deprecated:: 3.0 Use ``db_session[collection].documents_ids(...)`` instead. See :py:meth:`DatabaseCollection.documents_ids`. """ try: c = self[collection] except ValueError: return yield from c.documents_ids() def get_documents(self, collection, fields=None, as_list=False, document_ids=None): """ .. deprecated:: 3.0 Use ``db_session[collection].documents_ids(...)`` instead. See :py:meth:`DatabaseCollection.documents_ids`. """ try: c = self[collection] except ValueError: return if document_ids is None: yield from c.documents(fields=fields, as_list=as_list) else: for document_id in document_ids: document = c.get(document_id) if document is not None: yield document def remove_document(self, collection, document_id): """ .. deprecated:: 3.0 Use ``del db_session[collection][document_id]`` instead. See :py:meth:`DatabaseCollection.__delitem__`. """ del self[collection][document_id] def add_document(self, collection, document): """ .. deprecated:: 3.0 Use ``del db_session[collection].add(document)`` instead. See :py:meth:`DatabaseCollection.add`. """ self[collection].add(document) def filter_documents(self, collection, filter_query, fields=None, as_list=False): """ .. deprecated:: 3.0 Use ``del db_session[collection].filter(...)`` instead. See :py:meth:`DatabaseCollection.filter`. """ yield from self[collection].filter(filter_query, fields=fields, as_list=as_list) class DatabaseCollection: def __init__(self, session, name): self.session = session self.name = name self.catchall_column = self.settings().get("catchall_column", "_catchall") self.primary_key = {} self.bad_json_fields = set() self.fields = {} def settings(self): return self.session.settings("collection", self.name, {}) def set_settings(self, settings): self.session.set_settings("collection", self.name, settings) def document_id(self, document_id): if not isinstance(document_id, tuple | list): document_id = (document_id,) if len(document_id) != len(self.primary_key): raise KeyError( f"key for table {self.name} requires {len(self.primary_key)} value(s), {len(document_id)} given" ) return document_id def update_settings(self, **kwargs): settings = self.settings() settings.update(kwargs) self.set_settings(settings) def add_field( self, name, field_type, description=None, index=False, bad_json=False ): """ Adds a field to the database :param collection: Field collection (str, must be existing) :param name: Field name (str, must not be existing) :param field_type: Field type, in ('string', 'int', 'float', 'boolean', 'date', 'datetime', 'time', 'json', 'list_string', 'list_int', 'list_float', 'list_boolean', 'list_date', 'list_datetime', 'list_time', 'list_json') :param description: Field description (str or None) => None by default :param index: Bool to know if indexing must be done => False by default :raise ValueError: - If the collection does not exist - If the field already exists - If the field name is invalid - If the field type is invalid - If the field description is invalid """ raise NotImplementedError() def remove_field(self, name): """ Removes a field in the collection :param collection: Field collection (str, must be existing) :param field: Field name (str, must be existing)) :raise ValueError: - If the collection does not exist - If the field does not exist """ raise NotImplementedError() def update_document(self, document_id, partial_document): raise NotImplementedError() def has_document(self, document_id): raise NotImplementedError() def document(self, document_id, fields=None, as_list=False): raise NotImplementedError() def documents(self, fields=None, as_list=False): raise NotImplementedError() def documents_ids(self): yield from ( i for i in self.documents(fields=tuple(self.primary_key), as_list=True) ) def __iter__(self): return self.documents() def add(self, document, replace=False): raise NotImplementedError() def __setitem__(self, document_id, document): raise NotImplementedError() def _encode_column_value(self, field, value): encoding = self.fields.get(field, {}).get("encoding") if encoding: encode, decode = encoding try: column_value = encode(value) except TypeError: # Error with JSON encoding column_value = ... if column_value is ...: column_value = encode(json_encode(value)) self.bad_json_fields.add(field) settings = self.settings() settings.setdefault("fields", {}).setdefault(field, {})["bad_json"] = ( True ) self.set_settings(settings) return column_value return value def __getitem__(self, document_id): return self.document(document_id) def __delitem__(self, document_id): raise NotImplementedError() def parse_filter(self, filter): raise NotImplementedError() def filter(self, filter, fields=None, as_list=False): """ Iterates over the collection documents selected by filter_query Each item yield is a row of the collection table returned filter_query can be the result of self.filter_query() or a string containing a filter (in this case self.fliter_query() is called to get the actual query) :param filter_query: Filter query (str) - A filter row must be written this way: {<field>} <operator> "<value>" - The operator must be in ('==', '!=', '<=', '>=', '<', '>', 'IN', 'ILIKE', 'LIKE') - The filter rows can be linked with ' AND ' or ' OR ' - Example: "((({BandWidth} == "50000")) AND (({FileName} LIKE "%G1%")))" :param fields: List of fields to retrieve in the document :param as_list: If True, document values are returned in a list using fields order """ raise NotImplementedError() def delete(self, filter): """ Delete documents corresponding to the given filter :param filter_query: Filter query (str) """ raise NotImplementedError() def json_dumps(value): return json.dumps(value, separators=(",", ":")) _json_encodings = { datetime: lambda d: f"{d.isoformat()}ℹdatetimeℹ", date: lambda d: f"{d.isoformat()}ℹdateℹ", time: lambda d: f"{d.isoformat()}ℹtimeℹ", list: lambda l: [json_encode(i) for i in l], # noqa: E741 dict: lambda d: {k: json_encode(v) for k, v in d.items()}, } _json_decodings = { "datetime": lambda s: dateutil.parser.parse(s), "date": lambda s: dateutil.parser.parse(s).date(), "time": lambda s: dateutil.parser.parse(s).time(), } def json_encode(value): global _json_encodings type_ = type(value) encode = _json_encodings.get(type_) if encode is not None: return encode(value) return value def json_decode(value): global _json_decodings if isinstance(value, list): return [json_decode(i) for i in value] elif isinstance(value, dict): return {k: json_decode(v) for k, v in value.items()} elif isinstance(value, str): if value.endswith("ℹ"): split_value = value[:-1].rsplit("ℹ", 1) if len(split_value) == 2: encoded_value, decoding_name = split_value decode = _json_decodings.get(decoding_name) if decode is None: raise ValueError(f'Invalid JSON encoding type for value "{value}"') return decode(encoded_value) return value # Obsolete constants kept for backward compatibility with API v2 FIELD_TYPE_STRING = str FIELD_TYPE_INTEGER = int FIELD_TYPE_FLOAT = float FIELD_TYPE_BOOLEAN = bool FIELD_TYPE_DATE = date FIELD_TYPE_DATETIME = datetime FIELD_TYPE_TIME = time FIELD_TYPE_JSON = dict FIELD_TYPE_LIST_STRING = list[str] FIELD_TYPE_LIST_INTEGER = list[int] FIELD_TYPE_LIST_FLOAT = list[float] FIELD_TYPE_LIST_BOOLEAN = list[bool] FIELD_TYPE_LIST_DATE = list[date] FIELD_TYPE_LIST_DATETIME = list[datetime] FIELD_TYPE_LIST_TIME = list[time] FIELD_TYPE_LIST_JSON = list[dict] ALL_TYPES = { FIELD_TYPE_LIST_STRING, FIELD_TYPE_LIST_INTEGER, FIELD_TYPE_LIST_FLOAT, FIELD_TYPE_LIST_BOOLEAN, FIELD_TYPE_LIST_DATE, FIELD_TYPE_LIST_DATETIME, FIELD_TYPE_LIST_TIME, FIELD_TYPE_LIST_JSON, FIELD_TYPE_STRING, FIELD_TYPE_INTEGER, FIELD_TYPE_FLOAT, FIELD_TYPE_BOOLEAN, FIELD_TYPE_DATE, FIELD_TYPE_DATETIME, FIELD_TYPE_TIME, FIELD_TYPE_JSON, }