Workflow#

The different independent processing for different role as the role’s own workflow

SmoothCrawler-Cluster let the implementation details of each role’s processing as each role’s individual workflow. And the workflow would be dispatched to do the process which it should work at that time in crawler by the dispatcher in dispatcher module. So it won’t take care who use them, it only considers that what things it should do in the cluster by the role.

New in version 0.2.0.

Base Workflow#

class smoothcrawler_cluster.crawler.workflow.BaseWorkflow(name: CrawlerName, path: Type[MetaDataPath], metadata_opts_callback: MetaDataOpt)[source]#

The base running workflow

It rules 2 things: workflow object initial arguments and major function run.

  • Initial arguments

    Workflow is responsible for each role’s processing in cluster. So it must need the basic crawler info: name and index separation. And it also needs to communicate with each other to know the cluster brief state, so it also needs path of meta-data and callback function for getting and setting meta-data.

  • Major function run

    The main body of what details the workflow would do. So no matter that workflows be called or dispatched, caller could only use <BaseWorkflow object>.run(*args, **kwargs) to run it.

Parameters:
  • name (CrawlerName) – The data object CrawlerName which provides some attribute like crawler instance’s name or ID, etc.

  • path (Type[MetaDataPath]) – The objects which has all meta-data object’s path property.

  • metadata_opts_callback (MetaDataOpt) – The data object MetaDataOpt which provides multiple callback functions about getting and setting meta-data.

abstract run(*args, **kwargs) Any[source]#

The major function for running this workflow. This abstract function’s argument use *args and **kwargs because it let all workflow could have their own needed customized arguments, absolutely it also could have nothing arguments if it needs.

Parameters:
  • *args (tuple) – Function arguments which would be used as **args*.

  • **kwargs (dict) – Function arguments which would be used as **kwargs.

Returns:

Base Workflow for crawler role#

class smoothcrawler_cluster.crawler.workflow.BaseRoleWorkflow(name: CrawlerName, path: Type[MetaDataPath], metadata_opts_callback: MetaDataOpt, lock: DistributedLock, crawler_process_callback: Callable)[source]#

Base of each role’s workflow

The base class of each role’s workflow which should have property of what role it is for. And the major function run must needs to pass argument timer.

Parameters:
  • name (CrawlerName) – The data object CrawlerName which provides some attribute like crawler instance’s name or ID, etc.

  • path (Type[MetaDataPath]) – The objects which has all meta-data object’s path property.

  • metadata_opts_callback (MetaDataOpt) – The data object MetaDataOpt which provides multiple callback functions about getting and setting meta-data.

  • lock (DistributedLock) – The adapter of distributed lock.

  • crawler_process_callback (Callable) – The callback function about running the crawler core processes.

abstract property role: CrawlerRole#

Property with only getter for the crawler role what current workflow for.

Type:

str

abstract run(timer: CrawlerTimer) Any[source]#

Run the processes of the role’s workflow.

Parameters:

timer (CrawlerTimer) – The object which has some timer attributes.

Returns:

No rule. It could return anything it needs in each role’s workflow.

Runner Workflow#

class smoothcrawler_cluster.crawler.workflow.RunnerWorkflow(name: CrawlerName, path: Type[MetaDataPath], metadata_opts_callback: MetaDataOpt, lock: DistributedLock, crawler_process_callback: Callable)[source]#

The processing of role **RUNNER**

The crawler role RUNNER needs to run tasks so that it also needs to monitor the meta-data object Task to check whether it has tasks or not and runs it if it has.

Parameters:
  • name (CrawlerName) – The data object CrawlerName which provides some attribute like crawler instance’s name or ID, etc.

  • path (Type[MetaDataPath]) – The objects which has all meta-data object’s path property.

  • metadata_opts_callback (MetaDataOpt) – The data object MetaDataOpt which provides multiple callback functions about getting and setting meta-data.

  • lock (DistributedLock) – The adapter of distributed lock.

  • crawler_process_callback (Callable) – The callback function about running the crawler core processes.

property role: CrawlerRole#

Property with only getter for the crawler role what current workflow for.

Type:

str

run(timer: CrawlerTimer) None[source]#

Keep waiting for tasks coming and run it.

Parameters:

timer (CrawlerTimer) –

The object which has some timer attributes. In this workflow, it would only use one attribute — CrawlerTimer.time_interval.check_task (float), a property about the frequency of checking task.

  • CrawlerTimer.time_interval.check_task (float)

    how long does the crawler instance wait a second for next task. The unit is seconds and default value is 2.

Returns:

None

run_task(task: Task) None[source]#

Run the task it directs. It runs the task by meta-data Task.running_content and records the running result back to meta-data Task.running_result and Task.result_detail.

The core implementation of how it works web spider job in protected function _run_crawling_processing (and processing_crawling_task).

Parameters:

task (Task) – The task it directs.

Returns:

None

Primary Backup Runner Workflow#

class smoothcrawler_cluster.crawler.workflow.PrimaryBackupRunnerWorkflow(name: CrawlerName, path: Type[MetaDataPath], metadata_opts_callback: MetaDataOpt, lock: DistributedLock, crawler_process_callback: Callable)[source]#

The processing of role **BACKUP RUNNER**

Literally, the crawler role BACKUP RUNNER should be the backup of each RUNNER. But in this workflow, the backup one should be the primary one, which means the index of backup instance’s crawler name should be same as the value of meta-data GroupState.standby_id.

The primary backup runner would keep monitoring the heartbeat status of each RUNNER. It would immediately activate itself to be RUNNER if it discovers anyone of RUNNER is dead, and it would try to hand over the dead one’s tasks if the dead one doesn’t finish its task yet.

Parameters:
  • name (CrawlerName) – The data object CrawlerName which provides some attribute like crawler instance’s name or ID, etc.

  • path (Type[MetaDataPath]) – The objects which has all meta-data object’s path property.

  • metadata_opts_callback (MetaDataOpt) – The data object MetaDataOpt which provides multiple callback functions about getting and setting meta-data.

  • lock (DistributedLock) – The adapter of distributed lock.

  • crawler_process_callback (Callable) – The callback function about running the crawler core processes.

property role: CrawlerRole#

Property with only getter for the crawler role what current workflow for.

Type:

str

run(timer: CrawlerTimer) None[source]#

Keep checking everyone’s heartbeat info, and standby to activate to be a runner by itself if it discovers anyone is dead.

It would record 2 types checking result as dict type value and the data structure like below:
  • timeout records: {<crawler_name>: <timeout times>}

  • not timeout records: {<crawler_name>: <not timeout times>}

It would record the timeout times into the value, and it would get each types timeout threshold from meta-data Heartbeat. So it would base on them to run the checking process.

Prevent from the times be counted by unstable network environment and cause the crawler instance be marked as Dead, it won’t count the times by accumulation, it would reset the timeout value if the record be counted long time ago. Currently, it would reset the timeout value if it was counted 10 times ago.

Parameters:

timer (CrawlerTimer) –

The object which has some timer attributes. In this workflow, it would use 2 attributes:

  • CrawlerTimer.time_interval.check_crawler_state (float)

    How long does the crawler instance wait a second for next checking heartbeat. The unit is seconds and default value is 0.5.

  • CrawlerTimer.threshold.reset_timeout (int)

    The threshold of how many straight times it doesn’t occur, then it would reset the timeout record.

Returns:

None

discover(dead_crawler_name: str, heartbeat: Heartbeat) Task[source]#

When backup role crawler instance discover anyone is dead, it would mark the target one as Dead (HeartState.Asystole) and update its meta-data Heartbeat. In the same time, it would try to get its Task and take over it.

Parameters:
  • dead_crawler_name (str) – The crawler name which be under checking.

  • heartbeat (Heartbeat) – The meta-data Heartbeat of crawler be under checking.

Returns:

Meta-data Task from dead crawler instance.

Return type:

Task

activate(dead_crawler_name: str) None[source]#

After backup role crawler instance marks target as Dead, it would start to activate to be running by itself and run the runner’s job.

Parameters:

dead_crawler_name (str) – The crawler name which be under checking.

Returns:

None

hand_over_task(task: Task) None[source]#

Hand over the task of the dead crawler instance. It would get the meta-data Task from dead crawler and write it to this crawler’s meta-data Task.

Parameters:

task (Task) – The meta-data Task of crawler be under checking.

Returns:

None

Secondary Backup Runner Workflow#

class smoothcrawler_cluster.crawler.workflow.SecondaryBackupRunnerWorkflow(name: CrawlerName, path: Type[MetaDataPath], metadata_opts_callback: MetaDataOpt, lock: DistributedLock, crawler_process_callback: Callable)[source]#

The processing of role **BACKUP RUNNER**

Literally, the crawler role BACKUP RUNNER should be the backup of each RUNNER. If the backup crawler member is not primary, then it is secondary backup runner. The secondary backup crawler only does one thing — keep monitoring and checking whether the GroupState.standby_id to be same as the index of current crawler or not. If it does, then it would activate itself to be the primary backup runner.

Parameters:
  • name (CrawlerName) – The data object CrawlerName which provides some attribute like crawler instance’s name or ID, etc.

  • path (Type[MetaDataPath]) – The objects which has all meta-data object’s path property.

  • metadata_opts_callback (MetaDataOpt) – The data object MetaDataOpt which provides multiple callback functions about getting and setting meta-data.

  • lock (DistributedLock) – The adapter of distributed lock.

  • crawler_process_callback (Callable) – The callback function about running the crawler core processes.

property role: CrawlerRole#

Property with only getter for the crawler role what current workflow for.

Type:

str

run(timer: CrawlerTimer) bool[source]#

Keep waiting to be the primary backup crawler instance.

Parameters:

timer (CrawlerTimer) –

The object which has some timer attributes. In this workflow, it would use only one attribute:

  • CrawlerTimer.time_interval.check_standby_id (float)

    How long does the crawler instance wait a second for next checking GroupState.standby_id. The unit is seconds and default value is 2.

Returns:

It’s True if it directs the standby ID attribute value is equal to its index of name.

Return type:

bool

Heartbeat Updating Workflow#

class smoothcrawler_cluster.crawler.workflow.HeartbeatUpdatingWorkflow(name: CrawlerName, path: Type[MetaDataPath], metadata_opts_callback: MetaDataOpt)[source]#

The processing of every role which needs to alive with updating heartbeat

This is a general workflow, so its super class isn’t BaseRoleWorkflow, is BaseWorkflow. This workflow only focus on one thing — keep updating the current crawler instance’s heartbeat.

Parameters:
  • name (CrawlerName) – The data object CrawlerName which provides some attribute like crawler instance’s name or ID, etc.

  • path (Type[MetaDataPath]) – The objects which has all meta-data object’s path property.

  • metadata_opts_callback (MetaDataOpt) – The data object MetaDataOpt which provides multiple callback functions about getting and setting meta-data.

property stop_heartbeat: bool#

Property with getter and setter for stopping updating the current crawler instance’s heartbeat info.

Type:

bool

run() None[source]#

The main function of updating Heartbeat info.

Note

It has a flag _stop_heartbeat_signal. If it’s True, it would stop updating Heartbeat.

Returns:

None