Skip to content

API reference

ctsgen3.spi.spi

FPS = 3.75 module-attribute

Current FPS of this firmware version

PIXEL_HEIGHT = 15 module-attribute

Thermal image pixel height.

PIXEL_WIDTH = 20 module-attribute

Thermal image pixel width.

MAX_NUM_DETECTIONS = 21 module-attribute

Maximum number of detections

CvDetection

CV algorithm outputs for each detected heat blob.

Source code in src/ctsgen3/spi/spi.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class CvDetection(ctypes.LittleEndianStructure):
    """
    CV algorithm outputs for each detected heat blob.
    """

    _fields_ = [
        ("id", ctypes.c_uint8),
        ("label", ctypes.c_uint8),
        ("temperature_centre_location_x", ctypes.c_uint8),
        ("temperature_centre_location_y", ctypes.c_uint8),
        ("frames_since_motion", ctypes.c_uint32),
        ("peak_temperature", ctypes.c_float),
        ("foot_position_estimate_x", ctypes.c_float),
        ("foot_position_estimate_y", ctypes.c_float),
        ("displacement", ctypes.c_float),
        ("speed", ctypes.c_float),
        ("angle", ctypes.c_float),
    ]
    _pack_ = 1

    def __repr__(self) -> str:
        fields = [f"{name}={getattr(self, name)!r}" for field in self._fields_ for name in [field[0]]]
        return f"{self.__class__.__name__}({', '.join(fields)})"

SpiThermalPacket

SPI bulk data thermal frame packet.

Source code in src/ctsgen3/spi/spi.py
59
60
61
62
63
64
65
66
67
68
class SpiThermalPacket(ctypes.LittleEndianStructure):
    """
    SPI bulk data thermal frame packet.
    """

    _fields_ = [
        ("thermal_frame", ctypes.c_int16 * (PIXEL_HEIGHT * PIXEL_WIDTH)),
        ("crc", ctypes.c_uint16),
    ]
    _pack_ = 1

SpiMetadataPacket

SPI bulk data metadata packet.

Source code in src/ctsgen3/spi/spi.py
71
72
73
74
75
76
77
78
79
80
class SpiMetadataPacket(ctypes.LittleEndianStructure):
    """
    SPI bulk data metadata packet.
    """

    _fields_ = [
        ("metadata", RegisterMap),
        ("crc", ctypes.c_uint16),
    ]
    _pack_ = 1

SpiCvForegroundPacket

SPI bulk data CV foreground packet.

Source code in src/ctsgen3/spi/spi.py
83
84
85
86
87
88
89
90
91
92
class SpiCvForegroundPacket(ctypes.LittleEndianStructure):
    """
    SPI bulk data CV foreground packet.
    """

    _fields_ = [
        ("cv_foreground", ctypes.c_int16 * (PIXEL_HEIGHT * PIXEL_WIDTH)),
        ("crc", ctypes.c_uint16),
    ]
    _pack_ = 1

SpiCvDetectionsPacket

SPI bulk data CV detections packet.

Source code in src/ctsgen3/spi/spi.py
 95
 96
 97
 98
 99
100
101
102
103
104
class SpiCvDetectionsPacket(ctypes.LittleEndianStructure):
    """
    SPI bulk data CV detections packet.
    """

    _fields_ = [
        ("cv_detections", CvDetection * MAX_NUM_DETECTIONS),
        ("crc", ctypes.c_uint16),
    ]
    _pack_ = 1

SpiPacket

Full SpiPacket when all sections are configured via command and control I2C interface.

Source code in src/ctsgen3/spi/spi.py
107
108
109
110
111
112
113
114
115
116
117
118
119
class SpiPacket(ctypes.LittleEndianStructure):
    """
    Full SpiPacket when all sections are configured via command and control I2C interface.
    """

    _fields_ = [
        ("version", ctypes.c_uint32),
        ("thermal_frame", SpiThermalPacket),
        ("metadata", SpiMetadataPacket),
        ("cv_foreground", SpiCvForegroundPacket),
        ("cv_detections", SpiCvDetectionsPacket),
    ]
    _pack_ = 1

SpiInterface dataclass

SPI interface class for reading bulk data from CTS Gen3 EVK via FTDI FT4222 I2C/SPI to USB chip.

Source code in src/ctsgen3/spi/spi.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
@dataclass
class SpiInterface:
    """
    SPI interface class for reading bulk data from CTS Gen3 EVK via FTDI FT4222 I2C/SPI to USB chip.
    """

    _ft4222_pair: FT4222Pair
    _crc_calculator: anycrc.CRC
    _clock: ft4222.SysClock = ft4222.SysClock.CLK_80
    _clock_divider: ft4222.SPIMaster.Clock = ft4222.SPIMaster.Clock.DIV_8
    _cpol: ft4222.SPI.Cpol = ft4222.SPI.Cpol.IDLE_LOW
    _cpha: ft4222.SPI.Cpha = ft4222.SPI.Cpha.CLK_LEADING
    _chip_select_pin: ft4222.SPIMaster.SlaveSelect = ft4222.SPIMaster.SlaveSelect.SS0

    def __init__(
        self,
        ft4222_pair: FT4222Pair,
        clock: ft4222.SysClock = ft4222.SysClock.CLK_80,
        clock_divider: ft4222.SPIMaster.Clock = ft4222.SPIMaster.Clock.DIV_8,
    ):
        """
        Initialises SpiInterface object.

        Configure clock and divider to achieve goal frequency - maximum frequency tested is 10MHz = CLK_80MHz / divdier 8

        Polarity, phase and slave select pins are locked.
        """
        self._ft4222_pair = ft4222_pair
        self._clock = clock
        self._clock_divider = clock_divider
        self._crc_calculator = anycrc.CRC(
            width=CRC_WIDTH,
            poly=CRC_POLYNOMIAL,
            init=CRC_INITIAL_VALUE,
            xorout=CRC_FINAL_XOR_VALUE,
            refin=CRC_REVERSE_INPUT,
            refout=CRC_REVERSE_OUTPUT,
        )

    class CRCError(Exception):
        """Raised when CRC validation fails in SPI transfer."""

    def init_spi(self) -> None:
        """
        Initialises FT4222 as a SPI master. Ensure you call this again if you reconfigure the FT4222 in another mode before using other methods.

        Raises `ft4222.FT4222DeviceError` on error.
        """
        self._ft4222_pair._ft4222_A.setClock(self._clock)
        self._ft4222_pair._ft4222_A.spiMaster_Init(
            ft4222.SPIMaster.Mode.SINGLE,
            self._clock_divider,
            ft4222.SPI.Cpol.IDLE_LOW,
            ft4222.SPI.Cpha.CLK_LEADING,
            ft4222.SPIMaster.SlaveSelect.SS0,
        )
        self._ft4222_pair._ft4222_B.gpio_Init(gpio2=ft4222.GPIO.Dir.INPUT)  # type: ignore
        self._ft4222_pair._ft4222_B.setSuspendOut(False)
        self._ft4222_pair._ft4222_B.gpio_SetInputTrigger(ft4222.GPIO.Port.P2, ft4222.GPIO.Trigger.FALLING)

    def close_spi(self) -> None:
        """
        Close all FT4222 devices in SPI Interface
        """
        self._ft4222_pair._ft4222_A.close()
        self._ft4222_pair._ft4222_B.close()

    def read_packet(self) -> SpiPacket:
        """
        Reads a SpiPacket from SPI and validates CRCs. Does not wait for GPIO trigger.

        Raises `ft4222.FT4222DeviceError` or `CRCError` on error.

        Returns:
            SpiPacket: populated packet on success
        """
        rx_bytes = self._ft4222_pair._ft4222_A.spiMaster_SingleReadWrite(
            data=(bytes([0] * ctypes.sizeof(SpiPacket))), isEndTransaction=True
        )
        spi_packet = SpiPacket.from_buffer_copy(rx_bytes)
        crc_data = ctypes.string_at(
            ctypes.addressof(spi_packet.thermal_frame.thermal_frame),
            ctypes.sizeof(spi_packet.thermal_frame.thermal_frame),
        )
        crc_computed = self._crc_calculator.calc(crc_data)
        if crc_computed != spi_packet.thermal_frame.crc:
            print("⚠️ Thermal CRC fail.\n" f"Got: {spi_packet.thermal_frame.crc}\n" f"Expected: {crc_computed}")
            raise SpiInterface.CRCError()
        crc_data = ctypes.string_at(
            ctypes.addressof(spi_packet.cv_foreground.cv_foreground),
            ctypes.sizeof(spi_packet.cv_foreground.cv_foreground),
        )
        crc_computed = self._crc_calculator.calc(crc_data)
        if crc_computed != spi_packet.cv_foreground.crc:
            print("⚠️ Foreground CRC fail.\n" f"Got: {spi_packet.cv_foreground.crc}\n" f"Expected: {crc_computed}")
            raise SpiInterface.CRCError()
        crc_data = ctypes.string_at(
            ctypes.addressof(spi_packet.cv_detections.cv_detections),
            ctypes.sizeof(spi_packet.cv_detections.cv_detections),
        )
        crc_computed = self._crc_calculator.calc(crc_data)
        if crc_computed != spi_packet.cv_detections.crc:
            print("⚠️ Detections CRC fail.\n" f"Got: {spi_packet.cv_detections.crc}\n" f"Expected: {crc_computed}")
            raise SpiInterface.CRCError()
        crc_data = ctypes.string_at(
            ctypes.addressof(spi_packet.metadata.metadata),
            ctypes.sizeof(spi_packet.metadata.metadata),
        )
        crc_computed = self._crc_calculator.calc(crc_data)
        if crc_computed != spi_packet.metadata.crc:
            print("⚠️ Metadata CRC fail.\n" f"Got: {spi_packet.metadata.crc}\n" f"Expected: {crc_computed}")
            raise SpiInterface.CRCError()
        return spi_packet

    def read_bulk_data(self, timeout_ms: int = 0) -> SpiPacket | None:
        """
        Attempt to read a `SpiPacket` within `timeout_ms` and checks for CRC errors.

        If `timeout_ms` is 0, which is default, it will hang indefinitely.

        Raises `ft4222.FT4222DeviceError` or `CRCError` on error.

        Args:
            timeout_ms (int): sets the read timeout in milliseconds. Value of 0 means there is no timeout.

        Returns:
            SpiPacket | None: populated packet on success, else none
        """
        start_time = time.time()
        timed_out = True
        while (timeout_ms == 0) | ((time.time() - start_time) * 1000 < timeout_ms):
            if self._ft4222_pair._ft4222_B.gpio_GetTriggerStatus(ft4222.GPIO.Port.P2) > 0:
                timed_out = False
                break
        if timed_out:
            return None
        self._ft4222_pair._ft4222_B.gpio_ReadTriggerQueue(ft4222.GPIO.Port.P2, 1)
        return self.read_packet()

CRCError

Raised when CRC validation fails in SPI transfer.

Source code in src/ctsgen3/spi/spi.py
168
169
class CRCError(Exception):
    """Raised when CRC validation fails in SPI transfer."""

__init__(ft4222_pair, clock=ft4222.SysClock.CLK_80, clock_divider=ft4222.SPIMaster.Clock.DIV_8)

Initialises SpiInterface object.

Configure clock and divider to achieve goal frequency - maximum frequency tested is 10MHz = CLK_80MHz / divdier 8

Polarity, phase and slave select pins are locked.

Source code in src/ctsgen3/spi/spi.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def __init__(
    self,
    ft4222_pair: FT4222Pair,
    clock: ft4222.SysClock = ft4222.SysClock.CLK_80,
    clock_divider: ft4222.SPIMaster.Clock = ft4222.SPIMaster.Clock.DIV_8,
):
    """
    Initialises SpiInterface object.

    Configure clock and divider to achieve goal frequency - maximum frequency tested is 10MHz = CLK_80MHz / divdier 8

    Polarity, phase and slave select pins are locked.
    """
    self._ft4222_pair = ft4222_pair
    self._clock = clock
    self._clock_divider = clock_divider
    self._crc_calculator = anycrc.CRC(
        width=CRC_WIDTH,
        poly=CRC_POLYNOMIAL,
        init=CRC_INITIAL_VALUE,
        xorout=CRC_FINAL_XOR_VALUE,
        refin=CRC_REVERSE_INPUT,
        refout=CRC_REVERSE_OUTPUT,
    )

init_spi()

Initialises FT4222 as a SPI master. Ensure you call this again if you reconfigure the FT4222 in another mode before using other methods.

Raises ft4222.FT4222DeviceError on error.

Source code in src/ctsgen3/spi/spi.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def init_spi(self) -> None:
    """
    Initialises FT4222 as a SPI master. Ensure you call this again if you reconfigure the FT4222 in another mode before using other methods.

    Raises `ft4222.FT4222DeviceError` on error.
    """
    self._ft4222_pair._ft4222_A.setClock(self._clock)
    self._ft4222_pair._ft4222_A.spiMaster_Init(
        ft4222.SPIMaster.Mode.SINGLE,
        self._clock_divider,
        ft4222.SPI.Cpol.IDLE_LOW,
        ft4222.SPI.Cpha.CLK_LEADING,
        ft4222.SPIMaster.SlaveSelect.SS0,
    )
    self._ft4222_pair._ft4222_B.gpio_Init(gpio2=ft4222.GPIO.Dir.INPUT)  # type: ignore
    self._ft4222_pair._ft4222_B.setSuspendOut(False)
    self._ft4222_pair._ft4222_B.gpio_SetInputTrigger(ft4222.GPIO.Port.P2, ft4222.GPIO.Trigger.FALLING)

close_spi()

Close all FT4222 devices in SPI Interface

Source code in src/ctsgen3/spi/spi.py
189
190
191
192
193
194
def close_spi(self) -> None:
    """
    Close all FT4222 devices in SPI Interface
    """
    self._ft4222_pair._ft4222_A.close()
    self._ft4222_pair._ft4222_B.close()

read_packet()

Reads a SpiPacket from SPI and validates CRCs. Does not wait for GPIO trigger.

Raises ft4222.FT4222DeviceError or CRCError on error.

Returns:

Name Type Description
SpiPacket SpiPacket

populated packet on success

Source code in src/ctsgen3/spi/spi.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def read_packet(self) -> SpiPacket:
    """
    Reads a SpiPacket from SPI and validates CRCs. Does not wait for GPIO trigger.

    Raises `ft4222.FT4222DeviceError` or `CRCError` on error.

    Returns:
        SpiPacket: populated packet on success
    """
    rx_bytes = self._ft4222_pair._ft4222_A.spiMaster_SingleReadWrite(
        data=(bytes([0] * ctypes.sizeof(SpiPacket))), isEndTransaction=True
    )
    spi_packet = SpiPacket.from_buffer_copy(rx_bytes)
    crc_data = ctypes.string_at(
        ctypes.addressof(spi_packet.thermal_frame.thermal_frame),
        ctypes.sizeof(spi_packet.thermal_frame.thermal_frame),
    )
    crc_computed = self._crc_calculator.calc(crc_data)
    if crc_computed != spi_packet.thermal_frame.crc:
        print("⚠️ Thermal CRC fail.\n" f"Got: {spi_packet.thermal_frame.crc}\n" f"Expected: {crc_computed}")
        raise SpiInterface.CRCError()
    crc_data = ctypes.string_at(
        ctypes.addressof(spi_packet.cv_foreground.cv_foreground),
        ctypes.sizeof(spi_packet.cv_foreground.cv_foreground),
    )
    crc_computed = self._crc_calculator.calc(crc_data)
    if crc_computed != spi_packet.cv_foreground.crc:
        print("⚠️ Foreground CRC fail.\n" f"Got: {spi_packet.cv_foreground.crc}\n" f"Expected: {crc_computed}")
        raise SpiInterface.CRCError()
    crc_data = ctypes.string_at(
        ctypes.addressof(spi_packet.cv_detections.cv_detections),
        ctypes.sizeof(spi_packet.cv_detections.cv_detections),
    )
    crc_computed = self._crc_calculator.calc(crc_data)
    if crc_computed != spi_packet.cv_detections.crc:
        print("⚠️ Detections CRC fail.\n" f"Got: {spi_packet.cv_detections.crc}\n" f"Expected: {crc_computed}")
        raise SpiInterface.CRCError()
    crc_data = ctypes.string_at(
        ctypes.addressof(spi_packet.metadata.metadata),
        ctypes.sizeof(spi_packet.metadata.metadata),
    )
    crc_computed = self._crc_calculator.calc(crc_data)
    if crc_computed != spi_packet.metadata.crc:
        print("⚠️ Metadata CRC fail.\n" f"Got: {spi_packet.metadata.crc}\n" f"Expected: {crc_computed}")
        raise SpiInterface.CRCError()
    return spi_packet

read_bulk_data(timeout_ms=0)

Attempt to read a SpiPacket within timeout_ms and checks for CRC errors.

If timeout_ms is 0, which is default, it will hang indefinitely.

Raises ft4222.FT4222DeviceError or CRCError on error.

Parameters:

Name Type Description Default
timeout_ms int

sets the read timeout in milliseconds. Value of 0 means there is no timeout.

0

Returns:

Type Description
SpiPacket | None

SpiPacket | None: populated packet on success, else none

Source code in src/ctsgen3/spi/spi.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def read_bulk_data(self, timeout_ms: int = 0) -> SpiPacket | None:
    """
    Attempt to read a `SpiPacket` within `timeout_ms` and checks for CRC errors.

    If `timeout_ms` is 0, which is default, it will hang indefinitely.

    Raises `ft4222.FT4222DeviceError` or `CRCError` on error.

    Args:
        timeout_ms (int): sets the read timeout in milliseconds. Value of 0 means there is no timeout.

    Returns:
        SpiPacket | None: populated packet on success, else none
    """
    start_time = time.time()
    timed_out = True
    while (timeout_ms == 0) | ((time.time() - start_time) * 1000 < timeout_ms):
        if self._ft4222_pair._ft4222_B.gpio_GetTriggerStatus(ft4222.GPIO.Port.P2) > 0:
            timed_out = False
            break
    if timed_out:
        return None
    self._ft4222_pair._ft4222_B.gpio_ReadTriggerQueue(ft4222.GPIO.Port.P2, 1)
    return self.read_packet()