Source code for PyFunceble.cli.threads.tester

"""
The tool to check the availability or syntax of domain, IP or URL.

::


    ██████╗ ██╗   ██╗███████╗██╗   ██╗███╗   ██╗ ██████╗███████╗██████╗ ██╗     ███████╗
    ██╔══██╗╚██╗ ██╔╝██╔════╝██║   ██║████╗  ██║██╔════╝██╔════╝██╔══██╗██║     ██╔════╝
    ██████╔╝ ╚████╔╝ █████╗  ██║   ██║██╔██╗ ██║██║     █████╗  ██████╔╝██║     █████╗
    ██╔═══╝   ╚██╔╝  ██╔══╝  ██║   ██║██║╚██╗██║██║     ██╔══╝  ██╔══██╗██║     ██╔══╝
    ██║        ██║   ██║     ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
    ╚═╝        ╚═╝   ╚═╝      ╚═════╝ ╚═╝  ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝

Provides the logic behind the threads which is supposed to test the given subject.

Author:
    Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom

Special thanks:
    https://pyfunceble.github.io/#/special-thanks

Contributors:
    https://pyfunceble.github.io/#/contributors

Project link:
    https://github.com/funilrys/PyFunceble

Project documentation:
    https://pyfunceble.readthedocs.io/en/dev/

Project homepage:
    https://pyfunceble.github.io/

License:
::


    Copyright 2017, 2018, 2019, 2020 Nissar Chababy

    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

        http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
"""

import concurrent.futures
import time
from typing import List, Optional

import PyFunceble.cli.utils.stdout
import PyFunceble.cli.utils.testing
import PyFunceble.facility
import PyFunceble.storage
from PyFunceble.checker.availability.domain_and_ip import DomainAndIPAvailabilityChecker
from PyFunceble.checker.availability.url import URLAvailabilityChecker
from PyFunceble.checker.base import CheckerBase
from PyFunceble.checker.reputation.domain_and_ip import DomainAndIPReputationChecker
from PyFunceble.checker.reputation.url import URLReputationChecker
from PyFunceble.checker.status_base import CheckerStatusBase
from PyFunceble.checker.syntax.domain import DomainSyntaxChecker
from PyFunceble.checker.syntax.url import URLSyntaxChecker
from PyFunceble.cli.continuous_integration.base import ContinuousIntegrationBase
from PyFunceble.cli.threads.base import ThreadsBase
from PyFunceble.cli.threads.utils import wait_until_completion


[docs]class TesterThread(ThreadsBase): """ Provides our tester thread logic. The thread behind this object, will read :code:`the_queue`, test all entries until a `stop` is given and store the result into :code:`output_queue`. """ thread_name: str = "pyfunceble_tester" continuous_integration: ContinuousIntegrationBase = None
[docs] @staticmethod def done_callback(func: concurrent.futures.Future) -> None: """ This method will be executed after each task run. :raise Exception: The the task has some exception. """ if func.done() and func.exception(): PyFunceble.facility.Logger.critical( "Fatal error while testing.", exc_info=True ) raise func.exception()
[docs] @staticmethod def get_testing_object(subject_type: str, checker_type: str) -> CheckerBase: """ Provides the object to use for testing. :raise ValueError: When the given subject type is unkown. """ known = { "SYNTAX": {"domain": DomainSyntaxChecker, "url": URLSyntaxChecker}, "AVAILABILITY": { "domain": DomainAndIPAvailabilityChecker, "url": URLAvailabilityChecker, }, "REPUTATION": { "domain": DomainAndIPReputationChecker, "url": URLReputationChecker, }, } if checker_type in known: if subject_type in known[checker_type]: # Yes, we initialize before returning! return known[checker_type][subject_type]() raise ValueError(f"<subject_type> ({subject_type!r}) is unknwon.") raise ValueError(f"<testing_mode> ({checker_type!r}) is unknwon.")
[docs] def get_status(self, test_dataset: dict) -> Optional[CheckerStatusBase]: """ This is our part our our target. Meaning that you shouldn't be playing with this outside the CLI. This method reads the given dataset, starts the test and return the result of it after putting it into the declared output queue. Please note a slighly side effect of this method. Indeed, :py:class:`None` will be returned if the subject to test should be ignored. """ if self.should_be_ignored(test_dataset["idna_subject"]): # X means that it was ignored because of our core ignore procedure. PyFunceble.cli.utils.stdout.print_single_line("X") return None if test_dataset["type"] != "single": continue_object = ( PyFunceble.cli.utils.testing.get_continue_databaset_object() ) if test_dataset["output_dir"]: try: continue_object.set_base_directory(test_dataset["output_dir"]) except NotImplementedError: pass if continue_object.exists(test_dataset): # A means that it was ignored because of the continue # logic. PyFunceble.facility.Logger.info( "Ignoring %r because it was already tested previously " "(continue).", test_dataset["idna_subject"], ) PyFunceble.cli.utils.stdout.print_single_line("A") return None if "from_inactive" not in test_dataset: inactive_object = ( PyFunceble.cli.utils.testing.get_inactive_dataset_object() ) if inactive_object.exists(test_dataset): # I means that it was ignored because of the inactive (db) # logic. PyFunceble.facility.Logger.info( "Ignoring %r because it was already tested previously " "(inactive).", test_dataset["idna_subject"], ) PyFunceble.cli.utils.stdout.print_single_line("I") self.add_to_output_queue((test_dataset, "ignored_inactive")) return None testing_object = self.get_testing_object( test_dataset["subject_type"], test_dataset["checker_type"] ) # We want to always check the syntax first (ONLY UNDER THE CLI) testing_object.set_do_syntax_check_first( not bool(PyFunceble.storage.CONFIGURATION.cli_testing.local_network) ) result = ( testing_object.set_subject(test_dataset["idna_subject"]) .query_status() .get_status() ) PyFunceble.facility.Logger.debug("Got status:\n%r.", result) self.add_to_output_queue((test_dataset, result)) return result
[docs] @ThreadsBase.ensure_output_queue_is_given def target(self) -> None: """ This is our core logic. Everything starts here! """ with concurrent.futures.ThreadPoolExecutor( max_workers=PyFunceble.storage.CONFIGURATION.cli_testing.max_workers, thread_name_prefix=self.thread_name, ) as executor: submitted_list: List[concurrent.futures.Future] = [] stop_message_caught = False # Don't worry, if we are not under a CI engine, the # is_time_exceeded will automatically return false (cf: decorators) while ( self.continuous_integration and not self.continuous_integration.is_time_exceeded() ) or True: if self.the_queue.empty(): continue consumed = self.the_queue.get() PyFunceble.facility.Logger.debug("Got: %r", consumed) if consumed == "stop": PyFunceble.facility.Logger.info( "Got stop message. Stopping reading from the queue." ) stop_message_caught = True break if not isinstance(consumed, dict): PyFunceble.facility.Logger.debug( "Waiting for new dataset " "because inputed data was not a dictionnary." ) continue PyFunceble.facility.Logger.debug("Submitting: %r", consumed) submitted = executor.submit(self.get_status, consumed) PyFunceble.facility.Logger.debug("Submitted: %r", consumed) PyFunceble.facility.Logger.debug( "Adding callback into submitted: %r", submitted ) # submitted.add_done_callback(self.done_callback) PyFunceble.facility.Logger.debug( "Added callback into submitted: %r", submitted ) submitted_list.append(submitted) if PyFunceble.storage.CONFIGURATION.cli_testing.cooldown_time > 0: PyFunceble.facility.Logger.info( "Sleeping: %rs for our own safety :-)", PyFunceble.storage.CONFIGURATION.cli_testing.cooldown_time, ) # Apply cooldowntime. time.sleep( PyFunceble.storage.CONFIGURATION.cli_testing.cooldown_time ) PyFunceble.facility.Logger.info( "Slept: %rs for our own safety :-)", PyFunceble.storage.CONFIGURATION.cli_testing.cooldown_time, ) wait_until_completion(submitted_list, raise_exc=True) if stop_message_caught: self.add_to_output_queue("stop")