Source code for smoothcrawler_cluster.model

"""*Model: data classes or enum objects for package*

In a crawler cluster/decentralized system or distributed system, it must have a lot of meta-data objects to transfer to
each other instances to get information about state in the entire system to know what happen and what things they should
do to processing that. Here are the data classes and enum objects for recording, serialize, deserialize, etc, these
information.
"""

from abc import ABCMeta, abstractmethod
from datetime import datetime
from typing import List, Union

from .metadata import (
    GroupState,
    Heartbeat,
    NodeState,
    ResultDetail,
    RunningContent,
    RunningResult,
    Task,
)
from .metadata_enum import CrawlerRole, HeartState, TaskState


class _BaseDataObjectUtils(metaclass=ABCMeta):
    """*The base class of different util objects*

    There are some different types of operating meta-data objects: *empty*, *initial* and *update*. So it's naming the
    operating as class name and rules every operating object should have functions for every different meta-data objects
    *GroupState*, *NodeState*, *Task* and *Heartbeat*.
    """

    @staticmethod
    @abstractmethod
    def group_state(*args, **kwargs) -> GroupState:
        """Operating for meta-data object **GroupState**.

        Args:
            *args (tuple): Pass parameters though tuple type value.
            **kwargs (dict): Pass parameters though dict type value.

        Returns:
            GroupState: The **GroupState** meta-data object.

        """
        pass

    @staticmethod
    @abstractmethod
    def node_state(*args, **kwargs) -> NodeState:
        """Operating for meta-data object **NodeState**.

        Args:
            *args (tuple): Pass parameters though tuple type value.
            **kwargs (dict): Pass parameters though dict type value.

        Returns:
            NodeState: The **NodeState** meta-data object.

        """
        pass

    @staticmethod
    @abstractmethod
    def task(*args, **kwargs) -> Task:
        """Operating for meta-data object **Task**.

        Args:
            *args (tuple): Pass parameters though tuple type value.
            **kwargs (dict): Pass parameters though dict type value.

        Returns:
            Task: The **Task** meta-data object.

        """
        pass

    @staticmethod
    @abstractmethod
    def heartbeat(*args, **kwargs) -> Heartbeat:
        """Operating for meta-data object **Heartbeat**.

        Args:
            *args (tuple): Pass parameters though tuple type value.
            **kwargs (dict): Pass parameters though dict type value.

        Returns:
            Heartbeat: The **Heartbeat** meta-data object.

        """
        pass


[docs]class Empty(_BaseDataObjectUtils): """*Empty meta-data objects* Generate an empty meta-data objects without any values. """
[docs] @staticmethod def group_state() -> GroupState: """Generate an empty meta-data object **GroupState**. Returns: GroupState: An empty **GroupState** meta-data object. """ group_state = GroupState() group_state.total_crawler = 0 group_state.total_runner = 0 group_state.total_backup = 0 group_state.current_crawler = [] group_state.current_runner = [] group_state.current_backup = [] group_state.standby_id = "0" group_state.fail_crawler = [] group_state.fail_runner = [] group_state.fail_backup = [] return group_state
[docs] @staticmethod def node_state() -> NodeState: """Generate an empty meta-data object **NodeState**. Returns: NodeState: An empty **NodeState** meta-data object. """ node_state = NodeState() node_state.group = "" node_state.role = CrawlerRole.INITIAL return node_state
[docs] @staticmethod def task() -> Task: """Generate an empty meta-data object **Task**. Returns: Task: An empty **Task** meta-data object. """ task = Task() task.running_content = [] task.cookie = {} task.authorization = {} task.in_progressing_id = "-1" task.running_result = RunningResult(success_count=0, fail_count=0) task.running_status = TaskState.NOTHING task.result_detail = [] return task
[docs] @staticmethod def heartbeat() -> Heartbeat: """Generate an empty meta-data object **Heartbeat**. Returns: Heartbeat: An empty **Heartbeat** meta-data object. """ heartbeat = Heartbeat() heartbeat.time_format = "%Y-%m-%d %H:%M:%S" heartbeat.heart_rhythm_time = datetime.now().strftime(heartbeat.time_format) heartbeat.update_time = "2s" heartbeat.update_timeout = "4s" heartbeat.heart_rhythm_timeout = "3" heartbeat.healthy_state = HeartState.NEWBORN heartbeat.task_state = TaskState.NOTHING return heartbeat
[docs]class Initial(_BaseDataObjectUtils): """*Initialize a meta-data object with values* Initial meta-data object with values at one or more multiple specific different options. """
[docs] @staticmethod def group_state( crawler_name: str, total_crawler: int, total_runner: int, total_backup: int, standby_id: str = "0", current_crawler: List[str] = [], current_runner: List[str] = [], current_backup: List[str] = [], fail_crawler: List[str] = [], fail_runner: List[str] = [], fail_backup: List[str] = [], ) -> GroupState: """Initialize a meta-data object **GroupState** with values. Args: crawler_name (str): Crawler instance's name. total_crawler (int): Total amount of crawler includes every role. total_runner (int): Total amount of crawler which is *Runner*. total_backup (int): Total amount of crawler which is *Backup_Runner*. standby_id (str): The standby ID. It should be the index of crawler name. current_crawler (list of str): A list of total crawler instance's name includes every role. current_runner (list of str): A list of total crawler instance's name which is *Runner*. current_backup (list of str): A list of total crawler instance's name which is *Backup_Runner*. fail_crawler (list of str): A list of total crawler instance's name which is dead state. fail_runner (list of str): A list of total crawler instance's name which is *Dead_Runner*. fail_backup (list of str): A list of total crawler instance's name which is *Dead_Backup_Runner*. Returns: GroupState: An **GroupState** meta-data object with value(s). """ group_state = GroupState() group_state.total_crawler = total_crawler group_state.total_runner = total_runner group_state.total_backup = total_backup current_crawler = list(set(current_crawler)) if not current_crawler or crawler_name not in set(current_crawler): current_crawler.append(crawler_name) group_state.current_crawler = current_crawler group_state.current_runner = current_runner group_state.current_backup = current_backup group_state.standby_id = standby_id group_state.fail_crawler = fail_crawler group_state.fail_runner = fail_runner group_state.fail_backup = fail_backup return group_state
[docs] @staticmethod def node_state(group: str = None, role: CrawlerRole = None) -> NodeState: """Initialize a meta-data object **NodeState** with values. Args: group (str): The name of group which the current crawler instance belong to. role (CrawlerRole): The role of current crawler instance. Returns: NodeState: An **NodeState** meta-data object with value(s). """ node_state = NodeState() if group: node_state.group = group if not role: role = CrawlerRole.INITIAL node_state.role = role return node_state
[docs] @staticmethod def task( running_content: List[Union[dict, RunningContent]] = [], cookie: dict = {}, authorization: dict = {}, in_progressing_id: str = "-1", running_result: Union[dict, RunningResult] = None, running_state: TaskState = None, result_detail: List[Union[dict, ResultDetail]] = [], ) -> Task: """Initialize a meta-data object **Task** with values. Args: running_content (List[Union[dict, RunningContent]]): The details of task content. cookie (dict): Cookie. authorization (dict): Authorization settings of HTTP request. in_progressing_id (str): The task ID which is in processing state. running_result (Union[dict, RunningResult]): The running result statistics about amount of successful and fail done tasks. running_state (TaskState): The status of task running. result_detail (List[Union[dict, ResultDetail]]): The details of running result. Returns: Task: An **Task** meta-data object with value(s). """ task = Task() task.running_content = running_content task.cookie = cookie task.authorization = authorization task.in_progressing_id = in_progressing_id if not running_result: running_result = RunningResult(success_count=0, fail_count=0) task.running_result = running_result if not running_state: running_state = TaskState.NOTHING task.running_status = running_state task.result_detail = result_detail return task
[docs] @staticmethod def heartbeat( time_format: str = None, update_time: str = None, update_timeout: str = None, heart_rhythm_timeout: str = None, healthy_state: HeartState = None, task_state: TaskState = None, ) -> Heartbeat: """Initialize a meta-data object **Heartbeat** with values. Args: time_format (str): The format of datetime value. update_time (str): The timer for updating heartbeat. update_timeout (str): The timeout threshold of updating. heart_rhythm_timeout (str): The timeout threshold of entire updating process. healthy_state (HeartState): Heartbeat status. task_state (TaskState): Task running status. Returns: Heartbeat: An **Heartbeat** meta-data object with value(s). """ heartbeat = Heartbeat() if not time_format: time_format = "%Y-%m-%d %H:%M:%S" heartbeat.time_format = time_format heartbeat.heart_rhythm_time = datetime.now().strftime(time_format) if not update_time: update_time = "1s" heartbeat.update_time = update_time if not update_timeout: update_timeout = "2s" heartbeat.update_timeout = update_timeout if not heart_rhythm_timeout: heart_rhythm_timeout = "3" heartbeat.heart_rhythm_timeout = heart_rhythm_timeout if not healthy_state: healthy_state = HeartState.NEWBORN heartbeat.healthy_state = healthy_state if not task_state: task_state = TaskState.NOTHING heartbeat.task_state = task_state return heartbeat
[docs]class Update(_BaseDataObjectUtils): """*Updating a meta-data object with values* Update the meta-data object with one or more multiple options. """
[docs] @staticmethod def group_state( state: GroupState, total_crawler: int = None, total_runner: int = None, total_backup: int = None, standby_id: str = None, append_current_crawler: List[str] = [], append_current_runner: List[str] = [], append_current_backup: List[str] = [], append_fail_crawler: List[str] = [], append_fail_runner: List[str] = [], append_fail_backup: List[str] = [], ) -> GroupState: """Updating a meta-data object **GroupState** with values. .. note:: The updating of some options which is list type would update value though appending element(s) at the current list value in Zookeeper and assigning it at the target option. Args: state (GroupState): Current *GroupState* meta-data object. total_crawler (int): Total amount of crawler includes every role. total_runner (int): Total amount of crawler which is *Runner*. total_backup (int): Total amount of crawler which is *Backup_Runner*. standby_id (str): The standby ID. It should be the index of crawler name. append_current_crawler (list of str): A list of total crawler instance's name includes every role. append_current_runner (list of str): A list of total crawler instance's name which is *Runner*. append_current_backup (list of str): A list of total crawler instance's name which is *Backup_Runner*. append_fail_crawler (list of str): A list of total crawler instance's name which is dead state. append_fail_runner (list of str): A list of total crawler instance's name which is *Dead_Runner*. append_fail_backup (list of str): A list of total crawler instance's name which is *Dead_Backup_Runner*. Returns: GroupState: An **GroupState** meta-data object with value(s). """ Update._update_ele_if_not_none(data_obj=state, prop="total_crawler", new_val=total_crawler) Update._update_ele_if_not_none(data_obj=state, prop="total_runner", new_val=total_runner) Update._update_ele_if_not_none(data_obj=state, prop="total_backup", new_val=total_backup) Update._append_ele_if_not_none(data_obj=state, prop="current_crawler", new_val=append_current_crawler) Update._append_ele_if_not_none(data_obj=state, prop="current_runner", new_val=append_current_runner) Update._append_ele_if_not_none(data_obj=state, prop="current_backup", new_val=append_current_backup) Update._update_ele_if_not_none(data_obj=state, prop="standby_id", new_val=standby_id) Update._append_ele_if_not_none(data_obj=state, prop="fail_crawler", new_val=append_fail_crawler) Update._append_ele_if_not_none(data_obj=state, prop="fail_runner", new_val=append_fail_runner) Update._append_ele_if_not_none(data_obj=state, prop="fail_backup", new_val=append_fail_backup) return state
[docs] @staticmethod def node_state(node_state: NodeState, group: str = None, role: CrawlerRole = None) -> NodeState: """Updating a meta-data object **NodeState** with values. Args: node_state (NodeState): Current *NodeState* meta-data object. group (str): The name of group which the current crawler instance belong to. role (CrawlerRole): The role of current crawler instance. Returns: NodeState: An **NodeState** meta-data object with value(s). """ Update._update_ele_if_not_none(data_obj=node_state, prop="group", new_val=group) Update._update_ele_if_not_none(data_obj=node_state, prop="role", new_val=role) return node_state
[docs] @staticmethod def task( task: Task, running_content: List[Union[dict, RunningContent]] = None, cookie: dict = None, authorization: dict = None, in_progressing_id: str = None, running_result: Union[dict, RunningResult] = None, running_status: TaskState = None, result_detail: List[Union[dict, ResultDetail]] = None, ) -> Task: """Updating a meta-data object **Task** with values. Args: task (Task): Current *Task* meta-data object. running_content (List[Union[dict, RunningContent]]): The details of task content. cookie (dict): Cookie. authorization (dict): Authorization settings of HTTP request. in_progressing_id (str): The task ID which is in processing state. running_result (Union[dict, RunningResult]): The running result statistics about amount of successful and fail done tasks. running_status (TaskState): The status of task running. result_detail (List[Union[dict, ResultDetail]]): The details of running result. Returns: Task: An **Task** meta-data object with value(s). """ Update._update_ele_if_not_none(data_obj=task, prop="running_content", new_val=running_content) Update._update_ele_if_not_none(data_obj=task, prop="cookie", new_val=cookie) Update._update_ele_if_not_none(data_obj=task, prop="authorization", new_val=authorization) Update._update_ele_if_not_none(data_obj=task, prop="in_progressing_id", new_val=in_progressing_id) Update._update_ele_if_not_none(data_obj=task, prop="running_result", new_val=running_result) Update._update_ele_if_not_none(data_obj=task, prop="running_status", new_val=running_status) Update._update_ele_if_not_none(data_obj=task, prop="result_detail", new_val=result_detail) return task
[docs] @staticmethod def heartbeat( heartbeat: Heartbeat, heart_rhythm_time: datetime = None, time_format: str = None, update_time: str = None, update_timeout: str = None, heart_rhythm_timeout: str = None, healthy_state: HeartState = None, task_state: Union[str, TaskState] = None, ) -> Heartbeat: """Updating a meta-data object **Heartbeat** with values. Args: heartbeat (Heartbeat): Current *Heartbeat* meta-data object. heart_rhythm_time (datetime): It should be a *datetime.datetime* type object. time_format (str): The format of datetime value. update_time (str): The timer for updating heartbeat. update_timeout (str): The timeout threshold of updating. heart_rhythm_timeout (str): The timeout threshold of entire updating process. healthy_state (HeartState): Heartbeat status. task_state (TaskState): Task running status. Returns: Heartbeat: An **Heartbeat** meta-data object with value(s). """ Update._update_ele_if_not_none(data_obj=heartbeat, prop="heart_rhythm_time", new_val=heart_rhythm_time) Update._update_ele_if_not_none(data_obj=heartbeat, prop="time_format", new_val=time_format) Update._update_ele_if_not_none(data_obj=heartbeat, prop="update_time", new_val=update_time) Update._update_ele_if_not_none(data_obj=heartbeat, prop="update_timeout", new_val=update_timeout) Update._update_ele_if_not_none(data_obj=heartbeat, prop="heart_rhythm_timeout", new_val=heart_rhythm_timeout) Update._update_ele_if_not_none(data_obj=heartbeat, prop="healthy_state", new_val=healthy_state) Update._update_ele_if_not_none(data_obj=heartbeat, prop="task_state", new_val=task_state) return heartbeat
@staticmethod def _update_ele_if_not_none( data_obj, prop: str, new_val: Union[int, str, list, dict, datetime, CrawlerRole, TaskState, HeartState], ) -> None: if new_val is not None: setattr(data_obj, prop, new_val) @staticmethod def _append_ele_if_not_none(data_obj, prop: str, new_val: List[str]): if new_val is not None: prop_value = getattr(data_obj, prop) if prop_value is None: prop_value = [] prop_value += list(set(new_val)) setattr(data_obj, prop, prop_value)