Source code for PyFunceble.core.file

"""
The tool to check the availability or syntax of domains, IPv4, IPv6 or URL.

::


    ██████╗ ██╗   ██╗███████╗██╗   ██╗███╗   ██╗ ██████╗███████╗██████╗ ██╗     ███████╗
    ██╔══██╗╚██╗ ██╔╝██╔════╝██║   ██║████╗  ██║██╔════╝██╔════╝██╔══██╗██║     ██╔════╝
    ██████╔╝ ╚████╔╝ █████╗  ██║   ██║██╔██╗ ██║██║     █████╗  ██████╔╝██║     █████╗
    ██╔═══╝   ╚██╔╝  ██╔══╝  ██║   ██║██║╚██╗██║██║     ██╔══╝  ██╔══██╗██║     ██╔══╝
    ██║        ██║   ██║     ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
    ╚═╝        ╚═╝   ╚═╝      ╚═════╝ ╚═╝  ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝

Provides the file testing core interface.

Author:
    Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom

Special thanks:
    https://pyfunceble.github.io/special-thanks.html

Contributors:
    https://pyfunceble.github.io/contributors.html

Project link:
    https://github.com/funilrys/PyFunceble

Project documentation:
    https://pyfunceble.readthedocs.io///en/master/

Project homepage:
    https://pyfunceble.github.io/

License:
::


    MIT License

    Copyright (c) 2017, 2018, 2019, 2020 Nissar Chababy

    Permission is hereby granted, free of charge, to any person obtaining a copy
    of this software and associated documentation files (the "Software"), to deal
    in the Software without restriction, including without limitation the rights
    to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    copies of the Software, and to permit persons to whom the Software is
    furnished to do so, subject to the following conditions:

    The above copyright notice and this permission notice shall be included in all
    copies or substantial portions of the Software.

    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
    SOFTWARE.
"""

from tempfile import NamedTemporaryFile

from domain2idna import get as domain2idna

import PyFunceble

from .cli import CLICore


[docs]class FileCore(CLICore): # pylint: disable=too-many-instance-attributes """ Brain of PyFunceble for file testing. :param str file: The file we are testing. :param str file_type: The file type. Should be one of the following. - :code:`domain` - :code:`url` """ # We set a regex of element to delete. # Understand with this variable that we don't want to test those. regex_ignore = r"localhost$|localdomain$|local$|broadcasthost$|0\.0\.0\.0$|allhosts$|allnodes$|allrouters$|localnet$|loopback$|mcastprefix$|ip6-mcastprefix$|ip6-localhost$|ip6-loopback$|ip6-allnodes$|ip6-allrouters$|ip6-localnet$" # pylint: disable=line-too-long accepted_file_content_types = ["url", "domain"] def __init__(self, file, file_content_type="domain"): super().__init__() if file_content_type.lower() not in self.accepted_file_content_types: raise PyFunceble.exceptions.WrongParameterValue( f"<file_content_type> ({file_content_type}) " "not in {self.accepted_file_content_types}." ) self.file = self.download_link(file) self.file_type = file_content_type.lower() self.inactive_db = PyFunceble.database.Inactive(self.file, parent_process=True) self.mining = PyFunceble.engine.Mining(self.file) self.autocontinue = PyFunceble.engine.AutoContinue( self.file, parent_process=True )
[docs] def generate_complement_status_file(self, subject, status): """ Generate the complements status files. """ if self.complements_test_started: PyFunceble.output.Generate( subject, f"file_{self.file_type}", status ).complements_file()
[docs] @classmethod def get_complements(cls, auto_continue_db): """ Generate a list of complements to test. """ if ( PyFunceble.CONFIGURATION.generate_complements and auto_continue_db and auto_continue_db.authorized ): # * The user want us to generate and test the list # of all complements. # and # * The autocontinue subsystem is activated. # We get/generate the complements. for complement in auto_continue_db.get_or_generate_complements(): yield complement
[docs] def generate_files_of_status( self, status, include_entries_without_changes=False ): # pragma: no cover """ Generates the status file of all subjects of the given status. :param str status: A status to filter. :param bool include_entries_without_changes: Descriptive enough. """ if PyFunceble.CONFIGURATION.db_type in ["mariadb", "mysql"]: to_select = ( "SELECT tested as subject, status, status_source, expiration_date, " "http_status_code, whois_server, file_path " "FROM {0} WHERE status = %(official_status)s " "AND file_path = %(file_path)s ORDER BY subject ASC" ).format(PyFunceble.engine.MySQL.tables["tested"]) with PyFunceble.engine.MySQL() as connection, connection.cursor() as cursor: cursor.execute( to_select, {"official_status": status, "file_path": self.file} ) fetched = cursor.fetchall() if fetched: for data in fetched: generate = PyFunceble.output.Generate( data["subject"], f"file_{self.file_type}", data["status"], source=data["status_source"], expiration_date=data["expiration_date"], http_status_code=data["http_status_code"], whois_server=data["whois_server"], filename=self.file, end=True, ) if include_entries_without_changes: generate.status_file(exclude_file_generation=False) else: generate.status_file( exclude_file_generation=self.inactive_db.authorized and data["status"] not in self.list_of_up_statuses and data["subject"] in self.inactive_db.to_retest )
[docs] def generate_files(self, include_entries_without_changes=False): # pragma: no cover """ Generates all needed files. """ if PyFunceble.CONFIGURATION.db_type in ["mariadb", "mysql"]: self.preset.reset_counters() if PyFunceble.CONFIGURATION.syntax: self.generate_files_of_status( PyFunceble.STATUS.official.valid, include_entries_without_changes=include_entries_without_changes, ) elif PyFunceble.CONFIGURATION.reputation: self.generate_files_of_status( PyFunceble.STATUS.official.sane, include_entries_without_changes=include_entries_without_changes, ) else: self.generate_files_of_status( PyFunceble.STATUS.official.up, include_entries_without_changes=include_entries_without_changes, ) if PyFunceble.CONFIGURATION.reputation: self.generate_files_of_status( PyFunceble.STATUS.official.malicious, include_entries_without_changes=include_entries_without_changes, ) else: self.generate_files_of_status( PyFunceble.STATUS.official.down, include_entries_without_changes=include_entries_without_changes, ) self.generate_files_of_status( PyFunceble.STATUS.official.invalid, include_entries_without_changes=include_entries_without_changes, )
[docs] def test(self, subject): """ Tests the given subject and return it's results. """ if PyFunceble.CONFIGURATION.idna_conversion: subject = domain2idna(subject) if isinstance(PyFunceble.CONFIGURATION.cooldown_time, (float, int)): PyFunceble.sleep(PyFunceble.CONFIGURATION.cooldown_time) if PyFunceble.CONFIGURATION.syntax: if "url" in self.file_type: result = PyFunceble.status.UrlSyntax( subject, whois_db=self.whois_db, inactive_db=self.inactive_db, filename=self.file, ).get() else: result = PyFunceble.status.DomainAndIpSyntax( subject, whois_db=self.whois_db, inactive_db=self.inactive_db, filename=self.file, ).get() elif PyFunceble.CONFIGURATION.reputation: if "url" in self.file_type: result = PyFunceble.status.UrlReputation( subject, whois_db=self.whois_db, inactive_db=self.inactive_db, filename=self.file, ).get() else: result = PyFunceble.status.DomainAndIPReputation( subject, whois_db=self.whois_db, inactive_db=self.inactive_db, filename=self.file, ).get() elif "url" in self.file_type: result = PyFunceble.status.UrlAvailability( subject, whois_db=self.whois_db, inactive_db=self.inactive_db, filename=self.file, ).get() else: result = PyFunceble.status.DomainAndIpAvailability( subject, whois_db=self.whois_db, inactive_db=self.inactive_db, filename=self.file, ).get() self.generate_complement_status_file(result["tested"], result["status"]) self.save_into_database(result, self.file) return result
# pylint: disable=too-many-return-statements
[docs] @classmethod def should_be_ignored( cls, subject, auto_continue_db, inactive_db, ignore_inactive_db_check=False ): """ Given a subject which is supposed to be tested, we check if we should ignore it. """ if not subject: PyFunceble.LOGGER.debug(f"Ignored {subject} because not true.") return True if subject.startswith(PyFunceble.converter.File.comment_sign): PyFunceble.LOGGER.debug( f"Ignored {subject} because it starts with comment sign." ) return True if auto_continue_db and subject in auto_continue_db.get_already_tested(): PyFunceble.LOGGER.debug( f"Ignored {subject} because it is into the list of already tested (autocontinue)." ) return True if ( not ignore_inactive_db_check and inactive_db and subject in inactive_db.get_already_tested() ): PyFunceble.LOGGER.debug( f"Ignored {subject} because it is into the list of already tested (inactive_db)." ) return True if PyFunceble.helpers.Regex(cls.regex_ignore).match( subject, return_match=False ): PyFunceble.LOGGER.debug(f"Ignored {subject} because it match our regex.") return True if ( not PyFunceble.CONFIGURATION.local and PyFunceble.Check(subject).is_reserved_ip() ): PyFunceble.LOGGER.debug( f"Ignored {subject} because it is a reserved IP and we are not in local test." ) return True if PyFunceble.CONFIGURATION.filter and not PyFunceble.helpers.Regex( PyFunceble.CONFIGURATION.filter ).match(subject, return_match=False): PyFunceble.LOGGER.debug( f"Ignored {subject} because it does not " f"match the given filter ({PyFunceble.CONFIGURATION.fileter})" ) return True return False
[docs] def post_test_treatment( self, test_output, file_content_type, complements_test_started=False, auto_continue_db=None, inactive_db=None, mining=None, whois_db=None, ): """ Do the post test treatment. """ if auto_continue_db: auto_continue_db.add(test_output["tested"], test_output["status"]) if test_output["status"].lower() in self.list_of_up_statuses: if mining: mining.mine(test_output["tested"], file_content_type) if inactive_db and test_output["tested"] in inactive_db: PyFunceble.output.Generate( test_output["tested"], f"file_{file_content_type}", PyFunceble.STATUS.official.up, ).analytic_file("suspicious") inactive_db.remove(test_output["tested"]) elif inactive_db: inactive_db.add(test_output["tested"], test_output["status"]) if ( auto_continue_db and complements_test_started and PyFunceble.CONFIGURATION.db_type == "json" ): if "complements" in auto_continue_db.database: while test_output["tested"] in auto_continue_db.database["complements"]: auto_continue_db.database["complements"].remove( test_output["tested"] ) auto_continue_db.save() if ( whois_db and test_output["expiration_date"] and test_output["whois_record"] and test_output["tested"] ): whois_db.add( test_output["tested"], test_output["expiration_date"], test_output["whois_record"], ) if ( PyFunceble.CONFIGURATION.db_type == "json" and PyFunceble.CONFIGURATION.multiprocess ): generate = PyFunceble.output.Generate( test_output["tested"], f"file_{self.file_type}", test_output["status"], source=test_output["status_source"], expiration_date=test_output["expiration_date"], http_status_code=test_output["http_status_code"], whois_server=test_output["whois_server"], filename=self.file, end=True, ) generate.prints_status_file() generate.unified_file()
[docs] def cleanup(self, auto_continue_db, auto_save, test_completed=False): """ Runs the logic to run at the end of all test. """ if test_completed: auto_continue_db.update_counters() self.generate_files() self.sort_generated_files() auto_continue_db.clean() auto_save.process(test_completed=test_completed) elif auto_save.is_time_exceed(): auto_continue_db.update_counters() self.generate_files() self.sort_generated_files() auto_save.process(test_completed=test_completed)
def __run_single_test(self, subject, ignore_inactive_db_check=False): """ Run a test for a single subject. """ self.print_header() if not self.should_be_ignored( subject, auto_continue_db=self.autocontinue, inactive_db=self.inactive_db, ignore_inactive_db_check=ignore_inactive_db_check, ): result = self.test(subject) self.post_test_treatment( result, self.file_type, complements_test_started=self.complements_test_started, auto_continue_db=self.autocontinue, inactive_db=self.inactive_db, mining=self.mining, whois_db=self.whois_db, ) elif self.autosave.authorized or PyFunceble.CONFIGURATION.print_dots: # We are under a CI/CD environment. PyFunceble.LOGGER.info(f"Skipped {subject!r}.") # We print a dot. print(".", end="") self.cleanup(self.autocontinue, self.autosave, test_completed=False)
[docs] @classmethod def get_subjects(cls, line): """ Given a line, we give the list of subjects to test. """ if PyFunceble.CONFIGURATION.adblock: subjects = PyFunceble.converter.AdBlock( line, aggressive=PyFunceble.CONFIGURATION.aggressive ).get_converted() else: subjects = PyFunceble.converter.File(line).get_converted() return subjects
def __test_line(self, line, ignore_inactive_db_check=False): """ Tests a given line. """ subjects = self.get_subjects(line) if isinstance(subjects, list): for subject in subjects: self.__run_single_test( subject, ignore_inactive_db_check=ignore_inactive_db_check ) else: self.__run_single_test( subjects, ignore_inactive_db_check=ignore_inactive_db_check )
[docs] def construct_and_get_shadow_file( self, file_stream, ignore_inactive_db_check=False ): """ Provides a path to a file which contain the list to file. The idea is to do a comparison between what we already tested and what we still have to test. """ with NamedTemporaryFile(delete=False) as temp_file: if self.autosave.authorized or PyFunceble.CONFIGURATION.print_dots: print("") for line in file_stream: subjects = self.get_subjects(line) if isinstance(subjects, list): comparison = [ self.should_be_ignored( x, auto_continue_db=self.autocontinue, inactive_db=self.inactive_db, ignore_inactive_db_check=ignore_inactive_db_check, ) for x in subjects ] else: comparison = [ self.should_be_ignored( subjects, auto_continue_db=self.autocontinue, inactive_db=self.inactive_db, ignore_inactive_db_check=ignore_inactive_db_check, ) ] if all(comparison): if self.autosave.authorized or PyFunceble.CONFIGURATION.print_dots: print("I", end="") continue if self.autosave.authorized or PyFunceble.CONFIGURATION.print_dots: print("S", end="") temp_file.write(line.encode("utf-8")) if self.autosave.authorized or PyFunceble.CONFIGURATION.print_dots: print("") return temp_file.name
[docs] def run_test(self): """ Run the test of the content of the given file. """ with open(self.file, "r", encoding="utf-8") as file_stream, open( self.construct_and_get_shadow_file(file_stream), "r" ) as shadow_file: for subject in shadow_file: self.__test_line(subject) shadow_file_name = shadow_file.name PyFunceble.helpers.File(shadow_file_name).delete() if self.autocontinue.is_empty(): with open(self.file, "r", encoding="utf-8") as file_stream, open( self.construct_and_get_shadow_file( file_stream, ignore_inactive_db_check=True ), "r", encoding="utf-8", ) as shadow_file: for subject in shadow_file: self.__test_line(subject, ignore_inactive_db_check=True) shadow_file_name = shadow_file.name PyFunceble.helpers.File(shadow_file_name).delete() for subject in self.inactive_db.get_to_retest(): self.__test_line(subject) self.complements_test_started = True for subject in self.get_complements(self.autocontinue): self.__test_line(subject) self.complements_test_started = False for index, subject in self.mining.list_of_mined(): self.__test_line(subject) self.mining.remove(index, subject) self.cleanup(self.autocontinue, self.autosave, test_completed=True)