"""
The tool to check the availability or syntax of domain, IP or URL.
::
██████╗ ██╗ ██╗███████╗██╗ ██╗███╗ ██╗ ██████╗███████╗██████╗ ██╗ ███████╗
██╔══██╗╚██╗ ██╔╝██╔════╝██║ ██║████╗ ██║██╔════╝██╔════╝██╔══██╗██║ ██╔════╝
██████╔╝ ╚████╔╝ █████╗ ██║ ██║██╔██╗ ██║██║ █████╗ ██████╔╝██║ █████╗
██╔═══╝ ╚██╔╝ ██╔══╝ ██║ ██║██║╚██╗██║██║ ██╔══╝ ██╔══██╗██║ ██╔══╝
██║ ██║ ██║ ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝
Provides the inactive database interface.
Author:
Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom
Special thanks:
https://pyfunceble.github.io/special-thanks.html
Contributors:
https://pyfunceble.github.io/contributors.html
Project link:
https://github.com/funilrys/PyFunceble
Project documentation:
https://pyfunceble.readthedocs.io/en/master/
Project homepage:
https://pyfunceble.github.io/
License:
::
Copyright 2017, 2018, 2019, 2020 Nissar Chababy
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from datetime import datetime, timedelta
from multiprocessing import get_start_method
from sqlalchemy.orm.exc import NoResultFound
import PyFunceble
from PyFunceble.engine.database.loader import session
from PyFunceble.engine.database.schemas import File, Status
[docs]class InactiveDB: # pylint: disable=too-many-instance-attributes
"""
Provides the inactive database logic and interface.
:param str filename: The name of the file we are processing.
"""
is_present_cache = {}
database = {}
filename = None
def __init__(self, filename, parent_process=False):
self.one_day = timedelta(days=1)
self.database_file = ""
self.parent = parent_process
self.authorized = self.authorization()
PyFunceble.LOGGER.debug(f"Authorization: {self.authorized}")
if self.authorized:
self.days = timedelta(days=PyFunceble.CONFIGURATION.days_between_db_retest)
self.days_between_clean = timedelta(
days=PyFunceble.CONFIGURATION.days_between_inactive_db_clean
)
if PyFunceble.CONFIGURATION.db_type == "json":
self.database_file = "{0}{1}".format(
PyFunceble.CONFIG_DIRECTORY,
PyFunceble.OUTPUTS.default_files.inactive_db,
)
self.filename = filename
self.to_retest = self.get_to_retest()
PyFunceble.LOGGER.debug(f"DB (File): {self.database_file}")
self.initiate()
def __contains__(self, subject):
if self.authorized:
if PyFunceble.CONFIGURATION.db_type == "json":
if subject not in self.is_present_cache:
self.is_present_cache[subject] = False
if self[subject]:
self.is_present_cache[subject] = True
return self.is_present_cache[subject]
if PyFunceble.CONFIGURATION.db_type in ["mariadb", "mysql"]:
with session.Session() as db_session:
try:
# pylint: disable=no-member
data = (
db_session.query(Status)
.join(File)
.filter(File.path == self.filename)
.filter(Status.tested == subject)
.filter(
Status.status.notin_(
PyFunceble.core.CLI.get_up_statuses()
)
)
.one()
)
except NoResultFound:
data = []
if data:
return True
return False # pragma: no cover
def __getitem__(self, subject):
if (
self.authorized
and PyFunceble.CONFIGURATION.db_type == "json"
and self.filename in self.database
and subject in self.database[self.filename]
):
return self.database[self.filename][subject]
return {}
def __setitem__(self, subject, data):
if PyFunceble.CONFIGURATION.db_type == "json":
actual_state = self[subject]
if actual_state:
if isinstance(actual_state, dict): # pragma: no cover
if isinstance(data, dict):
self.database[self.filename][
subject
] = PyFunceble.helpers.Merge(data).into(
self.database[self.filename][subject]
)
else: # pragma: no cover
self.database[self.filename][subject] = data
elif isinstance(actual_state, list): # pragma: no cover
if isinstance(data, list):
PyFunceble.helpers.Merge(data).into(
self.database[self.filename][subject], strict=False
)
else: # pragma: no cover
self.database[self.filename][subject].append(data)
self.database[self.filename][subject] = PyFunceble.helpers.List(
self.database[self.filename][subject]
).format()
else: # pragma: no cover
self.database[self.filename][subject] = data
else:
if self.filename not in self.database:
self.database[self.filename] = {subject: data}
else:
self.database[self.filename][subject] = data
def __delitem__(self, subject):
if PyFunceble.CONFIGURATION.db_type == "json" and self[subject]:
del self.database[self.filename][subject]
[docs] @classmethod
def authorization(cls):
"""
Provides the execution authorization.
"""
return PyFunceble.CONFIGURATION.inactive_database
[docs] def _merge(self):
"""
Merges the database with the older one which
has already been set into the database.
"""
if self.authorized and PyFunceble.CONFIGURATION.db_type == "json":
database_content = PyFunceble.helpers.Dict().from_json_file(
self.database_file
)
for database_top_key in [
x for x in database_content.keys() if database_content[x]
]:
for database_low_key, data in database_content[
database_top_key
].items():
to_set = {}
if database_low_key.isdigit():
last_test_date = datetime.fromtimestamp(float(database_low_key))
for subject, status in data.items():
to_set[subject] = {
"included_at_epoch": last_test_date.timestamp(),
"included_at_iso": last_test_date.isoformat(),
"last_retested_at_epoch": last_test_date.timestamp(),
"last_retested_at_iso": last_test_date.isoformat(),
"status": status,
}
else:
to_set[database_low_key] = data
if database_top_key not in self.database:
self.database[database_top_key] = to_set
else:
self.database[database_top_key].update(to_set)
PyFunceble.LOGGER.info("Merged possible old to the new format")
[docs] def load(self):
"""
Loads the content of the database file.
"""
if self.authorized and PyFunceble.CONFIGURATION.db_type == "json":
if PyFunceble.helpers.File(self.database_file).exists():
self._merge()
else:
self.database = {self.filename: {}}
if self.filename not in self.database: # pragma: no cover
self.database[self.filename] = {}
PyFunceble.LOGGER.info(
"Database content loaded in memory. (DATASET WONT BE LOGGED)"
)
[docs] def save(self):
"""
Saves the current database into the database file.
"""
if (
self.authorized
and self.parent
and PyFunceble.CONFIGURATION.db_type == "json"
):
PyFunceble.helpers.Dict(self.database).to_json_file(self.database_file)
PyFunceble.LOGGER.info(f"Saved database into {repr(self.database_file)}.")
[docs] def clean(self):
"""
Cleans everything which is not needed anymore.
"""
if self.authorized and self.parent:
PyFunceble.LOGGER.info("Started to clean old entry from the database.")
for subject in self.get_to_clean():
self.remove(subject)
PyFunceble.LOGGER.info("Finished to clean old entry from the database.")
[docs] def initiate(self):
"""
Initiates the databsse.
"""
if self.authorized:
self.load()
self.clean()
self.save()
[docs] @classmethod
def datetime(cls):
"""
Gets the timestamp where we are going to save our current list.
:return: The timestamp to append with the currently tested element.
:rtype: int|str
"""
return datetime.utcnow()
[docs] def add(self, subject, status):
"""
Adds the given subject into the database.
:param str subject: The subject we are working with.
:param str status: The status of the given subject.
"""
if self.authorized:
if PyFunceble.CONFIGURATION.db_type == "json":
current_datetime = self.datetime()
if self[subject]:
self[subject] = {
"status": status,
"last_retested_at_iso": current_datetime.isoformat(),
"last_retested_at_epoch": current_datetime.timestamp(),
}
else:
self[subject] = {
"status": status,
"included_at_iso": current_datetime.isoformat(),
"last_retested_at_iso": current_datetime.isoformat(),
"included_at_epoch": current_datetime.timestamp(),
"last_retested_at_epoch": current_datetime.timestamp(),
}
PyFunceble.LOGGER.info(
f"Indexed {repr(subject)} with the status "
f"{repr(status)} into {repr(self.filename)} database's."
)
self.save()
# We ignore the mariadb/mysql case because
# we are sure that the data will be added in the database
# by the system.
[docs] def remove(self, subject):
"""
Removes all occurrences of the given subject from the database.
:param str subject: The subject we are working with.
"""
if self.authorized:
if PyFunceble.CONFIGURATION.db_type == "json":
if self[subject]:
del self[subject]
PyFunceble.LOGGER.info(
"Cleaned the data related to " f"{repr(subject)}."
)
self.save()
# We don't implement the mysql/mariadb because,
# instead of removing, the system will update the status
# automatically.
[docs] def get_to_retest(self):
"""
Returns a set of subject to restest.
"""
PyFunceble.LOGGER.info(
"Getting the list of subjects to retest (DATASET WONT BE LOGGED)"
)
if (
self.authorized
and PyFunceble.CONFIGURATION.days_between_db_retest >= 0
and self.filename in self.database
):
if PyFunceble.CONFIGURATION.db_type == "json":
result = set()
if (
PyFunceble.CONFIGURATION.multiprocess
and get_start_method() == "spawn"
): # pragma: no cover
self.load()
for subject, info in self.database[self.filename].items():
if (
"last_retested_at_epoch" in info
and info["last_retested_at_epoch"]
):
if (
datetime.utcnow()
> datetime.fromtimestamp(info["last_retested_at_epoch"])
+ self.days
):
result.add(subject)
else:
result.add(subject)
return result
if PyFunceble.CONFIGURATION.db_type in ["mariadb", "mysql"]:
with session.Session() as db_session:
try:
# pylint: disable=no-member
result = (
db_session.query(Status)
.join(File)
.filter(File.path == self.filename)
.filter(
Status.status.notin_(
PyFunceble.core.CLI.get_up_statuses()
)
)
.filter(datetime.utcnow() > Status.tested_at + self.days)
.all()
)
except NoResultFound:
result = []
if result:
return {x.tested for x in result}
return set()
[docs] def get_already_tested(self):
"""
Returns a set of already tested subjects.
"""
PyFunceble.LOGGER.info(
"Getting the list of already tested (DATASET WONT BE LOGGED)"
)
if self.authorized and PyFunceble.CONFIGURATION.days_between_db_retest >= 0:
if PyFunceble.CONFIGURATION.db_type == "json":
result = set()
if (
PyFunceble.CONFIGURATION.multiprocess
and get_start_method() == "spawn"
): # pragma: no cover
self.load()
for subject, info in self.database[self.filename].items():
if (
"last_retested_at_epoch" in info
and info["last_retested_at_epoch"]
and datetime.utcnow()
< datetime.fromtimestamp(info["last_retested_at_epoch"])
+ self.days
):
result.add(subject)
else:
result.add(subject)
return result
if PyFunceble.CONFIGURATION.db_type in [
"mariadb",
"mysql",
]: # pragma: no cover
# pylint: disable=no-member
with session.Session() as db_session:
result = (
db_session.query(Status)
.join(File)
.filter(File.path == self.filename)
.filter(File.id == Status.file_id)
.filter(
Status.status.notin_(PyFunceble.core.CLI.get_up_statuses())
)
.filter(datetime.utcnow() < Status.tested_at + self.days)
.all()
)
if result:
return {x.tested for x in result}
return set()
[docs] def get_to_clean(self):
"""
Returns a set of subject to clean from the database.
"""
PyFunceble.LOGGER.info(
"Getting the list of subject to clean (DATASET WONT BE LOGGED)"
)
if (
self.authorized
and PyFunceble.CONFIGURATION.days_between_inactive_db_clean >= 0
and self.filename in self.database
):
if PyFunceble.CONFIGURATION.db_type == "json":
result = set()
for subject, info in self.database[self.filename].items():
if "included_at_epoch" in info and info["included_at_epoch"]:
if (
datetime.utcnow()
> datetime.fromtimestamp(info["included_at_epoch"])
+ self.days_between_clean
):
result.add(subject)
else:
result.add(subject)
return result
# We ignore the mariadb/mysql case because
# it's not necessary anymore.
return set()