#!@PYTHON@ # -*- coding: utf-8 -*- # Copyright (C) 2008 David Goncalves # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # 2008-01-14 David Goncalves # PyNUT is an abstraction class to access NUT (Network UPS Tools) server. # # 2008-06-09 David Goncalves # Added 'GetRWVars' and 'SetRWVar' commands. # # 2009-02-19 David Goncalves # Changed class PyNUT to PyNUTClient # # 2010-07-23 David Goncalves - Version 1.2 # Changed GetRWVars function that fails is the UPS is not # providing such vars. # # 2011-07-05 René Martín Rodríguez - Version 1.2.1 # Added support for FSD, HELP and VER commands # # 2012-02-07 René Martín Rodríguez - Version 1.2.2 # Added support for LIST CLIENTS command # # 2014-06-03 george2 - Version 1.3.0 # Added custom exception class, fixed minor bug, added Python 3 support. # # 2021-09-27 Jim Klimov - Version 1.4.0 # Revise strings used to be byte sequences as required by telnetlib # in Python 3.9, by spelling out b"STR" or str.encode('ascii'); # the change was also tested to work with Python 2.7, 3.4, 3.5 and # 3.7 (to the extent of accompanying test_nutclient.py at least). import telnetlib class PyNUTError( Exception ) : """ Base class for custom exceptions """ class PyNUTClient : """ Abstraction class to access NUT (Network UPS Tools) server """ __debug = None # Set class to debug mode (prints everything useful for debuging...) __host = None __port = None __login = None __password = None __timeout = None __srv_handler = None __version = "1.4.0" __release = "2021-09-27" def __init__( self, host="127.0.0.1", port=3493, login=None, password=None, debug=False, timeout=5 ) : """ Class initialization method host : Host to connect (default to localhost) port : Port where NUT listens for connections (default to 3493) login : Login used to connect to NUT server (default to None for no authentication) password : Password used when using authentication (default to None) debug : Boolean, put class in debug mode (prints everything on console, default to False) timeout : Timeout used to wait for network response """ self.__debug = debug if self.__debug : print( "[DEBUG] Class initialization..." ) print( "[DEBUG] -> Host = %s (port %s)" % ( host, port ) ) print( "[DEBUG] -> Login = '%s' / '%s'" % ( login, password ) ) self.__host = host self.__port = port self.__login = login self.__password = password self.__timeout = 5 self.__connect() # Try to disconnect cleanly when class is deleted ;) def __del__( self ) : """ Class destructor method """ try : self.__srv_handler.write( b"LOGOUT\n" ) except : pass def __connect( self ) : """ Connects to the defined server If login/pass was specified, the class tries to authenticate. An error is raised if something goes wrong. """ if self.__debug : print( "[DEBUG] Connecting to host" ) self.__srv_handler = telnetlib.Telnet( self.__host, self.__port ) if self.__login != None : self.__srv_handler.write( ("USERNAME %s\n" % self.__login).encode('ascii') ) result = self.__srv_handler.read_until( b"\n", self.__timeout ) if result[:2] != b"OK" : raise PyNUTError( result.replace( b"\n", b"" ) ) if self.__password != None : self.__srv_handler.write( ("PASSWORD %s\n" % self.__password).encode('ascii') ) result = self.__srv_handler.read_until( b"\n", self.__timeout ) if result[:2] != b"OK" : raise PyNUTError( result.replace( b"\n", b"" ) ) def GetUPSList( self ) : """ Returns the list of available UPS from the NUT server The result is a dictionary containing 'key->val' pairs of 'UPSName' and 'UPS Description' """ if self.__debug : print( "[DEBUG] GetUPSList from server" ) self.__srv_handler.write( b"LIST UPS\n" ) result = self.__srv_handler.read_until( b"\n" ) if result != b"BEGIN LIST UPS\n" : raise PyNUTError( result.replace( b"\n", b"" ) ) result = self.__srv_handler.read_until( b"END LIST UPS\n" ) ups_list = {} for line in result.split( b"\n" ) : if line[:3] == b"UPS" : ups, desc = line[4:-1].split( b'"' ) ups_list[ ups.replace( b" ", b"" ) ] = desc return( ups_list ) def GetUPSVars( self, ups="" ) : """ Get all available vars from the specified UPS The result is a dictionary containing 'key->val' pairs of all available vars. """ if self.__debug : print( "[DEBUG] GetUPSVars called..." ) self.__srv_handler.write( ("LIST VAR %s\n" % ups).encode('ascii') ) result = self.__srv_handler.read_until( b"\n" ) if result != ("BEGIN LIST VAR %s\n" % ups).encode('ascii') : raise PyNUTError( result.replace( b"\n", b"" ) ) ups_vars = {} result = self.__srv_handler.read_until( ("END LIST VAR %s\n" % ups).encode('ascii') ) offset = len( ("VAR %s " % ups ).encode('ascii') ) end_offset = 0 - ( len( ("END LIST VAR %s\n" % ups).encode('ascii') ) + 1 ) for current in result[:end_offset].split( b"\n" ) : var = current[ offset: ].split( b'"' )[0].replace( b" ", b"" ) data = current[ offset: ].split( b'"' )[1] ups_vars[ var ] = data return( ups_vars ) def GetUPSCommands( self, ups="" ) : """ Get all available commands for the specified UPS The result is a dict object with command name as key and a description of the command as value """ if self.__debug : print( "[DEBUG] GetUPSCommands called..." ) self.__srv_handler.write( ("LIST CMD %s\n" % ups).encode('ascii') ) result = self.__srv_handler.read_until( b"\n" ) if result != ("BEGIN LIST CMD %s\n" % ups).encode('ascii') : raise PyNUTError( result.replace( b"\n", b"" ) ) ups_cmds = {} result = self.__srv_handler.read_until( ("END LIST CMD %s\n" % ups).encode('ascii') ) offset = len( ("CMD %s " % ups).encode('ascii') ) end_offset = 0 - ( len( ("END LIST CMD %s\n" % ups).encode('ascii') ) + 1 ) for current in result[:end_offset].split( b"\n" ) : var = current[ offset: ].split( b'"' )[0].replace( b" ", b"" ) # For each var we try to get the available description try : self.__srv_handler.write( ("GET CMDDESC %s %s\n" % ( ups, var )).encode('ascii') ) temp = self.__srv_handler.read_until( b"\n" ) if temp[:7] != b"CMDDESC" : raise PyNUTError else : off = len( ("CMDDESC %s %s " % ( ups, var )).encode('ascii') ) desc = temp[off:-1].split(b'"')[1] except : desc = var ups_cmds[ var ] = desc return( ups_cmds ) def GetRWVars( self, ups="" ) : """ Get a list of all writable vars from the selected UPS The result is presented as a dictionary containing 'key->val' pairs """ if self.__debug : print( "[DEBUG] GetUPSVars from '%s'..." % ups ) self.__srv_handler.write( ("LIST RW %s\n" % ups).encode('ascii') ) result = self.__srv_handler.read_until( b"\n" ) if ( result != ("BEGIN LIST RW %s\n" % ups).encode('ascii') ) : raise PyNUTError( result.replace( b"\n", b"" ) ) result = self.__srv_handler.read_until( ("END LIST RW %s\n" % ups).encode('ascii') ) offset = len( ("VAR %s" % ups).encode('ascii') ) end_offset = 0 - ( len( ("END LIST RW %s\n" % ups).encode('ascii') ) + 1 ) rw_vars = {} try : for current in result[:end_offset].split( b"\n" ) : var = current[ offset: ].split( b'"' )[0].replace( b" ", b"" ) data = current[ offset: ].split( b'"' )[1] rw_vars[ var ] = data except : pass return( rw_vars ) def SetRWVar( self, ups="", var="", value="" ): """ Set a variable to the specified value on selected UPS The variable must be a writable value (cf GetRWVars) and you must have the proper rights to set it (maybe login/password). """ self.__srv_handler.write( ("SET VAR %s %s %s\n" % ( ups, var, value )).encode('ascii') ) result = self.__srv_handler.read_until( b"\n" ) if ( result == b"OK\n" ) : return( "OK" ) else : raise PyNUTError( result ) def RunUPSCommand( self, ups="", command="" ) : """ Send a command to the specified UPS Returns OK on success or raises an error """ if self.__debug : print( "[DEBUG] RunUPSCommand called..." ) self.__srv_handler.write( ("INSTCMD %s %s\n" % ( ups, command )).encode('ascii') ) result = self.__srv_handler.read_until( b"\n" ) if ( result == b"OK\n" ) : return( "OK" ) else : raise PyNUTError( result.replace( b"\n", b"" ) ) def FSD( self, ups="") : """ Send FSD command Returns OK on success or raises an error NOTE: API changed since NUT 2.8.0 to replace MASTER with PRIMARY (and backwards-compatible alias handling) """ if self.__debug : print( "[DEBUG] PRIMARY called..." ) self.__srv_handler.write( ("PRIMARY %s\n" % ups).encode('ascii') ) result = self.__srv_handler.read_until( b"\n" ) if ( result != b"OK PRIMARY-GRANTED\n" ) : if self.__debug : print( "[DEBUG] Retrying: MASTER called..." ) self.__srv_handler.write( ("MASTER %s\n" % ups).encode('ascii') ) result = self.__srv_handler.read_until( b"\n" ) if ( result != b"OK MASTER-GRANTED\n" ) : raise PyNUTError( ( "Primary level functions are not available", "" ) ) if self.__debug : print( "[DEBUG] FSD called..." ) self.__srv_handler.write( ("FSD %s\n" % ups).encode('ascii') ) result = self.__srv_handler.read_until( b"\n" ) if ( result == b"OK FSD-SET\n" ) : return( "OK" ) else : raise PyNUTError( result.replace( b"\n", b"" ) ) def help(self) : """ Send HELP command """ if self.__debug : print( "[DEBUG] HELP called..." ) self.__srv_handler.write( b"HELP\n" ) return self.__srv_handler.read_until( b"\n" ) def ver(self) : """ Send VER command """ if self.__debug : print( "[DEBUG] VER called..." ) self.__srv_handler.write( b"VER\n" ) return self.__srv_handler.read_until( b"\n" ) def ListClients( self, ups = None ) : """ Returns the list of connected clients from the NUT server The result is a dictionary containing 'key->val' pairs of 'UPSName' and a list of clients """ if self.__debug : print( "[DEBUG] ListClients from server" ) if ups and (ups not in self.GetUPSList()): raise PyNUTError( "%s is not a valid UPS" % ups ) if ups: self.__srv_handler.write( ("LIST CLIENTS %s\n" % ups).encode('ascii') ) else: self.__srv_handler.write( b"LIST CLIENTS\n" ) result = self.__srv_handler.read_until( b"\n" ) if result != b"BEGIN LIST CLIENTS\n" : raise PyNUTError( result.replace( b"\n", b"" ) ) result = self.__srv_handler.read_until( b"END LIST CLIENTS\n" ) ups_list = {} for line in result.split( b"\n" ): if line[:6] == b"CLIENT" : host, ups = line[7:].split(b' ') ups.replace(b' ', b'') if not ups in ups_list: ups_list[ups] = [] ups_list[ups].append(host) return( ups_list )