Heartbeat Code For Cluster Environment
Working in a cluster environment, I often need to check if some of the nodes of my cluster are dead or live. To do so, I have a class called Heartbeat in my Python toolbox. This simple heartbeat class does a ping on the cluster node, and returns True, or false depending on the health of the targeted node. This class implements a stripped down version of ping. It sends a ICMP_ECHO_REQUEST packet and waits for the answer.
To use it, I call the constructor with the node name, or IP address, followed by the number of seconds between heartbeats. Then every time I need to check if the node is still alive, I call the method is_alive(), which returns a Boolean.
Here an example of how to use it, followed by the code.
How to use it
>>> import heartbeat >>> master = heartbeat.Heartbeat('172.16.2.1', 5) >>> master.is_alive() True >>> master.is_alive() False >>> master.is_alive() True >>>
# # (c) 2009 - Fred Cirera http://0x9900.com/ # import array import os import socket import time import select from struct import pack, unpack, calcsize ICMP_TYPE = 8 ICMP_CODE = 0 ICMP_CHECKSUM = 0 ICMP_ID = 0 ICMP_SEQ_NR = 0 PACKET_SIZE = 56 HEARTBEAT_PROBE_TIME = 20 HEARTBEAT_SERVER = 'master.cluster' # This version of ping is the stripped down version of the ping.py by # Lars Strand. in our case we only need to see if the network is up # and running. def _construct(id): """Constructs a ICMP echo packet of variable size""" # construct header header = pack('bbHHh', ICMP_TYPE, ICMP_CODE, ICMP_CHECKSUM, ICMP_ID, ICMP_SEQ_NR+id) # space for time size = PACKET_SIZE - calcsize("d") data = pack("d", time.time()) + 'X' * size packet = header + data # ping packet without checksum checksum = _in_cksum(packet) # make checksum # construct header with correct checksum header = pack('bbHHh', ICMP_TYPE, ICMP_CODE, checksum, ICMP_ID, \ ICMP_SEQ_NR+id) # ping packet *with* checksum packet = header + data # a perfectly formatted ICMP echo packet return packet def _in_cksum(packet): """THE RFC792 states: 'The 16 bit one's complement of the one's complement sum of all 16 bit words in the header.' Generates a checksum of a (ICMP) packet. Based on in_chksum found in ping.c on FreeBSD. """ # add byte if not divisible by 2 if len(packet) & 1: packet = packet + '\0' # split into 16-bit word and insert into a binary array words = array.array('h', packet) sum = 0 # perform ones complement arithmetic on 16-bit words for word in words: sum += (word & 0xffff) hi = sum >> 16 lo = sum & 0xffff sum = hi + lo sum = sum + (sum >> 16) return (~sum) & 0xffff # return ones complement def pingNode(node, sock, timeout=1.0): """Pings a node based on input given to the function. return False for dead, and True when alive. """ pid = os.getpid() packet = _construct(pid) # make a ping packet # send the ping try: sock.sendto(packet,(node,1)) except socket.error, e: return False # reset values pong = ""; iwtd =  # wait until there is data in the socket while 1: # input, output, exceptional conditions iwtd, owtd, ewtd = select.select([sock], , , timeout) break # no data and timout occurred # data on socket - this means we have an answer if iwtd: # ok, data on socket # read data (we only need the header) pong, address = sock.recvfrom(PACKET_SIZE+48) # fetch pong header pongHeader = pong[20:28] pongType, pongCode, pongChksum, pongID, pongSeqnr = unpack("bbHHh", pongHeader) # valid ping packet received? if not pongSeqnr == pid: pong = None # NO data on socket - timeout waiting for answer if not pong: return False return True return False class Heartbeat: last_check = 0 status = 0 def __init__(self, node=HEARTBEAT_SERVER, probe_time=HEARTBEAT_PROBE_TIME): self.node = node self.probe_time = probe_time return def is_alive(self): now = int(time.time()) if self.last_check + self.probe_time > now: return self.status self.last_check = now self.status = False try: host = socket.gethostbyname(self.node) sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp")) self.status = pingNode(self.node, sock) sock.close() except socket.gaierror: pass return self.status