Source code for ophyd.signal

# vi: ts=4 sw=4
import logging
import time

import numpy as np
import epics

from .utils import (ReadOnlyError, LimitError)
from .utils.epics_pvs import (pv_form, waveform_to_string,
                              raise_if_disconnected, data_type, data_shape)
from .ophydobj import OphydObject
from .status import Status
from .utils import set_and_wait


logger = logging.getLogger(__name__)


[docs]class Signal(OphydObject): '''A signal, which can have a read-write or read-only value. Parameters ---------- value : any, optional The initial value timestamp : float, optional The timestamp associated with the initial value. Defaults to the current local time. tolerance : any, optional The absolute tolerance associated with the value rtolerance : any, optional The relative tolerance associated with the value, used in set_and_wait as follows: absolute(setpoint - readback) <= (tolerance + rtolerance * absolute(readback)) Attributes ---------- rtolerance : any, optional The relative tolerance associated with the value ''' SUB_VALUE = 'value' _default_sub = SUB_VALUE def __init__(self, *, value=None, timestamp=None, name=None, parent=None, tolerance=None, rtolerance=None): super().__init__(name=name, parent=parent) self._readback = value if timestamp is None: timestamp = time.time() self._timestamp = timestamp self._set_thread = None self._tolerance = tolerance # self.tolerance is a property self.rtolerance = rtolerance
[docs] def trigger(self): '''Call that is used by bluesky prior to read()''' # NOTE: this is a no-op that exists here for bluesky purposes # it may need to be moved in the future d = Status(self) d._finished() return d
[docs] def wait_for_connection(self, timeout=0.0): '''Wait for the underlying signals to initialize or connect''' pass
@property def timestamp(self): '''Timestamp of the readback value''' return self._timestamp @property def tolerance(self): '''The absolute tolerance associated with the value.''' return self._tolerance @tolerance.setter def tolerance(self, tolerance): self._tolerance = tolerance def _repr_info(self): yield from super()._repr_info() value = self.value if value is not None: yield ('value', value) if self.timestamp is not None: yield ('timestamp', self.timestamp) if self.tolerance is not None: yield ('tolerance', self.tolerance) if self.rtolerance is not None: yield ('rtolerance', self.rtolerance)
[docs] def get(self, **kwargs): '''The readback value''' return self._readback
[docs] def put(self, value, *, timestamp=None, force=False, **kwargs): '''Put updates the internal readback value The value is optionally checked first, depending on the value of force. In addition, VALUE subscriptions are run. Extra kwargs are ignored (for API compatibility with EpicsSignal kwargs pass through). Parameters ---------- value : any Value to set timestamp : float, optional The timestamp associated with the value, defaults to time.time() force : bool, optional Check the value prior to setting it, defaults to False ''' # TODO: consider adding set_and_wait here as a kwarg if not force: self.check_value(value) old_value = self._readback self._readback = value if timestamp is None: timestamp = time.time() self._timestamp = timestamp self._run_subs(sub_type=self.SUB_VALUE, old_value=old_value, value=value, timestamp=self._timestamp)
[docs] def set(self, value, *, timeout=None, settle_time=None): '''Set is like `put`, but is here for bluesky compatibility Returns ------- st : Status This status object will be finished upon return in the case of basic soft Signals ''' def set_thread(): nonlocal timeout if timeout is None: timeout = 10 # TODO set_and_wait does not support a timeout of None # and 10 is its default timeout try: set_and_wait(self, value, timeout=timeout, atol=self.tolerance, rtol=self.rtolerance) except TimeoutError: logger.debug('set_and_wait(%r, %s) timed out', self.name, value) success = False except Exception as ex: logger.debug('set_and_wait(%r, %s) failed', self.name, value, exc_info=ex) success = False else: logger.debug('set_and_wait(%r, %s) succeeded => %s', self.name, value, self.value) success = True if settle_time is not None: time.sleep(settle_time) finally: st._finished(success=success) self._set_thread = None if self._set_thread is not None: raise RuntimeError('Another set() call is still in progress') st = Status(self) self._status = st self._set_thread = epics.ca.CAThread(target=set_thread) self._set_thread.daemon = True self._set_thread.start() return self._status
@property def value(self): '''The signal's value''' return self.get() @value.setter def value(self, value): self.put(value)
[docs] def read(self): '''Put the status of the signal into a simple dictionary format for data acquisition Returns ------- dict ''' return {self.name: {'value': self.get(), 'timestamp': self.timestamp}}
[docs] def describe(self): """Return the description as a dictionary""" return {self.name: {'source': 'SIM:{}'.format(self.name), 'dtype': 'number', 'shape': []}}
[docs] def read_configuration(self): "Subclasses may customize this." return self.read()
[docs] def describe_configuration(self): "Subclasses may customize this." return self.describe()
@property def limits(self): # Always override, never extend this return (0, 0) @property def low_limit(self): return self.limits[0] @property def high_limit(self): return self.limits[1]
class DerivedSignal(Signal): def __init__(self, derived_from, *, name=None, parent=None, **kwargs): '''A signal which is derived from another one Parameters ---------- derived_from : Signal The signal from which this one is derived name : str, optional The signal name parent : Device, optional The parent device ''' super().__init__(name=name, parent=parent, **kwargs) self._derived_from = derived_from if self._derived_from.connected: # set up the initial timestamp reporting, if connected self._timestamp = self._derived_from.timestamp @property def derived_from(self): '''Signal that this one is derived from''' return self._derived_from def describe(self): '''Description based on the original signal description''' desc = self._derived_from.describe()[self._derived_from.name] desc['derived_from'] = self._derived_from.name return {self.name: desc} def get(self, **kwargs): '''Get the value from the original signal''' value = self._derived_from.get(**kwargs) self._timestamp = self._derived_from.timestamp return value def put(self, value, **kwargs): '''Put the value to the original signal''' res = self._derived_from.put(value, **kwargs) self._timestamp = self._derived_from.timestamp return res def wait_for_connection(self, timeout=0.0): '''Wait for the original signal to connect''' return self._derived_from.wait_for_connection(timeout=timeout) @property def connected(self): '''Mirrors the connection state of the original signal''' return self._derived_from.connected @property def limits(self): '''Limits from the original signal''' return self._derived_from.limits def _repr_info(self): yield from super()._repr_info() yield ('derived_from', self._derived_from)
[docs]class EpicsSignalBase(Signal): '''A read-only EpicsSignal -- that is, one with no `write_pv` Keyword arguments are passed on to the base class (Signal) initializer Parameters ---------- read_pv : str The PV to read from pv_kw : dict, optional Keyword arguments for epics.PV(**pv_kw) auto_monitor : bool, optional Use automonitor with epics.PV name : str, optional Name of signal. If not given defaults to read_pv string : bool, optional Attempt to cast the EPICS PV value to a string by default ''' def __init__(self, read_pv, *, pv_kw=None, string=False, auto_monitor=False, name=None, **kwargs): if 'rw' in kwargs: if kwargs['rw']: new_class = EpicsSignal else: new_class = EpicsSignalRO raise RuntimeError('rw is no longer an option for EpicsSignal. ' 'Based on your setting of `rw`, you should be ' 'using this class: {}' ''.format(new_class.__name__)) if pv_kw is None: pv_kw = dict() self._read_pv = None self._string = bool(string) self._pv_kw = pv_kw self._auto_monitor = auto_monitor if name is None: name = read_pv super().__init__(name=name, **kwargs) self._read_pv = epics.PV(read_pv, form=pv_form, auto_monitor=auto_monitor, **pv_kw) self._read_pv.add_callback(self._read_changed, run_now=self._read_pv.connected) @property def as_string(self): '''Attempt to cast the EPICS PV value to a string by default''' return self._string @property @raise_if_disconnected def precision(self): '''The precision of the read PV, as reported by EPICS''' return self._read_pv.precision @property @raise_if_disconnected def enum_strs(self): """List of strings if PV is an enum type""" return self._read_pv.enum_strs def _reinitialize_pv(self, old_instance, **pv_kw): '''Reinitialize a PV instance Takes care of clearing callbacks, setting PV form, and ensuring connectivity status remains the same Parameters ---------- old_instance : epics.PV The old PV instance pv_kw : kwargs The parameters to pass to the initializer ''' old_instance.clear_callbacks() was_connected = old_instance.connected new_instance = epics.PV(old_instance.pvname, form=old_instance.form, **pv_kw) if was_connected: new_instance.wait_for_connection() return new_instance def subscribe(self, callback, event_type=None, run=True): if event_type is None: event_type = self._default_sub # check if this is a setpoint subscription, and we are not explicitly # auto monitoring obj_mon = (event_type == self.SUB_VALUE and self._auto_monitor is not True) # but if the epics.PV has already connected and determined that it # should automonitor (based on the maximum automonitor length), then we # don't need to reinitialize it if obj_mon and not self._read_pv.auto_monitor: self._read_pv = self._reinitialize_pv(self._read_pv, auto_monitor=True, **self._pv_kw) self._read_pv.add_callback(self._read_changed, run_now=self._read_pv.connected) return super().subscribe(callback, event_type=event_type, run=run) def wait_for_connection(self, timeout=1.0): if not self._read_pv.connected: if not self._read_pv.wait_for_connection(timeout=timeout): raise TimeoutError('Failed to connect to %s' % self._read_pv.pvname) @property @raise_if_disconnected def timestamp(self): '''Timestamp of readback PV, according to EPICS''' if not self._read_pv.auto_monitor: # force updating the timestamp when not using auto monitoring self._read_pv.get_timevars() return self._read_pv.timestamp @property def pvname(self): '''The readback PV name''' return self._read_pv.pvname def _repr_info(self): yield ('read_pv', self._read_pv.pvname) yield from super()._repr_info() yield ('pv_kw', self._pv_kw) yield ('auto_monitor', self._auto_monitor) yield ('string', self._string) @property def connected(self): return self._read_pv.connected @property @raise_if_disconnected def limits(self): '''The read PV limits''' # This overrides the base limits pv = self._read_pv pv.get_ctrlvars() return (pv.lower_ctrl_limit, pv.upper_ctrl_limit)
[docs] def get(self, *, as_string=None, **kwargs): '''Get the readback value through an explicit call to EPICS Parameters ---------- count : int, optional Explicitly limit count for array data as_string : bool, optional Get a string representation of the value, defaults to as_string from this signal, optional as_numpy : bool Use numpy array as the return type for array data. timeout : float, optional maximum time to wait for value to be received. (default = 0.5 + log10(count) seconds) use_monitor : bool, optional to use value from latest monitor callback or to make an explicit CA call for the value. (default: True) ''' # NOTE: in the future this should be improved to grab self._readback # instead, when all of the kwargs match up if as_string is None: as_string = self._string if not self._read_pv.connected: if not self._read_pv.wait_for_connection(): raise TimeoutError('Failed to connect to %s' % self._read_pv.pvname) ret = self._read_pv.get(as_string=as_string, **kwargs) if as_string: return waveform_to_string(ret) return ret
def _fix_type(self, value): if self._string: value = waveform_to_string(value) return value def _read_changed(self, value=None, timestamp=None, **kwargs): '''A callback indicating that the read value has changed''' if timestamp is None: timestamp = time.time() value = self._fix_type(value) super().put(value, timestamp=timestamp, force=True)
[docs] def describe(self): """Return the description as a dictionary Returns ------- dict Dictionary of name and formatted description string """ desc = {'source': 'PV:{}'.format(self._read_pv.pvname), } val = self.value desc['dtype'] = data_type(val) desc['shape'] = data_shape(val) try: desc['precision'] = int(self.precision) except (ValueError, TypeError): pass desc['units'] = self._read_pv.units if hasattr(self, '_write_pv'): desc['lower_ctrl_limit'] = self._write_pv.lower_ctrl_limit desc['upper_ctrl_limit'] = self._write_pv.upper_ctrl_limit if self.enum_strs: desc['enum_strs'] = list(self.enum_strs) return {self.name: desc}
@raise_if_disconnected
[docs] def read(self): """Read the signal and format for data collection Returns ------- dict Dictionary of value timestamp pairs """ return {self.name: {'value': self.value, 'timestamp': self.timestamp}}
[docs]class EpicsSignalRO(EpicsSignalBase): '''A read-only EpicsSignal -- that is, one with no `write_pv` Keyword arguments are passed on to the base class (Signal) initializer Parameters ---------- read_pv : str The PV to read from pv_kw : dict, optional Keyword arguments for epics.PV(**pv_kw) limits : bool, optional Check limits prior to writing value auto_monitor : bool, optional Use automonitor with epics.PV name : str, optional Name of signal. If not given defaults to read_pv ''' def put(self, *args, **kwargs): raise ReadOnlyError('Read-only signals cannot be put to') def set(self, *args, **kwargs): raise ReadOnlyError('Read-only signals cannot be set')
[docs]class EpicsSignal(EpicsSignalBase): '''An EPICS signal, comprised of either one or two EPICS PVs Keyword arguments are passed on to the base class (Signal) initializer Parameters ---------- read_pv : str The PV to read from write_pv : str, optional The PV to write to if different from the read PV pv_kw : dict, optional Keyword arguments for epics.PV(**pv_kw) limits : bool, optional Check limits prior to writing value auto_monitor : bool, optional Use automonitor with epics.PV name : str, optional Name of signal. If not given defaults to read_pv put_complete : bool, optional Use put completion when writing the value tolerance : any, optional The absolute tolerance associated with the value. If specified, this overrides any precision information calculated from the write PV rtolerance : any, optional The relative tolerance associated with the value ''' SUB_SETPOINT = 'setpoint' def __init__(self, read_pv, write_pv=None, *, pv_kw=None, put_complete=False, string=False, limits=False, auto_monitor=False, name=None, **kwargs): self._write_pv = None self._use_limits = bool(limits) self._put_complete = put_complete self._setpoint = None self._setpoint_ts = None if write_pv == read_pv: write_pv = None super().__init__(read_pv, pv_kw=pv_kw, string=string, auto_monitor=auto_monitor, name=name, **kwargs) if write_pv is not None: self._write_pv = epics.PV(write_pv, form=pv_form, auto_monitor=self._auto_monitor, **self._pv_kw) self._write_pv.add_callback(self._write_changed, run_now=self._write_pv.connected) else: self._write_pv = self._read_pv def subscribe(self, callback, event_type=None, run=True): if event_type is None: event_type = self._default_sub # check if this is a setpoint subscription, and we are not explicitly # auto monitoring obj_mon = (event_type == self.SUB_SETPOINT and self._auto_monitor is not True) # but if the epics.PV has already connected and determined that it # should automonitor (based on the maximum automonitor length), then we # don't need to reinitialize it if obj_mon and not self._write_pv.auto_monitor: self._write_pv = self._reinitialize_pv(self._write_pv, auto_monitor=True, **self._pv_kw) self._write_pv.add_callback(self._write_changed, run_now=self._write_pv.connected) return super().subscribe(callback, event_type=event_type, run=run) def wait_for_connection(self, timeout=1.0): super().wait_for_connection(timeout=timeout) if self._write_pv is not None and not self._write_pv.connected: if not self._write_pv.wait_for_connection(timeout=timeout): raise TimeoutError('Failed to connect to %s' % self._write_pv.pvname) @property @raise_if_disconnected def tolerance(self): '''The tolerance of the write PV, as reported by EPICS Can be overidden by the user at the EpicsSignal level. Returns ------- tolerance : float or None Using the write PV's precision: If precision == 0, tolerance will be None If precision > 0, calculated to be 10**(-precision) ''' # NOTE: overrides Signal.tolerance property if self._tolerance is not None: return self._tolerance precision = self.precision if precision == 0 or precision is None: return None return 10. ** (-precision) @tolerance.setter def tolerance(self, tolerance): self._tolerance = tolerance @property @raise_if_disconnected def setpoint_ts(self): '''Timestamp of setpoint PV, according to EPICS''' if not self._write_pv.auto_monitor: # force updating the timestamp when not using auto monitoring self._write_pv.get_timevars() return self._write_pv.timestamp @property def setpoint_pvname(self): '''The setpoint PV name''' return self._write_pv.pvname def _repr_info(self): yield from super()._repr_info() if self._write_pv is not None: yield ('write_pv', self._write_pv.pvname) yield ('limits', self._use_limits) yield ('put_complete', self._put_complete) @property def connected(self): return self._read_pv.connected and self._write_pv.connected @property @raise_if_disconnected def limits(self): '''The write PV limits''' # read_pv_limits = super().limits pv = self._write_pv pv.get_ctrlvars() return (pv.lower_ctrl_limit, pv.upper_ctrl_limit)
[docs] def check_value(self, value): '''Check if the value is within the setpoint PV's control limits Raises ------ ValueError ''' super().check_value(value) if value is None: raise ValueError('Cannot write None to epics PVs') if not self._use_limits: return low_limit, high_limit = self.limits if low_limit >= high_limit: return if not (low_limit <= value <= high_limit): raise LimitError('Value {} outside of range: [{}, {}]' .format(value, low_limit, high_limit))
@raise_if_disconnected
[docs] def get_setpoint(self, **kwargs): '''Get the setpoint value (use only if the setpoint PV and the readback PV differ) Keyword arguments are passed on to epics.PV.get() ''' setpoint = self._write_pv.get(**kwargs) return self._fix_type(setpoint)
def _write_changed(self, value=None, timestamp=None, **kwargs): '''A callback indicating that the write value has changed''' if timestamp is None: timestamp = time.time() value = self._fix_type(value) old_value = self._setpoint self._setpoint = value self._setpoint_ts = timestamp if self._read_pv is not self._write_pv: self._run_subs(sub_type=self.SUB_SETPOINT, old_value=old_value, value=value, timestamp=self._setpoint_ts, **kwargs)
[docs] def put(self, value, force=False, **kwargs): '''Using channel access, set the write PV to `value`. Keyword arguments are passed on to callbacks Parameters ---------- value : any The value to set force : bool, optional Skip checking the value in Python first ''' if not force: self.check_value(value) if not self._write_pv.connected: if not self._write_pv.wait_for_connection(): raise TimeoutError('Failed to connect to %s' % self._write_pv.pvname) use_complete = kwargs.pop('use_complete', self._put_complete) self._write_pv.put(value, use_complete=use_complete, **kwargs) old_value = self._setpoint self._setpoint = value if self._read_pv is self._write_pv: # readback and setpoint PV are one in the same, so update the # readback as well super().put(value, timestamp=time.time(), force=True) self._run_subs(sub_type=self.SUB_SETPOINT, old_value=old_value, value=value, timestamp=self.timestamp, **kwargs)
[docs] def set(self, value, *, timeout=None, settle_time=None): '''Set is like `EpicsSignal.put`, but is here for bluesky compatibility If put completion is used for this EpicsSignal, the status object will complete once EPICS reports the put has completed. Otherwise, set_and_wait will be used (as in `Signal.set`) Parameters ---------- value : any timeout : float, optional Maximum time to wait. Note that set_and_wait does not support an infinite timeout. settle_time: float, optional Delay after the set() has completed to indicate completion to the caller Returns ------- st : Status See Also -------- `Signal.set` ''' if not self._put_complete: return super().set(value, timeout=timeout, settle_time=settle_time) # using put completion: # timeout and settle time is handled by the status object. st = Status(self, timeout=timeout, settle_time=settle_time) def put_callback(**kwargs): st._finished(success=True) self.put(value, use_complete=True, callback=put_callback) return st
@property def setpoint(self): '''The setpoint PV value''' return self.get_setpoint() @setpoint.setter def setpoint(self, value): self.put(value)
[docs]class AttributeSignal(Signal): '''Signal derived from a Python object instance's attribute Parameters ---------- attr : str The dotted attribute name, relative to this signal's parent. name : str, optional The signal name parent : Device, optional The parent device instance ''' def __init__(self, attr, *, name=None, parent=None, **kwargs): super().__init__(name=name, parent=parent, **kwargs) if '.' in attr: self.attr_base, self.attr = attr.rsplit('.', 1) else: self.attr_base, self.attr = None, attr @property def full_attr(self): '''The full attribute name''' if not self.attr_base: return self.attr else: return '.'.join((self.attr_base, self.attr)) @property def base(self): '''The parent instance which has the final attribute''' if self.attr_base is None: return self.parent obj = self.parent for i, part in enumerate(self.attr_base.split('.')): try: obj = getattr(obj, part) except AttributeError as ex: attr = '.'.join(self.parent_attr[:i + 1]) raise AttributeError('{} ({})'.format(attr, ex)) return obj def get(self, **kwargs): return getattr(self.base, self.attr) def put(self, value, **kwargs): return setattr(self.base, self.attr, value) def describe(self): value = self.value desc = {'source': 'PY:{}.{}'.format(self.parent.name, self.full_attr), 'dtype': data_type(value), 'shape': data_shape(value), } return {self.name: desc}
[docs]class ArrayAttributeSignal(AttributeSignal): '''An AttributeSignal which is cast to an ndarray on get This is used where data_type and data_shape may otherwise fail to determine how to store the data into metadatastore. ''' def get(self, **kwargs): return np.asarray(super().get(**kwargs))