"""*The different independent processing for different role as the role's own workflow*
*SmoothCrawler-Cluster* let the implementation details of each role's processing as each role's individual workflow. And
the workflow would be dispatched to do the process which it should work at that time in crawler by the dispatcher in
*dispatcher* module. So it won't take care who use them, it only considers that what things it should do in the cluster
by the role.
*New in version 0.2.0.*
"""
import re
import time
from abc import ABCMeta, abstractmethod
from datetime import datetime
from typing import Any, Callable, Dict, Optional, Type
from .._utils import parse_timer
from .._utils.converter import TaskContentDataUtils
from ..exceptions import StopUpdateHeartbeat
from ..model import (
CrawlerRole,
GroupState,
Heartbeat,
HeartState,
NodeState,
ResultDetail,
RunningResult,
Task,
TaskState,
Update,
)
from ..model._data import CrawlerName, CrawlerTimer, MetaDataOpt, MetaDataPath
from .adapter import DistributedLock
[docs]class BaseWorkflow(metaclass=ABCMeta):
"""*The base running workflow*
It rules 2 things: workflow object initial arguments and major function *run*.
* Initial arguments
Workflow is responsible for each role's processing in cluster. So it must need the basic crawler info: *name*
and *index separation*. And it also needs to communicate with each other to know the cluster brief state, so it
also needs *path of meta-data* and *callback function for getting and setting meta-data*.
* Major function *run*
The main body of what details the workflow would do. So no matter that workflows be called or dispatched, caller
could only use `<BaseWorkflow object>.run(*args, **kwargs)` to run it.
"""
def __init__(self, name: CrawlerName, path: Type[MetaDataPath], metadata_opts_callback: MetaDataOpt):
"""
Args:
name (CrawlerName): The data object **CrawlerName** which provides some attribute like crawler instance's
name or ID, etc.
path (Type[MetaDataPath]): The objects which has all meta-data object's path property.
metadata_opts_callback (MetaDataOpt): The data object *MetaDataOpt* which provides multiple callback
functions about getting and setting meta-data.
"""
self._crawler_name_data = name
self._path = path
# get_metadata (Callable): The callback function about getting meta-data as object.
self._get_metadata = metadata_opts_callback.get_callback
# set_metadata (Callable): The callback function about setting meta-data from object.
self._set_metadata = metadata_opts_callback.set_callback
[docs] @abstractmethod
def run(self, *args, **kwargs) -> Any:
"""
The major function for running this workflow. This abstract function's argument use ``*args`` and ``**kwargs``
because it let all workflow could have their own needed customized arguments, absolutely it also could have
nothing arguments if it needs.
Args:
*args (tuple): Function arguments which would be used as **args*.
**kwargs (dict): Function arguments which would be used as ***kwargs*.
Returns:
"""
pass
[docs]class BaseRoleWorkflow(BaseWorkflow):
"""*Base of each role's workflow*
The base class of each role's workflow which should have property of what *role* it is for. And the major function
*run* must needs to pass argument *timer*.
"""
def __init__(
self,
name: CrawlerName,
path: Type[MetaDataPath],
metadata_opts_callback: MetaDataOpt,
lock: DistributedLock,
crawler_process_callback: Callable,
):
"""
Args:
name (CrawlerName): The data object **CrawlerName** which provides some attribute like crawler instance's
name or ID, etc.
path (Type[MetaDataPath]): The objects which has all meta-data object's path property.
metadata_opts_callback (MetaDataOpt): The data object *MetaDataOpt* which provides multiple callback
functions about getting and setting meta-data.
lock (DistributedLock): The adapter of distributed lock.
crawler_process_callback (Callable): The callback function about running the crawler core processes.
"""
super().__init__(
name=name,
path=path,
metadata_opts_callback=metadata_opts_callback,
)
self._lock = lock
self._crawler_process_callback = crawler_process_callback
@property
@abstractmethod
def role(self) -> CrawlerRole:
""":obj:`str`: Property with only getter for the crawler role what current workflow for."""
pass
[docs] @abstractmethod
def run(self, timer: CrawlerTimer) -> Any:
"""Run the processes of the role's workflow.
Args:
timer (CrawlerTimer): The object which has some timer attributes.
Returns:
No rule. It could return anything it needs in each role's workflow.
"""
pass
[docs]class RunnerWorkflow(BaseRoleWorkflow):
"""*The processing of role **RUNNER***
The crawler role **RUNNER** needs to run tasks so that it also needs to monitor the meta-data object **Task** to
check whether it has tasks or not and runs it if it has.
"""
@property
def role(self) -> CrawlerRole:
return CrawlerRole.RUNNER
[docs] def run(self, timer: CrawlerTimer) -> None:
"""Keep waiting for tasks coming and run it.
Args:
timer (CrawlerTimer): The object which has some timer attributes. In this workflow, it would only use one
attribute --- *CrawlerTimer.time_interval.check_task* (float), a property about the frequency of
checking task.
* *CrawlerTimer.time_interval.check_task* (`float`_)
how long does the crawler instance wait a second for next task. The unit is seconds and default
value is 2.
Returns:
None
.. _float: https://docs.python.org/3/library/functions.html#float
"""
# 1. Try to get data from the target mode path of Zookeeper
# 2. if (step 1 has data and it's valid) {
# Start to run tasks from the data
# } else {
# Keep wait for tasks
# }
while True:
node_state = self._get_metadata(path=self._path.node_state, as_obj=NodeState, must_has_data=False)
task = self._get_metadata(path=self._path.task, as_obj=Task, must_has_data=False)
if node_state.role == CrawlerRole.DEAD_RUNNER.value:
raise StopUpdateHeartbeat
if task.running_content:
# Start to run tasks ...
self.run_task(task=task)
else:
# Keep waiting
time.sleep(timer.time_interval.check_task)
[docs] def run_task(self, task: Task) -> None:
"""Run the task it directs. It runs the task by meta-data *Task.running_content* and records the running result
back to meta-data *Task.running_result* and *Task.result_detail*.
The core implementation of how it works web spider job in protected function *_run_crawling_processing* (and
*processing_crawling_task*).
Args:
task (Task): The task it directs.
Returns:
None
"""
running_content = task.running_content
current_task: Task = task
start_task_id = task.in_progressing_id
assert re.search(r"[0-9]{1,32}", start_task_id) is not None, "The task index must be integer format value."
for index, content in enumerate(running_content[int(start_task_id) :]):
content = TaskContentDataUtils.convert_to_running_content(content)
# Update the ongoing task ID
original_task = self._get_metadata(path=self._path.task, as_obj=Task)
if index == 0:
current_task = Update.task(
task=original_task, in_progressing_id=content.task_id, running_status=TaskState.PROCESSING
)
else:
current_task = Update.task(task=original_task, in_progressing_id=content.task_id)
self._set_metadata(path=self._path.task, metadata=current_task)
# Run the task and update the meta data Task
try:
# TODO: Consider of here usage about how to implement to let it be more clear and convenience in usage
# in client site
data = self._crawler_process_callback(content)
except NotImplementedError as e:
raise e
except Exception as e: # pylint: disable=broad-except
# Update attributes with fail result
running_result = TaskContentDataUtils.convert_to_running_result(original_task.running_result)
updated_running_result = RunningResult(
success_count=running_result.success_count, fail_count=running_result.fail_count + 1
)
result_detail = original_task.result_detail
# TODO: If it gets fail, how to record the result?
result_detail.append(
ResultDetail(
task_id=content.task_id,
state=TaskState.ERROR.value,
status_code=500,
response=None,
error_msg=f"{e}",
)
)
else:
# Update attributes with successful result
running_result = TaskContentDataUtils.convert_to_running_result(original_task.running_result)
updated_running_result = RunningResult(
success_count=running_result.success_count + 1, fail_count=running_result.fail_count
)
result_detail = original_task.result_detail
# TODO: Some information like HTTP status code of response should be get from response object.
result_detail.append(
ResultDetail(
task_id=content.task_id,
state=TaskState.DONE.value,
status_code=200,
response=data,
error_msg=None,
)
)
current_task = Update.task(
task=original_task,
in_progressing_id=content.task_id,
running_result=updated_running_result,
result_detail=result_detail,
)
self._set_metadata(path=self._path.task, metadata=current_task)
# Finish all tasks, record the running result and reset the content ...
current_task = Update.task(
task=current_task, running_content=[], in_progressing_id="-1", running_status=TaskState.DONE
)
self._set_metadata(path=self._path.task, metadata=current_task)
[docs]class PrimaryBackupRunnerWorkflow(BaseRoleWorkflow):
"""*The processing of role **BACKUP RUNNER***
Literally, the crawler role **BACKUP RUNNER** should be the backup of each **RUNNER**. But in this workflow, the
backup one should be the primary one, which means the index of backup instance's crawler name should be same as the
value of meta-data *GroupState.standby_id*.
The primary backup runner would keep monitoring the heartbeat status of each **RUNNER**. It would immediately
activate itself to be **RUNNER** if it discovers anyone of **RUNNER** is dead, and it would try to hand over the
dead one's tasks if the dead one doesn't finish its task yet.
"""
@property
def role(self) -> CrawlerRole:
return CrawlerRole.BACKUP_RUNNER
[docs] def run(self, timer: CrawlerTimer) -> None:
"""Keep checking everyone's heartbeat info, and standby to activate to be a runner by itself if it discovers
anyone is dead.
It would record 2 types checking result as dict type value and the data structure like below:
* timeout records: {<crawler_name>: <timeout times>}
* not timeout records: {<crawler_name>: <not timeout times>}
It would record the timeout times into the value, and it would get each types timeout threshold from meta-data
**Heartbeat**. So it would base on them to run the checking process.
Prevent from the times be counted by unstable network environment and cause the crawler instance be marked as
*Dead*, it won't count the times by accumulation, it would reset the timeout value if the record be counted long
time ago. Currently, it would reset the timeout value if it was counted 10 times ago.
Args:
timer (CrawlerTimer): The object which has some timer attributes. In this workflow, it would use 2
attributes:
* *CrawlerTimer.time_interval.check_crawler_state* (`float`_)
How long does the crawler instance wait a second for next checking heartbeat. The unit is seconds
and default value is 0.5.
* *CrawlerTimer.threshold.reset_timeout* (`int`_)
The threshold of how many straight times it doesn't occur, then it would reset the timeout record.
Returns:
None
.. _float: https://docs.python.org/3/library/functions.html#float
.. _int: https://docs.python.org/3/library/functions.html#int
"""
timeout_records: Dict[str, int] = {}
no_timeout_records: Dict[str, int] = {}
def _one_current_runner_is_dead(runner_name: str) -> Optional[str]:
current_runner_heartbeat = self._get_metadata(
path=f"{self._path.generate_parent_node(runner_name)}/{self._path.heartbeat_node_str}",
as_obj=Heartbeat,
)
heart_rhythm_time = current_runner_heartbeat.heart_rhythm_time
time_format = current_runner_heartbeat.time_format
update_timeout = current_runner_heartbeat.update_timeout
heart_rhythm_timeout = current_runner_heartbeat.heart_rhythm_timeout
diff_datetime = datetime.now() - datetime.strptime(heart_rhythm_time, time_format)
if diff_datetime.total_seconds() >= parse_timer(update_timeout):
# It should start to pay attention on it
timeout_records[runner_name] = timeout_records.get(runner_name, 0) + 1
no_timeout_records[runner_name] = 0
if timeout_records[runner_name] >= int(heart_rhythm_timeout):
return runner_name
else:
no_timeout_records[runner_name] = no_timeout_records.get(runner_name, 0) + 1
if no_timeout_records[runner_name] >= timer.threshold.reset_timeout:
timeout_records[runner_name] = 0
return None
while True:
group_state = self._get_metadata(path=self._path.group_state, as_obj=GroupState)
chk_current_runners_is_dead = map(_one_current_runner_is_dead, group_state.current_runner)
dead_current_runner_iter = filter(
lambda _dead_runner: _dead_runner is not None, list(chk_current_runners_is_dead)
)
dead_current_runner = list(dead_current_runner_iter)
if dead_current_runner:
# Only handle the first one of dead crawlers (if it has multiple dead crawlers
runner_name = dead_current_runner[0]
heartbeat = self._get_metadata(
path=f"{self._path.generate_parent_node(runner_name)}/{self._path.heartbeat_node_str}",
as_obj=Heartbeat,
)
task_of_dead_crawler = self.discover(dead_crawler_name=runner_name, heartbeat=heartbeat)
self.activate(dead_crawler_name=runner_name)
self.hand_over_task(task=task_of_dead_crawler)
break
time.sleep(timer.time_interval.check_crawler_state)
[docs] def discover(self, dead_crawler_name: str, heartbeat: Heartbeat) -> Task:
"""When backup role crawler instance discover anyone is dead, it would mark the target one as *Dead*
(*HeartState.Asystole*) and update its meta-data **Heartbeat**. In the same time, it would try to get
its **Task** and take over it.
Args:
dead_crawler_name (str): The crawler name which be under checking.
heartbeat (Heartbeat): The meta-data **Heartbeat** of crawler be under checking.
Returns:
Task: Meta-data **Task** from dead crawler instance.
"""
node_state_path = f"{self._path.generate_parent_node(dead_crawler_name)}/{self._path.state_node_str}"
node_state = self._get_metadata(path=node_state_path, as_obj=NodeState)
node_state.role = CrawlerRole.DEAD_RUNNER
self._set_metadata(path=node_state_path, metadata=node_state)
task = self._get_metadata(
path=f"{self._path.generate_parent_node(dead_crawler_name)}/{self._path.task_node_str}",
as_obj=Task,
)
heartbeat.healthy_state = HeartState.ASYSTOLE
heartbeat.task_state = task.running_status
self._set_metadata(
path=f"{self._path.generate_parent_node(dead_crawler_name)}/{self._path.heartbeat_node_str}",
metadata=heartbeat,
)
return task
[docs] def activate(self, dead_crawler_name: str) -> None:
"""After backup role crawler instance marks target as *Dead*, it would start to activate to be running by itself
and run the runner's job.
Args:
dead_crawler_name (str): The crawler name which be under checking.
Returns:
None
"""
def _update_group_state() -> None:
state = self._get_metadata(path=self._path.group_state, as_obj=GroupState)
state.total_backup = state.total_backup - 1
state.current_crawler.remove(dead_crawler_name)
state.current_runner.remove(dead_crawler_name)
state.current_runner.append(str(self._crawler_name_data))
state.current_backup.remove(str(self._crawler_name_data))
state.fail_crawler.append(dead_crawler_name)
state.fail_runner.append(dead_crawler_name)
state.standby_id = str(int(state.standby_id) + 1)
self._set_metadata(path=self._path.group_state, metadata=state)
node_state = self._get_metadata(path=self._path.node_state, as_obj=NodeState)
node_state.role = CrawlerRole.RUNNER
self._set_metadata(path=self._path.node_state, metadata=node_state)
self._lock.strongly_run(function=_update_group_state)
[docs] def hand_over_task(self, task: Task) -> None:
"""Hand over the task of the dead crawler instance. It would get the meta-data **Task** from dead crawler and
write it to this crawler's meta-data **Task**.
Args:
task (Task): The meta-data **Task** of crawler be under checking.
Returns:
None
"""
if task.running_status == TaskState.PROCESSING.value:
# Run the tasks from the start index
self._set_metadata(path=self._path.task, metadata=task)
elif task.running_status == TaskState.ERROR.value:
# Reset some specific attributes
updated_task = Update.task(
task,
in_progressing_id="0",
running_result=RunningResult(success_count=0, fail_count=0),
result_detail=[],
)
# Reruns all tasks
self._set_metadata(path=self._path.task, metadata=updated_task)
else:
# Ignore and don't do anything if the task state is nothing or done.
pass
[docs]class SecondaryBackupRunnerWorkflow(BaseRoleWorkflow):
"""*The processing of role **BACKUP RUNNER***
Literally, the crawler role **BACKUP RUNNER** should be the backup of each **RUNNER**. If the backup crawler member
is not primary, then it is secondary backup runner. The secondary backup crawler only does one thing --- keep
monitoring and checking whether the *GroupState.standby_id* to be same as the index of current crawler or not. If it
does, then it would activate itself to be the primary backup runner.
"""
@property
def role(self) -> CrawlerRole:
return CrawlerRole.BACKUP_RUNNER
[docs] def run(self, timer: CrawlerTimer) -> bool:
"""Keep waiting to be the primary backup crawler instance.
Args:
timer (CrawlerTimer): The object which has some timer attributes. In this workflow, it would use only one
attribute:
* *CrawlerTimer.time_interval.check_standby_id* (`float`_)
How long does the crawler instance wait a second for next checking GroupState.standby_id. The unit
is seconds and default value is 2.
Returns:
bool: It's True if it directs the standby ID attribute value is equal to its index of name.
.. _float: https://docs.python.org/3/library/functions.html#float
"""
while True:
group_state = self._get_metadata(path=self._path.group_state, as_obj=GroupState)
if str(self._crawler_name_data.id) == group_state.standby_id:
# Start to do wait_and_standby
return True
time.sleep(timer.time_interval.check_standby_id)
[docs]class HeartbeatUpdatingWorkflow(BaseWorkflow):
"""*The processing of every role which needs to alive with updating heartbeat*
This is a general workflow, so its super class isn't **BaseRoleWorkflow**, is **BaseWorkflow**. This workflow only
focus on one thing --- keep updating the current crawler instance's heartbeat.
"""
_stop_heartbeat_signal: bool = False
_updating_exception: Exception = None
@property
def stop_heartbeat(self) -> bool:
""":obj:`bool`: Property with getter and setter for stopping updating the current crawler instance's heartbeat
info.
"""
return self._stop_heartbeat_signal
@stop_heartbeat.setter
def stop_heartbeat(self, signal: bool) -> None:
self._stop_heartbeat_signal = signal
[docs] def run(self) -> None:
"""The main function of updating **Heartbeat** info.
.. note::
It has a flag *_stop_heartbeat_signal*. If it's True, it would stop updating **Heartbeat**.
Returns:
None
"""
while True:
if not self._stop_heartbeat_signal:
try:
# Get *Task* and *Heartbeat* info
task = self._get_metadata(path=self._path.task, as_obj=Task)
heartbeat = self._get_metadata(path=self._path.heartbeat, as_obj=Heartbeat)
# Update the values
heartbeat = Update.heartbeat(
heartbeat,
heart_rhythm_time=datetime.now(),
healthy_state=HeartState.HEALTHY,
task_state=task.running_status,
)
self._set_metadata(path=self._path.heartbeat, metadata=heartbeat)
# Sleep ...
time.sleep(parse_timer(heartbeat.update_time))
except Exception as e: # pylint: disable=broad-except
self._updating_exception = e
break
else:
task = self._get_metadata(path=self._path.task, as_obj=Task)
heartbeat = self._get_metadata(path=self._path.heartbeat, as_obj=Heartbeat)
heartbeat = Update.heartbeat(
heartbeat,
heart_rhythm_time=datetime.now(),
healthy_state=HeartState.APPARENT_DEATH,
task_state=task.running_status,
)
self._set_metadata(path=self._path.heartbeat, metadata=heartbeat)
break
if self._updating_exception:
raise self._updating_exception