start:utils:cockpit:p1:cpu:persistent
PersistentData.py
Tous les accès à la base de données sont gérés via ce fichier.
- init_db - Création de la base de données et des tables
- get_config(key_name) - Recherche la valeur d'un paramètre
- save_config(key_name, value_text) - Sauvegarde d'un paramètre
- get_active_organizations - Renvoie la liste des organisations sélectionnées
- get_ticket_statuses - Renvoie la liste des status de ticket
- get_ticket_priorities - Renvoie la liste des priorités de ticket
- get_ticket_priorities_with_criticity - Liste des priorités sélectionnées pour édition des criticités
- store_history - Sauvegarde de l'historique des valeurs
- PersistentData.py
# PersistentData.py from arduino.app_bricks.dbstorage_sqlstore import SQLStore from arduino.app_bricks.dbstorage_tsstore import TimeSeriesStore # --- INITIALIZATION --- db = SQLStore("cockpit_config.db") tsdb = TimeSeriesStore(retention_days=90) def init_db(): tsdb.start() """ Initializes the database schema using raw SQL. 'IF NOT EXISTS' ensures data persistence across restarts. """ # Configuration table db.execute_sql(""" CREATE TABLE IF NOT EXISTS config ( key TEXT PRIMARY KEY, value TEXT ) """) # Organizations db.execute_sql(""" CREATE TABLE IF NOT EXISTS organizations ( id INTEGER PRIMARY KEY, name TEXT, selected INTEGER DEFAULT 0, enabled INTEGER DEFAULT 1, ticketingModuleEnabled INTEGER DEFAULT 0 ) """) # Objects table db.execute_sql(""" CREATE TABLE IF NOT EXISTS objects ( id INTEGER, module TEXT, type TEXT, subType TEXT, reference TEXT, selected INTEGER DEFAULT 0, criticity INTEGER DEFAULT 0, manual INTEGER DEFAULT 0, PRIMARY KEY (id, type, subType) ) """) # History table — time series of metrics (tickets, sensors, etc.) db.execute_sql(""" CREATE TABLE IF NOT EXISTS history ( timestamp TEXT, kind TEXT, value REAL, PRIMARY KEY (timestamp, kind) ) """) # Tickets table db.execute_sql(""" CREATE TABLE IF NOT EXISTS tickets ( id INTEGER PRIMARY KEY, type TEXT, title TEXT, organization_id INTEGER, organization_name TEXT, status_id INTEGER, status_reference TEXT, priority_id INTEGER, priority_reference TEXT, assignedTeam_id INTEGER, assignedTeam_name TEXT, assignedUser_id INTEGER, assignedUser_name TEXT ) """) # Teams table db.execute_sql(""" CREATE TABLE IF NOT EXISTS teams ( id INTEGER PRIMARY KEY, org_id INTEGER, name TEXT, selected INTEGER DEFAULT 0 ) """) # View: active tickets filtered by selected status and priority db.execute_sql("DROP VIEW IF EXISTS v_active_tickets") db.execute_sql(""" CREATE VIEW v_active_tickets AS SELECT t.organization_name, t.type, t.id, t.title, COALESCE(p.criticity, 0) AS criticity FROM tickets t LEFT JOIN objects p ON (p.type = 'TICKET_PRIORITY' AND p.subType = t.type AND p.id = t.priority_id) LEFT JOIN objects s ON (s.type = 'TICKET_STATUS' AND s.subType = t.type AND s.id = t.status_id) WHERE (priority_id > 0) AND (assignedUser_id = 0) AND (s.selected = 1) AND (p.selected = 1) ORDER BY t.organization_name, t.type, t.id """) print("DEBUG: Database initialized with raw SQL (Persistence active).") def get_config(key_name): try: result = db.read("config", condition=f"key = '{key_name}'") return result[0]['value'] if result else "" except: return "" def save_config(key_name, value_text): existing = db.read("config", condition=f"key = '{key_name}'") data = {"key": key_name, "value": str(value_text)} if existing: db.update("config", data, condition=f"key = '{key_name}'") else: db.store("config", data) def get_active_organizations(): """ Reads active organizations from the DB and returns them sorted alphabetically. Call CockpitITSM.update_organizations() first to sync with the API. """ rows = db.read("organizations", condition="enabled = 1 AND ticketingModuleEnabled = 1") orgs = [ { "selected": row["selected"], "id": row["id"], "name": row["name"] } for row in (rows or []) ] orgs.sort(key=lambda o: o["name"].lower()) return orgs def get_teams(): """Returns all teams from DB sorted by org then name.""" rows = db.read("teams") or [] teams = [ { "id": row["id"], "org_id": row["org_id"], "name": row["name"], "selected": row["selected"] } for row in rows ] teams.sort(key=lambda t: t["name"].lower()) return teams def get_ticket_statuses(): """ Reads ticket statuses from the DB, sorted by subType then reference. Call CockpitITSM.update_ticket_status() first to sync with the API. """ rows = db.read("objects", condition="type = 'TICKET_STATUS'") statuses = [ { "id": row["id"], "subType": row["subType"], "reference": row["reference"], "selected": row["selected"] } for row in (rows or []) ] statuses.sort(key=lambda s: (s["subType"], s["reference"])) return statuses def get_ticket_priorities(): """ Reads ticket priorities from the DB, sorted by subType then reference. Call CockpitITSM.update_ticket_priorities() first to sync with the API. """ rows = db.read("objects", condition="type = 'TICKET_PRIORITY'") priorities = [ { "id": row["id"], "subType": row["subType"], "reference": row["reference"], "selected": row["selected"] } for row in (rows or []) ] priorities.sort(key=lambda p: (p["subType"] or "", p["reference"] or "")) return priorities def get_ticket_priorities_with_criticity(): """ Reads ticket priorities from the DB including criticity, sorted by subType then reference. Only returns priorities with selected = 1. Call CockpitITSM.update_ticket_priorities() first to sync with the API. Criticity values: 1 = VERY_HIGH 2 = HIGH 3 = MEDIUM (default) 4 = LOW 5 = INFORMATION """ rows = db.read("objects", condition="type = 'TICKET_PRIORITY' AND selected = 1") priorities = [ { "id": row["id"], "subType": row["subType"], "reference": row["reference"], "selected": row["selected"], "criticity": row.get("criticity", 0) if row.get("criticity") is not None else 0 } for row in (rows or []) ] priorities.sort(key=lambda p: (p["subType"] or "", p["reference"] or "")) return priorities def store_history(kind, value, timestamp=None): """ Stores a metric snapshot in the history table. kind : string identifier (e.g. 'TICKETS_TOTAL_3_REQUEST') value : numeric value, stored as REAL, rounded to 2 decimal places timestamp: optional, pass a shared timestamp so all entries in a cycle share the same one """ from datetime import datetime, timezone if timestamp is None: timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") value = round(float(value), 2) data = {"timestamp": timestamp, "kind": kind, "value": value} cond = f"timestamp = '{timestamp}' AND kind = '{kind}'" existing = db.read("history", condition=cond) if existing: db.update("history", data, condition=cond) else: db.store("history", data) print(f"DEBUG: History stored — {kind}: {value}")
start/utils/cockpit/p1/cpu/persistent.txt · Last modified: by admin_wiki
