Source code for PyFunceble.lookup.dns

Provides the DNS lookup interface.

from socket import IPPROTO_TCP, gaierror, getaddrinfo, gethostbyaddr, herror

import dns.resolver
import dns.reversename
from dns.exception import DNSException

import PyFunceble

[docs]class DNSLookup: # pylint: disable=too-few-public-methods """ DNS lookup interface. :param str subject: The subject we are working with. :param dns_server: The DNS server we are working with. :type dns_server: list|tuple|str :param int lifetime: Set the lifetime of a query. """ def __init__(self, dns_server=None, lifetime=3, tcp=False): self.given_dns_server = dns_server self.resolver = self.__get_resolver(dns_server) self.update_nameserver(dns_server) self.update_lifetime(lifetime) self.tcp = tcp PyFunceble.LOGGER.debug( f"DNS Resolver Nameservers: {self.resolver.nameservers}" ) PyFunceble.LOGGER.debug( f"DNS Resolver (Nameservers) Port: {self.resolver.nameserver_ports}" ) PyFunceble.LOGGER.debug(f"DNS Resolver Port: {self.resolver.port}") PyFunceble.LOGGER.debug(f"DNS Resolver timeout: {self.resolver.timeout}") PyFunceble.LOGGER.debug(f"DNS Resolver lifetime: {self.resolver.lifetime}") PyFunceble.LOGGER.debug(f"DNS Resolver over TCP: {self.tcp}") PyFunceble.INTERN["dns_lookup"] = { "resolver": self.resolver, "given_dns_server": dns_server, } @classmethod def __get_resolver(cls, dns_server): """ Provides the configured dns resolver. """ if dns_server: # A dns server is given. "DNS Server explicitly given, generating a new configurable resolver." ) PyFunceble.LOGGER.debug(f"Given DNS server: {dns_server}") # We initiate and configure the resolver. resolver = dns.resolver.Resolver(configure=False) return resolver # A dns server is not given. "DNS Server not explicitly given, providing the systemwide one." ) # We configure everything with what the OS gives us. return dns.resolver.Resolver()
[docs] def update_lifetime(self, lifetime): """ Updates the lifetime of a query. """ self.resolver.timeout = 2.0 if isinstance(lifetime, (int, float)): if lifetime == self.resolver.timeout: self.resolver.lifetime = float(lifetime) + 2.0 else: self.resolver.lifetime = float(lifetime) else: self.resolver.lifetime = self.resolver.timeout + 2.0
[docs] def update_nameserver(self, nameserver): """ Updates the nameserver to query. """ if nameserver: nameserver = ( [nameserver] if not isinstance(nameserver, (list, tuple)) else nameserver ) self.resolver.nameserver_ports = { z: y for x, y in self.__get_server_and_port_from(nameserver) for z in self.__get_dns_servers_from(x) } self.resolver.nameservers = list(self.resolver.nameserver_ports.keys()) f"Switched Resolver nameserver to: {self.resolver.nameservers}" ) f"Switched Resolver nameserver port to: {self.resolver.nameserver_ports}" )
def __get_server_and_port_from(self, inputed_dns): # pragma: no cover """ Given a list or an input representing dns server, we split the server from the port. :param inputed_dns: The inputed DNS server(s). :type input: str, list, tuple :return: A tuple with: (The DNS, the port) :rtype: tuple """ if isinstance(inputed_dns, (list, tuple)): result = [] for dns_server in inputed_dns: result.append(self.__get_server_and_port_from(dns_server)) return result if ":" in inputed_dns: splited = inputed_dns.split(":") if len(splited) > 1 and splited[-1] and splited[-1].isdigit(): return ":".join(splited[:-1]), int(splited[-1]) if not splited[-1]: return ":".join(splited[:-1]), 53 return inputed_dns, 53 def __get_dns_servers_from(self, inputed_dns): # pragma: no cover """ Given a list or an input representing dns servers, we ensure that we have a list of a string which represent an IP. :param input: The inputed DNS server(s). :type input: str, list, tuple :return: The list of dns server to use. :rtype: list """ if isinstance(inputed_dns, (list, tuple)): result = [] for dns_server in inputed_dns: result.extend(self.__get_dns_servers_from(dns_server)) return result result = [] try: if PyFunceble.Check(inputed_dns).is_domain(): result.extend( [x.address for x in dns.resolver.Resolver().query(inputed_dns)] ) else: result.append(inputed_dns) except DNSException: result.extend(dns.resolver.get_default_resolver().nameservers) return result
[docs] def a_record(self, subject, tcp=None): """ Return the A record of the given subject (if found). :param str subject: The subject we are working with. :param bool tcp: Tell us to use TCP for query. :return: A list of A record(s). :rtype: list, None :raise ValueError: When a non string :code:`subject` is given. """ if not subject or not isinstance(subject, str): raise ValueError(f"<subject> must be of type {str} and not empty.") if tcp is None: tcp = self.tcp PyFunceble.LOGGER.debug(f"Query over TCP: {self.tcp}") try:"Getting A record of {repr(subject)}") # We get the A record of the given subject. result = [str(x) for x in self.resolver.query(subject, "A", tcp=tcp)]"Could get A record of {repr(subject)}: {result}") return result except DNSException: PyFunceble.LOGGER.error(f"Could not get A record of {repr(subject)}") return None
[docs] def aaaa_record(self, subject, tcp=None): """ Return the AAAA record of the given subject (if found). :param str subject: The subject we are working with. :param bool tcp: Tell us to use TCP for query. :return: A list of A record(s). :rtype: list, None :raise ValueError: When a non string :code:`subject` is given. """ if not subject or not isinstance(subject, str): raise ValueError(f"<subject> must be of type {str} and not empty.") if tcp is None: tcp = self.tcp PyFunceble.LOGGER.debug(f"Query over TCP: {self.tcp}") try:"Getting AAAA record of {repr(subject)}") # We get the A record of the given subject. result = [str(x) for x in self.resolver.query(subject, "AAAA", tcp=tcp)] f"Could get AAAA record of {repr(subject)}: {result}" ) return result except DNSException: PyFunceble.LOGGER.error(f"Could not get AAAA record of {repr(subject)}") return None
[docs] def cname_record(self, subject, tcp=None): """ Return the CNAME record of the given subject (if found). :param str subject: The subject we are working with. :param bool tcp: Tell us to use TCP for query. :return: A list of CNAME record(s). :rtype: list, None :raise ValueError: When a non string :code:`subject` is given. """ if not subject or not isinstance(subject, str): raise ValueError(f"<subject> must be of type {str} and not empty.") if tcp is None: tcp = self.tcp PyFunceble.LOGGER.debug(f"Query over TCP: {self.tcp}") try:"Getting CNAME record of {repr(subject)}") # We get the A record of the given subject. result = [str(x) for x in self.resolver.query(subject, "CNAME", tcp=tcp)] f"Could get CNAME record of {repr(subject)}: {result}" ) return result except DNSException: PyFunceble.LOGGER.error(f"Could not get CNAME record of {repr(subject)}") return None
[docs] def dname_record(self, subject, tcp=None): # pragma: no cover """ Return the DNAME record of the given subject (if found). :param str subject: The subject we are working with. :param bool tcp: Tell us to use TCP for query. :return: A list of DNAME record(s). :rtype: list, None :raise ValueError: When a non string :code:`subject` is given. """ if not subject or not isinstance(subject, str): raise ValueError(f"<subject> must be of type {str} and not empty.") if tcp is None: tcp = self.tcp PyFunceble.LOGGER.debug(f"Query over TCP: {self.tcp}") try:"Getting DNAME record of {repr(subject)}") # We get the A record of the given subject. result = [str(x) for x in self.resolver.query(subject, "DNAME", tcp=tcp)] f"Could get DNAME record of {repr(subject)}: {result}" ) return result except DNSException: PyFunceble.LOGGER.error(f"Could not get DNAME record of {repr(subject)}") return None
[docs] def mx_record(self, subject, tcp=None): """ Return the MX record of the given subject (if found). :param str subject: The subject we are working with. :return: A list of MX record(s). :rtype: list, None :raise ValueError: When a non string :code:`subject` is given. """ if not subject or not isinstance(subject, str): raise ValueError(f"<subject> must be of type {str} and not empty.") if tcp is None: tcp = self.tcp PyFunceble.LOGGER.debug(f"Query over TCP: {self.tcp}") try:"Getting MX record of {repr(subject)}") # We get the MX record of the given subject. result = [str(x) for x in self.resolver.query(subject, "MX", tcp=tcp)]"Could get MX record of {repr(subject)}: {result}") return result except DNSException: PyFunceble.LOGGER.error(f"Could not get MX record of {repr(subject)}") return None
[docs] def ns_record(self, subject, tcp=None): """ Return the NS record of the given subject (if found). :param str subject: The subject we are working with. :param bool tcp: Tell us to use TCP for query. :return: A list of NS record(s). :rtype: list, None :raise ValueError: When a non string :code:`subject` is given. """ if not subject or not isinstance(subject, str): raise ValueError(f"<subject> must be of type {str} and not empty.") if tcp is None: tcp = self.tcp PyFunceble.LOGGER.debug(f"Query over TCP: {self.tcp}") try:"Getting NS record of {repr(subject)}") # We get the NS record of the given subject. result = [str(x) for x in self.resolver.query(subject, "NS", tcp=tcp)]"Could get NS record of {repr(subject)}: {result}") return result except DNSException: PyFunceble.LOGGER.error(f"Could not get NS record of {repr(subject)}") return None
[docs] def txt_record(self, subject, tcp=None): """ Return the TXT record of the given subject (if found). :param str subject: The subject we are working with. :param bool tcp: Tell us to use TCP for query. :return: A list of TXT record(s). :rtype: list, None :raise ValueError: When a non string :code:`subject` is given. """ if not subject or not isinstance(subject, str): raise ValueError(f"<subject> must be of type {str} and not empty.") if tcp is None: tcp = self.tcp PyFunceble.LOGGER.debug(f"Query over TCP: {self.tcp}") try:"Getting TXT record of {repr(subject)}") # We get the TXT record of the given subject. result = [str(x) for x in self.resolver.query(subject, "TXT", tcp=tcp)]"Could get TXT record of {repr(subject)}: {result}") return result except DNSException: PyFunceble.LOGGER.error(f"Could not get TXT record of {repr(subject)}") return None
[docs] def ptr_record(self, subject, reverse_name=True, tcp=None): """ Return the PTR record of the given subject (if found). :param str subject: The subject we are working with. :param bool tcp: Tell us to use TCP for query. :return: A list of PTR record(s). :rtype: list, None :raise ValueError: When a non string :code:`subject` is given. """ if not subject or not isinstance(subject, str): raise ValueError(f"<subject> must be of type {str} and not empty.") if tcp is None: tcp = self.tcp PyFunceble.LOGGER.debug(f"Query over TCP: {self.tcp}") try:"Getting TXT record of {repr(subject)}") if reverse_name: # We get the reverse name we are going to request. to_request = dns.reversename.from_address(subject) else: to_request = subject # We get the PTR record of the currently read A record. result = [str(x) for x in self.resolver.query(to_request, "PTR", tcp=tcp)]"Could get PTR record of {repr(subject)}: {result}") return result except DNSException: PyFunceble.LOGGER.error(f"Could not get PTR record of {repr(subject)}") return None
[docs] @classmethod def get_addr_info(cls, subject): # pragma: no cover """ Get and return the information of the given subject (address). :param str subject: The subject we are working with. :return: A list of address. :rtype: list, None :raise ValueError: When a non string :code:`subject` is given. """ if not subject or not isinstance(subject, str): raise ValueError(f"<subject> must be of type {str} and not empty.") try: # pragma: no cover # We request the address information. req = getaddrinfo(subject, 80, proto=IPPROTO_TCP) # We format the addr infos. return [x[-1][0] for x in req] except (gaierror, OSError, herror, UnicodeError): pass return None
[docs] @classmethod def get_host_by_addr(cls, subject): # pragma: no cover """ Get and return the host of the given subject (address). :param str subject: The subject we are working with. :return: A dict in the following format or :code:`None`. :: { "hostname": "", "aliases": [], "ips": [] } :rtype: dict, None :raise ValueError: When a non string :code:`subject` is given. """ if not subject or not isinstance(subject, str): raise ValueError(f"<subject> must be of type {str} and not empty.") try: # We get the host by addr. req = gethostbyaddr(subject) # And we format the result. return {"hostname": req[0], "aliases": req[1], "ips": req[2]} except (gaierror, OSError, herror): pass return None
[docs] @classmethod def is_record_present_in_result(cls, to_check, result, allow_empty=False): """ Checks if the given record type is in the result. :param to_check: The record to check the presence. :type to_check: list, str, tuple :param dict result: The result to work with. :param bool allow_empty: Tell to if we allow and empty result as present. :rtype: bool """ if isinstance(to_check, (list, tuple)): return any( [ cls.is_record_present_in_result(x, result, allow_empty=allow_empty) for x in to_check ] ) if to_check in result: if not allow_empty and not result[to_check]: PyFunceble.LOGGER.debug( f"{to_check} record is not in result:\n{result}" ) return False PyFunceble.LOGGER.debug(f"{to_check} record is in result:\n{result}") return True PyFunceble.LOGGER.debug(f"{to_check} record is not in result:\n{result}") return False
def __request_complete_not_ip(self, subject, tcp=None): # pragma: no cover """ Requests and provides the complete DNS spectrum. :rtype: dict """ result = {} # We get the A record of the given subject. result["A"] = self.a_record(subject, tcp=tcp) # We get the AAAA record of the given subject. result["AAAA"] = self.aaaa_record(subject, tcp=tcp) # We get the CNAME record of the given subject. result["CNAME"] = self.cname_record(subject, tcp=tcp) # We get the DNAME record of the given subject. result["DNAME"] = self.dname_record(subject, tcp=tcp) # We get the MX record of the given subject. result["MX"] = self.mx_record(subject, tcp=tcp) # We get the TXT record of the given subject. result["TXT"] = self.txt_record(subject, tcp=tcp) if self.is_record_present_in_result("A", result): # We could get some A record(s). # We initiate the PTR. result["PTR"] = [] for a_result in result["A"]: # We loop through the list of A records. if "." not in a_result: # pragma: no cover # There is no "." in the currently # read A record. # We continue the loop. continue try: # We get the PTR record of the currently read A record. result["PTR"].extend(self.ptr_record(a_result, tcp=tcp)) except TypeError: # pragma: no cover pass if not all(result["PTR"]): # pragma: no cover # No PTR record was found. # We delete the PTR entry. del result["PTR"] return result def __request_not_ip(self, subject, complete=False, tcp=None): # pragma: no cover """ Handle the request for a subject which is not an IP. :rtype: dict :raise ValueError: When a non string :code:`subject` is given. """ if not subject or not isinstance(subject, str): raise ValueError(f"<subject> must be of type {str} and not empty.") if tcp is None: tcp = self.tcp PyFunceble.LOGGER.debug(f"{repr(subject)} is not IP. Requesting record.") result = {} # We get the NS record of the given subject. result["NS"] = self.ns_record(subject, tcp=tcp) if complete: # pragma: no cover result = PyFunceble.helpers.Merge( self.__request_complete_not_ip(subject, tcp=tcp) ).into(result) if not self.is_record_present_in_result(["CNAME", "NS"], result): # As sometime we may not have a NS but a CNAME, we handle that case. result["CNAME"] = self.cname_record(subject, tcp=tcp) if not self.is_record_present_in_result(["CNAME", "DNAME", "NS"], result): # Same but with DNAME. result["DNAME"] = self.dname_record(subject, tcp=tcp) if not self.is_record_present_in_result(["A", "CNAME", "DNAME", "NS"], result): # Same but with A. result["A"] = self.a_record(subject, tcp=tcp) if not self.is_record_present_in_result( ["A", "AAAA", "CNAME", "DNAME", "NS"], result ): # Same but with A. result["AAAA"] = self.aaaa_record(subject, tcp=tcp) # We get the list of index to delete. to_delete = [x for x in result if not result[x]] for index in to_delete: # We loop through the list of index to delete. # And we delete them. del result[index] if not result and complete: # pragma: no cover # We could not get DNS records about the given subject. # We get the addr infos. result["addr_info"] = self.get_addr_info(subject) if not result["addr_info"]: # The addr_info index is empty. # We delete it. del result["addr_info"] elif result: result["nameservers"] = self.resolver.nameservers PyFunceble.LOGGER.debug( f"Records for {repr(subject)} (with {self.resolver.nameservers}):\n{result}" ) return result def __request_ip(self, subject, tcp=None): # pragma: no cover """ Handle the request for a subject which is an IP. :param str subject: The subject we are working with. :param bool tcp: Tell us to use TCP for query. :rtype: dict :raise ValueError: When a non string :code:`subject` is given. """ if not subject or not isinstance(subject, str): raise ValueError(f"<subject> must be of type {str} and not empty.") if tcp is None: tcp = self.tcp"{repr(subject)} is an IP. Requesting PTR record.") result = {} # We get the PTR record of the given subject. result["PTR"] = self.ptr_record(subject, tcp=tcp) if "PTR" not in result or not result["PTR"]: del result["PTR"] PyFunceble.LOGGER.error(f"PTR record for {repr(subject)} not found.") elif result["PTR"]: result["nameservers"] = self.resolver.nameservers if not result: # We could not get DNS records about the given subject."Getting hosts by addr for {repr(subject)}") result = self.get_host_by_addr(subject) PyFunceble.LOGGER.debug(f"Request result: \n{result}") return result
[docs] def request(self, subject, complete=False, tcp=None): # pragma: no cover """ Perform a DNS lookup/requests. :param str subject: The subject we are working with. :param bool complete: Tell us to return as many result as possible. :param bool tcp: Tell us to use TCP for query. :return: A dict with following index if the given subject is not registered into the given DNS server. (More likely local subjects). :: { "hostname": "", "aliases": [], "ips": [] } A dict with following index for everything else (and if found). :: { "nameservers: [], "A": [], "AAAA": [], "CNAME": [], "DNAME": [], "MX": [], "NS": [], "TXT": [], "PTR": [] } :rtype: dict :raise ValueError: When a non string :code:`subject` is given. """ if not subject or not isinstance(subject, str): raise ValueError(f"<subject> must be of type {str} and not empty.") if tcp is None: tcp = self.tcp result = {} if not PyFunceble.Check(subject).is_ip(): # We are looking for something which is not an IPv4 or IPv6. temp_result = self.__request_not_ip(subject, complete=complete, tcp=tcp) if isinstance(temp_result, dict): result.update(temp_result) else: # We are working with something which is an IPv4 or IPv6. temp_result = self.__request_ip(subject, tcp=tcp) if isinstance(temp_result, dict): result.update(temp_result) return result