Python MRE for OAK-FFC-3P + IMX577 Blocking Issue

Lincoln Wang
Lincoln Wang
Last updated 

Severity of the issue

The issue can be found from the screen directly. It rarely happens (1 out of 25). For the case the issue happen more often, head to the C++ MRE.
output 122 KB View full-size Download


Code and NN files

#!/usr/bin/env python3

from pathlib import Path
import cv2
import depthai as dai
import numpy as np
import argparse
import time
import errno
import os
import sys
import json

parser = argparse.ArgumentParser()
parser.add_argument('-nn', '--nn_model', help='select model path for inference',
                    default='/workspaces/yolov6n_openvino_2022.1_6shave.blob', type=str)
parser.add_argument('-c', '--config', help='Provide config path for inference',
                    default='/workspaces/yolov6n.json', type=str)
parser.add_argument('-fps', '--fps', type=float, help='Frame rate of camera capturing', default=15)

args = parser.parse_args()
nn_path = Path(args.nn_model)
config_path = Path(args.config)
config_fps = args.fps

if not nn_path.is_file():
    sys.exit('NN not found!')
if not config_path.is_file():
    sys.exit('Config not found!')

with config_path.open() as f:
    config = json.load(f)

nn_config = config.get("nn_config", {})
if "input_size" in nn_config:
    nn_width, nn_height = tuple(map(int, nn_config.get("input_size").split('x')))
metadata = nn_config.get("NN_specific_metadata", {})
classes = metadata.get("classes", {})
coordinates = metadata.get("coordinates", {})
anchors = metadata.get("anchors", {})
anchor_masks = metadata.get("anchor_masks", {})
iou_threshold = metadata.get("iou_threshold", {})
confidence_threshold = metadata.get("confidence_threshold", {})

print(metadata)
nn_mappings = config.get("mappings", {})
labels = nn_mappings.get("labels", {})

# pipeline
pipeline = dai.Pipeline()

camera_l = pipeline.create(dai.node.ColorCamera)
detection_nn_l = pipeline.create(dai.node.YoloDetectionNetwork)
xout_rgb_l = pipeline.create(dai.node.XLinkOut)
xout_nn_l = pipeline.create(dai.node.XLinkOut)

camera_r = pipeline.create(dai.node.ColorCamera)
detection_nn_r = pipeline.create(dai.node.YoloDetectionNetwork)
xout_rgb_r = pipeline.create(dai.node.XLinkOut)
xout_nn_r = pipeline.create(dai.node.XLinkOut)

xout_rgb_l.setStreamName("rgb_l")
xout_nn_l.setStreamName("nn_l")
xout_rgb_r.setStreamName("rgb_r")
xout_nn_r.setStreamName("nn_r")

camera_l.setPreviewSize(nn_width, nn_height)
camera_l.setBoardSocket(dai.CameraBoardSocket.CAM_B)
camera_l.setPreviewKeepAspectRatio(False)
camera_l.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
camera_l.setInterleaved(False)
camera_l.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
camera_l.setFps(config_fps)

camera_r.setPreviewSize(nn_width, nn_height)
camera_r.setBoardSocket(dai.CameraBoardSocket.CAM_C)
camera_r.setPreviewKeepAspectRatio(False)
camera_r.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
camera_r.setInterleaved(False)
camera_r.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
camera_r.setFps(config_fps)

detection_nn_l.setConfidenceThreshold(confidence_threshold)
detection_nn_l.setNumClasses(classes)
detection_nn_l.setCoordinateSize(coordinates)
detection_nn_l.setAnchors(anchors)
detection_nn_l.setAnchorMasks(anchor_masks)
detection_nn_l.setIouThreshold(iou_threshold)
detection_nn_l.setBlobPath(str(nn_path))
detection_nn_l.setNumInferenceThreads(1)
detection_nn_l.setNumNCEPerInferenceThread(1)
detection_nn_l.input.setBlocking(False)

detection_nn_r.setConfidenceThreshold(confidence_threshold)
detection_nn_r.setNumClasses(classes)
detection_nn_r.setCoordinateSize(coordinates)
detection_nn_r.setAnchors(anchors)
detection_nn_r.setAnchorMasks(anchor_masks)
detection_nn_r.setIouThreshold(iou_threshold)
detection_nn_r.setBlobPath(str(nn_path))
detection_nn_r.setNumInferenceThreads(1)
detection_nn_r.setNumNCEPerInferenceThread(1)
detection_nn_r.input.setBlocking(False)

# linking
camera_l.preview.link(xout_rgb_l.input)
camera_l.preview.link(detection_nn_l.input)
detection_nn_l.out.link(xout_nn_l.input)

camera_r.preview.link(xout_rgb_r.input)
camera_r.preview.link(detection_nn_r.input)
detection_nn_r.out.link(xout_nn_r.input)

with dai.Device(pipeline) as device:
    print('Device name:', device.getDeviceName())
    if device.getBootloaderVersion() is not None:
        print('Bootloader version:', device.getBootloaderVersion())
    print('Usb speed:', device.getUsbSpeed().name)
    print('Connected cameras:', device.getConnectedCameraFeatures())

    device_info = device.getDeviceInfo()
    print('Device mixid:', device_info.getMxId())

    q_rgb_l = device.getOutputQueue(name="rgb_l", maxSize=4, blocking=False)
    q_nn_l = device.getOutputQueue(name="nn_l", maxSize=4, blocking=False)
    q_rgb_r = device.getOutputQueue(name="rgb_r", maxSize=4, blocking=False)
    q_nn_r = device.getOutputQueue(name="nn_r", maxSize=4, blocking=False)

    start_time = time.monotonic()

    counter_l = 0
    detections_l = []
    fps_l = 0
    frame_l = None

    counter_r = 0
    detections_r = []
    fps_r = 0
    frame_r = None

    text_color = (0, 0, 255)

    def to_planar(arr: np.ndarray, shape: tuple) -> np.ndarray:
        return cv2.resize(arr, shape).transpose(2, 0, 1).flatten()

    def frameNorm(frame, bbox):
        normVals = np.full(len(bbox), frame.shape[0])
        normVals[::2] = frame.shape[1]
        return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int)

    def displayFrame(name, frame, detections):
        color = (0, 0, 255)
        for detection in detections:
            bbox = frameNorm(frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
            cv2.putText(frame, labels[detection.label], (bbox[0] + 10, bbox[1] + 20), cv2.FONT_HERSHEY_TRIPLEX, 0.5, color)
            cv2.putText(frame, f"{int(detection.confidence * 100)}%", (bbox[0] + 10, bbox[1] + 40), cv2.FONT_HERSHEY_TRIPLEX, 0.5, color)
            cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
        cv2.imshow(name, frame)

    while True:
        in_nn_l = q_nn_l.tryGet()
        in_rgb_l = q_rgb_l.tryGet()
        in_nn_r = q_nn_r.tryGet()
        in_rgb_r = q_rgb_r.tryGet()

        if in_rgb_l is not None:
            frame_l = in_rgb_l.getCvFrame()
            fps_l = counter_l / (time.monotonic() - start_time)
            cv2.putText(frame_l, "NN fps: {:.2f}".format(fps_l),
                        (2, frame_l.shape[0] - 4), cv2.FONT_HERSHEY_TRIPLEX, 0.5, text_color)
        if in_rgb_r is not None:
            frame_r = in_rgb_r.getCvFrame()
            fps_r = counter_r / (time.monotonic() - start_time)
            cv2.putText(frame_r, "NN fps: {:.2f}".format(fps_r),
                        (2, frame_r.shape[0] - 4), cv2.FONT_HERSHEY_TRIPLEX, 0.5, text_color)

        if in_nn_l is not None:
            detections_l = in_nn_l.detections
            counter_l += 1

        if in_nn_r is not None:
            detections_r = in_nn_r.detections
            counter_r += 1

        if frame_l is not None:
            displayFrame("left_camera", frame_l, detections_l)

        if frame_r is not None:
            displayFrame("right_camera", frame_r, detections_r)

        if cv2.waitKey(1) == ord('q'):
            break