Source code for PyFunceble.cli.processes.workers.file_sorter_base

"""
The tool to check the availability or syntax of domain, IP or URL.

::


    ██████╗ ██╗   ██╗███████╗██╗   ██╗███╗   ██╗ ██████╗███████╗██████╗ ██╗     ███████╗
    ██╔══██╗╚██╗ ██╔╝██╔════╝██║   ██║████╗  ██║██╔════╝██╔════╝██╔══██╗██║     ██╔════╝
    ██████╔╝ ╚████╔╝ █████╗  ██║   ██║██╔██╗ ██║██║     █████╗  ██████╔╝██║     █████╗
    ██╔═══╝   ╚██╔╝  ██╔══╝  ██║   ██║██║╚██╗██║██║     ██╔══╝  ██╔══██╗██║     ██╔══╝
    ██║        ██║   ██║     ╚██████╔╝██║ ╚████║╚██████╗███████╗██████╔╝███████╗███████╗
    ╚═╝        ╚═╝   ╚═╝      ╚═════╝ ╚═╝  ╚═══╝ ╚═════╝╚══════╝╚═════╝ ╚══════╝╚══════╝

Provides our file sorter worker base. This is the base of all our file sorter.

Author:
    Nissar Chababy, @funilrys, contactTATAfunilrysTODTODcom

Special thanks:
    https://pyfunceble.github.io/#/special-thanks

Contributors:
    https://pyfunceble.github.io/#/contributors

Project link:
    https://github.com/funilrys/PyFunceble

Project documentation:
    https://pyfunceble.readthedocs.io/en/latest/

Project homepage:
    https://pyfunceble.github.io/

License:
::


    Copyright 2017, 2018, 2019, 2020, 2021 Nissar Chababy

    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

        http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
"""

import heapq
import os
import secrets
import tempfile
from io import TextIOWrapper
from itertools import islice
from typing import Any, Generator, List, Tuple

import PyFunceble.cli.storage
import PyFunceble.facility
import PyFunceble.factory
import PyFunceble.storage
from PyFunceble.cli.filesystem.printer.file import FilePrinter
from PyFunceble.cli.processes.workers.base import WorkerBase
from PyFunceble.cli.utils.sort import get_best_sorting_key
from PyFunceble.helpers.file import FileHelper
from PyFunceble.helpers.list import ListHelper


[docs]class FileSorterWorkerBase(WorkerBase): """ Provides our the base of all our file sorters. """ MAX_LINES: int = 32_000 FILE_BUFFER_SIZE: int = 64 * 1024 def __post_init__(self) -> None: # We don't need to wait for anything here :-) self.accept_waiting_delay = False return super().__post_init__()
[docs] @classmethod def process_file_sorting( cls, file: str, remove_duplicates: bool = True, write_header: bool = True, sorting_key: Any = None, ) -> None: """ Process the sorting of the given file. The idea is to split the file piece by piece and at the end join all sorted files. For that job, we create a temporary directory which will store the temporary files. :param file: The file to sort. :param remove_duplicates: Activates the deletion of duplicates. :param write_header: Activates the writing of the PyFunceble related header. .. warning:: When this is set to :py:class:`True`, we assume that the header itself was already given. Meaning that the first 2 commented lines will be excluded from the sorting and regenerated. :param sorting_key: The sorting key to apply while sorting. This is the lambda/function that goes into the :code:`key` argument of the :py:class:`sorted` function. """ # pylint: disable=too-many-locals,too-many-statements def merge_files( files: List[TextIOWrapper], ) -> Generator[Tuple[List[TextIOWrapper]], str, None]: """ Merges the given files and yield each "lines" of the merged file. :param files: The files to merge. """ result = [] for index, file in enumerate(files): try: iterator = iter(file) value = next(iterator) heapq.heappush( result, ((sorting_key(value), index, value, iterator, file)) ) except StopIteration: file.close() previous = None comment_count = 0 max_comment_count = 2 while result: ignore = False _, index, value, iterator, file = heapq.heappop(result) if remove_duplicates and value == previous: ignore = True if ( write_header and comment_count < max_comment_count and value[0] == "#" ): ignore = True max_comment_count += 1 if not ignore: yield value previous = value try: value = next(iterator) heapq.heappush( result, ((sorting_key(value), index, value, iterator, file)) ) except StopIteration: file.close() temp_directory = tempfile.TemporaryDirectory() temporary_output_file = os.path.join(temp_directory.name, secrets.token_hex(6)) if not sorting_key: sorting_key = get_best_sorting_key() file_helper = FileHelper(file) sorted_files = [] PyFunceble.facility.Logger.info("Started sort of %r.", file) with file_helper.open( "r", encoding="utf-8", buffering=cls.FILE_BUFFER_SIZE ) as file_stream: while True: to_sort = list(islice(file_stream, cls.MAX_LINES)) if not to_sort: break new_file = open( os.path.join(temp_directory.name, secrets.token_hex(6)), "w+", encoding="utf-8", buffering=cls.FILE_BUFFER_SIZE, ) new_file.writelines( ListHelper(to_sort) .remove_duplicates() .custom_sort(key_method=sorting_key) .subject ) new_file.flush() new_file.seek(0) sorted_files.append(new_file) with open( temporary_output_file, "w", cls.FILE_BUFFER_SIZE, encoding="utf-8" ) as file_stream: if write_header: file_stream.write(FilePrinter.STD_FILE_GENERATION) file_stream.write(FilePrinter.get_generation_date_line()) file_stream.write("\n\n") file_stream.writelines(merge_files(sorted_files)) FileHelper(temporary_output_file).move(file) PyFunceble.facility.Logger.info("Finished sort of %r.", file) temp_directory.cleanup()