Provides the WHOIS database interface.

from datetime import datetime

from sqlalchemy.orm.exc import NoResultFound

import PyFunceble
from PyFunceble.engine.database.loader import session
from PyFunceble.engine.database.schemas import WhoisRecord

[docs]class WhoisDB: """ Provides the WHOIS database interface and logic. """ database = {} database_file = None def __init__(self, parent_process=False): # Get the authorization. self.database_file = "" if PyFunceble.CONFIGURATION.db_type == "json": # We set the location of the database file. self.database_file = "{0}{1}".format( PyFunceble.CONFIG_DIRECTORY, PyFunceble.OUTPUTS.default_files.whois_db ) self.parent = parent_process PyFunceble.LOGGER.debug(f"DB (File): {self.database_file}") # We load the configuration. self.load() def __contains__(self, index): if self.authorized: if PyFunceble.CONFIGURATION.db_type == "json": if index in self.database:"{index} is present into the database.") return True"{index} is not present into the database.") return False if PyFunceble.CONFIGURATION.db_type in ["mariadb", "mysql"]: with session.Session() as db_session: try: # pylint: disable=no-member _ = ( db_session.query(WhoisRecord).filter( WhoisRecord.subject == index ) ).one()"{index} is present into the database.") return True except NoResultFound: f"{index} is not present into the database." ) return False # pragma: no cover def __getitem__(self, index): if self.authorized: if PyFunceble.CONFIGURATION.db_type == "json": if index in self.database: return self.database[index] return None if PyFunceble.CONFIGURATION.db_type in ["mariadb", "mysql"]: fetched = None with session.Session() as db_session: try: # pylint: disable=no-member fetched = ( db_session.query(WhoisRecord).filter( WhoisRecord.subject == index ) ).one() return { "epoch": fetched.epoch, "expiration_date": fetched.expiration_date, "state": fetched.state, "record": fetched.record, } except NoResultFound: pass return None # pragma: no cover def __setitem_json(self, index, value): actual_value = self[index] if not PyFunceble.CONFIGURATION.store_whois_record and "record" in value: del value["record"] if isinstance(actual_value, dict): if isinstance(value, dict): self.database[index].update(value) else: # pragma: no cover self.database[index] = value elif isinstance(actual_value, list): if isinstance(value, list): # pragma: no cover self.database[index].extend(value) else: # pragma: no cover self.database[index].append(value) else: self.database[index] = value f"Inserted {repr(value)} into the subset of {repr(index)}" ) @classmethod def __setitem_mysql(cls, index, value): with session.Session() as db_session: try: # pylint: disable=no-member record = ( db_session.query(WhoisRecord) .filter(WhoisRecord.subject == index) .one() ) except NoResultFound: record = WhoisRecord( subject=index, ) for db_key, db_value in value.items(): if not PyFunceble.CONFIGURATION.store_whois_record and db_key == "record": continue setattr(record, db_key, db_value) with session.Session() as db_session: # pylint: disable=no-member db_session.add(record) db_session.commit()"Inserted into the database: \n {value}") def __setitem__(self, index, value): if self.authorized: if PyFunceble.CONFIGURATION.db_type == "json": self.__setitem_json(index, value) elif PyFunceble.CONFIGURATION.db_type in ["mariadb", "mysql"]: self.__setitem_mysql(index, value) @property def authorized(self): """ Provides the operation authorization. """ return ( not PyFunceble.CONFIGURATION.no_whois and PyFunceble.CONFIGURATION.whois_database )
[docs] @classmethod def merge(cls, old): # pragma: no cover """ Merges the older version of the database into the new version. :param dict old: The old version of the database. :return: The database in the new format. :rtype: dict """ # We initiate a local place to save our results. result = {} if PyFunceble.CONFIGURATION.db_type == "json": for index, data in old.items(): # We loop through all indexes and data of the database. if isinstance(data, dict) and "epoch" in data: # The epoch index is present into the currently # read dataset. # We create the copy of the dataset for our result. result[index] = data continue if isinstance(data, dict): # The read data is a dict. # We save the content of of the currently read dataset # into the upstream index. result.update(data) # We return the result. return result
[docs] def load(self): """ Loads the database file into the database. """ if ( self.authorized and PyFunceble.helpers.File(self.database_file).exists() and PyFunceble.CONFIGURATION.db_type == "json" ): # * We are authorized to operate. # and # * The database file exists. # We merge our current database into already initiated one. self.database.update( self.merge(PyFunceble.helpers.Dict().from_json_file(self.database_file)) ) "Database content loaded in memory. (DATASET WONT BE LOGGED)" ) # As changes can happen because of the merging, we directly saved # the loaded data.
[docs] def save(self): """ Saves the database into the database file. """ if ( self.authorized and self.parent and PyFunceble.CONFIGURATION.db_type == "json" ): # We are authorized to operate. # We save the current state of the datbase. PyFunceble.helpers.Dict(self.database).to_json_file(self.database_file)"Saved database into {repr(self.database_file)}.")
[docs] def is_time_older(self, subject): """ Checks if the expiration time of the given subject is older. :param str subject: The subject we are working with. .. note:: Of course, we imply that the subject is in the database. """ data = self[subject] return ( self.authorized and data and "epoch" in data and datetime.fromtimestamp(float(data["epoch"])) < datetime.utcnow() )
[docs] def get_expiration_date(self, subject): """ Gets the expiration date of the given subject. :param str subject: The subject we are working with. :return: The expiration date from the database. :rtype: tuple """ if self.authorized and self[subject] and not self.is_time_older(subject): # * We are authorized to work. # and # * The element we are testing is in the database. # and # * The expiration date is in the future. if "record" in self[subject]: whois_record = self[subject]["record"] else: whois_record = None try: # We return the expiration date. return self[subject]["expiration_date"], whois_record except KeyError: # pragma: no cover pass # We return None, there is no data to work with. return None, None
[docs] def add(self, subject, expiration_date, server, record=None): """ Adds the given subject and expiration date to the database. :param str subject: The subject we are working with. :param str expiration_date: The extracted expiration date. :param str server: The server we got the record from. :param str record: The WHOIS record. """ if self.authorized and expiration_date: # * We are authorized to operate. # and # * The expiration date is not empty nor None. # We initiate what we are going to save into the database data = { "epoch": datetime.strptime(expiration_date, "%d-%b-%Y").timestamp(), "expiration_date": expiration_date, "server": server, } if record: # pragma: no cover data["record"] = str(record) else: data["record"] = str(None) if data["epoch"] < datetime.utcnow().timestamp(): # We compare the epoch with the current time. # We set the state. data["state"] = "past" else: # We set the state. data["state"] = "future" # We save everything into the database. self[subject] = data # We save everything.