Source code for lib.heartbeat

from datetime import datetime, timedelta, timezone
from threading import Thread, Timer

from requests import ConnectTimeout, get, post

from document.agent.catalog import AgentCatalogDocument
from document.agent.instance import AgentInstanceDocument
from document.connection import ConnectionDocument
from document.exec_env import ExecEnvDocument
from document.network_link import NetworkLinkDocument
from lib.http import HTTP_Status
from lib.token import create_token
from reader.arg import ArgReader
from utils.datetime import datetime_from_str, datetime_to_str
from utils.log import Log


[docs]def heartbeat(): """Heartbeat procedure with the LCPs.""" search = ExecEnvDocument.search() res = search[: search.count()].execute() threads = [] for exec_env in res: if exec_env.lcp: thread = Thread(target=heartbeat_exec_env, args=(exec_env,)) threads.append(thread) thread.start() for thread in threads: thread.join() thread = Timer(ArgReader.db.hb_period, heartbeat) thread.daemon = True thread.start()
def heartbeat_exec_env(exec_env): log = Log.get("heartbeat") try: exec_env_id = exec_env.meta.id lcp = exec_env.lcp lbl = f"{exec_env_id} (LCP at {exec_env.hostname}:{lcp.port})" if exec_env.enabled: schema = "https" if lcp.https else "http" endpoint_lcp = exec_env.lcp.endpoint endpoint_lcp = f"/{endpoint_lcp}" if endpoint_lcp else "" req_uri = f"{schema}://{exec_env.hostname}:{lcp.port}{endpoint_lcp}/status" # noqa F401 resp = post( req_uri, timeout=ArgReader.db.hb_timeout, headers={"Authorization": create_token()}, json={"id": exec_env_id}, ) if resp.status_code == HTTP_Status.OK: data = resp.json() exec_env_id = data.pop("id", None) lcp.started = data.get("started", None) lcp.last_heartbeat = data.get("last_heartbeat", None) log.success(f"Connection established with exec-env {lbl}") else: lcp.last_heartbeat = None log.warning(f"Connection reset with exec-env {lbl}") log.notice(f"Response: {resp.content}") if not lcp.https: lcp.https = False now = datetime.now(timezone.utc) if True or not hasattr(exec_env, 'last_discovery') or \ not datetime_from_str(exec_env.last_discovery) <= now - timedelta(seconds=ArgReader.db.discovery_period): # noqa F401 exec_env.last_discovery = datetime_to_str(now) resp = get( f"{schema}://{exec_env.hostname}:{lcp.port}{endpoint_lcp}/poll", # noqa F401 timeout=ArgReader.db.hb_timeout, headers={"Authorization": create_token()}, ) if resp.status_code == HTTP_Status.OK: data = resp.json() # ExecEnv exec_env_data = data.get("exec_env", {}) if exec_env_id != exec_env_data.get("id", None): exec_env_id = exec_env_data.get("id", None) exec_env.root = exec_env_id exec_env.save() data = exec_env.to_dict() data.pop("root", None) exec_env = ExecEnvDocument(meta={'id': exec_env_id}, **data) # noqa F401 # LCP data for field, lcp_data in exec_env_data.pop("lcp", {}).items(): # noqa F401 setattr(exec_env.lcp, field, lcp_data) # ExecEnv data for field, ee_data in exec_env_data.items(): setattr(exec_env, field, ee_data) log.success(f"Polling established with exec-env {lbl}") else: log.warning(f"Polling not possible with exec-env {lbl}") exec_env.save() if resp.status_code == HTTP_Status.OK: # Network Link and Connections: exec_env_data = data.get("exec_env", {}) for net_link in exec_env_data.get('network_links', []): net_link_id = net_link.pop('id') net_link_doc = NetworkLinkDocument.get_or_new( net_link_id) for field, value in net_link.items(): setattr(net_link_doc, field, value) net_link_doc.save() conn_id = f'{exec_env_id}@{net_link_id}' conn_doc = ConnectionDocument.get_or_new(conn_id) conn_doc.exec_env_id = exec_env_id conn_doc.network_link_id = net_link_id conn_doc.save() log.success( f"Update network {net_link_id} and " f"connection with {exec_env_id}" f"from {net_link_id}" ) # Agent Type (Catalog) for agent_cat_data in data.get("agentType", []): agent_cat_data_id = agent_cat_data.get("id", None) AgentCatalogDocument.from_agent_type(agent_cat_data) log.success( f"Update agent catalog: {agent_cat_data_id} " f"from {exec_env_id}" ) # Agent Instances for agent_inst_data in data.get("agentInstance", []): agent_inst_data_id = agent_inst_data["id"] agent_inst_data["agent_catalog_id"] = agent_inst_data[ "hasAgentType" ] # noqa E501 AgentInstanceDocument.from_agent_instance( agent_inst_data, exec_env_id ) log.success( f"Update agent instance: {agent_inst_data_id} " f"from {exec_env_id}" ) # noqa E501 # LCP Sons for lcp_son in data.pop("lcpSons", []): lcp_son_id = lcp_son.pop("id", None) exec_env_son = ExecEnvDocument.get_or_new(lcp_son_id) for field, ee_data in lcp_son.items(): setattr(exec_env_son, field, ee_data) exec_env_son.discovered = True exec_env_son.save() log.success(f"Update exec-env/lcp: {lcp_son_id}") thread = Thread(target=heartbeat_exec_env, args=(exec_env_son,)) thread.start() else: log.notice(f"Exec-env {lbl} not enabled") except ConnectTimeout: log.error(f"Connection timeout with exec-env {lbl}") except ConnectionError: log.error(f"Connection refused with exec-env {lbl}") except Exception as exception: log.exception( f"Exception during connection with exec-env {lbl}", exception )