"""
The tool to check the availability or syntax of domains, IPv4, IPv6 or URL.
::
██████╗ ██╗ ██╗███████╗██╗ ██╗███╗ ██╗ ██████╗███████╗██████╗ ██╗ ███████╗
██╔══██╗╚██╗ ██╔╝██╔════╝██║ ██║████╗ ██║██╔════╝██╔════╝██╔══██╗██║ ██╔════╝
██████╔╝ ╚████╔╝ █████╗ ██║ ██║██╔██╗ ██║██║ █████╗ ██████╔╝██║ █████╗
██╔═══╝ ╚██╔╝ ██╔══╝ ██║ ██║██║╚██╗██║██║ ██╔══╝ ██╔══██╗██║ ██╔══╝
██║ ██║ ██║ ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝
Provides the inactive database interface.
Author:
Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom
Special thanks:
https://pyfunceble.github.io/special-thanks.html
Contributors:
https://pyfunceble.github.io/contributors.html
Project link:
https://github.com/funilrys/PyFunceble
Project documentation:
https://pyfunceble.readthedocs.io///en/master/
Project homepage:
https://pyfunceble.github.io/
License:
::
MIT License
Copyright (c) 2017, 2018, 2019, 2020 Nissar Chababy
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
from datetime import datetime, timedelta
import PyFunceble
[docs]class InactiveDB: # pylint: disable=too-many-instance-attributes
"""
Provides the inactive database logic and interface.
:param str filename: The name of the file we are processing.
"""
is_present_cache = {}
database = {}
authorized = False
filename = None
def __init__(self, filename, parent_process=False):
self.one_day = timedelta(days=1)
self.database_file = ""
self.authorized = self.authorization()
self.parent = parent_process
PyFunceble.LOGGER.debug(f"Authorization: {self.authorized}")
if self.authorized:
self.days = timedelta(days=PyFunceble.CONFIGURATION.days_between_db_retest)
self.days_between_clean = timedelta(
days=PyFunceble.CONFIGURATION.days_between_inactive_db_clean
)
if PyFunceble.CONFIGURATION.db_type == "json":
self.database_file = "{0}{1}".format(
PyFunceble.CONFIG_DIRECTORY,
PyFunceble.OUTPUTS.default_files.inactive_db,
)
self.filename = filename
self.table_name = self.get_table_name()
self.to_retest = self.get_to_retest()
PyFunceble.LOGGER.debug(f"Table Name: {self.table_name}")
PyFunceble.LOGGER.debug(f"DB (File): {self.database_file}")
self.initiate()
def __contains__(self, subject):
if self.authorized:
if PyFunceble.CONFIGURATION.db_type == "json":
if subject not in self.is_present_cache:
self.is_present_cache[subject] = False
if self[subject]:
self.is_present_cache[subject] = True
return self.is_present_cache[subject]
if PyFunceble.CONFIGURATION.db_type in ["mariadb", "mysql"]:
query = (
"SELECT COUNT(*) "
"FROM {0} "
"WHERE subject = %(subject)s AND file_path = %(file)s"
).format(self.table_name)
with PyFunceble.engine.MySQL() as connection, connection.cursor() as cursor:
cursor.execute(query, {"subject": subject, "file": self.filename})
fetched = cursor.fetchone()
return fetched["COUNT(*)"]
return False # pragma: no cover
def __getitem__(self, subject):
if (
self.authorized
and PyFunceble.CONFIGURATION.db_type == "json"
and self.filename in self.database
and subject in self.database[self.filename]
):
return self.database[self.filename][subject]
return {}
def __setitem__(self, subject, data):
if PyFunceble.CONFIGURATION.db_type == "json":
actual_state = self[subject]
if actual_state:
if isinstance(actual_state, dict): # pragma: no cover
if isinstance(data, dict):
self.database[self.filename][
subject
] = PyFunceble.helpers.Merge(data).into(
self.database[self.filename][subject]
)
else: # pragma: no cover
self.database[self.filename][subject] = data
elif isinstance(actual_state, list): # pragma: no cover
if isinstance(data, list):
PyFunceble.helpers.Merge(data).into(
self.database[self.filename][subject], strict=False
)
else: # pragma: no cover
self.database[self.filename][subject].append(data)
self.database[self.filename][subject] = PyFunceble.helpers.List(
self.database[self.filename][subject]
).format()
else: # pragma: no cover
self.database[self.filename][subject] = data
else:
if self.filename not in self.database:
self.database[self.filename] = {subject: data}
else:
self.database[self.filename][subject] = data
def __delitem__(self, subject):
if PyFunceble.CONFIGURATION.db_type == "json" and self[subject]:
del self.database[self.filename][subject]
[docs] @classmethod
def authorization(cls):
"""
Provides the execution authorization.
"""
return PyFunceble.CONFIGURATION.inactive_database
[docs] @classmethod
def get_table_name(cls):
"""
Returns the name of the table to use.
"""
if PyFunceble.CONFIGURATION.db_type in ["mariadb", "mysql"]:
return PyFunceble.engine.MySQL.tables["inactive"]
return "inactive"
[docs] def _merge(self):
"""
Merges the database with the older one which
has already been set into the database.
"""
if self.authorized and PyFunceble.CONFIGURATION.db_type == "json":
database_content = PyFunceble.helpers.Dict().from_json_file(
self.database_file
)
for database_top_key in [
x for x in database_content.keys() if database_content[x]
]:
for database_low_key, data in database_content[
database_top_key
].items():
to_set = {}
if database_low_key.isdigit():
last_test_date = datetime.fromtimestamp(float(database_low_key))
for subject, status in data.items():
to_set[subject] = {
"included_at_epoch": last_test_date.timestamp(),
"included_at_iso": last_test_date.isoformat(),
"last_retested_at_epoch": last_test_date.timestamp(),
"last_retested_at_iso": last_test_date.isoformat(),
"status": status,
}
else:
to_set[database_low_key] = data
if database_top_key not in self.database:
self.database[database_top_key] = to_set
else:
self.database[database_top_key].update(to_set)
PyFunceble.LOGGER.info("Merged possible old to the new format")
[docs] def load(self):
"""
Loads the content of the database file.
"""
if self.authorized and PyFunceble.CONFIGURATION.db_type == "json":
if PyFunceble.helpers.File(self.database_file).exists():
self._merge()
else:
self.database = {self.filename: {}}
if self.filename not in self.database: # pragma: no cover
self.database[self.filename] = {}
PyFunceble.LOGGER.info(
"Database content loaded in memory. (DATASET WONT BE LOGGED)"
)
[docs] def save(self):
"""
Saves the current database into the database file.
"""
if (
self.authorized
and self.parent
and PyFunceble.CONFIGURATION.db_type == "json"
):
PyFunceble.helpers.Dict(self.database).to_json_file(self.database_file)
PyFunceble.LOGGER.info(f"Saved database into {repr(self.database_file)}.")
[docs] def clean(self):
"""
Cleans everything which is not needed anymore.
"""
if self.authorized and self.parent:
PyFunceble.LOGGER.info(f"Started to clean old entry from the database.")
for subject in self.get_to_clean():
self.remove(subject)
PyFunceble.LOGGER.info(f"Finished to clean old entry from the database.")
[docs] def initiate(self):
"""
Initiates the databsse.
"""
if self.authorized:
self.load()
self.clean()
self.save()
[docs] @classmethod
def datetime(cls):
"""
Gets the timestamp where we are going to save our current list.
:return: The timestamp to append with the currently tested element.
:rtype: int|str
"""
return datetime.now()
[docs] def add(self, subject, status):
"""
Adds the given subject into the database.
:param str subject: The subject we are working with.
:param str status: The status of the given subject.
"""
if self.authorized:
if PyFunceble.CONFIGURATION.db_type == "json":
current_datetime = self.datetime()
if self[subject]:
self[subject] = {
"status": status,
"last_retested_at_iso": current_datetime.isoformat(),
"last_retested_at_epoch": current_datetime.timestamp(),
}
else:
self[subject] = {
"status": status,
"included_at_iso": current_datetime.isoformat(),
"last_retested_at_iso": current_datetime.isoformat(),
"included_at_epoch": current_datetime.timestamp(),
"last_retested_at_epoch": current_datetime.timestamp(),
}
PyFunceble.LOGGER.info(
f"Indexed {repr(subject)} with the status "
f"{repr(status)} into {repr(self.filename)} database's."
)
self.save()
elif PyFunceble.CONFIGURATION.db_type in ["mariadb", "mysql"]:
digest = PyFunceble.helpers.Hash(algo="sha256").data(
bytes(self.filename + subject, "utf-8")
)
query = (
"INSERT INTO {0} "
"(file_path, subject, status, digest) "
"VALUES (%(file)s, %(subject)s, %(status)s, %(digest)s)"
).format(self.table_name)
with PyFunceble.engine.MySQL() as connection, connection.cursor() as cursor:
playload = {
"file": self.filename,
"subject": subject,
"status": status,
"digest": digest,
}
try:
cursor.execute(query, playload)
PyFunceble.LOGGER.info(
f"Inserted into the database: \n {playload}"
)
except PyFunceble.engine.MySQL.errors:
query = (
"UPDATE {0} "
"SET subject = %(subject)s, status = %(status)s "
"WHERE digest = %(digest)s"
).format(self.table_name)
cursor.execute(
query,
{"subject": subject, "status": status, "digest": digest},
)
PyFunceble.LOGGER.info(
"Data already indexed, updated the modified "
f"column of the row related to {repr(subject)}."
)
[docs] def remove(self, subject):
"""
Removes all occurrences of the given subject from the database.
:param str subject: The subject we are working with.
"""
if self.authorized:
if PyFunceble.CONFIGURATION.db_type == "json":
if self[subject]:
del self[subject]
PyFunceble.LOGGER.info(
"Cleaned the data related to " f"{repr(subject)}."
)
self.save()
elif PyFunceble.CONFIGURATION.db_type in ["mariadb", "mysql"]:
query = (
"DELETE FROM {0} "
"WHERE file_path = %(file)s "
"AND subject = %(subject)s"
).format(self.table_name)
with PyFunceble.engine.MySQL() as connection, connection.cursor() as cursor:
cursor.execute(query, {"file": self.filename, "subject": subject})
PyFunceble.LOGGER.info(
"Cleaned the data related to "
f"{repr(subject)} and {repr(self.filename)} from "
f"the {repr(self.table_name)} table."
)
def __execute_query(self, query): # pragma: no cover
"""
Executes the query to get the list to retest or already tested.
"""
with PyFunceble.engine.MySQL() as connection, connection.cursor() as cursor:
cursor.execute(
query,
{
"file": self.filename,
"days": self.days,
"days_between_clean": self.days_between_clean,
},
)
fetched = cursor.fetchall()
if fetched:
return {x["subject"] for x in fetched}
return set()
[docs] def get_to_retest(self):
"""
Returns a set of subject to restest.
"""
PyFunceble.LOGGER.info(
"Getting the list of subjects to retest (DATASET WONT BE LOGGED)"
)
if (
self.authorized
and PyFunceble.CONFIGURATION.days_between_db_retest >= 0
and self.filename in self.database
):
if PyFunceble.CONFIGURATION.db_type == "json":
result = set()
for subject, info in self.database[self.filename].items():
if (
"last_retested_at_epoch" in info
and info["last_retested_at_epoch"]
):
if (
datetime.now()
> datetime.fromtimestamp(info["last_retested_at_epoch"])
+ self.days
):
result.add(subject)
else:
result.add(subject)
return result
if PyFunceble.CONFIGURATION.db_type in ["mariadb", "mysql"]:
query = (
"SELECT * FROM {0} WHERE file_path = %(file)s "
"AND CAST(UNIX_TIMESTAMP() AS {1}) "
"> (CAST(UNIX_TIMESTAMP(modified) AS {1}) + CAST(%(days)s AS {1}))"
).format(self.table_name, PyFunceble.engine.MySQL.get_int_cast_type())
return self.__execute_query(query)
return set()
[docs] def get_already_tested(self):
"""
Returns a set of already tested subjects.
"""
PyFunceble.LOGGER.info(
"Getting the list of already tested (DATASET WONT BE LOGGED)"
)
if (
self.authorized
and PyFunceble.CONFIGURATION.days_between_db_retest >= 0
and self.filename in self.database
):
if PyFunceble.CONFIGURATION.db_type == "json":
result = set()
for subject, info in self.database[self.filename].items():
if (
"last_retested_at_epoch" in info
and info["last_retested_at_epoch"]
):
if (
datetime.now()
< datetime.fromtimestamp(info["last_retested_at_epoch"])
+ self.days
):
result.add(subject)
else:
result.add(subject)
return result
if PyFunceble.CONFIGURATION.db_type in [
"mariadb",
"mysql",
]: # pragma: no cover
query = (
"SELECT * FROM {0} WHERE file_path= %(file)s "
"AND CAST(UNIX_TIMESTAMP() AS {1}) "
"< (CAST(UNIX_TIMESTAMP(modified) AS {1}) + CAST(%(days)s AS {1}))"
).format(self.table_name, PyFunceble.engine.MySQL.get_int_cast_type())
return self.__execute_query(query)
return set()
[docs] def get_to_clean(self):
"""
Returns a set of subject to clean from the database.
"""
PyFunceble.LOGGER.info(
"Getting the list of subject to clean (DATASET WONT BE LOGGED)"
)
if (
self.authorized
and PyFunceble.CONFIGURATION.days_between_inactive_db_clean >= 0
and self.filename in self.database
):
if PyFunceble.CONFIGURATION.db_type == "json":
result = set()
for subject, info in self.database[self.filename].items():
if "included_at_epoch" in info and info["included_at_epoch"]:
if (
datetime.now()
> datetime.fromtimestamp(info["included_at_epoch"])
+ self.days_between_clean
):
result.add(subject)
else:
result.add(subject)
return result
if PyFunceble.CONFIGURATION.db_type in [
"mariadb",
"mysql",
]: # pragma: no cover
query = (
"SELECT * FROM {0} WHERE file_path= %(file)s "
"AND CAST(UNIX_TIMESTAMP() AS {1}) "
"> (CAST(UNIX_TIMESTAMP(created) AS {1}) + CAST(%(days_between_clean)s AS {1}))"
).format(self.table_name, PyFunceble.engine.MySQL.get_int_cast_type())
return self.__execute_query(query)
return set()