Source code for PyFunceble.query.dns.query_tool

The tool to check the availability or syntax of domain, IP or URL.


    ██████╗ ██╗   ██╗███████╗██╗   ██╗███╗   ██╗ ██████╗███████╗██████╗ ██╗     ███████╗
    ██╔══██╗╚██╗ ██╔╝██╔════╝██║   ██║████╗  ██║██╔════╝██╔════╝██╔══██╗██║     ██╔════╝
    ██████╔╝ ╚████╔╝ █████╗  ██║   ██║██╔██╗ ██║██║     █████╗  ██████╔╝██║     █████╗
    ██╔═══╝   ╚██╔╝  ██╔══╝  ██║   ██║██║╚██╗██║██║     ██╔══╝  ██╔══██╗██║     ██╔══╝
    ██║        ██║   ██║     ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
    ╚═╝        ╚═╝   ╚═╝      ╚═════╝ ╚═╝  ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝

Provides an interface for the query.

    Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom

Special thanks:


Project link:

Project documentation:

Project homepage:


    Copyright 2017, 2018, 2019, 2020 Nissar Chababy

    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.

import copy
import functools
import ipaddress
import random
import socket
import time
from typing import Dict, List, Optional, Union

import dns.exception
import dns.message
import dns.query
import dns.rdataclass
import dns.rdatatype

import PyFunceble.facility
from PyFunceble.helpers.list import ListHelper
from PyFunceble.query.dns.nameserver import Namseservers
from PyFunceble.query.record.dns import DNSQueryToolRecord

[docs]class DNSQueryTool: """ Provides our query tool. """ # pylint: disable=too-many-public-methods STD_PROTOCOL: str = "UDP" STD_TIMEOUT: float = 5.0 SUPPORTED_PROTOCOL: List[str] = ["TCP", "UDP", "HTTPS", "TLS"] BREAKOFF: float = 0.2 value2rdata_type: Dict[int, str] = { x.value: for x in dns.rdatatype.RdataType } rdata_type2value: Dict[str, int] = { x.value for x in dns.rdatatype.RdataType } nameservers: Namseservers = Namseservers() _query_record_type: int = dns.rdatatype.RdataType.ANY _subject: Optional[str] = None _follow_nameserver_order: bool = True _prefered_protocol: str = "UDP" _query_timeout: float = 3.0 dns_name: Optional[str] = None query_message: Optional[dns.message.QueryMessage] = None lookup_record: Optional[DNSQueryToolRecord] = None def __init__( self, *, nameservers: Optional[List[str]] = None, follow_nameserver_order: bool = True, prefered_protocol: Optional[str] = None, ) -> None: if nameservers is not None: self.set_nameservers(nameservers) else: # pragma: no cover ## I'm not playing with system resolver. self.nameservers.guess_and_set_nameservers() if prefered_protocol is not None: self.set_prefered_protocol(prefered_protocol) else: self.guess_and_set_prefered_protocol() self.set_follow_nameserver_order(follow_nameserver_order)
[docs] def prepare_query(func): # pylint: disable=no-self-argument """ Prepare the query after running the decorated method. """ @functools.wraps(func) def wrapper(self, *args, **kwargs): result = func(self, *args, **kwargs) # pylint: disable=not-callable if self.subject and self.query_record_type: self.dns_name = self.get_dns_name_from_subject_and_query_type() if self.dns_name: self.query_message = dns.message.make_query( self.dns_name, self.query_record_type ) else: self.query_message = None return result return wrapper
[docs] def update_lookup_record(func): # pylint: disable=no-self-argument """ Ensures that a clean record is generated after the execution of the decorated method. """ @functools.wraps(func) def wrapper(self, *args, **kwargs): result = func(self, *args, **kwargs) # pylint: disable=not-callable if self.lookup_record is None or self.subject != self.lookup_record.subject: self.lookup_record = DNSQueryToolRecord() self.lookup_record.subject = self.subject if self.dns_name != self.lookup_record.dns_name: self.lookup_record.dns_name = self.dns_name if ( self.get_human_query_record_type() != self.lookup_record.query_record_type ): self.lookup_record.query_record_type = ( self.get_human_query_record_type() ) if ( self.follow_nameserver_order != self.lookup_record.follow_nameserver_order ): self.lookup_record.follow_nameserver_order = ( self.follow_nameserver_order ) if self.query_timeout != self.lookup_record.query_timeout: self.lookup_record.query_timeout = self.query_timeout if self.prefered_protocol != self.lookup_record.prefered_protocol: self.lookup_record.prefered_protocol = self.prefered_protocol return result return wrapper
[docs] def ensure_subject_is_given(func): # pylint: disable=no-self-argument """ Ensures that the subject to work with is given before running the decorated method. :raise TypeError: If :code:`self.subject` is not a :py:class:`str`. :raise ValueError: If :code:`self.subject` is empty. """ @functools.wraps(func) def wrapper(self, *args, **kwargs): # pragma: no cover ## Safety! if not isinstance(self.subject, str): raise TypeError( f"<self.subject> should be {str}, {type(self.subject)} given." ) if not self.subject: raise ValueError("<self.subject> should not be empty.") return func(self, *args, **kwargs) # pylint: disable=not-callable return wrapper
[docs] def update_lookup_record_response(func): # pylint: disable=no-self-argument """ Ensures that the response of the decorated method is set as response in our record. """ @functools.wraps(func) def wrapper(self, *args, **kwargs): # pragma: no cover ## Just common sense result = func(self, *args, **kwargs) # pylint: disable=not-callable if result != self.lookup_record.response: self.lookup_record.response = result return result return wrapper
[docs] def ignore_if_query_message_is_missing(func): # pylint: disable=no-self-argument """ Ignores the call to the decorated method if the query message is missing. Otherwise, return an empty list. """ @functools.wraps(func) def wrapper(self, *args, **kwargs): # pragma: no cover ## Just common sense if self.query_message: return func(self, *args, **kwargs) # pylint: disable=not-callable return [] return wrapper
[docs] @ensure_subject_is_given def get_dns_name_from_subject_and_query_type(self): """ Provides the dns name based on the current subject and query type. """ try: if self.get_human_query_record_type().lower() == "ptr": try: return ipaddress.ip_address(self.subject).reverse_pointer ) except ValueError: return return except return None
@property def subject(self) -> Optional[str]: """ Provides the current state of the :code:`_subject` attribute. """ return self._subject @subject.setter @prepare_query @update_lookup_record def subject(self, value: str) -> None: """ Sets the subject to work with. :param value: The subject to set. :raise TypeError: When the given :code:`value` is not a :py:class:`str`. :raise ValueError: When the given :code:`value` is empty. """ if not isinstance(value, str): raise TypeError(f"<value> should be {str}, {type(value)} given.") if not value: raise ValueError("<value> should not be empty.") self._subject = value
[docs] def set_subject(self, value: str) -> "DNSQueryTool": """ Sets the subject to work with. :param value: The subject to set. """ self.subject = value return self
[docs] def set_nameservers(self, value: List[str]) -> "DNSQueryTool": """ Sets the nameservers to work with. :raise TypeError: When the given :code:`value` is not a list of :py:class:`str`. :raise ValueError: When the given :code:`value` is empty. """ self.nameservers.set_nameservers(value)
@property def follow_nameserver_order(self) -> bool: """ Provides the current state of the :code:`_follow_nameserver_order` attribute. """ return self._follow_nameserver_order @follow_nameserver_order.setter @update_lookup_record def follow_nameserver_order(self, value: bool) -> None: """ Updates the :code:`follow_nameserver_order` variable. :param value: The value to set. :raise TypeError: When the given :code:`value` is not a :py:class:`bool`. """ if not isinstance(value, bool): raise TypeError(f"<value> should be {bool}, {type(value)} given.") self._follow_nameserver_order = value
[docs] def set_follow_nameserver_order(self, value: bool) -> "DNSQueryTool": """ Updates the :code:`follow_nameserver_order` variable. :param value: The value to set. """ self.follow_nameserver_order = value return self
@property def query_record_type(self) -> int: """ Provides the current state of the :code:`_query_record_type` attribute. """ return self._query_record_type @query_record_type.setter @prepare_query @update_lookup_record def query_record_type(self, value: Union[str, int]) -> None: """ Sets the DNS record type to query. :param value: The value to set. It can be the human version (e.g AAAA) or an integer as registered in the :code:`value2rdata_type` attribute. :raise TypeError: When the given :code:`value` is not a :py:class:`str` nor :py:class:`int`. :raise ValueError: When the given :code:`value` is unknown or unsupported. """ if not isinstance(value, (str, int)): raise TypeError(f"<value> should be {int} or {str}, {type(value)} given.") if value in self.rdata_type2value: self._query_record_type = self.rdata_type2value[value] elif value in self.value2rdata_type: self._query_record_type = value else: raise ValueError(f"<value> ({value!r}) is unknown or unsupported.")
[docs] def set_query_record_type(self, value: Union[str, int]) -> "DNSQueryTool": """ Sets the DNS record type to query. :param value: The value to set. It can be the human version (e.g AAAA) or an integer as registered in the :code:`value2rdata_type` attribute. """ self.query_record_type = value return self
[docs] def get_human_query_record_type(self) -> str: """ Provides the currently set record type. """ return self.value2rdata_type[self.query_record_type]
@property def query_timeout(self) -> float: """ Provides the current state of the :code:`_query_timeout` attribute. """ return self._query_timeout @query_timeout.setter @update_lookup_record def query_timeout(self, value: Union[int, float]) -> None: """ Sets the timeout to apply. :param value: The timeout to apply. :raise TypeError: When the given :code:`value` is not a :py:class:`float` nor :py:class.`int`. """ if not isinstance(value, (float, int)): raise TypeError(f"<value> should be {float} or {int}, {type(value)} given.") self._query_timeout = float(value)
[docs] def set_timeout(self, value: Union[int, float]) -> "DNSQueryTool": """ Sets the timeout to apply. :param value: The timeout to apply. """ self.query_timeout = value return self
[docs] def guess_and_set_timeout(self) -> "DNSQueryTool": """ Try to guess and set the timeout. """ if PyFunceble.facility.ConfigLoader.is_already_loaded(): if self.query_timeout = else: self.query_timeout = self.STD_TIMEOUT else: self.query_timeout = self.STD_TIMEOUT return self
@property def prefered_protocol(self) -> Optional[str]: """ Provides the current state of the :code:`_prefered_protocol` attribute. """ return self._prefered_protocol @prefered_protocol.setter def prefered_protocol(self, value: str) -> None: """ Sets the prefered protocol. :param value: The protocol to use. :raise TypeError: When the given :code:`value` is not a :py:class:`str`. :raise ValueError: Whent the given :code:`value` is unkown or unsupported. """ if not isinstance(value, str): raise TypeError(f"<value> should be {str}, {type(value)} given.") value = value.upper() if value not in self.SUPPORTED_PROTOCOL: raise ValueError( f"<value> {value!r} is unknown or unsupported " f"(supported: {self.SUPPORTED_PROTOCOL!r})." ) self._prefered_protocol = self.nameservers.protocol = value
[docs] def set_prefered_protocol(self, value: str) -> "DNSQueryTool": """ Sets the prefered protocol. :param value: The protocol to use. """ self.prefered_protocol = value return self
[docs] def guess_and_set_prefered_protocol(self) -> "DNSQueryTool": """ Try to guess and set the prefered procol. """ if PyFunceble.facility.ConfigLoader.is_already_loaded(): if self.prefered_protocol = else: self.prefered_protocol = self.STD_PROTOCOL else: self.prefered_protocol = self.STD_PROTOCOL return self
[docs] def guess_all_settings(self) -> "DNSQueryTool": """ Try to guess all settings. """ to_ignore = ["guess_all_settings"] for method in dir(self): if method in to_ignore or not method.startswith("guess_"): continue getattr(self, method)() return self
[docs] def get_prefered_protocol(self) -> Optional[str]: """ Provides the currently set prefered protocol. """ return self.prefered_protocol
[docs] def get_lookup_record( self, ) -> Optional[DNSQueryToolRecord]: # pragma: no cover ## It's just a dataclass """ Provides the current query record. """ return self.lookup_record
[docs] def _get_result_from_response( self, response: dns.message.Message ) -> List[str]: # pragma: no cover ## Let's trust upstream tests. """ Given a response, we return the best possible result. """ result = [] rrset = response.get_rrset( response.answer, self.dns_name, dns.rdataclass.RdataClass.IN, self.query_record_type, ) if rrset: result.extend([x.to_text() for x in rrset]) PyFunceble.facility.Logger.debug("Result from response:\r%r", result) return result
[docs] def _mix_order( self, data: Union[dict, List[str]] ) -> Union[dict, List[str]]: # pragma: no cover ## Just a shuffle :-) """ Given a dataset, we mix its order. """ dataset = copy.deepcopy(data) if not self.follow_nameserver_order: if isinstance(dataset, list): random.shuffle(dataset) return dataset if isinstance(dataset, dict): temp = list(dataset.items()) random.shuffle(temp) return dict(temp) PyFunceble.facility.Logger.debug("Mixed data:\n%r", dataset) return dataset
[docs] @ensure_subject_is_given @ignore_if_query_message_is_missing @update_lookup_record_response def tcp( self, ) -> Optional[List[str]]: # pragma: no cover ## Let's trust upstream tests. """ Request the chosen record through the TCP protocol. """ self.lookup_record.used_protocol = "TCP" result = [] for nameserver, port in self._mix_order( self.nameservers.get_nameserver_ports() ).items(): PyFunceble.facility.Logger.debug( "Started to query information of %r from %r", self.subject, nameserver ) try: response = dns.query.tcp( self.query_message, nameserver, port=port, timeout=self.query_timeout, ) local_result = self._get_result_from_response(response) if local_result: result.extend(local_result) self.lookup_record.nameserver = nameserver self.lookup_record.port = port PyFunceble.facility.Logger.debug( "Successfully queried information of %r from %r.", self.subject, nameserver, ) break except dns.exception.Timeout: pass PyFunceble.facility.Logger.debug( "Unsuccessfully queried information of %r from %r. Sleeping %fs.", self.subject, nameserver, self.BREAKOFF, ) time.sleep(self.BREAKOFF) return ListHelper(result).remove_duplicates().subject
[docs] @ensure_subject_is_given @ignore_if_query_message_is_missing @update_lookup_record_response def udp( self, ) -> Optional[List[str]]: # pragma: no cover ## Let's trust upstream tests. """ Request the chosen record through the UTP protocol. """ self.lookup_record.used_protocol = "UDP" result = [] for nameserver, port in self._mix_order( self.nameservers.get_nameserver_ports() ).items(): PyFunceble.facility.Logger.debug( "Started to query information of %r from %r", self.subject, nameserver ) try: response = dns.query.udp( self.query_message, nameserver, port=port, timeout=self.query_timeout, ) local_result = self._get_result_from_response(response) if local_result: result.extend(local_result) self.lookup_record.nameserver = nameserver self.lookup_record.port = port PyFunceble.facility.Logger.debug( "Successfully queried information of %r from %r.", self.subject, nameserver, ) break except (dns.exception.Timeout, socket.gaierror): pass PyFunceble.facility.Logger.debug( "Unsuccessfully queried information of %r from %r. Sleeping %fs.", self.subject, nameserver, self.BREAKOFF, ) time.sleep(self.BREAKOFF) return ListHelper(result).remove_duplicates().subject
[docs] @ensure_subject_is_given @ignore_if_query_message_is_missing @update_lookup_record_response def https( self, ) -> Optional[List[str]]: # pragma: no cover ## Let's trust upstream tests. """ Request the chosen record through the https protocol. """ self.lookup_record.used_protocol = "HTTPS" result = [] for nameserver in self._mix_order(self.nameservers.get_nameservers()): PyFunceble.facility.Logger.debug( "Started to query information of %r from %r", self.subject, nameserver ) try: response = dns.query.https( self.query_message, nameserver, timeout=self.query_timeout ) local_result = self._get_result_from_response(response) if local_result: result.extend(local_result) self.lookup_record.nameserver = nameserver PyFunceble.facility.Logger.debug( "Successfully queried information of %r from %r.", self.subject, nameserver, ) break except dns.exception.Timeout: pass PyFunceble.facility.Logger.debug( "Unsuccessfully queried information of %r from %r. Sleeping %fs.", self.subject, nameserver, self.BREAKOFF, ) time.sleep(self.BREAKOFF) return ListHelper(result).remove_duplicates().subject
[docs] @ensure_subject_is_given @ignore_if_query_message_is_missing @update_lookup_record_response def tls( self, ) -> Optional[List[str]]: # pragma: no cover ## Let's trust upstream tests. """ Request the chosen record through the TLS protocol. """ self.lookup_record.used_protocol = "TLS" result = [] for nameserver, port in self._mix_order( self.nameservers.get_nameserver_ports() ).items(): PyFunceble.facility.Logger.debug( "Started to query information of %r from %r", self.subject, nameserver ) if port == 53: # Default port for nameserver class is 53. So we ensure we # overwrite with our own default. port = 853 try: response = dns.query.tls( self.query_message, nameserver, port=port, timeout=self.query_timeout, ) local_result = self._get_result_from_response(response) if local_result: result.extend(local_result) self.lookup_record.nameserver = nameserver self.lookup_record.port = port PyFunceble.facility.Logger.debug( "Successfully queried information of %r from %r.", self.subject, nameserver, ) break except dns.exception.Timeout: pass PyFunceble.facility.Logger.debug( "Unsuccessfully queried information of %r from %r. Sleeping %fs.", self.subject, nameserver, self.BREAKOFF, ) time.sleep(self.BREAKOFF) return ListHelper(result).remove_duplicates().subject
[docs] def query( self, ) -> Optional[List[str]]: # pragma: no cover ## Let's trust upstream tests. """ Process the query based on the prefered protocol. """ return getattr(self, self.prefered_protocol.lower())()