"""
The tool to check the availability or syntax of domain, IP or URL.
::
██████╗ ██╗ ██╗███████╗██╗ ██╗███╗ ██╗ ██████╗███████╗██████╗ ██╗ ███████╗
██╔══██╗╚██╗ ██╔╝██╔════╝██║ ██║████╗ ██║██╔════╝██╔════╝██╔══██╗██║ ██╔════╝
██████╔╝ ╚████╔╝ █████╗ ██║ ██║██╔██╗ ██║██║ █████╗ ██████╔╝██║ █████╗
██╔═══╝ ╚██╔╝ ██╔══╝ ██║ ██║██║╚██╗██║██║ ██╔══╝ ██╔══██╗██║ ██╔══╝
██║ ██║ ██║ ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝
Provides the base of all our workers.
Author:
Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom
Special thanks:
https://pyfunceble.github.io/#/special-thanks
Contributors:
https://pyfunceble.github.io/#/contributors
Project link:
https://github.com/funilrys/PyFunceble
Project documentation:
https://pyfunceble.readthedocs.io/en/dev/
Project homepage:
https://pyfunceble.github.io/
License:
::
Copyright 2017, 2018, 2019, 2020 Nissar Chababy
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import multiprocessing
import multiprocessing.connection
import queue
import time
import traceback
from datetime import datetime, timedelta
from typing import Any, Optional, Tuple
import PyFunceble.facility
from PyFunceble.cli.continuous_integration.base import ContinuousIntegrationBase
[docs]class WorkerBase(multiprocessing.Process):
"""
Provides the base of all our workers.
:param input_queue:
The input queue to read.
:param output_queue:
The output queue to write.
"""
STD_NAME: str = "pyfunceble_base_worker"
MINING_WAIT_TIME: int = 10
BREAKOFF: float = 0.2
input_queue: Optional[queue.Queue] = None
output_queue: Optional[queue.Queue] = None
continuous_integration: ContinuousIntegrationBase = None
global_exit_event: Optional[multiprocessing.Event] = None
exit_it: Optional[multiprocessing.Event] = None
send_feeding_message: Optional[bool] = None
accept_waiting_delay: Optional[bool] = None
_parent_connection: Optional[multiprocessing.connection.Connection] = None
_child_connection: Optional[multiprocessing.connection.Connection] = None
_exception: Optional[multiprocessing.Pipe] = None
def __init__(
self,
input_queue: Optional[queue.Queue],
output_queue: Optional[queue.Queue] = None,
global_exit_event: Optional[multiprocessing.Event] = None,
*,
name: Optional[str] = None,
daemon: Optional[bool] = None,
continuous_integration: Optional[ContinuousIntegrationBase] = None,
configuration: Optional[dict] = None,
) -> None:
self.configuration = configuration
self.input_queue = input_queue
self.output_queue = output_queue
self.continuous_integration = continuous_integration
self.global_exit_event = global_exit_event
self.exit_it = multiprocessing.Event()
self._parent_connection, self._child_connection = multiprocessing.Pipe()
self._exception = None
self.send_feeding_message = True
self.accept_waiting_delay = True
super().__init__(name=name, daemon=daemon)
self.__post_init__()
def __post_init__(self) -> None:
"""
A method which will be executed after the :code:`__init__` method.
"""
@property
def exception(self):
"""
Provides the exception of the current worker.
"""
if self._parent_connection.poll():
self._exception = self._parent_connection.recv()
return self._exception
[docs] def add_to_output_queue(
self, data: Any, *, worker_name: Optional[str] = None
) -> "WorkerBase":
"""
Adds the given data to the output queue queue.
:param data:
The data to add into the queue.
"""
if worker_name:
to_send = (worker_name, data)
else:
to_send = (self.name, data)
self.output_queue.put(to_send)
PyFunceble.facility.Logger.debug("Added to the (output) queue: %r", data)
return self
[docs] def target(self, consumed: Any) -> Optional[Tuple[Any, ...]]:
"""
This the target that is run to process something.
This method should return a result which will pu send to the output
queue.
"""
raise NotImplementedError()
[docs] def run(self) -> None: # pylint: disable=too-many-statements
def break_from_feeder(feeding_worker_events: dict) -> bool:
"""
Checks if we should from based on the current state of the
feeding worker events.
:param feeding_worker_events:
The events which were caught so far.
"""
if not feeding_worker_events:
return False
return all(not x for x in feeding_worker_events.values())
def break_now() -> bool:
"""
Checks if it is time to make a break.
"""
if not wait_for_stop or not self.accept_waiting_delay:
return True
return datetime.utcnow() > break_time
if self.configuration is not None:
PyFunceble.facility.ConfigLoader.set_custom_config(self.configuration)
PyFunceble.facility.ConfigLoader.start()
feeding_worker = dict()
wait_for_stop = (
bool(PyFunceble.storage.CONFIGURATION.cli_testing.mining) is True
)
break_time = datetime.utcnow() + timedelta(seconds=self.MINING_WAIT_TIME)
try:
while True:
if self.global_exit_event.is_set():
PyFunceble.facility.Logger.info(
"Got global exit event. Stopping runner."
)
self.add_to_input_queue("stop")
break
if self.exit_it.is_set():
PyFunceble.facility.Logger.info("Got exit event. Stopping runner.")
self.add_to_output_queue("stop")
break
if (
self.continuous_integration
and self.continuous_integration.is_time_exceeded()
):
PyFunceble.facility.Logger.info(
"CI time exceeded. Stopping runner."
)
if break_now():
self.add_to_output_queue("stop")
break
self.add_to_input_queue("wait")
continue
try:
worker_name, consumed = self.input_queue.get()
except queue.Empty:
PyFunceble.facility.Logger.debug("Queue is empty. Continue.")
continue
except EOFError:
PyFunceble.facility.Logger.info("Got EOFError. Stopping runner.")
self.global_exit_event.set()
break
PyFunceble.facility.Logger.info(
"Got (from %r): %r", worker_name, consumed
)
PyFunceble.facility.Logger.debug("Feeding workers: %r", feeding_worker)
if consumed == "stop":
if feeding_worker and not worker_name.startswith(
self.name[: self.name.rfind("_")]
):
feeding_worker[worker_name] = False
if break_from_feeder(feeding_worker) and break_now():
PyFunceble.facility.Logger.info(
"Got stop message from %r (all feeders). Applying.",
worker_name,
)
self.add_to_output_queue("stop")
break
PyFunceble.facility.Logger.info(
"Got stop message from %r. Keeping track of it.",
worker_name,
)
self.add_to_input_queue("wait")
continue
if break_now():
PyFunceble.facility.Logger.info(
"Got stop message from %r (all feeders?). Applying.",
worker_name,
)
self.add_to_output_queue("stop")
break
self.add_to_input_queue("wait")
continue
if consumed == "feeding":
PyFunceble.facility.Logger.info(
"%r is feeding this worker. "
"Keeping track of that information.",
worker_name,
)
if (
worker_name not in feeding_worker
and not worker_name.startswith(
self.name[: self.name.rfind("_")]
)
):
feeding_worker[worker_name] = True
if self.send_feeding_message:
self.add_to_output_queue("feeding", worker_name=self.name)
continue
if consumed == "wait":
if not wait_for_stop:
continue
if break_now():
PyFunceble.facility.Logger.debug(
"Waited sufficiently. Stopping current worker."
)
self.add_to_output_queue("stop")
break
PyFunceble.facility.Logger.debug(
"We need to wait a bit more. Continue waiting."
)
self.add_to_input_queue("wait")
time.sleep(self.BREAKOFF)
continue
if break_from_feeder(feeding_worker) and break_now():
PyFunceble.facility.Logger.info(
"Got stop message from all feeders. Stopping current worker."
)
self.add_to_output_queue("stop")
break
result = self.target(consumed)
if result is not None:
self.add_to_output_queue(result)
break_time = datetime.utcnow() + timedelta(
seconds=self.MINING_WAIT_TIME
)
except Exception as exception: # pylint: disable=broad-except
PyFunceble.facility.Logger.critical(
"Error while running target", exc_info=True
)
trace = traceback.format_exc()
self._child_connection.send((exception, trace))
raise exception
[docs] def terminate(self) -> None:
"""
Terminate our runner.
"""
self.exit_it.set()