Source code for stixpy.calibration.compression

"""
STIX integer compression is a quasi-exponential 1 byte compression of integer data, both positive
or negative depending on the parameterization.  For the decompression it returns the mid-point value
of the possible compressed values that could have given the input.

The compression algorithm is applied to an integer value, V, to yield a compressed value, C, as
follows:

1. If  V < 0, set s=1 and set V = -V
2. If  V < 2^(M+1), set C = V and skip to step 6
3. If  V >= 2^(M+1), shift V right until 1s (if any) appear only in LS M+1 bits
4. Exponent, e = number of shifts+ 1
5. Mantissa, m = LS M bits of the shifted value.
6. Set C = m + 2^M * e
7. If S=1, set msb of C = s

The algorithm family has the following properties:

* The compressed value is exact for input absolute values up to 2^(M+1)
* RMS fractional error is in range: 1./(sqrt(12)*2^M) to 1/(sqrt(1/12)*s^(M+1))
* The maximum absolute value that can be compressed is given by: 2^(2^K-2) x (2^(M+1)-1)
  The number of bits required to hold the compressed value is S+K+M

References
----------
STIX-TN-0117-FHNW
"""

import numpy as np

__all__ = [
    "compress",
    "decompress",
    "CompressionRangeError",
    "CompressionSchemeParameterError",
    "NonIntegerCompressionError",
]


[docs] def compress(values, *, s, k, m): """ Compress values according to parameters. Parameters ---------- values : array-like (int) Values to be compressed s : int Number of sign bits, 0 for positive only values 1 for signed k : int Number of bits for exponent m : int Number of bit for mantissa Returns ------- array The compressed values Examples -------- >>> comp = compress(12345, s=0, k=5, m=3) >>> int(comp) 92 >>> comp = compress(-1984, s=1, k=3, m=4) >>> int(comp) 255 """ values = np.atleast_1d(np.array(values)) if not np.issubdtype(values.dtype, np.integer): raise NonIntegerCompressionError(f"Input must be an integer type not {values.dtype}") total = s + k + m if total > 8: raise CompressionSchemeParameterError(f"Invalid s={s}, k={k}, m={m} must sum to less than 8 not {total}") max_value = 2 ** (2**k - 2) * (2 ** (m + 1)) - 1 abs_range = [0, max_value] if s == 0 else [-1 * max_value, 1 * max_value] if values.min() < abs_range[0] or values.max() > abs_range[1]: raise CompressionRangeError( f"Valid input range exceeded for s={s}, k={k}, m={m} " f"input (min/max) {values.min()}/{values.max()} " f"exceeds {abs_range}" ) negative_indices = np.nonzero(values < 0) out = np.abs(values).astype(np.uint64) compress_indices = np.nonzero(out >= 2 ** (m + 1)) if values[compress_indices].size != 0: kv = np.floor(np.log(np.abs(values[compress_indices])) / np.log(2.0) - m).astype(np.int64) mv = (np.abs(values[compress_indices], dtype=np.int64) // 2**kv) & (2**m - 1) out[compress_indices] = kv + 1 << m | mv if s != 0 and values[negative_indices].size >= 1: out[negative_indices] += 128 return out
[docs] def decompress(values, *, s, k, m, return_variance=False): """ Compress values according to parameters. Parameters ---------- values : array-like (int) Values to decompressed s : int (0, 1) Number of sign bits k : int Number of bits for exponent m : int Number of bits for mantissa return_variance : boolean (optional) Returns ------- array The decompressed values Examples -------- >>> decomp = decompress(92, s=0, k=5, m=3) >>> int(decomp) 12799 >>> decomp = decompress(255, s=1, k=3, m=4) >>> int(decomp) -2015 """ values = np.atleast_1d(np.array(values)) if not np.issubdtype(values.dtype, np.integer): raise NonIntegerCompressionError(f"Input must be an integer type not {values.dtype}") values = values.astype(np.uint64) total = s + k + m if total > 8: raise CompressionSchemeParameterError(f"Invalid s={s}, k={k}, m={m} must sum to less than 8 not {total}") if values.min() < 0 or values.max() > 255: raise CompressionRangeError("Compressed values must be in the range 0 to 255") max_value = 2 ** (2**k - 2) * (2 ** (m + 1)) - 1 uint64_info = np.iinfo(np.uint64) if max_value > uint64_info.max: raise CompressionRangeError("Decompressed value too large to fit into uint64") abs_values = values if s == 0 else values & 127 out = abs_values[:].astype(np.uint64) negative_indices = np.nonzero(values >= 128) if s != 0 else ([]) decompress_indices = np.nonzero(abs_values >= 2 ** (m + 1)) if return_variance: variance = np.zeros_like(values, dtype=np.float64) if abs_values[decompress_indices].size >= 1: kv = (abs_values[decompress_indices] >> m) - 1 mv = 2**m + (abs_values[decompress_indices] & 2**m - 1) out[decompress_indices] = (mv << kv) + (1 << (kv - 1)) - 1 if return_variance: variance[decompress_indices] = ((1 << kv) ** 2 + 2) / 12 if abs_values[negative_indices].size >= 1: out = out.astype(np.int64) out[negative_indices] = -1 * out[negative_indices] if return_variance: return out, variance else: return out
[docs] class NonIntegerCompressionError(Exception): """ Exception raised when the input to the compression algorithm is not an integer. """
[docs] class CompressionSchemeParameterError(Exception): """ Exception raised for invalid compression scheme parameters. """
[docs] class CompressionRangeError(Exception): """ Exception raised due to invalid range in the input or output. """