Skip to content

Latest commit

 

History

History
109 lines (74 loc) · 1.87 KB

File metadata and controls

109 lines (74 loc) · 1.87 KB

Monitoring

Heart Beat Producer

def start_heart_beat():

    thread = threading.Thread(target=heart_beat)
    thread.start()

Topic Name

  • Node

    node_ip = get('https://api.ipify.org').text
    
    topic_name = "{}-{}_{}".format("node",node_ip, "8004")
  • Application

    topic_name = "{}-{}".format("application", App_instance_id)
  • Service

    topic_name = "{}-{}".format("service", service_name)
heart_beat = "1"
producer.send(topic_name, heart_beat)

Heart Beat Consumer

threading.Thread(target=receive_reg_dereg_request).start()

Register & Deregister

  • Service, Node & Application will register themselves dynamically on event.
def receive_reg_dereg_request():

	reg_dereg_topic = 'register_deregister'
	for message in consumer:
		
		if request_type == 'register':
			register(request_msg)
		else:	
			deregister(request_msg)
  • Register

    • Indirectly Start Listening heart-beat coming from That node/ Application/ Service on new thread.
    def register(request_msg):
    
    	# request_msg --> Topic Name (Described in Heart Beat Producer)
    	
    	threading.Thread(
    		target=heart_beat_consumer, 
    		args=(topic_name)
    	).start()
  • Deregister

    # Applicable Only to Applications
    # Remove from Database

Heart Beat Consumer

def heart_beat_consumer(topic_name):
	consumer = KafkaConsumer(
					topic_name, 
					['{}:{}'.format(kafka_ip, kafka_port)], 
					consumer_timeout_ms = 16000
				)
	
	for message in consumer:
		pass

	inform_fault_tolerance(topic_name)
def	inform_fault_tolerance(topic_name):

	service_name = "Node/ Service/ App_instance_ID"
	topic_name = "monitoring"
	
	producer.send(topic_name, service_name)