# @Email: jmaggio14@gmail.com
# @Website: https://www.imagepypelines.org/
# @License: https://github.com/jmaggio14/imagepypelines/blob/master/LICENSE
# @github: https://github.com/jmaggio14/imagepypelines
#
# Copyright (c) 2018 - 2020 Jeff Maggio, Jai Mehra, Ryan Hartzell
from ..Logger import get_logger, get_master_logger
import inspect
import collections
import time
from termcolor import colored
import numpy as np
import socket
import threading
from heapq import heappush, heappop
from struct import pack, unpack
TIMER_LOGGER = get_logger('TIMER')
MAX_UNACCEPTED_CONNECTIONS = 10
################################################################################
# Socket Helpers
################################################################################
class BaseTCP(object):
def __init__(self):
self._s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #<-- TCP SOCKET
# --------------------------------------------------------------------------
def disconnect(self):
self._s.close()
# --------------------------------------------------------------------------
@staticmethod
def recvall(c, length):
'''Convenience function to read large amounts of data (>4096 bytes)'''
data = b''
while len(data) < length:
remaining = length - len(data)
data += c.recv(min(remaining, 4096))
return data
# --------------------------------------------------------------------------
def write(self, msg):
msg_b = msg.encode()
length = pack('>Q', len(msg_b))
self._s.sendall(length) # send length of the message as 64bit integer
self._s.sendall(msg_b) # send the message itself
# --------------------------------------------------------------------------
def read(self):
"""
"""
line = self._s.recv(8) # 8 bytes for 64bit integer
if line == b'':
return None
length = unpack('>Q', line)[0]
return self.recvall(self._s, length)
# --------------------------------------------------------------------------
@property
def sock(self):
""":obj:`socket.Socket`: socket for this TCP connection"""
return self._s
# --------------------------------------------------------------------------
@property
def host(self):
"""str: ip address for this socket, or None if not connected"""
# if the socket isn't connected, just return None
try:
return self._s.getsockname()[0]
except OSError:
return None
# --------------------------------------------------------------------------
@property
def port(self):
"""int: port for this socket, or None if not connected"""
# if the socket isn't connected, just return None
try:
return self._s.getsockname()[1]
except OSError:
return None
################################################################################
class TCPClient(BaseTCP):
# --------------------------------------------------------------------------
def connect(self, host, port):
"""connects and binds the socket to the given host and port
Args:
host(str): ip address to connect to
port(int): port to connect to
"""
self._s.connect( (host,port) ) # <-- connect socket server to host & port
self._s.setblocking(0)
return self
################################################################################
class TCPServer(BaseTCP):
"""
NOTE:
TCP Server has no explicit implementation of accepting/closing client
connections. That is up to the implementation as needed. Socket object
can be retrieved via:
self.sock
"""
def __init__(self):
super().__init__()
self._s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# --------------------------------------------------------------------------
def connect(self, host, port=0):
"""connects and binds the socket to the given host and port
Args:
host(str): ip address to host on
port(int): port to host on, leave as 0 for the OS to choose
a port for you
"""
print(host, port)
self._s.bind( (host,port) ) # <-- bind socket server to host & port
self._s.setblocking(0)
self._s.listen(MAX_UNACCEPTED_CONNECTIONS) # <-- max of 10 unaccepted connections before not accepting anymore
return self
################################################################################
# Thread Helpers
################################################################################
class BaseCommThread(threading.Thread):
'''
Parent Class to all thread manager classes.
'''
def __init__(self):
super().__init__(name=self.__class__.__name__)
self.logger = get_master_logger()
self.daemon = True
def __enter__(self):
'''
Starts the thread in its own context manager block.
Note: If the running thread is meant to be run indefinitely it is not
recommended to use it as a context manager as once you exit the
context manager, the thread will safely shut down via the
__exit__() method.
'''
self.run()
def __exit__(self, exc_type, exc_value, traceback):
'''
Is called once the context leaves the manager, safely signals the
running thread to shutdown.
'''
self.stop_thread()
# ____ Run Function ______________________________________________________
def run(self):
'''
This function is to be overloaded in the child class. If the thread is
to be run indefinitely (as in not for a fixed duration), you MUST
structure this function as follows:
--[START]--------------------------------------------------------------
self.t = threading.current_thread() # Grab current threading context
...
while getattr(self.t, 'running', True):
...
...
--[END]--------------------------------------------------------------
This is necessary as the classes stop_thread() method can safely shut
down the running thread by changing self.running to False, thus
invalidating the while loop's condition.
'''
pass
# ____ Thread Killer _____________________________(Kills with kindness)___
def stop_thread(self):
'''
This is a convenience function used to safely end the running thread.
Note: This will only end the running thread if the run() function
checks for the classes 'running' attribute (as demonstrated in
the docstring of the run() function above).
This only works if the running thread is not hanging, this will
prevent the while loop from re-evaluating its condition
'''
self.logger.warning("Closing Thread " + self.name)
self.running = False
self.join()
self.logger.warning(f"{self.name} has stopped")
################################################################################
# Event Helpers
################################################################################
class EventQueue:
'''
This Class is meant to be a simple task scheduler that runs tasks in any
of the following ways:
* Immediately
* After a delay (seconds)
* After a delay & repeatedly every specified interval of time (seconds)
'''
ScheduledEvent = collections.namedtuple('ScheduleEvent',
['event_time', 'task'])
def __init__(self):
self.events = []
@staticmethod
def funcify(obj):
if callable(obj):
return obj
else:
return lambda: obj
def run_scheduled_tasks(self):
''' Runs all tasks that are scheduled to run at the current time '''
t = time.monotonic()
task_returns = []
while self.events and self.events[0].event_time <= time.monotonic():
event = heappop(self.events)
task_return = event.task()
task_returns.append(task_return)
return [t for t in task_returns if t is not None]
def add_task(self, task, event_time=None):
print(f"task = {task}")
task = self.funcify(task)
'Helper function to schedule one-time tasks at specific time'
if event_time is None:
event_time = time.monotonic()
heappush(self.events, EventQueue.ScheduledEvent(event_time, task))
def call_later(self, task, delay):
task = self.funcify(task)
'Helper function to schedule one-time tasks after a given delay'
self.add_task(task, time.monotonic() + delay)
def call_periodic(self, task, delay, interval):
task = self.funcify(task)
'Helper function to schedule recurring tasks'
def inner():
self.call_later(inner, interval)
return task()
self.call_later(inner, delay)
################################################################################
# Decorators
################################################################################
# def print_args(func):
# """Decorator to print out the arguments that a function is running with,
# this includes: arguments passed in, default values that are unspecified,
# varargs ``(*args)``, and varkwargs ``(**kwargs)``
#
# Args:
# func (callable): function or callable to print input arguments of
# """
# def _print_args(*args,**kwargs):
# """
# prints the arguments passed into the target
# """
# POSITIONAL = 'positional |'
# KEYWORD = 'keyword |'
# VARPOSITIONAL = 'var-positional|'
# VARKEYWORD = 'var-keyword |'
# DEFAULT = 'default |'
#
# arg_dict = collections.OrderedDict()
# vtypes = {}
# def __add_to_arg_dict(key,val,vtype):
# if isinstance(val, np.ndarray):
# val = str( Summarizer(val) )
# arg_dict[key] = val
# vtypes[key] = vtype
#
#
# spec = inspect.getfullargspec(func)
# specdefaults = [] if spec.defaults is None else spec.defaults
# specargs = [] if spec.args is None else spec.args
# speckwonlyargs = [] if spec.kwonlyargs is None else spec.kwonlyargs
# speckwonlydefaults = {} if spec.kwonlydefaults is None else spec.kwonlydefaults
#
# num_positional_passed_in = len(args)
# num_required = len(specargs) - len(specdefaults)
#
# # adding default positional args values to the dictionary
# for i,var_name in enumerate(specargs):
# if i < num_required:
# var = colored("No argument was passed in!",attrs=['bold'])
# else:
# var = specdefaults[i - num_required]
#
# vtype = DEFAULT
# __add_to_arg_dict(var_name,var,vtype)
#
# # positional arguments passed in and varargs passed in
# for i in range(num_positional_passed_in):
# if i < num_required:
# var_name = specargs[i]
# vtype = POSITIONAL
# else:
# var_name = 'arg{}'.format(i)
# vtype = VARPOSITIONAL
# var = args[i]
# __add_to_arg_dict(var_name,var,vtype)
#
# # adding keyword only args to the dict
# for var_name in speckwonlyargs:
# var = color.red("No argument was passed in!",bold=True)
# vtype = KEYWORD
# __add_to_arg_dict(var_name,var,vtype)
# for var_name,var in speckwonlydefaults.items():
# vtype = DEFAULT
# __add_to_arg_dict(var_name,var,vtype)
#
# # keyword arguments passed in
# for var_name in kwargs:
# if var_name in specargs:
# vtype = KEYWORD
# else:
# vtype = VARKEYWORD
# var = kwargs[var_name]
# __add_to_arg_dict(var_name,var,vtype)
#
# # formatting the actual string to be printed out
# MASTER_LOGGER.info("running '{}' with the following args:".format(func.__name__))
# if len(arg_dict) == 0:
# __add_to_arg_dict('None','','')
# longest_arg_name = max(len(k) for k in arg_dict)
# arg_string = ''.join(["\t{} {} : {}\n".format(vtypes[k], k+(' ' * (longest_arg_name-len(k))), v) for k,v in arg_dict.items()])
# print( arg_string )
#
# ret = func(*args,**kwargs)
# return ret
# return _print_args
################################################################################
# SUMMARY
################################################################################
[docs]def arrsummary(arr):
"""returns a Summarizer object for the given array"""
return Summarizer(arr)
class Summarizer(dict):
"""
Summarization object for numpy array. The primary job of this
object is to streamline and simply printing out numpy array objects
which normally appear as a stream of barely comprehendable data
This dictionary subclass will return the following when printed out
or otherwise stringified
Args:
input_array (np.ndarray): input array to summarize
Attributes:
input_array: original numpy array this object is summarizing
last_summary: last calculated summary dictionary
contains the following: shape, size, max, min, mean, dtype
last_string: last representation string calculated for this array
Example:
>>> import imagepypelines as ip
>>> import numpy as np
>>> np.random.seed(0)
>>> a = np.random.rand(512,512)
>>> a = ip.util.Summarizer(a)
>>> print(a)
[ARRAY SUMMARY | shape: (512, 512) | size: 262144 | max: 1.0 | min: 0.0 | mean: 0.5 | dtype: float64]
"""
def __init__(self, input_array):
"""Instantiation function
Args:
input_array (np.ndarray): input array to summarize
"""
# ERROR CHECKING
if not isinstance(input_array, np.ndarray):
error_msg = "'input_array' input must be a np.ndarray"
raise TypeError(error_msg)
# END ERROR CHECKING
self.input_array = input_array
self.last_summary = None
self.last_string = None
self.__update()
def __str__(self):
"""returns a stringified summary"""
self.__update()
return self.last_string
def summarize(self):
"""returns an output dictionary of array attributes
Args:
None
Returns:
summary (dict): dict containing the following [shape, size, max,
min, mean, dtype]
"""
self.__update()
return self.last_summary
def __update(self):
"""
updates the last_summary and last_string internal attributes
"""
summary = {
'shape': self.input_array.shape,
'size': self.input_array.size,
'max': round(self.input_array.max(), 3),
'min': round(self.input_array.min(), 3),
'mean': round(self.input_array.mean(), 3),
'dtype': self.input_array.dtype,
}
string = "[ARRAY SUMMARY | " \
+ "shape: {shape} | " \
+ "size: {size} | " \
+ "max: {max} | " \
+ "min: {min} | " \
+ "mean: {mean} | " \
+ "dtype: {dtype}]"
string = string.format( **summary )
self.last_summary = summary
self.last_string = string
self.update(self.last_summary)
def __repr__(self):
return str(self)
################################################################################
# TIMING
################################################################################
################################################################################
[docs]def timer(func):
"""Decorator to time how long a func takes to run in milliseconds
Example:
>>> import imagepypelines as ip
>>> import time
>>>
>>> @ip.timer
... def sleep_for_one_sec():
... time.sleep(1) #sleep for 1 second
>>>
>>> sleep_for_one_sec() # doctest: +ELLIPSIS
...
"""
def _timer(*args,**kwargs):
t = Timer()
ret = func(*args,**kwargs)
run_time = t.time()
msg = "ran function '{name}' in {t}sec".format(name=func.__name__,
t=run_time)
TIMER_LOGGER.info(msg)
return ret
return _timer
################################################################################
[docs]def timer_ms(func):
"""Decorator to time how long a func takes to run in milliseconds
Example:
>>> import imagepypelines as ip
>>> import time
>>>
>>> @ip.timer_ms
... def sleep_for_one_sec():
... time.sleep(1) #sleep for 1 second
>>>
>>> sleep_for_one_sec() # doctest: +ELLIPSIS
...
"""
def _timer_ms(*args,**kwargs):
t = Timer()
ret = func(*args,**kwargs)
run_time_ms = t.time_ms()
msg = "ran function '{name}' in {t}ms".format(name=func.__name__,
t=run_time_ms)
TIMER_LOGGER.info(msg)
return ret
return _timer_ms
################################################################################
[docs]class Timer(object):
"""
Timer which can be used to time processes
Attributes:
_start (float): start time in seconds since the epoch
_last (float): last time the lap timer was called
_countdown (float): countdown time if set (recalculated with
the countdown property)
_last_countdown (float): last countdown time check
Example:
#we need to do an action for 10 seconds
timer = Timer()
timer.countdown = 10
while timer.countdown:
do_action()
#this action will run for (about) 10 seconds
# maybe we want to record how long a part of our code runs
timer.reset()
do_action()
print( timer.lap() )
do_action2()
print( timer.lap() )
"""
def __init__(self):
self._start = time.time()
self._last = self._start
self._countdown_timer = None
self._countdown_start = None
[docs] def reset(self):
""" resets the timer start time """
self.__init__()
[docs] def time(self):
"""returns the time in seconds since the timer started or since it was
last reset"""
return round(self.raw_time(),3)
[docs] def raw_time(self):
"""returns the unrounded time in seconds since the timer started"""
return time.time() - self._start
[docs] def lap(self):
"""returns time in seconds since last time the lap was called"""
now = time.time()
lap = now - self._last
self._last = now
return round(lap,3)
[docs] def time_ms(self):
"""returns the time in milliseonds since the timer started or since it
was last reset"""
return round(self.raw_time()*1000,3)
[docs] def lap_ms(self):
"""returns time in milliseconds since last time the lap was called"""
return round(self.lap()*1000,3)
@property
def countdown(self):
if self._countdown_timer is None:
return 0
countdown = self._countdown_start - self._countdown_timer.raw_time()
countdown = max(countdown,0)
return countdown
@countdown.setter
def countdown(self,value):
"""sets the countdown timer"""
if not isinstance(value,(int,float)):
error_msg = "countdown must be set using a float \
or an int, current type is {0}".format(type(value))
TIMER_LOGGER.error(error_msg)
raise TypeError(error_msg)
self._countdown_timer = Timer()
self._countdown_start = float(value)
@property
def start(self):
return self._start
def __str__(self):
return "Timer @{}sec".format( self.time() )
def __repr__(self):
return str(self)