Source code for PyFunceble.dns_lookup

# pylint:disable=line-too-long
"""
The tool to check the availability or syntax of domains, IPv4 or URL.

::


    ██████╗ ██╗   ██╗███████╗██╗   ██╗███╗   ██╗ ██████╗███████╗██████╗ ██╗     ███████╗
    ██╔══██╗╚██╗ ██╔╝██╔════╝██║   ██║████╗  ██║██╔════╝██╔════╝██╔══██╗██║     ██╔════╝
    ██████╔╝ ╚████╔╝ █████╗  ██║   ██║██╔██╗ ██║██║     █████╗  ██████╔╝██║     █████╗
    ██╔═══╝   ╚██╔╝  ██╔══╝  ██║   ██║██║╚██╗██║██║     ██╔══╝  ██╔══██╗██║     ██╔══╝
    ██║        ██║   ██║     ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
    ╚═╝        ╚═╝   ╚═╝      ╚═════╝ ╚═╝  ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝

This submodule will provide the DNS lookup interface.

Author:
    Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom

Special thanks:
    https://pyfunceble.github.io/special-thanks.html

Contributors:
    https://pyfunceble.github.io/contributors.html

Project link:
    https://github.com/funilrys/PyFunceble

Project documentation:
    https://pyfunceble.readthedocs.io/en/master/

Project homepage:
    https://pyfunceble.github.io/

License:
::


    MIT License

    Copyright (c) 2017, 2018, 2019 Nissar Chababy

    Permission is hereby granted, free of charge, to any person obtaining a copy
    of this software and associated documentation files (the "Software"), to deal
    in the Software without restriction, including without limitation the rights
    to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    copies of the Software, and to permit persons to whom the Software is
    furnished to do so, subject to the following conditions:

    The above copyright notice and this permission notice shall be included in all
    copies or substantial portions of the Software.

    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
    SOFTWARE.
"""
# pylint: enable=line-too-long

from socket import IPPROTO_TCP, gaierror, getaddrinfo, gethostbyaddr, herror

import dns.resolver
import dns.reversename
from dns.exception import DNSException

from PyFunceble.check import Check


[docs]class DNSLookup: # pylint: disable=too-few-public-methods """ DNS lookup interface. :param str subject: The subject we are working with. :param dns_server: The DNS server we are working with. :type dns_server: list|tuple|str """ def __init__(self, subject, dns_server=None, complete=False): if subject: if isinstance(subject, str): self.subject = subject else: raise ValueError("{0} expected".format(type(subject))) if dns_server: # A dns server is given. # We initiate the default resolver. dns.resolver.default_resolver = dns.resolver.Resolver(configure=False) if isinstance(dns_server, (list, tuple)): # We got a list of dns server. # We parse them. dns.resolver.default_resolver.nameservers = dns_server else: # We got a dns server. # We parse it. dns.resolver.default_resolver.nameservers = [dns_server] else: # A dns server is not given. # We configure everything with what the OS gives us. dns.resolver.default_resolver = dns.resolver.Resolver() self.dns_resolver = dns.resolver self.complete = complete
[docs] def a_record(self, subject=None, lifetime=3.0): # pragma: no cover """ Return the A record of the given subject (if found). :param str subject: The subject we are working with. :param float lifetime: The number of second before timeout. :return: A list of A record(s). :rtype: list """ if not subject: subject = self.subject try: # We get the A record of the given subject. return [ str(x) for x in self.dns_resolver.query(subject, "A", lifetime=lifetime) ] except DNSException: pass return None
[docs] def aaaa_record(self, subject=None, lifetime=3.0): # pragma: no cover """ Return the AAAA record of the given subject (if found). :param str subject: The subject we are working with. :param float lifetime: The number of second before timeout. :return: A list of A record(s). :rtype: list """ if not subject: subject = self.subject try: # We get the A record of the given subject. return [ str(x) for x in self.dns_resolver.query(subject, "AAAA", lifetime=lifetime) ] except DNSException: pass return None
[docs] def cname_record(self, subject=None, lifetime=3.0): # pragma: no cover """ Return the CNAME record of the given subject (if found). :param str subject: The subject we are working with. :param float lifetime: The number of second before timeout. :return: A list of CNAME record(s). :rtype: list """ if not subject: subject = self.subject try: # We get the A record of the given subject. return [ str(x) for x in self.dns_resolver.query(subject, "CNAME", lifetime=lifetime) ] except DNSException: pass return None
[docs] def mx_record(self, subject=None, lifetime=3.0): # pragma: no cover """ Return the MX record of the given subject (if found). :param str subject: The subject we are working with. :param float lifetime: The number of second before timeout. :return: A list of MX record(s). :rtype: list """ if not subject: subject = self.subject try: # We get the MX record of the given subject. return [ str(x) for x in self.dns_resolver.query(subject, "MX", lifetime=lifetime) ] except DNSException: pass return None
[docs] def ns_record(self, subject=None, lifetime=3.0): """ Return the NS record of the given subject (if found). :param str subject: The subject we are working with. :param float lifetime: The number of second before timeout. :return: A list of NS record(s). :rtype: list """ if not subject: subject = self.subject try: # We get the NS record of the given subject. return [ str(x) for x in self.dns_resolver.query(subject, "NS", lifetime=lifetime) ] except DNSException: pass return None
[docs] def txt_record(self, subject=None, lifetime=3.0): # pragma: no cover """ Return the TXT record of the given subject (if found). :param str subject: The subject we are working with. :param float lifetime: The number of second before timeout. :return: A list of TXT record(s). :rtype: list """ if not subject: subject = self.subject try: # We get the TXT record of the given subject. return [ str(x) for x in self.dns_resolver.query(subject, "TXT", lifetime=lifetime) ] except DNSException: pass return None
[docs] def ptr_record(self, subject=None, reverse_name=True, lifetime=3.0): """ Return the PTR record of the given subject (if found). :param str subject: The subject we are working with. :param float lifetime: The number of second before timeout. :return: A list of PTR record(s). :rtype: list """ if not subject: subject = self.subject try: if reverse_name: # We get the reverse name we are going to request. to_request = dns.reversename.from_address(subject) else: # pragma: no cover to_request = subject # We get the PTR record of the currently read A record. return [ str(x) for x in dns.resolver.query(to_request, "PTR", lifetime=lifetime) ] except DNSException: # pragma: no cover pass return None # pragma: no cover
[docs] def get_addr_info(self, subject=None): """ Get and return the information of the given subject (address). :param str subject: The subject we are working with. :return: A list of address. :rtype: list """ if not subject: subject = self.subject try: # pragma: no cover # We request the address information. req = getaddrinfo(subject, 80, proto=IPPROTO_TCP) # We format the addr infos. return [x[-1][0] for x in req] except (gaierror, OSError, herror, UnicodeError): pass return None
[docs] def get_host_by_addr(self, subject=None): # pragma: no cover """ Get and return the host of the given subject (address). :param str subject: The subject we are working with. :return: A dict in the following format or :code:`None`. :: { "hostname": "", "aliases": [], "ips": [] } :rtype: list """ if not subject: subject = self.subject try: # We get the host by addr. req = gethostbyaddr(subject) # And we format the result. return {"hostname": req[0], "aliases": req[1], "ips": req[2]} except (gaierror, OSError, herror): pass return None
def __request_not_ipv4(self): """ Handle the request for a subject which is not an IPv4. """ result = {} # We get the NS record of the given subject. result["NS"] = self.ns_record() if self.complete: # pragma: no cover # We get the A record of the given subject. result["A"] = self.a_record() # We get the AAAA record of the given subject. result["AAAA"] = self.aaaa_record() # We get the CNAME record of the given subject. result["CNAME"] = self.cname_record() # We get the MX record of the given subject. result["MX"] = self.mx_record() # We get the TXT record of the given subject. result["TXT"] = self.txt_record() if "A" in result and result["A"]: # We could get some A record(s). # We initiate the PTR. result["PTR"] = [] for a_result in result["A"]: # We loop through the list of A records. if "." not in a_result: # pragma: no cover # There is no "." in the currently # read A record. # We continue the loop. continue try: # We get the PTR record of the currently read A record. result["PTR"].extend(self.ptr_record(a_result)) except TypeError: # pragma: no cover pass if not all(result["PTR"]): # pragma: no cover # No PTR record was found. # We delete the PTR entry. del result["PTR"] # We get the list of index to delete. to_delete = [x for x in result if not result[x]] for index in to_delete: # We loop through the list of index to delete. # And we delete them. del result[index] if not result: # We could not get DNS records about the given subject. # We get the addr infos. result["addr_info"] = self.get_addr_info() if not result["addr_info"]: # The addr_info index is empty. # We delete it. del result["addr_info"] return result def __request_ipv4(self): """ Handle the request for a subject which is IPv4. """ result = {} # We get the PTR record of the given subject. result["PTR"] = self.ptr_record() if "PTR" not in result or not result["PTR"]: # pragma: no cover del result["PTR"] if not result: # pragma: no cover # We could not get DNS records about the given subject. result = self.get_host_by_addr() return result
[docs] def request(self): """ Perform the NS request. :return: A dict with following index if the given subject is not registered into the given DNS server. (More likely local subjects). :: { "hostname": "", "aliases": [], "ips": [] } A dict with following index for everything else (and if found). :: { "A": [], "AAAA": [], "CNAME": [], "MX": [], "NS": [], "TXT": [], "PTR": [] } :rtype: dict """ result = {} if not Check(self.subject).is_ipv4(): # We are looking for something which is not an IPv4. temp_result = self.__request_not_ipv4() if isinstance(temp_result, dict): result.update(temp_result) else: # We are working with something which is an IPv4. temp_result = self.__request_ipv4() if isinstance(temp_result, dict): result.update(temp_result) return result