Source code for PyFunceble.engine.auto_continue

"""
The tool to check the availability or syntax of domain, IP or URL.

::


    ██████╗ ██╗   ██╗███████╗██╗   ██╗███╗   ██╗ ██████╗███████╗██████╗ ██╗     ███████╗
    ██╔══██╗╚██╗ ██╔╝██╔════╝██║   ██║████╗  ██║██╔════╝██╔════╝██╔══██╗██║     ██╔════╝
    ██████╔╝ ╚████╔╝ █████╗  ██║   ██║██╔██╗ ██║██║     █████╗  ██████╔╝██║     █████╗
    ██╔═══╝   ╚██╔╝  ██╔══╝  ██║   ██║██║╚██╗██║██║     ██╔══╝  ██╔══██╗██║     ██╔══╝
    ██║        ██║   ██║     ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
    ╚═╝        ╚═╝   ╚═╝      ╚═════╝ ╚═╝  ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝

Provides the auto-continue engine.

Author:
    Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom

Special thanks:
    https://pyfunceble.github.io/special-thanks.html

Contributors:
    https://pyfunceble.github.io/contributors.html

Project link:
    https://github.com/funilrys/PyFunceble

Project documentation:
    https://pyfunceble.readthedocs.io/en/master/

Project homepage:
    https://pyfunceble.github.io/

License:
::


    Copyright 2017, 2018, 2019, 2020 Nissar Chababy

    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

        http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
"""

from datetime import datetime
from multiprocessing import get_start_method

from sqlalchemy.orm.exc import NoResultFound

import PyFunceble
from PyFunceble.engine.database.loader import session
from PyFunceble.engine.database.schemas import File, Status


[docs]class AutoContinue: # pylint: disable=too-many-instance-attributes """ Provides the auto-continue subsystem. """ # Save the content of the database. database = {} # Save the database file database_file = None # Save the filename we are working with. filename = None def __init__(self, filename, parent_process=False): # We get the operation authorization. self.database_file = "" # We share the filename. self.filename = filename # We preset the filename namespace. self.database[self.filename] = {} # We share if we are under the parent process. self.parent = parent_process self.authorized = self.authorization() PyFunceble.LOGGER.debug(f"Authorization: {self.authorized}") if self.authorized: # We are authorized to operate. PyFunceble.LOGGER.info("Process authorized.") if PyFunceble.CONFIGURATION.db_type == "json": # We set the location of the database file. self.database_file = ( PyFunceble.OUTPUT_DIRECTORY + PyFunceble.OUTPUTS.parent_directory + PyFunceble.OUTPUTS.logs.filenames.auto_continue ) PyFunceble.LOGGER.debug(f"DB (File): {self.database_file}") # We load the backup (if existant). self.load() if self.parent and self.is_empty(): # The database of the file we are # currently testing is empty. PyFunceble.LOGGER.info( "Process authorized, is the parent process and the file " "to test not indexed. Cleaning directory structure." ) # We clean the output directory. PyFunceble.output.Clean(file_path=self.filename) elif self.parent: # We are not authorized to operate. PyFunceble.LOGGER.info( "Process not authorized but is the parent process. " "Cleaning directory structure." ) # We clean the output directory. PyFunceble.output.Clean(file_path=self.filename) def __contains__(self, index): # pragma: no cover if self.authorized: if PyFunceble.CONFIGURATION.db_type == "json": if self.filename in self.database: for status, status_data in self.database[self.filename].items(): if status == "complements": continue if index in status_data: PyFunceble.LOGGER.info( f"{index} is present into the database." ) return True PyFunceble.LOGGER.info(f"{index} is not present into the database.") return False if PyFunceble.CONFIGURATION.db_type in ["mariadb", "mysql"]: with session.Session() as db_session: try: # pylint: disable=no-member, singleton-comparison _ = ( db_session.query(Status) .join(File) .filter(Status.tested == index) .filter(File.path == self.filename) .one() ) except NoResultFound: PyFunceble.LOGGER.info( f"{index} is not present into the database." ) return False PyFunceble.LOGGER.info(f"{index} is present into the database.") return True PyFunceble.LOGGER.info( f"Could not check if {index} is present into the database. " "Unauthorized action." ) return False
[docs] @classmethod def authorization(cls): """ Provides the execution authorization. """ return ( PyFunceble.CONFIGURATION.auto_continue and not PyFunceble.CONFIGURATION.no_files )
[docs] def is_empty(self): """ Checks if the database related to the currently tested file is empty. """ if self.authorized: if PyFunceble.CONFIGURATION.db_type == "json": if ( self.filename not in self.database or not self.database[self.filename] ): PyFunceble.LOGGER.info("File to test was not previously indexed.") return True PyFunceble.LOGGER.info("File to test was previously indexed.") return False if PyFunceble.CONFIGURATION.db_type in ["mariadb", "mysql"]: # Here we don't really check if it is empty. # What we do, is that we check that everything was # tested. with session.Session() as db_session: # pylint: disable=no-member, singleton-comparison try: result = ( db_session.query(File) .filter(File.path == self.filename) .one() ) PyFunceble.LOGGER.info( f"File was completely tested: {not result.test_completed}" ) return result.test_completed except NoResultFound: pass PyFunceble.LOGGER.info( # pragma: no cover "Could not check if the file to test " "was previously indexed. Unauthorized action." ) return False # pragma: no cover
[docs] def add(self, subject, status): """ Adds the given subject into the database. """ if self.authorized: # We are authorized to operate. if PyFunceble.CONFIGURATION.db_type == "json": if self.filename in self.database: # We already have something related # to the file we are testing. if status in self.database[self.filename]: # The status is already registered. # We set the new data. self.database[self.filename][status].append(subject) else: # We set the new data. self.database[self.filename][status] = [subject] else: # We have nothing related to the file # we are testing. # We initiate the file index. self.database[self.filename] = {status: [subject]} PyFunceble.LOGGER.info( f"Indexed {repr(subject)} with the status " f"{repr(status)} into {repr(self.filename)} database's." ) # We save everything. self.save()
[docs] def save(self): """ Saves the current state of the database. """ if ( self.authorized and self.parent and PyFunceble.CONFIGURATION.db_type == "json" ): # We are authoried to operate. # We save the current database state. PyFunceble.helpers.Dict(self.database).to_json_file(self.database_file) PyFunceble.LOGGER.info(f"Saved database into {repr(self.database_file)}.")
[docs] def load(self): """ Loads previously saved database. """ if self.authorized and PyFunceble.CONFIGURATION.db_type == "json": # We are authorized to operate. if PyFunceble.helpers.File(self.database_file).exists(): # The database file exists. # We get its content and save it inside backup_content. self.database = PyFunceble.helpers.Dict().from_json_file( self.database_file ) else: # The database file do not exists. # We initiate an empty database. self.database = {self.filename: {}} PyFunceble.LOGGER.info(f"Loaded {repr(self.database_file)} in memory.")
[docs] def clean(self): """ Cleans the database. """ if self.authorized: # We are authorized to operate. if PyFunceble.CONFIGURATION.db_type == "json": # We empty the database. self.database[self.filename] = {} # And we save the current database state. PyFunceble.helpers.Dict(self.database).to_json_file(self.database_file) PyFunceble.LOGGER.info( "Cleaned the data related to " f"{repr(self.filename)} from {repr(self.database_file)}." ) elif PyFunceble.CONFIGURATION.db_type in [ "mysql", "mariadb", ]: # pragma: no cover with session.Session() as db_session: # pylint: disable=no-member, singleton-comparison try: file_object = ( db_session.query(File) .filter(File.path == self.filename) .one() ) file_object.test_completed = True db_session.add(file_object) db_session.commit() except NoResultFound: pass with session.Session() as db_session: # pylint: disable=no-member, singleton-comparison # Now we only replace the test_completed # flag for the inactive/invalid one. to_update = ( db_session.query(Status) .join(File) .filter(File.path == self.filename) .filter( Status.status.notin_(PyFunceble.core.CLI.get_up_statuses()) ) .filter(Status.test_completed == True) .all() ) for status_object in to_update: status_object.test_completed = False with session.Session() as db_session: # pylint: disable=no-member, singleton-comparison db_session.add(status_object) db_session.commit()
[docs] def update_counters(self): # pragma: no cover """ Updates the counters. """ if self.authorized and self.parent: # We are authorized to operate. # We create a list of all status we are working with. statuses = PyFunceble.STATUS.official.keys() # We preset the number of tested. tested = 0 for status in statuses: # We loop through the list of status. if PyFunceble.CONFIGURATION.db_type == "json": try: # We get the number of tested of the currently read # status. tested_for_status = len( self.database[self.filename][ PyFunceble.STATUS.official[status] ] ) # We then update/transfert it to its global place. PyFunceble.INTERN["counter"]["number"][ status ] = tested_for_status PyFunceble.LOGGER.debug( f"Counter of {repr(status)} set to {tested_for_status}." ) # We finally increate the number of tested. tested += tested_for_status except KeyError: PyFunceble.INTERN["counter"]["number"][status] = 0 continue elif PyFunceble.CONFIGURATION.db_type in ["mariadb", "mysql"]: with session.Session() as db_session: # pylint: disable=no-member, singleton-comparison result = ( db_session.query(Status) .join(File) .filter(File.path == self.filename) .filter(Status.status == PyFunceble.STATUS.official[status]) .filter(Status.test_completed == True) .all() ) fetched = len(result) PyFunceble.INTERN["counter"]["number"][status] = fetched PyFunceble.LOGGER.debug( f"Counter of {repr(status)} set to {fetched}." ) # We then update/transfer it to its global place. tested += fetched # We update/transfert the number of tested globally. PyFunceble.INTERN["counter"]["number"]["tested"] = tested PyFunceble.LOGGER.debug(f"Totally tested set to {repr(tested)}.")
[docs] def get_already_tested(self): """ Returns the list of subjects which were already tested as a set. """ PyFunceble.LOGGER.info( "Getting the list of already tested (DATASET WONT BE LOGGED)" ) # raise Exception("AUTHORIZED", self.authorized) if self.authorized: if PyFunceble.CONFIGURATION.db_type == "json": if ( PyFunceble.CONFIGURATION.multiprocess and get_start_method() == "spawn" ): # pragma: no cover self.load() try: return { y for state, x in self.database[self.filename].items() for y in x if state not in ["complements"] } except KeyError: # pragma: no cover pass elif PyFunceble.CONFIGURATION.db_type in ["mariadb", "mysql"]: with session.Session() as db_session: # pylint: disable=no-member, singleton-comparison result = ( db_session.query(Status) .join(File) .filter(File.path == self.filename) .filter(Status.test_completed == True) .all() ) if result: return {x.tested for x in result} return set() # pragma: no cover
def __generate_complements(self): # pragma: no cover """ Generates the complements from the given list of tested. """ # We get the list of domains we are going to work with. already_tested = self.get_already_tested() result = PyFunceble.get_complements(already_tested) # We remove the already tested subjects. return set(PyFunceble.helpers.List(result).format()) - already_tested def __get_or_generate_complements_json(self): # pragma: no cover """ Gets or generates the complements while working with as JSON formatted database. """ result = [] if "complements" not in self.database[self.filename].keys(): # The complements are not saved, already_tested = self.get_already_tested() result = PyFunceble.get_complements(already_tested) # We remove the already tested subjects. result = set(PyFunceble.helpers.List(result).format()) - already_tested # We save the constructed list of complements self.database[self.filename]["complements"] = list(result) self.save() else: # We get the complements we still have to test. result = self.database[self.filename]["complements"] return result def __get_or_generate_complements_mysql(self): # pragma: no cover """ Gets or generates the complements while working with as MySQL/MariaDB formatted database. """ result = [] with session.Session() as db_session: # pylint: disable=singleton-comparison,no-member result = ( db_session.query(Status) .join(File) .filter(File.path == self.filename) .filter(Status.is_complement == True) .all() ) if result: result = [x.tested for x in result] else: result = self.__generate_complements() for subject in result: with session.Session() as db_session: try: file = ( db_session.query(File).filter(File.path == self.filename).one() ) except NoResultFound: file = File(path=self.filename) db_session.add(file) db_session.commit() db_session.refresh(file) with session.Session() as db_session: # pylint: disable=no-member, singleton-comparison try: status = ( db_session.query(Status).filter(Status.tested == subject).one() ) except NoResultFound: status = Status( file_id=file.id, is_complement=True, tested=subject, tested_at=datetime.fromtimestamp(10), test_completed=False, ) db_session.add(status) db_session.commit() return result
[docs] def get_or_generate_complements(self): # pragma: no cover """ Gets or generates the complements. """ PyFunceble.LOGGER.info("Generate/Get complements (DATASET WONT BE LOGGED)") if self.authorized and PyFunceble.CONFIGURATION.generate_complements: # We aer authorized to operate. if PyFunceble.CONFIGURATION.db_type == "json": return self.__get_or_generate_complements_json() if PyFunceble.CONFIGURATION.db_type in ["mysql", "mariadb"]: return self.__get_or_generate_complements_mysql() return list()