Heartbeat Code For Cluster Environment

Working in a cluster environment, I often need to check if some of the nodes of my cluster are dead or live. To do so, I have a class called Heartbeat in my Python toolbox. This simple heartbeat class does a ping on the cluster node, and returns True, or false depending on the health of the targeted node. This class implements a stripped down version of ping. It sends a ICMP_ECHO_REQUEST packet and waits for the answer.

To use it, I call the constructor with the node name, or IP address, followed by the number of seconds between heartbeats. Then every time I need to check if the node is still alive, I call the method is_alive(), which returns a Boolean.

Here an example of how to use it, followed by the code.

How to use it

>>> import heartbeat
>>> master = heartbeat.Heartbeat('172.16.2.1', 5)
>>> master.is_alive()
True
>>> master.is_alive()
False
>>> master.is_alive()
True
>>>

The code

#
# (c) 2009 - Fred Cirera http://0x9900.com/
#

import array
import os
import socket
import time
import select
from struct import pack, unpack, calcsize

ICMP_TYPE = 8
ICMP_CODE = 0
ICMP_CHECKSUM = 0
ICMP_ID = 0
ICMP_SEQ_NR = 0

PACKET_SIZE = 56
HEARTBEAT_PROBE_TIME = 20
HEARTBEAT_SERVER = 'master.cluster'

# This version of ping is the stripped down version of the ping.py by
# Lars Strand. in our case  we only need to see if the network is up
# and running.

def _construct(id):
    """Constructs a ICMP echo packet of variable size"""
    # construct header
    header = pack('bbHHh', ICMP_TYPE, ICMP_CODE, ICMP_CHECKSUM, ICMP_ID,
                  ICMP_SEQ_NR+id)
    # space for time
    size = PACKET_SIZE - calcsize("d")
    data = pack("d", time.time()) + 'X' * size
    packet = header + data          # ping packet without checksum
    checksum = _in_cksum(packet)    # make checksum

    # construct header with correct checksum
    header = pack('bbHHh', ICMP_TYPE, ICMP_CODE, checksum, ICMP_ID, \
                         ICMP_SEQ_NR+id)

    # ping packet *with* checksum
    packet = header + data

    # a perfectly formatted ICMP echo packet
    return packet

def _in_cksum(packet):
    """THE RFC792 states: 'The 16 bit one's complement of
    the one's complement sum of all 16 bit words in the header.'

    Generates a checksum of a (ICMP) packet. Based on in_chksum found
    in ping.c on FreeBSD.
    """

    # add byte if not divisible by 2
    if len(packet) & 1:
        packet = packet + '\0'

    # split into 16-bit word and insert into a binary array
    words = array.array('h', packet)
    sum = 0

    # perform ones complement arithmetic on 16-bit words
    for word in words:
        sum += (word & 0xffff)

    hi = sum >> 16
    lo = sum & 0xffff
    sum = hi + lo
    sum = sum + (sum >> 16)

    return (~sum) & 0xffff # return ones complement


def pingNode(node, sock, timeout=1.0):
    """Pings a node based on input given to the function.
    return False for dead, and True when alive.
    """
    pid = os.getpid()
    packet = _construct(pid) # make a ping packet

    # send the ping
    try:
        sock.sendto(packet,(node,1))
    except socket.error, e:
        return False

    # reset values
    pong = ""; iwtd = []

    # wait until there is data in the socket
    while 1:
        # input, output, exceptional conditions
        iwtd, owtd, ewtd = select.select([sock], [], [], timeout)
        break # no data and timout occurred

    # data on socket - this means we have an answer
    if iwtd:  # ok, data on socket
        # read data (we only need the header)
        pong, address = sock.recvfrom(PACKET_SIZE+48)

        # fetch pong header
        pongHeader = pong[20:28]
        pongType, pongCode, pongChksum, pongID, pongSeqnr = unpack("bbHHh",
                                                                   pongHeader)

        # valid ping packet received?
        if not pongSeqnr == pid:
            pong = None

        # NO data on socket - timeout waiting for answer
        if not pong:
            return False

        return True

    return False

class Heartbeat:
    last_check = 0
    status = 0
    def __init__(self, node=HEARTBEAT_SERVER,
             probe_time=HEARTBEAT_PROBE_TIME):
        self.node = node
        self.probe_time = probe_time
        return
    def is_alive(self):
        now = int(time.time())
        if self.last_check + self.probe_time > now:
            return self.status

        self.last_check = now
        self.status = False
        try:
            host = socket.gethostbyname(self.node)
            sock = socket.socket(socket.AF_INET, socket.SOCK_RAW,
                                 socket.getprotobyname("icmp"))
            self.status = pingNode(self.node, sock)
            sock.close()
        except socket.gaierror:
            pass

        return self.status


Comments !