from threading import Thread, Timer
from requests import get, post
from requests.exceptions import ConnectionError, ConnectTimeout
from document.agent.catalog import AgentCatalogDocument
from document.agent.instance import AgentInstanceDocument
from document.exec_env import ExecEnvDocument
from lib.http import HTTP_Status
from lib.token import create_token
from reader.arg import ArgReader
from utils.log import Log
[docs]def heartbeat():
"""Heartbeat procedure with the LCPs."""
search = ExecEnvDocument.search()
res = search[: search.count()].execute()
threads = []
for exec_env in res:
if exec_env.lcp:
thread = Thread(target=heartbeat_exec_env, args=(exec_env,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
thread = Timer(ArgReader.db.hb_period, heartbeat)
thread.daemon = True
thread.start()
def heartbeat_exec_env(exec_env):
log = Log.get("heartbeat")
try:
exec_env_id = exec_env.meta.id
lcp = exec_env.lcp
lbl = f"{exec_env_id} (LCP at {exec_env.hostname}:{lcp.port})"
if exec_env.enabled:
schema = "https" if lcp.https else "http"
endpoint_lcp = exec_env.lcp.endpoint
endpoint_lcp = "/" + endpoint_lcp if endpoint_lcp else ""
req_uri = f"{schema}://{exec_env.hostname}:{lcp.port}{endpoint_lcp}/status" # noqa F401
resp = post(
req_uri,
timeout=ArgReader.db.hb_timeout,
headers={"Authorization": create_token()},
json={"id": exec_env_id},
)
if resp.status_code == HTTP_Status.OK:
data = resp.json()
exec_env_id = data.pop("id", None)
lcp.started = data.get("started", None)
lcp.last_heartbeat = data.get("last_heartbeat", None)
log.success(f"Connection established with exec-env {lbl}")
else:
lcp.last_heartbeat = None
log.warning(f"Connection reset with exec-env {lbl}")
log.notice(f"Response: {resp.content}")
if not lcp.https:
lcp.https = False
resp = get(
f"{schema}://{exec_env.hostname}:{lcp.port}{endpoint_lcp}/poll", # noqa F401
timeout=ArgReader.db.hb_timeout,
headers={"Authorization": create_token()},
)
if resp.status_code == HTTP_Status.OK:
data = resp.json()
# ExecEnv
exec_env_data = data.get("exec_env", {})
exec_env.meta.id = exec_env_data.pop("id")
# LCP data
for field, lcp_data in exec_env_data.pop("lcp", {}).items():
setattr(exec_env.lcp, field, lcp_data)
# ExecEnv data
for field, ee_data in exec_env_data.items():
setattr(exec_env, field, ee_data)
log.success(f"Polling established with exec-env {lbl}")
else:
log.warning(f"Polling not possible with exec-env {lbl}")
exec_env.save()
if resp.status_code == HTTP_Status.OK:
# Agent Type (Catalog)
for agent_cat_data in data.get("agentType", []):
agent_cat_data_id = agent_cat_data.get("id", None)
AgentCatalogDocument.from_agent_type(agent_cat_data)
log.success(
f"Update agent catalog: {agent_cat_data_id} "
f"from {exec_env_id}"
)
# Agent Instances
for agent_inst_data in data.get("agentInstance", []):
agent_inst_data_id = agent_inst_data["id"]
agent_inst_data["agent_catalog_id"] = agent_inst_data[
"hasAgentType"
] # noqa E501
AgentInstanceDocument.from_agent_instance(
agent_inst_data, exec_env_id
)
log.success(
f"Update agent instance: {agent_inst_data_id} "
f"from {exec_env_id}"
) # noqa E501
# LCP Sons
for lcp_son in data.pop("lcpSons", []):
lcp_son_id = lcp_son.pop("id", None)
exec_env_son = ExecEnvDocument.get_or_new(lcp_son_id)
for field, ee_data in lcp_son.items():
setattr(exec_env_son, field, ee_data)
exec_env_son.discovered = True
exec_env_son.save()
log.success(f"Update exec-env/lcp: {lcp_son_id}")
else:
log.notice(f"Exec-env {lbl} not enabled")
except ConnectTimeout:
log.error(f"Connection timeout with exec-env {lbl}")
except ConnectionError:
log.error(f"Connection refused with exec-env {lbl}")
except Exception as exception:
log.exception(
f"Exception during connection with exec-env {lbl}", exception
)