Zookeeper Crawlers#

class smoothcrawler_cluster.crawler.crawlers.ZookeeperCrawler(runner: int, backup: int, name: str = '', group: str = '', index_sep: List[str] = ['-', '_'], initial: bool = True, ensure_initial: bool = False, ensure_timeout: int = 3, ensure_wait: float = 0.5, heartbeat_update: float = 0.5, heartbeat_update_timeout: float = 2, heartbeat_dead_threshold: int = 3, zk_hosts: str | None = None, zk_converter: Type[BaseConverter] | None = None, attribute: BaseCrawlerAttribute | None = None, election_strategy: Generic[BaseElectionType] = None, factory: Type[BaseFactory] | None = None)[source]#

Cluster crawler with Zookeeper

This crawler reaches cluster feature though third party application — Zookeeper. So it let Zookeeper manage all processing of meta-data objects, e.g., GroupState, Heartbeat, etc.

Parameters:
  • runner (int) – The number of crawler to run task. This value is equal to attribute GroupState.total_runner.

  • backup (int) – The number of crawler to check all crawler runner is alive or not and standby to activate by itself to be another runner if anyone is dead. This value is equal to attribute GroupState.total_backup.

  • name (str) – The name of crawler instance. Default value is sc-crawler_1.

  • group (str) – The group name this crawler instance would belong to it. Default value is sc-crawler-cluster.

  • index_sep (list of str) – The separator of what index is in name. It only accepts 2 types: dash (‘-’) or under line (‘_’).

  • initial (bool) –

    It would run initial processing if this option is True. The initial processing detail procedure is:

    register meta-data to Zookeeper -> activate and run a new thread about keeping updating heartbeat info -> wait and check whether it’s ready for running election or not -> run election of task runner -> update role

  • ensure_initial (bool) – If it’s True, it would guarantee the value of register meta-data processing is satisfied of size of GroupState.current_crawler is equal to the total of runner and backup, and this crawler name must be in it.

  • ensure_timeout (int) – The times of timeout to guarantee the register meta-data processing finish. Default value is 3.

  • ensure_wait (float) – How long to wait between every checking. Default value is 0.5 (unit is second).

  • heartbeat_update (float) – The time frequency to update heartbeat info, i.g., if value is ‘2’, it would update heartbeat info every 2 seconds. The unit is seconds.

  • heartbeat_update_timeout (float) – The timeout value of updating, i.g., if value is ‘3’, it is time out if it doesn’t to update heartbeat info exceeds 3 seconds. The unit is seconds.

  • heartbeat_dead_threshold (int) – The threshold of timeout times to judge it is dead, i.g., if value is ‘3’ and the updating timeout exceeds 3 times, it would be marked as ‘Dead_<Role>’ (like ‘Dead_Runner’ or ‘Dead_Backup’).

  • zk_hosts (str) – The Zookeeper hosts. Use comma to separate each hosts if it has multiple values. Default value is localhost:2181.

  • zk_converter (Type[BaseConverter]) – The converter to parse data content to be an object. It must be a type of BaseConverter. Default value is JsonStrConverter.

  • attribute (BaseCrawlerAttribute) – The attribute type of crawler. Default strategy is SerialCrawlerAttribute.

  • election_strategy (BaseElection) – The strategy of election. Default strategy is IndexElection.

  • factory (Type[BaseFactory]) – The factory which saves SmoothCrawler components.

property name: CrawlerName#

Properties with both getter and setter. This is crawler instance name information. It MUST be unique naming in cluster (the same group) for let entire crawler cluster to distinguish every one, for example, the properties current_crawler, current_runner and current_backup in meta-data GroupState would record by crawler names. This option value could be modified by Zookeeper object option name.

Type:

CrawlerName

property group: str#

Properties with both a getter and setter. The group name of this crawler instance. This also means the cluster naming to let system distinguishes which crawler instance belong to which cluster. This option value could be modified by Zookeeper object option group.

Type:

str

property role: CrawlerRole#

Properties with both a getter and setter. The role of crawler instance. Please refer to CrawlerStateRole to get more detail of it.

Type:

str

property zookeeper_hosts: str#

Properties with both a getter and setter. The Zookeeper hosts current crawler instance use to connect. The value should be as IP:Port (multi: IP:Port,IP:Port) format content.

Type:

str

property ensure_register: bool#

Properties with both a getter and setter. The getter and setter of option ensure_initial.

Type:

str

property ensure_timeout: int#

Properties with both a getter and setter. The getter and setter of option ensure_timeout.

Type:

str

property ensure_wait: float#

Properties with both a getter and setter. The getter and setter of option ensure_wait.

Type:

str

property register: Register#

Properties with both getter and setter for object Register to process meta-data registrations.

Type:

Register

property dispatcher: WorkflowDispatcher#

Properties with both getter and setter for workflow dispatcher WorkflowDispatcher which would dispatch one specific workflow by the current crawler role to run.

Type:

WorkflowDispatcher

initial() None[source]#

Initial processing of entire cluster running. This processing procedure like below:

<Has an image of workflow>

  • Register

Initial each needed meta-data (GroupState, NodeState, Task and Heartbeat) and set them to Zookeeper, it also creates their node if it doesn’t exist in Zookeeper. This processing about setting meta-data to Zookeeper be called register.

  • Activate and run new thread to keep updating heartbeat info

If the signal _Updating_Stop_Signal is False, it would create a new thread here and it only keeps doing one thing again and again — updating it owns heartbeat info by itself.

  • Check it’s ready for running election

There are 2 conditions to check whether it is ready or not:
  1. The size of GroupState.current_crawler is equal to the sum of runner and backup.

  2. Current crawler instance’s name be included in the list GroupState.current_crawler.

The checking is forever (option timeout is -1) and it only waits 0.5 seconds between every checks.

  • Run election for runner

Elect which ones are runner and rest ones are backup. It could exchange different strategies to let election has different win conditions.

  • Update the role of crawler instance

After election, everyone must know what role it is, and they would update this info to Zookeeper to prepare for doing their own jobs.

Returns:

None

register_metadata() None[source]#

Register all needed meta-data (GroupState, NodeState, Task and Heartbeat) to Zookeeper.

Returns:

None

stop_update_heartbeat() None[source]#

Set the flag of _Updating_Stop_Signal to be True.

Returns:

None

is_ready_for_election(interval: float = 0.5, timeout: float = -1) bool[source]#

Check whether it is ready to run next processing for function elect.

Parameters:
  • interval (float) – How long it should wait between every check. Default value is 0.5 (unit is seconds).

  • timeout (float) – Waiting timeout, if it’s -1 means it always doesn’t time out. Default value is -1.

Returns:

If it’s True, means that it’s ready to run next processing.

Return type:

bool

is_ready_for_run(interval: float = 0.5, timeout: float = -1) bool[source]#

Check whether it is ready to run next processing for function run.

Parameters:
  • interval (float) – How long it should wait between every check. Default value is 0.5 (unit is seconds).

  • timeout (float) – Waiting timeout, if it’s -1 means it always doesn’t time out. Default value is -1.

Returns:

If it’s True, means that it’s ready to run next processing.

Return type:

bool

_is_ready_by_groupstate(condition_callback: Callable, interval: float = 0.5, timeout: float = -1) bool[source]#

Use the condition_callback option to judge it is ready for next running or not. The callback function must have an argument is GroupState.

In cluster running, it has some scenarios would need to wait for data be synced successfully and finish. And the scenarios in SmoothCrawler-Cluster would deeply depend on meta-data _GroupState_.

Parameters:
  • condition_callback (Callable) – The callback function of checking and judge it’s ready to run next process or not.

  • interval (float) – How long it should wait between every check. Default value is 0.5 (unit is seconds).

  • timeout (float) – Waiting timeout, if it’s -1 means it always doesn’t time out. Default value is -1.

Returns:

If it’s True, means that it’s ready to run next processing.

Return type:

bool

elect() ElectionResult[source]#

Run election to choose which crawler instances are runner and rest ones are backup.

Returns:

A ElectionResult type value.

run(interval: float = 0.5, timeout: int = -1, wait_task_time: float = 2, standby_wait_time: float = 0.5, wait_to_be_standby_time: float = 2, reset_timeout_threshold: int = 10, unlimited: bool = True) None[source]#

The major function of the cluster. It has a simple workflow cycle:

<has an image of the workflow>

Parameters:
  • interval (float) – How long it should wait between every check. Default value is 0.5 (unit is seconds).

  • timeout (int) – Waiting timeout, if it’s -1 means it always doesn’t time out. Default value is -1.

  • wait_task_time (float) – For a Runner, how long does the crawler instance wait a second for next task. The unit is seconds and default value is 2.

  • standby_wait_time (float) – For a Backup, how long does the crawler instance wait a second for next checking heartbeat. The unit is seconds and default value is 0.5.

  • wait_to_be_standby_time (float) – For a Backup but isn’t the primary one, how long does the crawler instance wait a second for next checking GroupState.standby_id. The unit is seconds and default value is 2.

  • reset_timeout_threshold (int) – The threshold of how many straight times it doesn’t occur, then it would reset the timeout record.

  • unlimited (bool) – If it is True, this function would keep running as unlimited loop, nor it would only run once time.

Returns:

None

Raises:

CrawlerIsDeadError – The current crawler instance is dead.

pre_running() None[source]#

Pre-processing which would be run first of all. Default implementation is doing nothing.

Returns:

None

running_as_role(role: str | CrawlerRole, wait_task_time: float = 2, standby_wait_time: float = 0.5, wait_to_be_standby_time: float = 2, reset_timeout_threshold: int = 10) bool | None[source]#

Running the crawler instance’s own job by what role it is.

  • _CrawlerStateRole.Runner_ ->

    waiting for tasks and run it if it gets.

  • _CrawlerStateRole.Backup_Runner_ with index is standby_id ->

    keep checking every runner’s heartbeat and standby to activate to be a runner by itself if it discovers anyone is dead.

  • _CrawlerStateRole.Backup_Runner_ with index is not standby_id ->

    keep checking the standby ID, and to be primary backup if the standby ID is equal to its index.

Parameters:
  • role (CrawlerRole) – The role of crawler instance.

  • wait_task_time (float) – For a Runner, how long does the crawler instance wait a second for next task. The unit is seconds and default value is 2.

  • standby_wait_time (float) – For a Backup, how long does the crawler instance wait a second for next checking heartbeat. The unit is seconds and default value is 0.5.

  • wait_to_be_standby_time (float) – For a Backup but isn’t the primary one, how long does the crawler instance wait a second for next checking GroupState.standby_id. The unit is seconds and default value is 2.

  • reset_timeout_threshold (int) – The threshold of how many straight times it doesn’t occur, then it would reset the timeout record.

Returns:

None

Raises:

CrawlerIsDeadError – The current crawler instance is dead.

before_dead(exception: Exception) None[source]#

Do something when it gets an exception. The default implementation is raising the exception to outside.

Parameters:

exception (Exception) – The exception it got.

Returns:

None

processing_crawling_task(content: RunningContent) Any[source]#

The core of web spider implementation. All running functions it used are developers are familiar with — SmoothCrawler components.

Parameters:

content (RunningContent) – A RunningContent type object which provides clear attributes to run crawling task.

Returns:

The running result of crawling.

Return type:

Any

_update_crawler_role(role: CrawlerRole) None[source]#

Update to be what role current crawler instance is in crawler cluster.

Parameters:

role (CrawlerRole) – The role of crawler instance.

Returns:

None

_run_updating_heartbeat_thread() None[source]#

Activate and run a new thread to keep updating Heartbeat info.

Returns:

None

_run_crawling_processing(content: RunningContent) Any[source]#

The wrapper function of core crawling function processing_crawling_task. This meaning is checking the SmoothCrawler components have been registered or not.

Parameters:

content (RunningContent) – A RunningContent type object which provides clear attributes to run crawling task.

Returns:

The running result of crawling.

Return type:

Any

_chk_register(http_sender: bool = True, http_resp_parser: bool = True, data_hdler: bool = True, persist: bool = True) bool[source]#

Checking the SmoothCrawler components has been registered or not.

Parameters:
  • http_sender – Checking component CrawlerFactory.http_factory if it’s True.

  • http_resp_parser – Checking component CrawlerFactory.parser_factory if it’s True.

  • data_hdler – Checking component CrawlerFactory.data_handling_factory if it’s True.

  • persist – Checking component CrawlerFactory.data_handling_factory if it’s True.

Returns:

If it’s True, means SmoothCrawler have been registered.

Return type:

bool