Source code for arvpyf.mgmt

import requests
import json


[docs]class ArchiverConfig(object): """ Interface to REST API commands supported by the Archiver Appliance MGMT service """
[docs] def __init__(self, url): """ Constructor Parameters ---------- url : str url of the Archiver Appliance MGMT service e.g., http://xf04id-ca1.cs.nsls2.local:17665 """ self.url = url
[docs] def get_all_pvs(self, regex='', limit=500): """ Return all the PVs (Note: this call can return millions of PVs) Parameters ---------- regex : str, optional Java regex wildcard limit : number, optional number of matched PV's (default: 500) Returns ------- list : list of PV names """ params = {} if len(regex) != 0: params['pv'] = regex if limit != 500: params['limit'] = str(limit) else: if limit != 500: params['limit'] = str(limit) request_url = self.url + '/getAllPVs' req = requests.get(request_url, params=params, stream=True) result = req.json() return result
def _get_pv_status(self, pvs): result = [] if len(pvs) == 0: return result pv = pvs[0] if len(pvs) > 1: for pvname in pvs[1:]: pv += ',' + pvname params = {'pv': pv} request_url = self.url + '/getPVStatus' req = requests.get(request_url, params=params, stream=True) result = req.json() return result
[docs] def get_pv_status(self, pvs): """ Return the PV status Parameters ---------- pvs : list list of PV names """ result = [] for i in range(0, len(pvs), 100): i1 = i i2 = min(i1+100, len(pvs)) r = self._get_pv_status(pvs[i1: i2]) result += r return result
[docs] def get_pv_type_info(self, pv): """ Return the type info for the specified PV Parameters ---------- pv : str PV name """ params = {'pv': pv} request_url = self.url + '/getPVTypeInfo' req = requests.get(request_url, params=params, stream=True) result = req.json() return result
[docs] def get_pv_details(self, pv): """ Return the detailed statistics for the specified PV Parameters ---------- pv : str PV name """ params = {'pv': pv} request_url = self.url + '/getPVDetails' req = requests.get(request_url, params=params, stream=True) result = req.json() return result
[docs] def get_recently_added_pvs(self): """ Return a list of PVs sorted by descending PVTypeInfo creation timestamp """ request_url = self.url + '/getRecentlyAddedPVs' req = requests.get(request_url, stream=True) result = req.json() return result
[docs] def get_paused_pvs_report(self): """ Return a list of PVs that are currently paused """ request_url = self.url + '/getPausedPVsReport' req = requests.get(request_url, stream=True) result = req.json() return result
[docs] def get_never_connected_pvs(self): """ Return a list of PVs that never connected """ request_url = self.url + '/getNeverConnectedPVs' req = requests.get(request_url, stream=True) result = req.json() pvs = [pv['pvName'] for pv in result] return pvs
[docs] def archive_pvs(self, pvnames, period=1.0, method='MONITOR', policy='Default'): """ Archive one or more PVs Parameters ---------- pvnames : list list of PV names period : float, optional sampling period to be used (default value is 1.0 seconds) method : str, optional sampling method to be used, MONITOR(default) or SCAN policy : str, optional policy name (default: 'Default') """ request_url = self.url + '/archivePV' headers = {"Content-type": "application/json", "Accept": "text/plain"} pvs = [] for pv in pvnames: pvdict = {} pvdict['pv'] = pv pvdict['samplingperiod'] = str(period) pvdict['samplingmethod'] = method pvdict['policy'] = policy pvs.append(pvdict) data = json.dumps(pvs) req = requests.post(request_url, data=data, headers=headers) result = req.json() return result
def _get_archiving_status(self, pvs): archived = [] others = [] if len(pvs) == 0: return archived, others pv = pvs[0] if len(pvs) > 1: for pvname in pvs[1:]: pv += ',' + pvname params = {'pv': pv} request_url = self.url + '/getPVStatus' req = requests.get(request_url, params=params, stream=True) result = req.json() for record in result: if record['status'] == 'Being archived': archived.append(record['pvName']) else: others.append(record['pvName']) return archived, others
[docs] def get_archiving_status(self, pvs): """ Return the PV archiving status Parameters ---------- pvs : list list of PV names """ archived = [] others = [] for i in range(0, len(pvs), 100): i1 = i i2 = min(i1+100, len(pvs)) a, o = self._get_archiving_status(pvs[i1: i2]) archived += a others += o return archived, others
[docs] def change_archival_parameters(self, pv, period, method='MONITOR'): """ Change the archival parameters for the specified PV Parameters ---------- pv : str PV name period : float new sampling period in seconds method : str new sampling method, 'SCAN' or 'MONITOR' """ params = {} params['pv'] = pv params['samplingperiod'] = str(period) params['samplingmethod'] = method request_url = self.url + '/changeArchivalParameters' req = requests.get(request_url, params=params, stream=True) result = req.json() return result
[docs] def abort_archiving_pv(self, pv): """ Abort any pending requests for archiving the specified PV Parameters ---------- pv : str PV name """ params = {'pv': pv} request_url = self.url + '/abortArchivingPV' req = requests.get(request_url, params=params, stream=True) result = req.json() return result
def _pause_archiving_pvs(self, pvs): result = [] if len(pvs) == 0: return result pv = pvs[0] if len(pvs) > 1: for pvname in pvs[1:]: pv += ',' + pvname params = {'pv': pv} request_url = self.url + '/pauseArchivingPV' req = requests.get(request_url, params=params, stream=True) result = req.json() return result
[docs] def pause_archiving_pvs(self, pvs): """ Pause archiving the specified PVs Parameters ---------- pvs : list list of PV names """ pause_pvs = [] for i in range(0, len(pvs), 100): i1 = i i2 = min(i1+100, len(pvs)) p_pvs = self._pause_archiving_pvs(pvs[i1:i2]) pause_pvs += p_pvs return pause_pvs
def _resume_archiving_pvs(self, pvs): result = [] if len(pvs) == 0: return result pv = pvs[0] if len(pvs) > 1: for pvname in pvs[1:]: pv += ',' + pvname params = {'pv': pv} request_url = self.url + '/resumeArchivingPV' req = requests.get(request_url, params=params, stream=True) result = req.json() return result
[docs] def resume_archiving_pvs(self, pvs): """ Resume archiving the specified PVs Parameters ---------- pvs : list list of PV names """ arch_pvs = [] for i in range(0, len(pvs), 100): i1 = i i2 = min(i1+100, len(pvs)) p_pvs = self._resume_archiving_pvs(pvs[i1:i2]) arch_pvs += p_pvs return arch_pvs
[docs] def delete_pv(self, pv, deleteData=False): """ Stop archiving the specified PV (Note: the PV needs to be paused first) Parameters ---------- pv : str PV name deleteData : bool, optional delete the data that has already been recorded (default: False) """ params = {'pv': pv} if deleteData is False: params['deleteData'] = 'false' else: params['deleteData'] = 'true' request_url = self.url + '/deletePV' req = requests.get(request_url, params=params, stream=True) result = req.json() return result