Source code for PyFunceble.inactive_db

# pylint:disable=line-too-long
"""
The tool to check the availability or syntax of domains, IPv4 or URL.

::


    ██████╗ ██╗   ██╗███████╗██╗   ██╗███╗   ██╗ ██████╗███████╗██████╗ ██╗     ███████╗
    ██╔══██╗╚██╗ ██╔╝██╔════╝██║   ██║████╗  ██║██╔════╝██╔════╝██╔══██╗██║     ██╔════╝
    ██████╔╝ ╚████╔╝ █████╗  ██║   ██║██╔██╗ ██║██║     █████╗  ██████╔╝██║     █████╗
    ██╔═══╝   ╚██╔╝  ██╔══╝  ██║   ██║██║╚██╗██║██║     ██╔══╝  ██╔══██╗██║     ██╔══╝
    ██║        ██║   ██║     ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
    ╚═╝        ╚═╝   ╚═╝      ╚═════╝ ╚═╝  ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝

This submodule will provide the inactive database logic and interface.

Author:
    Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom

Special thanks:
    https://pyfunceble.github.io/special-thanks.html

Contributors:
    https://pyfunceble.github.io/contributors.html

Project link:
    https://github.com/funilrys/PyFunceble

Project documentation:
    https://pyfunceble.readthedocs.io/en/master/

Project homepage:
    https://pyfunceble.github.io/

License:
::


    MIT License

    Copyright (c) 2017, 2018, 2019 Nissar Chababy

    Permission is hereby granted, free of charge, to any person obtaining a copy
    of this software and associated documentation files (the "Software"), to deal
    in the Software without restriction, including without limitation the rights
    to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    copies of the Software, and to permit persons to whom the Software is
    furnished to do so, subject to the following conditions:

    The above copyright notice and this permission notice shall be included in all
    copies or substantial portions of the Software.

    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
    SOFTWARE.
"""
# pylint: enable=line-too-long

from hashlib import sha256

import PyFunceble
from PyFunceble.helpers import Dict, File, List


[docs]class InactiveDB: # pylint: disable=too-many-instance-attributes """ Provide the inactive database logic and interface. :param str filename: The name of the file we are processing. """ # We initiate a variable which will save the # cache about who are present or not into the database. is_present_cache = {} # Saves the whole database content. database = {} # Save the operation authorization. authorized = False one_day_in_seconds = 1 * 24 * 3600 # Save the filename we are operating. filename = None def __init__(self, filename, sqlite_db=None, mysql_db=None): # We get the authorization status. self.authorized = self.authorization() # We convert the number of days between the database retest # to seconds. self.days_in_seconds = ( PyFunceble.CONFIGURATION["days_between_db_retest"] * 24 * 3600 ) # We set the path to the inactive database file. self.database_file = "{0}{1}".format( PyFunceble.CONFIG_DIRECTORY, PyFunceble.OUTPUTS["default_files"]["inactive_db"], ) # We share the filename. self.filename = filename # We get the db instance. self.sqlite_db = sqlite_db self.mysql_db = mysql_db self.table_name = self.get_table_name() # We initiate the database. self.initiate() def __contains__(self, subject): if self.authorized: # We are authorized to operate. if PyFunceble.CONFIGURATION["db_type"] == "json": if subject not in self.is_present_cache: for element in [ x for x in self.database[self.filename].keys() if x.isdigit() ]: if subject in self[element]: self.is_present_cache[subject] = True break else: # pragma: no cover self.is_present_cache[subject] = False continue if subject not in self.is_present_cache: self.is_present_cache[subject] = False return self.is_present_cache[subject] if PyFunceble.CONFIGURATION["db_type"] == "sqlite": query = ( "SELECT COUNT(*) " "FROM {0} " "WHERE subject = :subject AND file_path = :file" ).format(self.table_name) try: output = self.sqlite_db.cursor.execute( query, {"subject": subject, "file": self.filename} ) except self.sqlite_db.locked_errors: PyFunceble.sleep(0.3) output = self.sqlite_db.cursor.execute( query, {"subject": subject, "file": self.filename} ) fetched = output.fetchone() return fetched[0] != 0 if PyFunceble.CONFIGURATION["db_type"] in ["mariadb", "mysql"]: query = ( "SELECT COUNT(*) " "FROM {0} " "WHERE subject = %(subject)s AND file_path = %(file)s" ).format(self.table_name) with self.mysql_db.get_connection() as cursor: cursor.execute(query, {"subject": subject, "file": self.filename}) fetched = cursor.fetchone() return fetched["COUNT(*)"] return False # pragma: no cover def __getitem__(self, index): if self.authorized and PyFunceble.CONFIGURATION["db_type"] == "json": if self.filename in self.database and index in self.database[self.filename]: return self.database[self.filename][index] return [] def __setitem__(self, index, value): if PyFunceble.CONFIGURATION["db_type"] == "json": actual_state = self[index] if actual_state: if isinstance(actual_state, dict): if isinstance(value, dict): # pragma: no cover self.database[self.filename][index] = Dict( self.database[self.filename][index] ).merge(value, strict=True) else: # pragma: no cover self.database[self.filename][index] = value elif isinstance(actual_state, list): # pragma: no cover if isinstance(value, list): self.database[self.filename][index] = List( self.database[self.filename][index] ).merge(value, strict=False) else: # pragma: no cover self.database[self.filename][index].append(value) self.database[self.filename][index] = List( self.database[self.filename][index] ).format() else: # pragma: no cover self.database[self.filename][index] = value else: if self.filename not in self.database: self.database[self.filename] = {index: value} else: self.database[self.filename][index] = value
[docs] @classmethod def authorization(cls): """ Provide the execution authorization. """ return PyFunceble.CONFIGURATION["inactive_database"]
[docs] def get_table_name(self): """ Return the name of the table to use. """ if PyFunceble.CONFIGURATION["db_type"] == "sqlite": return self.sqlite_db.tables["inactive"] if PyFunceble.CONFIGURATION["db_type"] in ["mariadb", "mysql"]: return self.mysql_db.tables["inactive"] return "inactive"
[docs] def _merge(self): """ Merge the database with the older one which has already been set into the database. """ if self.authorized and PyFunceble.CONFIGURATION["db_type"] == "json": # We are authorized to operate. # We get the content of the database. database_content = Dict().from_json(File(self.database_file).read()) # We get the database top keys. database_top_keys = [ x for x in database_content.keys() if database_content[x] ] for database_top_key in database_top_keys: # We loop through the list of database top keys. # We get the list of lower indexes. database_low_keys = database_content[database_top_key].keys() for database_low_key in database_low_keys: # We loop through the lower keys. if isinstance( database_content[database_top_key][database_low_key], list ): # pragma: no cover to_set = { x: "" for x in database_content[database_top_key][ database_low_key ] } else: to_set = database_content[database_top_key][database_low_key] if database_top_key not in self.database: self.database[database_top_key] = {database_low_key: to_set} else: if database_low_key in self.database[database_top_key]: self.database[database_top_key][database_low_key] = Dict( self.database[database_top_key][database_low_key] ).merge(to_set, strict=False) else: # pragma: no cover self.database[database_top_key][database_low_key] = to_set
[docs] def load(self): """ Load the content of the database file. """ if self.authorized and PyFunceble.CONFIGURATION["db_type"] == "json": # We are authorized to operate. if PyFunceble.path.isfile(self.database_file): # The database file exists. self._merge() if ( self.filename in self.database and "to_test" in self.database[self.filename] ): new_time = str( int(PyFunceble.time()) - self.one_day_in_seconds - 100 ) self.database[self.filename][new_time] = self.database[ self.filename ]["to_test"] del self.database[self.filename]["to_test"] else: # The database file do not exists. # We initiate an empty database. self.database = {self.filename: {}}
[docs] def save(self): """ Save the current database into the database file. """ if self.authorized and PyFunceble.CONFIGURATION["db_type"] == "json": # We are authorized to operate. # We save the current database state into the database file. Dict(self.database).to_json(self.database_file)
[docs] def initiate(self): """ Initiate the databse. """ if self.authorized: # * We are authorized to operate. # and # * The filename is already in the database. # We load the database. self.load() if ( PyFunceble.CONFIGURATION["db_type"] == "json" and self.filename not in self.database ): # pragma: no cover # We create the current file namepace self.database[self.filename] = {} self.save()
[docs] def _timestamp(self): """ Get the timestamp where we are going to save our current list. :return: The timestamp to append with the currently tested element. :rtype: int|str """ if ( self.authorized and PyFunceble.CONFIGURATION["db_type"] == "json" and self.filename in self.database ): # * We are authorized to operate. # and # * The currently tested file is already in the database. if self.database[self.filename]: # The file we are testing is into the database and its content # is not empty. # We get the indexes of the current file (in the dabase). database_keys = [ x for x in self.database[self.filename].keys() if x.isdigit() ] if database_keys: # The list of keys is not empty. # We get the most recent date. recent_date = max(database_keys) else: # pragma: no cover # The list of keys is empty. # We return the current time. return int(PyFunceble.time()) if int(PyFunceble.time()) > int(recent_date) + self.one_day_in_seconds: # The most recent time was in more than one day. # We return the current time. return int(PyFunceble.time()) # The most recent time was in less than one day. if int(PyFunceble.time()) < int(recent_date) + self.days_in_seconds: # The most recent time was in less than the expected number of day for # retesting. # We return the most recent data. return int(recent_date) # The database subsystem is not activated. # We return the current time. return int(PyFunceble.time())
[docs] def add(self, subject, status): """ Add the given subject into the database. :param str subject: The subject we are working with. :param str status: The status of the given subject. """ if self.authorized: # We are authorized to operate. if PyFunceble.CONFIGURATION["db_type"] == "json": # We get the timestamp to use as index. timestamp = str(self._timestamp()) if self.filename in self.database: # * The file path is not into the database. self.remove(subject) # We initiate the file path and its content into the database. self[timestamp] = {subject: status} # And we save the database. self.save() elif PyFunceble.CONFIGURATION["db_type"] == "sqlite": query = ( "INSERT INTO {0} " "(file_path, subject, status) " "VALUES (:file, :subject, :status)" ).format(self.table_name) try: # We execute the query. self.sqlite_db.cursor.execute( query, {"file": self.filename, "subject": subject, "status": status}, ) except self.sqlite_db.errors: query = ( "UPDATE {0} " "SET subject = :subject, status = :status " "WHERE file_path = :file AND subject = :subject" ).format(self.table_name) # We execute the query. self.sqlite_db.cursor.execute( query, {"subject": subject, "file": self.filename, "status": status}, ) # And we commit the changes. self.sqlite_db.connection.commit() elif PyFunceble.CONFIGURATION["db_type"] in ["mariadb", "mysql"]: digest = sha256(bytes(self.filename + subject, "utf-8")).hexdigest() query = ( "INSERT INTO {0} " "(file_path, subject, status, digest) " "VALUES (%(file)s, %(subject)s, %(status)s, %(digest)s)" ).format(self.table_name) with self.mysql_db.get_connection() as cursor: try: cursor.execute( query, { "file": self.filename, "subject": subject, "status": status, "digest": digest, }, ) except self.mysql_db.errors: query = ( "UPDATE {0} " "SET subject = %(subject)s, status = %(status)s " "WHERE file_path = %(file)s " "AND %(subject)s = %(subject)s " "AND digest = %(digest)s" ).format(self.table_name) cursor.execute( query, { "subject": subject, "file": self.filename, "status": status, "digest": digest, }, )
[docs] def remove(self, subject): """ Remove all occurence of the given subject from the database. :param str subject: The subject we are working with. """ if self.authorized: # We are authorized to operate. if PyFunceble.CONFIGURATION["db_type"] == "json": for data in self.database[self.filename]: # We loop through the index of the file database. if subject in self[data]: # The currently tested element into the currently read index. self[data] = Dict(self[data]).remove_key(subject) # And we save the data into the database. self.save() elif PyFunceble.CONFIGURATION["db_type"] == "sqlite": # We construct the query we are going to execute. query = ( "DELETE FROM {0} " "WHERE file_path = :file " "AND subject = :subject" ).format(self.table_name) # We execute it. self.sqlite_db.cursor.execute( query, {"file": self.filename, "subject": subject} ) # We commit everything. self.sqlite_db.connection.commit() elif PyFunceble.CONFIGURATION["db_type"] in ["mariadb", "mysql"]: # We construct the query we are going to execute. query = ( "DELETE FROM {0} " "WHERE file_path = %(file)s " "AND subject = %(subject)s" ).format(self.table_name) with self.mysql_db.get_connection() as cursor: cursor.execute(query, {"file": self.filename, "subject": subject})
[docs] def get_to_retest(self): # pylint: pragma: no cover """ Return a set of subject to restest. """ if self.authorized: if PyFunceble.CONFIGURATION["db_type"] == "json": try: return { z for x, y in self.database[self.filename].items() if x.isdigit() and int(PyFunceble.time()) > int(x) + self.days_in_seconds for z in y.keys() } except KeyError: return set() if PyFunceble.CONFIGURATION["db_type"] == "sqlite": query = ( "SELECT * FROM {0} WHERE file_path = :file " "AND CAST(strftime('%s', 'now') AS INTEGER) " "> (CAST(strftime('%s', modified) AS INTEGER) + CAST(:days AS INTEGER))" ).format(self.table_name) output = self.sqlite_db.cursor.execute( query, {"file": self.filename, "days": self.days_in_seconds} ) fetched = output.fetchall() if fetched: return {x["subject"] for x in fetched} if PyFunceble.CONFIGURATION["db_type"] in ["mariadb", "mysql"]: if PyFunceble.CONFIGURATION["db_type"] == "mariadb": cast_type = "INTEGER" else: cast_type = "UNSIGNED" query = ( "SELECT * FROM {0} WHERE file_path = %(file)s " "AND CAST(UNIX_TIMESTAMP() AS {1}) " "> (CAST(UNIX_TIMESTAMP(modified) AS {1}) + CAST(%(days)s AS {1}))" ).format(self.table_name, cast_type) with self.mysql_db.get_connection() as cursor: cursor.execute( query, {"file": self.filename, "days": self.days_in_seconds} ) fetched = cursor.fetchall() if fetched: return {x["subject"] for x in fetched} return set()
[docs] def get_already_tested(self): # pragma: no cover """ Return a set of already tested subjects. """ if self.authorized: if PyFunceble.CONFIGURATION["db_type"] == "json": try: return { z for x, y in self.database[self.filename].items() if x.isdigit() and int(PyFunceble.time()) < int(x) + self.days_in_seconds for z in y.keys() } except KeyError: return set() if PyFunceble.CONFIGURATION["db_type"] == "sqlite": query = ( "SELECT * FROM {0} WHERE file_path = :file " "AND CAST(strftime('%s', 'now') AS INTEGER) " "< (CAST(strftime('%s', modified) AS INTEGER) + CAST(:days AS INTEGER))" ).format(self.table_name) output = self.sqlite_db.cursor.execute( query, {"file": self.filename, "days": self.days_in_seconds} ) fetched = output.fetchall() if fetched: return {x["subject"] for x in fetched} if PyFunceble.CONFIGURATION["db_type"] in ["mariadb", "mysql"]: if PyFunceble.CONFIGURATION["db_type"] == "mariadb": cast_type = "INTEGER" else: cast_type = "UNSIGNED" query = ( "SELECT * FROM {0} WHERE file_path= %(file)s " "AND CAST(UNIX_TIMESTAMP() AS {1}) " "< (CAST(UNIX_TIMESTAMP(modified) AS {1}) + CAST(%(days)s AS {1}))" ).format(self.table_name, cast_type) with self.mysql_db.get_connection() as cursor: cursor.execute( query, {"file": self.filename, "days": self.days_in_seconds} ) fetched = cursor.fetchall() if fetched: return {x["subject"] for x in fetched} return set()