"""
The tool to check the availability or syntax of domain, IP or URL.
::
██████╗ ██╗ ██╗███████╗██╗ ██╗███╗ ██╗ ██████╗███████╗██████╗ ██╗ ███████╗
██╔══██╗╚██╗ ██╔╝██╔════╝██║ ██║████╗ ██║██╔════╝██╔════╝██╔══██╗██║ ██╔════╝
██████╔╝ ╚████╔╝ █████╗ ██║ ██║██╔██╗ ██║██║ █████╗ ██████╔╝██║ █████╗
██╔═══╝ ╚██╔╝ ██╔══╝ ██║ ██║██║╚██╗██║██║ ██╔══╝ ██╔══██╗██║ ██╔══╝
██║ ██║ ██║ ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝
Provides the base of all reputation checker classes.
Author:
Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom
Special thanks:
https://pyfunceble.github.io/#/special-thanks
Contributors:
https://pyfunceble.github.io/#/contributors
Project link:
https://github.com/funilrys/PyFunceble
Project documentation:
https://pyfunceble.readthedocs.io/en/latest/
Project homepage:
https://pyfunceble.github.io/
License:
::
Copyright 2017, 2018, 2019, 2020, 2022, 2023 Nissar Chababy
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from typing import List, Optional
from sqlalchemy.orm import Session
import PyFunceble.facility
import PyFunceble.factory
import PyFunceble.storage
from PyFunceble.checker.base import CheckerBase
from PyFunceble.checker.reputation.params import ReputationCheckerParams
from PyFunceble.checker.reputation.status import ReputationCheckerStatus
from PyFunceble.checker.syntax.domain import DomainSyntaxChecker
from PyFunceble.checker.syntax.ip import IPSyntaxChecker
from PyFunceble.checker.syntax.url import URLSyntaxChecker
from PyFunceble.dataset.ipv4_reputation import IPV4ReputationDataset
from PyFunceble.query.dns.query_tool import DNSQueryTool
[docs]class ReputationCheckerBase(CheckerBase):
"""
Provides the base of all our reputation checker classes.
:param str subject:
Optional, The subject to work with.
:param bool do_syntax_check_first:
Optional, Activates/Disables the check of the status before the actual
status gathering.
"""
dns_query_tool: Optional[DNSQueryTool] = None
ipv4_reputation_query_tool: Optional[IPV4ReputationDataset] = None
domain_syntax_checker: Optional[DomainSyntaxChecker] = None
ip_syntax_checker: Optional[IPSyntaxChecker] = None
url_syntax_checker: Optional[URLSyntaxChecker] = None
status: Optional[ReputationCheckerStatus] = None
params: Optional[ReputationCheckerParams] = None
def __init__(
self,
subject: Optional[str] = None,
do_syntax_check_first: Optional[bool] = None,
db_session: Optional[Session] = None,
use_collection: Optional[bool] = None,
) -> None:
self.dns_query_tool = DNSQueryTool()
self.ipv4_reputation_query_tool = IPV4ReputationDataset()
self.domain_syntax_checker = DomainSyntaxChecker()
self.ip_syntax_checker = IPSyntaxChecker()
self.url_syntax_checker = URLSyntaxChecker()
self.params = ReputationCheckerParams()
self.status = ReputationCheckerStatus()
self.status.params = self.params
self.status.dns_lookup_record = self.dns_query_tool.lookup_record
super().__init__(
subject,
do_syntax_check_first=do_syntax_check_first,
db_session=db_session,
use_collection=use_collection,
)
[docs] @staticmethod
def is_valid() -> bool: # pylint: disable=arguments-differ
raise NotImplementedError()
[docs] def subject_propagator(self) -> "CheckerBase":
"""
Propagate the currently set subject.
.. warning::
You are not invited to run this method directly.
"""
self.dns_query_tool.set_subject(self.idna_subject)
self.domain_syntax_checker.subject = self.idna_subject
self.ip_syntax_checker.subject = self.idna_subject
self.url_syntax_checker.subject = self.idna_subject
self.status = ReputationCheckerStatus()
self.status.params = self.params
self.status.dns_lookup_record = self.dns_query_tool.lookup_record
return super().subject_propagator()
[docs] def should_we_continue_test(self, status_post_syntax_checker: str) -> bool:
"""
Checks if we are allowed to continue a standard testing.
"""
return bool(
not self.status.status
or status_post_syntax_checker == PyFunceble.storage.STATUS.invalid
)
[docs] def query_common_checker(self) -> "ReputationCheckerBase":
"""
Queries the common checkers.
"""
self.status.second_level_domain_syntax = (
self.domain_syntax_checker.is_valid_second_level()
)
self.status.subdomain_syntax = self.domain_syntax_checker.is_valid_subdomain()
self.status.domain_syntax = bool(self.status.subdomain_syntax) or bool(
self.status.second_level_domain_syntax
)
self.status.ipv4_syntax = self.ip_syntax_checker.is_valid_v4()
self.status.ipv6_syntax = self.ip_syntax_checker.is_valid_v6()
self.status.ipv4_range_syntax = self.ip_syntax_checker.is_valid_v4_range()
self.status.ipv6_range_syntax = self.ip_syntax_checker.is_valid_v6_range()
self.status.ip_syntax = bool(self.status.ipv4_syntax or self.status.ipv6_syntax)
self.status.url_syntax = self.url_syntax_checker.is_valid()
return super().query_common_checker()
[docs] def query_a_record(self) -> Optional[List[str]]:
"""
Queries all the A record.
"""
raise NotImplementedError()
[docs] def try_to_query_status_from_dns_lookup(self) -> "ReputationCheckerBase":
"""
Tries to query the status from the DNS lookup.
"""
PyFunceble.facility.Logger.info(
"Started to try to query the status of %r from: DNS Lookup",
self.status.idna_subject,
)
if not self.status.ipv4_syntax:
lookup_result = self.query_a_record()
self.status.dns_lookup = lookup_result
else:
lookup_result = [self.status.subject]
if lookup_result:
for subject in lookup_result:
if subject in self.ipv4_reputation_query_tool:
self.status.status = PyFunceble.storage.STATUS.malicious
self.status.status_source = "REPUTATION"
PyFunceble.facility.Logger.info(
"Could define the status of %r from: DNS Lookup",
self.status.idna_subject,
)
break
PyFunceble.facility.Logger.info(
"Finished to try to query the status of %r from: DNS Lookup",
self.status.idna_subject,
)
return self
[docs] def try_to_query_status_from_syntax_lookup(self) -> "ReputationCheckerBase":
"""
Tries to query the status from the syntax.
"""
PyFunceble.facility.Logger.info(
"Started to try to query the status of %r from: Syntax Lookup",
self.status.idna_subject,
)
if (
not self.status.domain_syntax
and not self.status.ip_syntax
and not self.status.url_syntax
):
self.status.status = PyFunceble.storage.STATUS.invalid
self.status.status_source = "SYNTAX"
PyFunceble.facility.Logger.info(
"Could define the status of %r from: Syntax Lookup",
self.status.idna_subject,
)
PyFunceble.facility.Logger.info(
"Finished to try to query the status of %r from: Syntax Lookup",
self.status.idna_subject,
)
return self
[docs] def try_to_query_status_from_collection(self) -> "ReputationCheckerBase":
"""
Tries to get and set the status from the Collection API.
"""
PyFunceble.facility.Logger.info(
"Started to try to query the status of %r from: Collection Lookup",
self.status.idna_subject,
)
data = self.collection_query_tool[self.idna_subject]
if data and "status" in data:
if (
self.collection_query_tool.preferred_status_origin == "frequent"
and data["status"]["reputation"]["frequent"]
):
self.status.status = data["status"]["reputation"]["frequent"]
self.status.status_source = "COLLECTION"
elif (
self.collection_query_tool.preferred_status_origin == "latest"
and data["status"]["reputation"]["latest"]
):
self.status.status = data["status"]["reputation"]["latest"]["status"]
self.status.status_source = "COLLECTION"
elif (
self.collection_query_tool.preferred_status_origin == "recommended"
and data["status"]["reputation"]["recommended"]
):
self.status.status = data["status"]["reputation"]["recommended"]
self.status.status_source = "COLLECTION"
PyFunceble.facility.Logger.info(
"Could define the status of %r from: Collection Lookup",
self.status.idna_subject,
)
PyFunceble.facility.Logger.info(
"Finished to try to query the status of %r from: Collection Lookup",
self.status.idna_subject,
)
[docs] @CheckerBase.ensure_subject_is_given
@CheckerBase.update_status_date_after_query
def query_status(self) -> "ReputationCheckerBase":
"""
Queries the status and for for more action.
"""
status_post_syntax_checker = None
if (
not self.status.status and self.use_collection
): # pragma: no cover ## Underlying tested
self.try_to_query_status_from_collection()
if not self.status.status and self.do_syntax_check_first:
self.try_to_query_status_from_syntax_lookup()
if self.status.status:
status_post_syntax_checker = self.status.status
if self.should_we_continue_test(status_post_syntax_checker):
self.try_to_query_status_from_dns_lookup()
if not self.status.status:
self.status.status = PyFunceble.storage.STATUS.sane
self.status.status_source = "REPUTATION"
PyFunceble.facility.Logger.info(
"Could not define the status of %r. Setting to %r",
self.status.idna_subject,
self.status.status,
)
return self
# pylint: disable=useless-super-delegation
[docs] def get_status(self) -> Optional[ReputationCheckerStatus]:
return super().get_status()