Source code for tibiawikisql.models.abc

#  Copyright 2021 Allan Galarza
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
"""Module with base classes used by models."""

import abc
import sqlite3

from tibiawikisql import database
from tibiawikisql.api import Article
from tibiawikisql.utils import parse_templatates_data


[docs]class Parseable(Article, metaclass=abc.ABCMeta): """An abstract base class with the common parsing operations. This class is inherited by Models that are parsed directly from a TibiaWiki article. Classes implementing this must override :py:attr:`map` Attributes ---------- article_id: :class:`int` The id of the containing article. title: :class:`str` The title of the containing article. timestamp: :class:`int` The last time the containing article was edited. """ _map = None """map: :class:`dict`: A dictionary mapping the article's attributes to object attributes.""" _template = None """The name of the infobox template containing the data"""
[docs] @classmethod def from_article(cls, article): """Parse an article into a TibiaWiki model. Parameters ---------- article: :class:`Article` The article from where the model is parsed. Returns ------- :class:`abc.Parseable` An inherited model object for the current article. """ if cls._map is None: raise NotImplementedError("Inherited class must override map") if article is None: return None templates = parse_templatates_data(article.content) if cls._template not in templates: return None row = { "article_id": article.article_id, "timestamp": article.timestamp, "title": article.title, "attributes": {}, } attributes = templates[cls._template] row["_raw_attributes"] = {} for attribute, value in attributes.items(): if attribute not in cls._map: row["_raw_attributes"][attribute] = value continue column, func = cls._map[attribute] row[column] = func(value) return cls(**row)
@property def infobox_attributes(self): raise AttributeError
[docs]class Row(metaclass=abc.ABCMeta): """An abstract base class implemented to indicate that the Model represents a SQL row. Attributes ---------- table: :class:`database.Table` The SQL table where this model is stored. """ table = None def __init__(self, **kwargs): for c in self.table.columns: value = kwargs.get(c.name, c.default) # SQLite Booleans are actually stored as 0 or 1, so we convert to true boolean. if isinstance(c.column_type, database.Boolean) and value is not None: value = bool(value) setattr(self, c.name, value) if kwargs.get("_raw_attributes"): self._raw_attributes = kwargs.get("_raw_attributes") def __init_subclass__(cls, table=None): cls.table = table def __repr__(self): attributes = [] for attr in self.__slots__: try: v = getattr(self, attr) if isinstance(v, bytes): continue if v is None: continue attributes.append(f"{attr}={v!r}") except AttributeError: pass return f"<{self.__class__.__name__} {' '.join(attributes)}>" @classmethod def _is_column(cls, name): return name in [c.name for c in cls.table.columns] @classmethod def _get_base_query(cls): return f"SELECT * FROM {cls.table.__tablename__}"
[docs] def insert(self, c): """Insert the current model into its respective database table. Parameters ---------- c: :class:`sqlite3.Cursor`, :class:`sqlite3.Connection` A cursor or connection of the database. """ rows = {} for column in self.table.columns: try: value = getattr(self, column.name) if value == column.default: continue rows[column.name] = value except AttributeError: continue self.table.insert(c, **rows)
[docs] @classmethod def from_row(cls, row): """Return an instance of the model from a row or dictionary. Parameters ---------- row: :class:`dict`, :class:`sqlite3.Row` A dict representing a row or a Row object. Returns ------- :class:`cls` An instance of the class, based on the row. """ if isinstance(row, sqlite3.Row): row = dict(row) return cls(**row)
[docs] @classmethod def get_by_field(cls, c, field, value, use_like=False): """Get an element by a specific field's value. Parameters ---------- c: :class:`sqlite3.Connection`, :class:`sqlite3.Cursor` A connection or cursor of the database. field: :class:`str` The field to filter with. value: The value to look for. use_like: :class:`bool` Whether to use ``LIKE`` as a comparator instead of ``=``. Returns ------- :class:`cls` The object found, or ``None``. Raises ------ ValueError The specified field doesn't exist in the table. """ # This is used to protect the query from possible SQL Injection. if not cls._is_column(field): raise ValueError(f"Field '{field}' doesn't exist.") operator = "LIKE" if use_like else "=" query = f"SELECT * FROM {cls.table.__tablename__} WHERE {field} {operator} ? LIMIT 1" c = c.execute(query, (value,)) c.row_factory = sqlite3.Row row = c.fetchone() if row is None: return None return cls.from_row(row)
[docs] @classmethod def search(cls, c, field=None, value=None, use_like=False, sort_by=None, ascending=True): """Find elements matching the provided values. If no values are provided, it will return all elements. Note that this won't get values found in child tables. Parameters ---------- c: :class:`sqlite3.Connection`, :class:`sqlite3.Cursor` A connection or cursor of the database. field: :class:`str`, optional The field to filter by. value: optional The value to filter by. use_like: :class:`bool`, optional Whether to use ``LIKE`` as a comparator instead of ``=``. sort_by: :class:`str`, optional The column to sort by. ascending: :class:`bool`, optional Whether to sort ascending or descending. Returns ------- list of :class:`cls` A list containing all matching objects. Raises ------ ValueError The specified field doesn't exist in the table. """ if field is not None and not cls._is_column(field): raise ValueError(f"Field '{field}' doesn't exist.") if sort_by is not None and not cls._is_column(sort_by): raise ValueError(f"Field '{sort_by}' doesn't exist.") operator = "LIKE" if use_like else "=" query = cls._get_base_query() tup = () if field is not None: query += f"\nWHERE {field} {operator} ?" tup = (value,) if sort_by is not None: query += f"\nORDER BY {sort_by} {'ASC' if ascending else 'DESC'}" c = c.execute(query, tup) c.row_factory = sqlite3.Row results = [] for row in c.fetchall(): row = cls.from_row(row) if row is not None: results.append(row) return results