telegraf_to_vu1/main.py
2024-12-02 18:28:01 +01:00

61 lines
1.8 KiB
Python
Executable file

#!/usr/bin/env python3
import vu1
import sys
import pdb
import json
import yaml
import requests
import argparse
from dial_configs import *
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config')
args = parser.parse_args()
def get_config(config_file_path="config.yaml"):
config = {}
with open(config_file_path, "r") as fd:
config = yaml.safe_load(fd.read())
return config
def find_dial(config, id_or_alias):
dials = config.get("dials", {})
if id_or_alias in dials:
return id_or_alias
else:
for k, dial in dials.items():
if id_or_alias in dial.get("aliases", []):
return k
return None
config = get_config(args.config)
print(config, flush=True)
api_config = config.get("vuserver", {}).get("api", {})
API = vu1.DialServer(api_config.get("url", "http://localhost:5340"), api_config.get("password", ""))
# generate objects with according dial objects based on user config
config_set = []
for metric, dial in config.get("config_sets", {}).get("notebook", []).items():
d = API.dials[API.dials.index(find_dial(config, dial))]
m = getattr(eval(metric), metric)(d, config.get("assets", "./assets")) # build instance of a matching dial_config class
config_set.append(m)
for line in sys.stdin:
metric = json.loads(line)
fields = metric.get("fields", {})
if metric in config_set:
dial_config = config_set[config_set.index(metric)]
#print("config for {metric_name} matched {conf}".format(metric_name=metric.get("name", "<noname>"), conf=type(dial_config)), flush=True)
value = dial_config.transform_value(metric)
image = dial_config.background_image
color = dial_config.transform_color(value)
dial = dial_config.dial
try:
dial.image = image
dial.value = value
dial.color = color
except Exception:
pass
continue