# @Email: jmaggio14@gmail.com
# @Website: https://www.imagepypelines.org/
# @License: https://github.com/jmaggio14/imagepypelines/blob/master/LICENSE
# @github: https://github.com/jmaggio14/imagepypelines
#
# Copyright (c) 2018 - 2020 Jeff Maggio, Jai Mehra, Ryan Hartzell
#
from .util import TCPClient
from ..Logger import get_master_logger
from .Exceptions import DashboardWarning
import logging
import json
[docs]def connect_to_dash(name, host, port):
"""Connects every pipeline in this session to
"""
if n_dashboards() == 0:
# if this is our first dashboard
# add a logging handler to support log messages
# initialize_dash_logging()
pass
DashboardComm.connect(name, host, port)
[docs]def n_dashboards():
"""returns the number of connected dashboards"""
return len(DashboardComm.clients)
# TODO: update the logging handler to send logging messages to the dashboard
class DashboardLoggingHandler(logging.Handler):
def __init__(self, *args, **kwargs):
super().__init__(self, *args, **kwargs)
self.dashcomm = DashboardComm()
def emit(self, record):
self.dashcomm.write_log( record.getMessage() )
def initialize_dash_logging():
handler = DashboardLoggingHandler()
formatter = logging.Formatter( json.dumps(
{
'type':'log',
'id': '%(pipeline_id)s', # {name}.{last 6 chars of uuid}
'uuid': '%(pipeline_uuid)s', # universal unique id for obj
'name': '%(pipeline_name)s', # obj name
'source_type':'logger',
'payload':{
'level':'%(levelname)8s', # INFO, WARNING, ERROR, etc
'time':'%(asctime)s', # datetime as YYYY-MM-DD HH:MM:SS, msecs’
'logger_name':'%(name)s', # name of the logger, often identical to 'id'
'message':'%(message)s', # Logging message
}
}
))
handler.setFormatter(formatter)
get_master_logger().logger.addHandler(handler)
################################################################################
# TODO: This dashboard system currently relies on the connect_to_dash() being called
# before the Pipeline has been instanitated. This needs to be corrected
# TODO: add method to disconnect from a specific dashboard
# TODO: better docstring
[docs]class DashboardComm(object):
"""Object to send messages from the pipelines to dashboard(s)
"""
clients = {}
"""list of :obj:`TCPClient`: class level variable containing a list of all
TCPclients that are connected"""
graphs_msg_cache = {}
"""cache of pipeline update messages, these are messages that the dashboard
needs to interpret the pipeline status messages"""
# --------------------------------------------------------------------------
[docs] @classmethod
def connect(cls, name, host, port):
"""establishes a connection with the Dashboard Chatroom at the given
host and port
Args:
name(str): human readable name of
host(str): ip address for the dashboard
port(int): port on host for the dashboard
"""
raise_warning = False
try:
new_client = TCPClient().connect(host, port)
cls.clients[name] = new_client
# send the pipeline graph messages to new clients so they can interpret new
# status and reset messages
# (@Jai, will these work being send one after another like this????)
for rep in cls.graphs_msg_cache.values():
new_client.write(rep)
except ConnectionRefusedError:
msg = f"unable to connect to Dashboard at {host}:{port}"
get_master_logger().error(msg)
raise_warning = True
# if raise_warning:
# raise DashboardWarning(msg)
# --------------------------------------------------------------------------
[docs] @classmethod
def disconnect_all(cls):
"""disconnects from all dashboard servers"""
for cli in cls.clients.values():
cli.disconnect()
cls.clients.empty()
# --------------------------------------------------------------------------
[docs] @classmethod
def disconnect(cls, name):
"""disconnects from an individual dashboard server"""
cls.clients[name].disconnect()
cls.clients.pop(name)
# --------------------------------------------------------------------------
@property
def total(self):
"""returns total number of connected dashboards"""
return len(self.clients)
# --------------------------------------------------------------------------
[docs] def write(self, msg, names=None):
"""sends the given message to all connected dashboard servers
Args:
msg(str): a string to send
names(tuple(str)): iterable of names specifying a whitelist
"""
if names is None:
names = self.clients.keys()
for name in names:
self.clients[name].write(msg)
# --------------------------------------------------------------------------
[docs] def read(self, names=None):
"""send delete messages to all dashboard servers"""
if names is None:
names = self.clients.keys()
for name in names:
try:
msg = self.clients[name].read()
except BlockingIOError: # Case for no data ready
msg = None
return msg
# --------------------------------------------------------------------------
[docs] def write_graph(self, pipeline_id, graph_msg):
"""send pipeline graph or task changes to the Dashboard"""
# # DEBUG ONLY
# with open("graph.json",'w') as f:
# f.write(graph_msg)
# # END DEBUG
# update internal variable tracking pipeline graph messages
self.graphs_msg_cache[pipeline_id] = graph_msg
# send messages to all servers
self.write(graph_msg)
# --------------------------------------------------------------------------
[docs] def write_status(self, status_msg):
"""send status changes to all Dashboards"""
# # DEBUG ONLY
# with open("status.json",'w') as f:
# f.write(status_msg)
# # END DEBUG
self.write(status_msg)
# --------------------------------------------------------------------------
[docs] def write_reset(self, reset_msg):
"""send reset messages to all dashboard servers"""
# # DEBUG ONLY
# with open("reset.json",'w') as f:
# f.write(reset_msg)
# # END DEBUG
self.write(reset_msg)
# --------------------------------------------------------------------------
[docs] def write_error(self, error_msg):
"""send error messages to all dashboard servers"""
# # DEBUG ONLY
# with open("error.json",'w') as f:
# f.write(error_msg)
# # END DEBUG
self.write(error_msg)
# --------------------------------------------------------------------------
[docs] def write_delete(self, delete_msg):
"""send delete messages to all dashboard servers"""
# # DEBUG ONLY
# with open("delete.json",'w') as f:
# f.write(delete_msg)
# # END DEBUG
self.write(delete_msg)
# --------------------------------------------------------------------------
[docs] def write_log(self, log_msg):
self.write(log_msg)
# IN THE FUTURE
# def read()