wiki:expansion/gw16168

Version 11 (modified by Tim Harvey, 7 days ago) ( diff )

added a video detection example

GW16168 NXP Ara240 DNPU AI Accelerator

The GW16168 NXP Ara240 DNPU AI Accelerator is an M.2 2280 M-Key card by Gateworks for use in the Gateworks single board computers. For more product information see here: https://www.gateworks.com/products/gw16168-m2-ai-accelerator-usa-made/

Terminology

The following terminology is used in the Kinara documentation:

  • Ara1 / Ara2 processor: An ultra low-power programmable Neural Network processor
  • Kinara SDK: Kinara Software Development Kit
  • DVNC: Kinara Network Compiler
  • DVSim: Kinara Simulator
  • DVConvert: Kinara Network Converter
  • NNApp: Neural Network Application
  • Development Platform: Machine to compile model and run simulator
  • Target Platform: Platform which Ara1 / Ara2 processor connects to
  • PPA: Power, Performance and Accuracy - metrics reported by the compiler
  • SOF: Schedule Optimization Factor - a measure reported by the compiler
  • CNN: Convolutional Neural Network - a deep learning model designed to analyze and process grid-like data such as images, videos and sometimes audio and text
  • LLM: Large Language Model - an AI model trained on massive amounts of text data to understand, summarize, and generate human-like language
  • VLM: Vision Language Model - a multimodal AI that bridges the gap between sight and language. It essentially gives an LLM the ability to "see" by integrating a vision encoder with a language
  • sLLM: small Language Model - a lightweight version of an LLM designed to be more efficient, especially for "edge" devices with limited hardware resources

Public:

NXP Ara240 DNPU AI Accelerator Quick Start

Using NXP deb distribution packages

Currently NXP is distributing the Ara2 runtime in binary form. They have released the kernel driver as opensource which resolves kernel compatibility issues which is a huge step but the userspace apps and libraries remain dynamic linked binary objects.

The current deb packages have some shortcomings:

  • packages are not very consistent; some have a systemd service in the data, others create one via postinst
  • they were intended to install on top of the NXP Embedded Linux Firmware (version L6.12.34-2.1.0) and intended to support only NXP dev kit boards so the dependencies are incomplete and don't match what would be on other Linux based root filesystems (Ubuntu system for example)

If you extract the deb's and examine the DEBIAN directory you can see how to install them on other boards and root filesystems.

It is fairly common for AI models to make use of python and NXP is using that here. The rt-sdk-ara2 includes a couple of Python Wheels that are used in the examples. A Python Wheel is a standard built-package format for distributing Python libraries. It is essentially a ZIP-format archive with a .whl extension that contains all the files needed for a package to run immediately after being. It's also standard when using Python to run into package version incompatibilities which is why user based Python virtual environments are used.

Note the deb files require an NXP account to download (from NXP ARA SDK Landing page) so the instructions below assume you have them already in the current directory.

rt-sdk-ara2

The ara2 runtime should not really be considered an 'SDK' - it has nothing to do with software development, its simply the set of utils and libs needed to use the Ara2.

The rt-sdk-ara2 provides a complete runtime environment for AI/ML acceleration using the Ara240 NPU on for aarch64. This package includes:

  • Runtime libraries for Ara240 NPU integration
  • Python bindings (DVAPI) for custom inference applications
  • Optimum-Ara framework for LLMs and VLMs
  • GStreamer plugins for Real-Time Detection Object Applications
  • Helper scripts for monitoring, benchmarking, and model management
  • Systemd service for automatic hardware initialization

Installation on a Gateworks board with Ubuntu based OS:

  • extract the debian 'data' (do not install the package!)
    # extract data (but don't install)
    dpkg-deb --vextract rt-sdk-ara2_2.0.4.deb /
    
  • take care of postinst steps
    • miscelaneous
      # create app dirs (used for models)
      mkdir -pv /usr/share/{cnn,llm}
      # get rid of circular symlink
      rm /usr/share/rt-sdk-ara240_2.0.4/rt-sdk-ara240_2.0.4
      
    • install uv package manager for Python virtualization and packaging for local user (which is installed to ~/.local/bin so we create symlinks to /usr/bin)
      apt update && apt install -y curl
      curl -LsSf https://astral.sh/uv/install.sh | sh
      ln -s /root/.local/bin/uv /usr/bin/uv
      ln -s /root/.local/bin/uvx /usr/bin/uvx
      
    • build driver (the one in the deb is specific to the IMX BSP kernel)
      apt update && apt install -y build-essential git bc file flex bison
      git clone https://github.com/nxp-imx-support/uiodma-driver
      ( cd uiodma-driver/uiodma; make )
      # install it where the rt service expects to find it (over the top of the non-compatible one)
      cp uiodma-driver/uiodma/uiodma.ko /usr/share/rt-sdk-ara240/driver/
      
    • enable service:
      # enable service
      systemctl enable rt-sdk-ara2.service
      # start service now (unless you reboot)
      systemctl start rt-sdk-ara2.service
      
    • use 'fetch_models' to pre-compiled models for testing via the fetch_models script which will fetch models from HuggingFace.
      # list models available for nxp/ara
      fetch_models --list
      # install YOLOv8
      fetch_models --repo-id nxp/YOLOv8 # 746MB (711MiB)
      
      • the script is a python wrapper that uses uvx and the fetch-models python wheel (/usr/share/python-wheels/fetch_models-1.0.0-py3-none-any.whl) to fetch and install models from HuggingFace HUB
      • the models will be installed in either /usr/share/cnn (Convolutional Neural Network) and /usr/share/llm (Large Language Model)
      • NXP has Ara2 optimized models at https://huggingface.co/nxp
      • the script has a hard coded list of models available and where to install them locally. You can use 'python -m zipfile -e /usr/share/python-wheels/fetch_models-1.0.0-py3-none-any.whl ./fetch_models' to see what it's doing

Notable Files:

  • /usr/lib/
    • libaraclient_aarch64.so - base library for interfacing with ara2
    • libara_vision_inference.so - inference lib that builds on libaraclient
  • /usr/lib/gstreamer-1.0
    • libgstdvPre.so
    • libgstdvInfo.so
    • libgstdvPost.so
  • /usr/share/rt-sdk-ara240 (symlink to a version independent dir at same location)
    • hw_utils/boot_img - firmware files
    • hw_utils/ddr_config - ddr binaries
    • hw_utils/bins/ - the hw utils for bringup/programming
    • optimum-ara/ - extension of the Hugging Face library that integrates with Ara240 DNPU
    • scripts - various wrappers around the tools etc
    • nnapp - tool for benchmarking models
    • config - various example yaml config files used for proxy/nnapp
    • include/dvapi.py - python bindings to dvapi
    • driver/uiodma.ko - driver (where the setup script expects to find it)
  • /usr/share/python-wheels - python wheels for fetch_models and optimum_ara
  • /usr/shar/doc/rt-sdk-ara2 - license info
  • /usr/include/sdk_ara - headers for C libs
  • /usr/bin - various scripts
  • /etc/udev/rules.d/99-ara2.rules - udev rule which makes the PCI ID dependent on the systemd service
  • /etc/systemd/system/rt-sdk-ara2.service - systemd service that handles the various hw util config
  • /etc/rt-sdk-ara240/cnn_config.yaml - config for nnapp
  • /etc/rt-sdk-ara240/proxy_config.yam - config for proxy

Notes:

  • This will not program flash - that is a manual step only required if there is an update
  • The 'uv' package manager is a fast all-in-one Python package and project manager written in Rust which makes it easy to work with virtual env's to avoid Python package version clashing which is essential
  • on bootup make sure you wait for the console messages indicating the Proxy is launched before using it as it can take a couple of minutes
  • the binary tools and libs are all currently dynamic linked against stdlibc
  • the GStreamer libs require GStreamer 1.26 or newer

Verification steps:

  1. show chip_info
    chip_info.sh
    
  2. verify service
    # show service status
    systemctl status rt-sdk-ara2.service --no-pager -l 
    # view detailed service logs
    journalctl -u rt-sdk-ara2.service
    # verify proxy is running (critical)
    ps -eaf | grep proxy_ara240
    

Examples:

  • Download pre-compiled models for testing:
    • The fetch_models script from the ara2-rt will fetch models from HuggingFace.
      # list models available for nxp/ara
      fetch_models --list
      # install YOLOv8
      fetch_models --repo-id nxp/YOLOv8 # 746MB (711MiB)
      
    • the 'fetch_models' script is a python wrapper that uses uvx and the fetch-models python wheel (/usr/share/python-wheels/fetch_models-1.0.0-py3-none-any.whl) to fetch and install models from HuggingFace HUB
    • the models will be installed in /usr/share/cnn (Convolutional Neural Network) and /usr/share/llm (Large Language Model)
    • NXP has Ara2 optimized models at https://huggingface.co/nxp
  • Run performance benchmark (uses nnapp)
    run_model_perf.sh
    
    • the 'run_model_perf.sh' script makes it easy to list and show model categories and models and is a wrapper around the nnapp app which has a lot of options and a config file
  • monitor real-time NPU metrics including utilization, temperature, DRAM usage and device state (interactively during benchmarking or model execution)
    ara2_metrics.sh
    

GStreamer plugins

The rt-sdk-ara2 provides a set of gstreamer plugins for inference:

  • dvPre
  • dvInf
  • dvPost

Without more documentation or source for these its likely best to think of them as: dvPre prepares buffers, dvInf hands them off to the NPU and dvPost processes the response.

The dvPre element must have 32bit pixel samples (ie format=BGRA using 4 bytes per pixel, blue, green, red, alpha; alpha byte is completely empty padding data not used for transparency just as a structural spacer), not 24-bit format=RGB (3 bytes one for red, green, blue).

All three elements require the model specified via the 'model' property. If using yolov8x for example you would specify the path to the yolov8x.dvm

For detection models the dvPost element frame data will contain a buffer with number of bytes (32bit) followed by a series of detection structures containing the bounding box, confidence level, and COCO class ID of the object detected.

The units for the bounding box are relative to the models size and will need to be scaled back to your original image size. For example the YOLO models operate on 640x640 pixel data. You can pass something larger in and it will essentially tile but its unclear if there is an advantage of doing that.

The gstreamer plugins are currently provided as binary only shared objects. They are linked against stdlibc (libc.so.6) and libgstreamer-1.0.so.0 and compatible with GStreamer 1.26 or newer.

If you are using a rootfs that does not have GStreamer 1.26 you will need to build it or provide it via virtualization. For example Ubuntu 24.x Noble has GStreamer 1.24, Ubuntu 25.x has GStreamer 1.26 and Ubuntu 26.x Ocelot has GStreamer 1.28. So if you were running Ubuntu Noble you could use distrobox/docker to install GStreamer 1.26 and its dependencies using Ubuntu 25.x.

Examples:

  • Ubuntu noble (24.04):
    • Ubuntu noble has GStreamer 1.24 which is not compatible with the 1.26 plugins
    • one solution could be a GStreamer 1.26 PPA backport but we have not found any
    • one solution is a containerized Ubuntu 25.04 container on Ubuntu 24.04 rootfs:
      apt update && apt install -y distrobox docker.io
      # Create a 25.04 container that can see your hardware
      distrobox create --image ubuntu:25.04 --name gst126  --volume /usr/lib/gstreamer-1.0:/opt/ara2/plugins:ro \
        --volume /usr/lib:/opt/ara2/libs:ro \
        --volume /usr/share/cnn:/usr/share/cnn \
        --volume /usr/share/llm:/usr/share/llm \
        --volume /dev/bus/usb:/dev/bus/usb
      # enter the container to use it
      distrobox enter gst126
      # export vars via ~/.bashrc (exit and enter the distrobox to take effect)
      echo "export GST_PLUGIN_PATH=/opt/ara2/plugins" >> ~/.bashrc
      echo "export LD_LIBRARY_PATH=/opt/ara2/libs:\$LD_LIBRARY_PATH" >> ~/.bashrc
      
      • whenever using the ARA plugins you will need to make sure you do so in the gst126 environment
      • the volume param creates bind mounts between the host and the virtual target
      • you can also always access the host rootfs via /run/host
      • also make sure you install gstreamer and anything that uses it within that virtual environment
      • this uses virtualization, not emulation - there is no performance hit or latency added, its just a different set of executables
      • disk space for the ubuntu 25.04 base above is about 1.54GB
  • Ubuntu 26.04 resolute
    • Ubuntu resolute (26.04) has GStreamer 1.28 which the 1.26 plugins are backwards compatible with
    • gstreamer 1.28 decodebin is picking hardware-accelerated v4l2jpegdec (on Venice) instead of the standard software decoder jpegdec and v4l2jpegdec does not support YUV3 (typical for standard JPEG images) so if using it you will need to take steps to disable it or prefer jpegdec over it. For example you can use GST_PLUGIN_FEATURE_RANK="v4l2jpegdec:NONE" or set the rank at runtime such is done in the detection examples below

Install GStreamer:

apt-get update && apt install -y \
   gstreamer1.0-x \
   gstreamer1.0-tools \
   gstreamer1.0-plugins-base \
   gstreamer1.0-plugins-good \
   gstreamer1.0-plugins-bad \
   gstreamer1.0-plugins-ugly \
   gstreamer1.0-libav \
   v4l-utils
  • this adds about 500MiB of disk space

Specify Plugin path:

# export now to current shell
export GST_PLUGIN_PATH=/usr/lib/gstreamer-1.0/
# put in .bashrc so it happens for any new bash shell
echo "export GST_PLUGIN_PATH=/usr/lib/gstreamer-1.0/" >> ~/.bashrc
  • this tells GStreamer to look for plugins in the non-standard location of the ARA gstreamer plugins

At this point you should be able to inspect the dvPre, dvInf, and dvPost elements:

gst-inspect-1.0 dvPre
gst-inspect-1.0 dvInf
gst-inspect-1.0 dvPost

Detection Examples

Examples:

  • gst-launch pipeline prototyping:
    • enabling debug level 6 on dvPost will show the number of object detections in its debug output but if you want to do anything with that data you need to write an application that can decode frame buffers. Still this is useful for prototyping:
      • perform detection on a v4l2 video device like a webcam:
        DEV=/dev/video2
        MODEL=/usr/share/cnn/detection/yolov8n/model.dvm
        GST_DEBUG="dvPost:6" \
        gst-launch-1.0 -v \
          v4l2src device=$DEV ! \
          video/x-raw,width=640,height=480,framerate=30/1 ! \
          videoconvert ! video/x-raw,format=BGRA ! \
          dvPre model=$MODEL ! \
          dvInf model=$MODEL sock=/var/run/proxy.sock use-shm=false ! \
          dvPost model=$MODEL ! \
          fakesink sync=false | grep Detected
        
      • perform a detection on an image:
        URI=file:///$PWD/traffic.png
        MODEL=/usr/share/cnn/detection/yolov8n/model.dvm
        GST_DEBUG="dvPost:6" \
        GST_PLUGIN_FEATURE_RANK="v4l2jpegdec:NONE" \
        gst-launch-1.0 -v \
          urisourcebin uri=$URI ! decodebin ! \
          videoconvert ! video/x-raw,format=BGRA ! \
          dvPre model=$MODEL ! \
          dvInf model=$MODEL sock=/var/run/proxy.sock use-shm=false ! \
          dvPost model=$MODEL ! \
          fakesink sync=false | grep Detected
        
        • the GST_PLUGIN_FEATURE_RANK is to disable the use of the v4l2jpegdec hardware decode on GStreamer 1.28 as it does not support a compatible format needed by dvPre (jet jpegdec does)
  • Image detection with boxing via Python
    • Python is incredibly useful for accessing GStreamer and handling the ARA detection frame data and imagemagick provides excellent tools for converting and drawing on images:
    • (optional) install lighttpd so that we can easily see our resulting images via a browser
      apt-get install -y lighttpd
      # add configuration for directory listing and mapping of /root to /
      cat << EOF >> /etc/lighttpd/lighttpd.conf
      dir-listing.encoding    = "utf-8"
      server.dir-listing      = "enable"
      
      # directory access
      alias.url += (
              "/root" => "/root",
      )
      EOF
      # make the dir executable
      chmod ugo+x .
      # restart the web server
      /etc/init.d/lighttpd restart
      
    • install imagemagick which we will use to draw named boxes for detections
      apt-get install -y imagemagick
      
    • create a dir for us to work in and create the script
      mkdir image-detect; cd image-detect
      # create python script
      cat <<\EOF > image_detect.py
      #!/usr/bin/env python3
      """
      Ara NPU Multi-Format Universal Image Decoder
      ============================================
      """
      
      import ctypes
      import os
      import sys
      import subprocess
      import gi
      
      gi.require_version('Gst', '1.0')
      from gi.repository import Gst
      
      Gst.init(None)
      
      # Standard COCO Class Mapping for printing human-readable labels
      COCO_CLASSES = {
          0: "person", 1: "bicycle", 2: "car", 3: "motorcycle", 4: "airplane", 5: "bus",
          6: "train", 7: "truck", 8: "boat", 9: "traffic light", 10: "fire hydrant",
          11: "stop sign", 12: "parking meter", 13: "bench", 14: "bird", 15: "cat",
          16: "dog", 17: "horse", 18: "sheep", 19: "cow", 20: "elephant", 21: "bear",
          22: "zebra", 23: "giraffe", 24: "backpack", 25: "umbrella", 26: "handbag",
          27: "tie", 28: "suitcase", 29: "frisbee", 30: "skis", 31: "snowboard",
          32: "sports ball", 33: "kite", 34: "baseball bat", 35: "baseball glove",
          36: "skateboard", 37: "surfboard", 38: "tennis racket", 39: "bottle",
          40: "wine glass", 41: "cup", 42: "fork", 43: "knife", 44: "spoon", 45: "bowl",
          46: "banana", 47: "apple", 48: "sandwich", 49: "orange", 50: "broccoli",
          51: "carrot", 52: "hot dog", 53: "pizza", 54: "donut", 55: "cake",
          56: "chair", 57: "couch", 58: "potted plant", 59: "bed", 60: "dining table",
          61: "toilet", 62: "tv", 63: "laptop", 64: "mouse", 65: "remote", 66: "keyboard",
          67: "cell phone", 68: "microwave", 69: "oven", 70: "toaster", 71: "sink",
          72: "refrigerator", 73: "book", 74: "clock", 75: "vase", 76: "scissors",
          77: "teddy bear", 78: "hair drier", 79: "toothbrush"
      }
      
      class AraDetection(ctypes.Structure):
          _layout_ = "ms"
          _pack_ = 1
          _fields_ = [
              ("xmin", ctypes.c_float), ("ymin", ctypes.c_float),
              ("xmax", ctypes.c_float), ("ymax", ctypes.c_float),
              ("confidence", ctypes.c_float), ("class_id", ctypes.c_int32),
              ("class_name_ptr", ctypes.c_void_p)
          ]
      
      def main():
          if len(sys.argv) < 3:
              print(f"Usage: {sys.argv[0]} <input_image> <output_image> [model]")
              sys.exit(1)
      
          input_image = sys.argv[1]
          output_image = sys.argv[2]
          model = "/usr/share/cnn/detection/yolov8n/model.dvm"
          if len(sys.argv) > 3:
              model = sys.argv[3]
      
          if not os.path.exists(input_image):
              print(f"ERROR: File '{input_image}' could not be located.")
              sys.exit(1)
      
          # Fetch native dimensions using ImageMagick
          try:
              dimensions = subprocess.check_output(f"identify -format '%w %h' {input_image}", shell=True).decode().split()
              w_native, h_native = int(dimensions[0]), int(dimensions[1])
          except Exception as e:
              print(f"ERROR: Failed to read image properties using ImageMagick: {e}")
              sys.exit(1)
         
          # Print target properties cleanly
          print(f"\nmodel: {model}")
          print(f"image: {os.path.basename(input_image)} {w_native}x{h_native}")
      
          MODEL_W, MODEL_H = 640, 640
      
          pipe_str = (
              f"multifilesrc location={input_image} loop=false num-buffers=2 ! decodebin name=d ! "
              f"videoconvert ! videoscale ! video/x-raw,width={MODEL_W},height={MODEL_H} ! "
              f"videoconvert ! video/x-raw,format=BGRA ! "
              f"dvPre model={model} ! "
              f"dvInf model={model} sock=/var/run/proxy.sock use-shm=true shm-path=/dev/shm/ara_inf_ ! "
              f"dvPost model={model} orig-width={MODEL_W} orig-height={MODEL_H} ! "
              f"appsink name=mysink sync=false async=false emit-signals=true"
          )
      
          # Before creating the launcher, adjust the system plugin registry ranking 
          # so GStreamer ignores v4l2jpegdec element (as it doesn't support BGRA output)
          registry = Gst.Registry.get()
          feature = registry.lookup_feature("v4l2jpegdec")
          if feature:
              # Lower its rank to ZERO so decodebin skips over it permanently
              feature.set_rank(0)
      
          pipeline = Gst.parse_launch(pipe_str)
          sink = pipeline.get_by_name("mysink")
          pipeline.set_state(Gst.State.PLAYING)
      
          last_valid_raw_bytes = None
      
          while True:
              sample = sink.emit("pull-sample")
              if not sample:
                  break
              buffer = sample.get_buffer()
              last_valid_raw_bytes = buffer.extract_dup(0, buffer.get_size())
      
          pipeline.set_state(Gst.State.NULL)
          
          processed_detections = []
      
          if last_valid_raw_bytes and len(last_valid_raw_bytes) >= 4:
              num_detections = int.from_bytes(last_valid_raw_bytes[:4], byteorder='little')
              
              if 0 < num_detections < 1000:
                  print(f"DETECTIONS LOGGED: FOUND {num_detections} ACTIVE OBJECTS")
                  print("-" * 70)
                  
                  offset = 4
                  ds = ctypes.sizeof(AraDetection)
                  
                  for i in range(num_detections):
                      if offset + ds > len(last_valid_raw_bytes): break
                      det = AraDetection.from_buffer_copy(last_valid_raw_bytes[offset:offset+ds])
                      offset += ds
                      
                          # Compute native image coordinate translation mapping
                      x1_mapped = det.xmin * (w_native / MODEL_W)
                      x2_mapped = det.xmax * (w_native / MODEL_W)
                      y1_mapped = det.ymin * (h_native / MODEL_H)
                      y2_mapped = det.ymax * (h_native / MODEL_H)
                      
                      coco_name = COCO_CLASSES.get(det.class_id, "unknown")
                      
                      print(f"Object {i+1}: ID={det.class_id} | Name={coco_name} | Confidence={det.confidence * 100:.1f}%")
                      print(f"          Bounding Box -> [{int(x1_mapped)}, {int(y1_mapped)}] to [{int(x2_mapped)}, {int(y2_mapped)}]")
                      print("-" * 70)
                      
                      processed_detections.append((coco_name, det.confidence, x1_mapped, y1_mapped, x2_mapped, y2_mapped))
      
          # Render final multi-object annotated canvas
          if processed_detections:
              cmd_args = [f"convert {input_image}"]
              for coco_name, conf, x1, y1, x2, y2 in processed_detections:
                  ix1, iy1, ix2, iy2 = int(x1), int(y1), int(x2), int(y2)
                  label = f"{coco_name} {conf*100:.1f}%"
                  cmd_args.append(f'-stroke green -strokewidth 2 -fill none -draw "rectangle {ix1},{iy1} {ix2},{iy2}"')
                  cmd_args.append(f'-stroke none -fill white -pointsize 16 -annotate +{ix1}+{iy1 - 6} "{label}"')
                  
              cmd_args.append(output_image)
              draw_cmd = " ".join(cmd_args)
              
              try:
                  subprocess.run(draw_cmd, shell=True, check=True)
                  print(f"SUCCESS: Mapped all boxes and text labels onto -> '{output_image}'\n")
              except subprocess.CalledProcessError:
                  print("ERROR: ImageMagick rendering execution failed.\n")
          else:
              print("INFO: No operational object targets were captured by the NPU context.\n")
      
      if __name__ == '__main__':
          main()
      EOF
      
    • The script using PyGObject which is a Python package that provides bindings for libraries based on GObject Introspection such as GTK, WebKit, and GStreamer. It allows you to use C-based frameworks in python. We need to install the C libs for GSTreamer for this:
      apt-get install -y \
        libcairo2-dev \
        libgirepository-2.0-dev \
        python3-dev \
        python3-gst-1.0 \
        cmake pkg-config
      # we are also going to need to install gstreamer and its dev packages
      apt-get install -y \
        libgstreamer1.0-dev \
        libgstreamer-plugins-base1.0-dev \
        libgstreamer-plugins-bad1.0-dev \
        gstreamer1.0-plugins-base \
        gstreamer1.0-plugins-good \
        gstreamer1.0-plugins-bad \
        gstreamer1.0-plugins-ugly \
        gstreamer1.0-libav \
        gstreamer1.0-tools
      
    • create a python virtual env (always a good idea to keep python dependencies containerized) and install python libs we need:
      # create a venv (.venv)
      uv venv
      # install our scripts dependencies
      uv pip install pygobject
      
    • (optional) fetch some images for detection
      # fetch a coco validation image; it contains a dog on a bench and the dog is at 208,147 to 293,289
      wget http://images.cocodataset.org/val2017/000000546829.jpg -O dog.jpg
      # use ffmpeg to grab a frame from within an MP4
      apt install -y ffmpeg
      ffmpeg -i /usr/share/ara2-vision-examples/sample_videos/video_0.mp4 -f null - # shows how lon git is (time=00:00:15.50)
      ffmpeg -i /usr/share/ara2-vision-examples/sample_videos/video_0.mp4 -ss 00:00:5 -frames:v 1 traffic.png
      
    • run the script (image_detect.py <source-image> <destination-image> [model-path])
      uv run image_detect.py dog.jpg coco_detections.jpg
      
      • Note that without shm the pipeline needs to copy the raw image bytes over a local network-style socket connection. By mounting a dedicated memory path to /dev/shm you can eliminate that transfer (zero-copy): dvPre dumps the processed directly into a designated block of system RAM and dvInf uses a pointer to it
      • you would think that if your original image was 1080x1920 and you resized it to the model size of 640x640 that if you tell dvPost the orig-width=1080 orig-height=1920 that it would scale the bounding boxes properly however in practice it seems it does not unless your image has the same aspect ratio of the model. mapping it as above (telling dvPost that the image is 640x640 and scaling ourselves) resolves this
  • Video detection with boxing via Python in a headless webapp
    • Python is incredibly useful for accessing GStreamer and handling the ARA detection frame data and building webapps
    • The script using PyGObject which is a Python package that provides bindings for libraries based on GObject Introspection such as GTK, WebKit, and GStreamer. It allows you to use C-based frameworks in python. We need to install the C libs for GSTreamer for this:
      apt-get install -y \
        libcairo2-dev \
        libgirepository-2.0-dev \
        python3-dev \
        python3-gst-1.0 \
        cmake pkg-config
      # we are also going to need to install gstreamer and its dev packages
      apt-get install -y \
        libgstreamer1.0-dev \
        libgstreamer-plugins-base1.0-dev \
        libgstreamer-plugins-bad1.0-dev \
        gstreamer1.0-plugins-base \
        gstreamer1.0-plugins-good \
        gstreamer1.0-plugins-bad \
        gstreamer1.0-plugins-ugly \
        gstreamer1.0-libav \
        gstreamer1.0-tools
      
    • create a python virtual env (always a good idea to keep python dependencies containerized) and install python libs we need:
      # create a venv (.venv)
      uv venv
      # install our scripts dependencies
      uv pip install pygobject opencv-python-headless
      cat << EOF > vision-webapp.py
      #!/usr/bin/env python3
      """
      Ara NPU Basic Video Stream & Inference Hub
      ==========================================
      """
      
      import argparse
      import ctypes
      import glob
      import os
      import sys
      import threading
      import time
      import logging
      import cv2
      import numpy as np
      from flask import Flask, Response, jsonify, request, render_template_string
      import gi
      
      gi.require_version('Gst', '1.0')
      from gi.repository import Gst
      Gst.init(None)
      
      # Quiet down Werkzeug HTTP traffic logging to suppress 1Hz AJAX console pollution
      log = logging.getLogger('werkzeug')
      log.setLevel(logging.ERROR)
      
      app = Flask(__name__)
      lock = threading.Lock()
      
      class AraDetection(ctypes.Structure):
          _pack_ = 1
          _fields_ = [
              ("xmin", ctypes.c_float), ("ymin", ctypes.c_float),
              ("xmax", ctypes.c_float), ("ymax", ctypes.c_float),
              ("confidence", ctypes.c_float), ("class_id", ctypes.c_int32),
              ("class_name_ptr", ctypes.c_void_p)
          ]
      
      # --- STATE STORAGE ---
      STATE_REPO = {
          "frame": None,
          "detections": [],
          "active_source": None,
          "active_model_name": "yolov8n",
          "active_model_path": "/usr/share/cnn/detection/yolov8n/model.dvm",
          "restart_flag": False,
          "source_registry": [],
          "model_registry": ["yolov8n"],
          
          # Target Pipeline Resolutions
          "CANVAS_W": 640,
          "CANVAS_H": 360,
          "MODEL_W": 640,
          "MODEL_H": 640,
          
          # Live Telemetry Metrics
          "native_w": 0,
          "native_h": 0,
          "stream_w": 0,
          "stream_h": 0,
          "inference_fps": 0.0
      }
      
      # FPS Calculation variables bound directly to the Inference thread
      inference_timestamps = []
      
      COCO_LABELS = {
          0: 'person', 1: 'bicycle', 2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus',
          6: 'train', 7: 'truck', 8: 'boat', 9: 'traffic light', 10: 'fire hydrant',
          11: 'stop sign', 12: 'parking meter', 13: 'bench', 14: 'bird', 15: 'cat',
          16: 'dog', 17: 'horse', 18: 'sheep', 19: 'cow', 20: 'elephant', 21: 'bear',
          22: 'zebra', 23: 'giraffe', 24: 'backpack', 25: 'umbrella', 26: 'handbag',
          27: 'tie', 28: 'suitcase', 29: 'frisbee', 30: 'skis', 31: 'snowboard',
          32: 'sports ball', 33: 'kite', 34: 'baseball bat', 35: 'baseball glove',
          36: 'skateboard', 37: 'surfboard', 38: 'tennis racket', 39: 'bottle',
          40: 'wine glass', 41: 'cup', 42: 'fork', 43: 'knife', 44: 'spoon', 45: 'bowl',
          46: 'banana', 47: 'apple', 48: 'sandwich', 49: 'orange', 50: 'broccoli',
          51: 'carrot', 52: 'hot dog', 53: 'pizza', 54: 'donut', 55: 'cake',
          56: 'chair', 57: 'couch', 58: 'potted plant', 59: 'bed', 60: 'dining table',
          61: 'toilet', 62: 'tv', 63: 'laptop', 64: 'mouse', 65: 'remote', 66: 'keyboard',
          67: 'cell phone', 68: 'microwave', 69: 'oven', 70: 'toaster', 71: 'sink',
          72: 'refrigerator', 73: 'book', 74: 'clock', 75: 'vase', 76: 'scissors',
          77: 'teddy bear', 78: 'hair drier', 79: 'toothbrush'
      }
      
      def build_source_injection_string(source_path):
          if source_path.endswith(".mp4"):
              return f"filesrc location={source_path} ! decodebin ! videoconvert ! tee name=t "
          else:
              return f"v4l2src device={source_path} ! videoconvert ! tee name=t "
      
      def gstreamer_orchestration_loop():
          global inference_timestamps
          CANVAS_W = STATE_REPO["CANVAS_W"]
          CANVAS_H = STATE_REPO["CANVAS_H"]
          MODEL_W = STATE_REPO["MODEL_W"]
          MODEL_H = STATE_REPO["MODEL_H"]
      
          while True:
              while STATE_REPO["active_source"] is None:
                  time.sleep(0.2)
                  if STATE_REPO["restart_flag"]:
                      break
      
              current_target_source = STATE_REPO["active_source"]
              current_target_model = STATE_REPO["active_model_path"]
              STATE_REPO["restart_flag"] = False
              
              if current_target_source is None:
                  continue
                  
              source_segment = build_source_injection_string(current_target_source)
              
              pipe_str = (
                  f"{source_segment} "
                  f"t. ! queue max-size-buffers=2 leaky=downstream ! appsink name=nativesink sync=false async=false emit-signals=true "
                  f"t. ! queue max-size-buffers=2 leaky=downstream ! videoscale ! video/x-raw,width={CANVAS_W},height={CANVAS_H} ! videoconvert ! video/x-raw,format=BGR ! appsink name=framesink sync=false async=false emit-signals=true "
                  f"t. ! queue max-size-buffers=2 leaky=downstream ! "
                  f"videoscale ! video/x-raw,width={MODEL_W},height={MODEL_H} ! videoconvert ! video/x-raw,format=BGRA ! "
                  f"dvPre model={current_target_model} ! "
                  f"dvInf model={current_target_model} sock=/var/run/proxy.sock use-shm=true shm-path=/dev/shm/ara_inf_ ! "
                  f"dvPost model={current_target_model} orig-width={MODEL_W} orig-height={MODEL_H} ! "
                  f"appsink name=postsink sync=false async=false emit-signals=true"
              )
      
              print(f"[LAUNCH PIPELINE]\n   {pipe_str}\n")
              pipeline = Gst.parse_launch(pipe_str)
              
              native_sink = pipeline.get_by_name("nativesink")
              frame_sink = pipeline.get_by_name("framesink")
              post_sink = pipeline.get_by_name("postsink")
      
              def on_native_caps(sink):
                  sample = sink.emit("pull-sample")
                  if sample:
                      caps = sample.get_caps()
                      struct = caps.get_structure(0)
                      STATE_REPO["native_w"] = struct.get_value("width")
                      STATE_REPO["native_h"] = struct.get_value("height")
                  return Gst.FlowReturn.OK
      
              def on_new_detection(sink):
                  global inference_timestamps
                  sample = sink.emit("pull-sample")
                  if sample:
                      # Calculate FPS derived purely from the inference hardware return loop
                      now = time.time()
                      inference_timestamps.append(now)
                      if len(inference_timestamps) > 30:
                          inference_timestamps.pop(0)
                      if len(inference_timestamps) > 1:
                          STATE_REPO["inference_fps"] = len(inference_timestamps) / (inference_timestamps[-1] - inference_timestamps[0])
      
                      buffer = sample.get_buffer()
                      raw_bytes = buffer.extract_dup(0, buffer.get_size())
                      if raw_bytes and len(raw_bytes) >= 4:
                          num_detections = np.frombuffer(raw_bytes[:4], dtype=np.uint32)[0]
                          local_dets = []
                          offset = 4
                          ds = ctypes.sizeof(AraDetection)
                          for _ in range(num_detections):
                              if offset + ds > len(raw_bytes): break
                              det = AraDetection.from_buffer_copy(raw_bytes[offset:offset+ds])
                              offset += ds
                              local_dets.append((det.class_id, det.confidence, det.xmin, det.ymin, det.xmax, det.ymax))
                          STATE_REPO["detections"] = local_dets
                  return Gst.FlowReturn.OK
      
              def on_new_frame(sink):
                  sample = sink.emit("pull-sample")
                  if sample:
                      buffer = sample.get_buffer()
                      caps = sample.get_caps()
                      struct = caps.get_structure(0)
                      w = struct.get_value("width")
                      h = struct.get_value("height")
                      
                      STATE_REPO["stream_w"] = w
                      STATE_REPO["stream_h"] = h
                      
                      raw_bytes = buffer.extract_dup(0, buffer.get_size())
                      if raw_bytes:
                          try:
                              frame_flat = np.frombuffer(raw_bytes, dtype=np.uint8)
                              frame_arr = frame_flat.reshape((h, w, 3))
                              STATE_REPO["frame"] = frame_arr.copy()
                          except ValueError:
                              pass
                  return Gst.FlowReturn.OK
      
              native_sink.connect("new-sample", on_native_caps)
              post_sink.connect("new-sample", on_new_detection)
              frame_sink.connect("new-sample", on_new_frame)
              pipeline.set_state(Gst.State.PLAYING)
      
              bus = pipeline.get_bus()
              while True:
                  msg = bus.timed_pop_filtered(Gst.SECOND * 0.05, Gst.MessageType.ERROR | Gst.MessageType.EOS)
                  if msg:
                      if msg.type == Gst.MessageType.EOS and current_target_source.endswith(".mp4"):
                          pipeline.seek_simple(Gst.Format.TIME, Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT, 0)
                          continue
                      break
                  
                  if STATE_REPO["restart_flag"]:
                      break
              
              pipeline.set_state(Gst.State.NULL)
              STATE_REPO["frame"] = None
              STATE_REPO["detections"] = []
              STATE_REPO["native_w"] = 0
              STATE_REPO["native_h"] = 0
              STATE_REPO["stream_w"] = 0
              STATE_REPO["stream_h"] = 0
              STATE_REPO["inference_fps"] = 0.0
              inference_timestamps = []
              time.sleep(1.0)
      
      @app.route('/')
      def index():
          src_active = STATE_REPO["active_source"]
          
          if not STATE_REPO["source_registry"]:
              src_html = '<option value="" disabled selected>-- NO VALID INPUT SOURCES AVAILABLE --</option>'
          else:
              src_html = '<option value="" disabled selected>-- SELECT TARGET SOURCE CHANNEL --</option>' if src_active is None else ""
              src_html += "".join(f'<option value="{s}" {"selected" if s == src_active else ""}>{s}</option>' for s in STATE_REPO["source_registry"])
          
          mdl_active = STATE_REPO["active_model_name"]
          mdl_html = "".join(f'<option value="{m}" {"selected" if m == mdl_active else ""}>{m}</option>' for m in STATE_REPO["model_registry"])
      
          html_template = """<!DOCTYPE html>
          <html>
          <head>
              <title>Ara Stream Client</title>
              <style>
                  body { font-family: sans-serif; background: #0c0c0e; color: #e1e1e6; margin: 0; padding: 20px; display: flex; flex-direction: column; align-items: center; }
                  .dashboard-layout { display: flex; flex-direction: column; gap: 15px; width: 660px; }
                  .panel { background: #121216; padding: 12px 15px; border-radius: 6px; border: 1px solid #1f1f24; display: flex; flex-direction: column; gap: 10px; }
                  .control-row { display: flex; align-items: center; justify-content: space-between; }
                  label { font-size: 12px; font-weight: bold; color: #8f8f9d; text-transform: uppercase; }
                  select { background: #0c0c0e; color: #fff; border: 1px solid #04d361; padding: 6px 10px; border-radius: 4px; width: 420px; outline: none; }
                  .stats-banner { display: flex; justify-content: space-between; background: #17171f; padding: 10px 15px; border: 1px solid #1f1f24; border-radius: 4px; font-family: monospace; font-size: 13px; color: #8f8f9d; }
                  .stats-banner span strong { color: #04d361; }
                  .media-container { background: #121216; padding: 8px; border-radius: 6px; border: 1px solid #1f1f24; position: relative; min-height: 480px; display: flex; align-items: center; justify-content: center; }
                  img { display: block; border-radius: 4px; width: 100%; height: auto; }
                  .overlay { position: absolute; top: 0; left: 0; width: 100%; height: 100%; background: rgba(12,12,14,0.9); display: flex; flex-direction: column; align-items: center; justify-content: center; border-radius: 6px; text-align: center; }
                  .prompt-text { color: #04d361; font-weight: bold; font-size: 16px; margin-bottom: 10px; }
              </style>
              <script>
                  let streamStarted = {% if active_src %}true{% else %}false{% endif %};
                  
                  async function switchConfig() {
                      const src = document.getElementById('source-picker').value;
                      const mdl = document.getElementById('model-picker').value;
                      if(!src) return;
                      
                      await fetch('/api/swap_config', {
                          method: 'POST',
                          headers: { 'Content-Type': 'application/json' },
                          body: JSON.stringify({ "source": src, "model": mdl })
                      });
                      
                      streamStarted = true;
                      document.getElementById('gatekeeper-overlay').style.display = 'none';
                      setTimeout(() => {
                          document.getElementById('stream-player').src = '/stream.mjpg';
                      }, 1000);
                  }
      
                  async function updateStreamMetrics() {
                      if (!streamStarted) return;
                      try {
                          const response = await fetch('/api/stream_info');
                          const data = await response.json();
                          
                          document.getElementById('metric-res').innerText = 'Source:' + data.native_w + 'x' + data.native_h + ' Canvas:' + data.width + 'x' + data.height;
                          document.getElementById('metric-fps').innerText = data.fps.toFixed(1);
                          document.getElementById('metric-dets').innerText = data.detections;
                      } catch (err) {}
                  }
                  setInterval(updateStreamMetrics, 1000);
              </script>
          </head>
          <body>
              <h2>Ara Vision Engine</h2>
              <div class="dashboard-layout">
                  <div class="panel">
                      <div class="control-row">
                          <label for="source-picker">Media Stream Target:</label>
                          <select id="source-picker" onchange="switchConfig()">""" + src_html + """</select>
                      </div>
                      <div class="control-row">
                          <label for="model-picker">NPU Pipeline Model:</label>
                          <select id="model-picker" onchange="switchConfig()">""" + mdl_html + """</select>
                      </div>
                  </div>
      
                  <div class="stats-banner">
                      <span id="metric-res">Source:0x0 Canvas:0x0</span>
                      <span>NPU Inference: <span id="metric-fps">0.0</span> FPS</span>
                      <span>Active Detections: <span id="metric-dets">0</span></span>
                  </div>
      
                  <div class="media-container">
                      {% if not active_src %}
                      <div class="overlay" id="gatekeeper-overlay">
                          <div class="prompt-text">Awaiting Source Context</div>
                          <div style="color: #8f8f9d; font-size: 13px; max-width: 400px;">Please select a media path and model from the drop-downs above to mount your pipeline.</div>
                      </div>
                      {% endif %}
                      <img id="stream-player" {% if active_src %}src="/stream.mjpg"{% endif %} style="max-width: """ + str(STATE_REPO["CANVAS_W"]) + """px;" />
                  </div>
              </div>
          </body>
          </html>"""
          return render_template_string(html_template, active_src=src_active)
      
      @app.route('/api/stream_info')
      def stream_info():
          with lock:
              return jsonify({
                  "native_w": STATE_REPO["native_w"],
                  "native_h": STATE_REPO["native_h"],
                  "width": STATE_REPO["stream_w"],
                  "height": STATE_REPO["stream_h"],
                  "fps": STATE_REPO["inference_fps"],
                  "detections": len(STATE_REPO["detections"])
              })
      
      @app.route('/api/swap_config', methods=['POST'])
      def swap_config():
          payload = request.get_json()
          src_selected = payload.get("source")
          mdl_selected = payload.get("model")
          
          with lock:
              trigger_restart = False
              if src_selected in STATE_REPO["source_registry"] and STATE_REPO["active_source"] != src_selected:
                  STATE_REPO["active_source"] = src_selected
                  trigger_restart = True
              if mdl_selected in STATE_REPO["model_registry"] and STATE_REPO["active_model_name"] != mdl_selected:
                  base_dir = app.config["MODEL_DIR"]
                  STATE_REPO["active_model_name"] = mdl_selected
                  STATE_REPO["active_model_path"] = os.path.join(base_dir, mdl_selected, "model.dvm")
                  trigger_restart = True
              if trigger_restart:
                  STATE_REPO["restart_flag"] = True
          return jsonify({"status": "success"})
      
      def generate_mjpeg_stream_generator():
          MODEL_W = float(STATE_REPO["MODEL_W"])
          MODEL_H = float(STATE_REPO["MODEL_H"])
          
          while True:
              time.sleep(0.04)
              frame_copy = STATE_REPO["frame"]
              local_dets = list(STATE_REPO["detections"])
              if frame_copy is not None:
                  frame = frame_copy.copy()
                  h_native, w_native, _ = frame_copy.shape
                  for class_id, confidence, rx1, ry1, rx2, ry2 in local_dets:
                      cx1 = int(rx1 * (float(w_native) / MODEL_W))
                      cx2 = int(rx2 * (float(w_native) / MODEL_W))
                      cy1 = int(ry1 * (float(h_native) / MODEL_H))
                      cy2 = int(ry2 * (float(h_native) / MODEL_H))
                      label = f"{COCO_LABELS.get(class_id, f'Class {class_id}')} ({confidence*100:.1f}%)"
                      cv2.rectangle(frame, (cx1, cy1), (cx2, cy2), (0, 255, 97), 2)
                      cv2.putText(frame, label, (cx1, max(15, cy1 - 5)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 97), 2)
                  _, encoded_img = cv2.imencode(".jpg", frame)
                  yield (b'--frame\r\n'
                         b'Content-Type: image/jpeg\r\n\r\n' + encoded_img.tobytes() + b'\r\n')
              else:
                  waiting_canvas = np.zeros((480, 640, 3), dtype=np.uint8)
                  cv2.putText(waiting_canvas, "AWAITING MEDIA INPUT SELECTION...", (140, 240), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 97), 1)
                  _, encoded_img = cv2.imencode(".jpg", waiting_canvas)
                  yield (b'--frame\r\n'
                         b'Content-Type: image/jpeg\r\n\r\n' + encoded_img.tobytes() + b'\r\n')
      
      @app.route('/stream.mjpg')
      def video_feed_stream_route():
          return Response(generate_mjpeg_stream_generator(), mimetype='multipart/x-mixed-replace; boundary=frame')
      
      def main():
          parser = argparse.ArgumentParser(description="Wiki Template: Ara Flask Video Engine")
          parser.add_argument("--camera", default=None, help="Camera context device node path")
          parser.add_argument("--mp4", default=None, help="Directory containing target mp4 sample videos")
          parser.add_argument("--port", type=int, default=8080, help="Target port mapping")
          parser.add_argument("--model-dir", default="/usr/share/cnn/detection", help="Directory containing target models")
          parser.add_argument("--model", default="yolov8n", help="Initial model selection")
          args = parser.parse_args()
      
          app.config["MODEL_DIR"] = args.model_dir
          STATE_REPO["source_registry"] = []
      
          if args.camera and os.path.exists(args.camera):
              STATE_REPO["source_registry"].append(args.camera)
      
          if args.mp4 and os.path.exists(args.mp4):
              local_videos = glob.glob(os.path.join(args.mp4, "*.mp4"))
              for vid in sorted(local_videos):
                  STATE_REPO["source_registry"].append(vid)
      
          if os.path.exists(args.model_dir):
              discovered_models = []
              for entry in sorted(os.listdir(args.model_dir)):
                  full_subdir = os.path.join(args.model_dir, entry)
                  if os.path.isdir(full_subdir) and os.path.exists(os.path.join(full_subdir, "model.dvm")):
                      discovered_models.append(entry)
              if discovered_models:
                  STATE_REPO["model_registry"] = discovered_models
                  STATE_REPO["active_model_name"] = args.model if args.model in discovered_models else discovered_models[0]
                  STATE_REPO["active_model_path"] = os.path.join(args.model_dir, STATE_REPO["active_model_name"], "model.dvm")
      
          threading.Thread(target=gstreamer_orchestration_loop, daemon=True).start()
      
          print(f"Server serving on: http://localhost:{args.port}/")
          app.run(host='0.0.0.0', port=args.port, threaded=True, use_reloader=False, debug=False)
      
      if __name__ == '__main__':
          main()
      EOF
      
    • run the script (vison-webapp.py [--port <portno>] [--camera <camera-dev>] [--mp4 <mp4-dir>]
      uv run vision-webapp.py --camera /dev/video_webcam --mp4 /usr/share/ara2-vision-examples/sample_videos/
      
    • you can provide a webcam device path to enable streaming from a webcam and/or an mp4 directory to enable processing those. A dropdown will allow you to select the input stream and the model and the browser window will show you detections and statistics

eIQ AAF Connector

The eIQ AAF Connector (edge Intelligence Ara Application Framework) is a REST-based server that enables LLM inference on NXP i.MX processors with the ARA-240 DNPU. The API implemented is the de-facto API standard created by OpenAI for ChatGPT. It provides a simple Chat Completions-based HTTP interface for serving models to client applications.

Requirements:

  • python 3.13 (we will install in a virtual env)
  • uv - used for the user-specific Python virtual environment
  • Optimum Ara framework for running Large Language Models (LLMs) and Vision-Language Models (VLMs) on Ara240 (part of rt-sdk)
  • OpenCV (dependency of the QwenVL engine)
  • Models

Installation on a Gateworks board with Ubuntu based OS:

  • extract the debian 'data' (do not install the package!)
    # extract data (but don't install)
    dpkg-deb --vextract eiq-aaf-connector_2.0.deb /
    
  • take care of postinst steps
    1. Create the /usr/share/eiq/aaf-connector/venv (used by /usr/share/eiq/aaf-connector/venv/bin/connector)
      # needs python 3.13 so we will install it in a virtual env for this user
      uv python install 3.13
      uv venv --python 3.13 "/usr/share/eiq/aaf-connector/venv"
      # activate venv
      source "/usr/share/eiq/aaf-connector/venv/bin/activate"
      # install Python dependencies in venv from the Optimum Ara wheel
      uv pip install --no-progress /usr/share/python-wheels/optimum_ara-2.0.0.2-py3-none-any.whl
      # install Python dependencies in venv from the eIQ wheel in this package
      uv pip install --no-progress /usr/share/python-wheels/eiq_aaf_connector-2.0.0-py3-none-any.whl
      # ditch the default opencv-python which depends on libgl1-mesa and install the headless version instead
      uv pip uninstall opencv-python
      uv pip install opencv-python-headless
      # deactivate venv
      deactivate
      
    2. Create systemd service file (not sure why this wasn't in the deb)
      cat > /etc/systemd/system/eiq-aaf-connector.service << EOF
      [Unit]
      Description=eIQ AAF Connector Service
      # No 'After' or 'Wants' for rt-sdk-ara2.service here
      # This prevents the 'Ordering Cycle' entirely
      After=network.target
      StartLimitIntervalSec=0
      
      [Service]
      Type=simple
      User=root
      WorkingDirectory=/usr/share/eiq/aaf-connector
      
      # This loop now handles the dependency logic internally.
      # It will spin until the proxy is actually alive, regardless of 
      # which service started it or when.
      ExecStartPre=/bin/bash -c 'until ss -Hltn | grep -E -q ":5000([[:space:]]|$)"; do echo "Waiting for ARA2 Proxy to initialize..." >&2; sleep 5; done'
      ExecStartPre=/bin/sleep 2
      
      ExecStart=/usr/share/eiq/aaf-connector/venv/bin/connector --host 0.0.0.0 --port 8000
      
      Restart=on-failure
      RestartSec=10s
      StartLimitBurst=0
      
      StandardOutput=journal
      StandardError=journal
      
      [Install]
      WantedBy=multi-user.target
      EOF
      
    • this one differs from the one in the deb's postinst script as I found that one to not work (it would not wait for the proxy to be alive)
    • If you wish this to be accessible from the Network set the host to '0.0.0.0' instead of '127.0.0.1':
      sed -i 's|--host 127.0.0.1|--host 0.0.0.0|g' /etc/systemd/system/eiq-aaf-connector.service
      
    1. add Ara2 optimized LLM models (these get installed to /usr/share/llm)
      fetch_models --repo-id nxp/Qwen2.5-7B-Instruct-Ara240 # 7.7GiB
      fetch_models --repo-id nxp/Qwen2.5-Coder-1.5B-Ara240 # 1.67GiB
      
    2. edit the config file to enable the two models we just downloaded (using jq):
      apt update && apt install -y jq
      jq '(.available_models[] | select(.name == "Qwen2.5-Coder-1.5B") |  .enabled) = true' /usr/share/eiq/aaf-connector/server_config.json > /tmp/config.json && \
        mv /tmp/config.json /usr/share/eiq/aaf-connector/server_config.json
      jq '(.available_models[] | select(.name == "Qwen2.5-7B-Instruct") |  .enabled) = true' /usr/share/eiq/aaf-connector/server_config.json > /tmp/config.json && \
        mv /tmp/config.json /usr/share/eiq/aaf-connector/server_config.json
      
    • you can just as easily edit the file manually if you want
    1. Enable and start service
      # Enable service on boot
      systemctl enable eiq-aaf-connector.service
      # Start the service now (or reboot)
      systemctl start eiq-aaf-connector.service
      

Note that it takes several minutes for the service to actually be ready for connections as it must process the models (monitor with 'journalctl -u eiq-aaf-connector.service --no-pager -f' and test that its ready for listening with 'ss -tulpn | grep :8000').

By default, the connector configured above will start on 127.0.0.1:8000 which is the local loopback interface. To be able to run requests from another device, you can change the host to '0.0.0.0' in the service file.

Notable Files:

  • /usr/share/eiq/aaf-connector/server_config.json (server config file)
  • /usr/share/python-wheels/eiq_aaf_connector-2.0.0-py3-none-any.whl - Python wheel
  • /usr/bin/aaf-connector - shell script that activates the venv and executes the connector
  • /usr/share/eiq/aaf-connector/venv - Python virtual env used by connector
  • /etc/systemd/system/eiq-aaf-connector.service - systemd service

The connector self-hosts API documentation at http://<serverip>:8000/docs

Example Usage:

  • verify connector running
    # show service status
    systemctl status eiq-aaf-connector.service --no-pager -l
    # view detailed service logs
    journalctl -u eiq-aaf-connector.service
    # verify process exists
    ps -ef | grep aaf-connector
    # verify port open
    ss -tulpn | grep :8000 # show IP:PORT server is listening on
    
  • view API docs and interact with server (requires changing the host to '0.0.0.0' in the ExecStart config for /etc/systemd/system/eiq-aaf-connector.service by opening http://<serverip>:8000/docs
  • use API via curl/jq
    # make sure curl and jq are installed (jq allows easy interaction with json data)
    apt install -y curl jq 
    # list of models
    curl -X 'GET' \
      'http://127.0.0.1:8000/v1/models' \
      -H 'accept: application/json' | jq
    # get info about a specific model (Qwen2.5-7B-Instruct)
    curl -X 'GET' \
      'http://127.0.0.1:8000/params/Qwen2.5-7B-Instruct' \
      -H 'accept: application/json' | jq
    # send a LLM query
    curl -X POST http://127.0.0.1:8000/v1/chat/completions -H "Content-Type: application/json" -d '{
      "model": "Qwen2.5-7B-Instruct",
      "messages": [
        {"role": "system", "content": "You are a helpful assistant running on NXP i.MX hardware."},
        {"role": "user", "content": "Explain what an NPU is in one sentence."}
      ],
      "max_tokens": 50
    }' | jq
    
  • run connector by hand (useful for troubleshooting or monitoring)
    systemctl stop eiq-aaf-connector.service
    source "/usr/share/eiq/aaf-connector/venv/bin/activate"
    connector --host 0.0.0.0 --port 8000 # will run until stopped
    deactivate
    

Ara2 SDK examples

Here are some Ara2 SDK examples that were 'vibe coded' within minutes

dvapi stats

This is an ANSI c app that provides an example of using the dvapi to connect to the proxy and obtain NPU endpoint stats such as temperature, clocks and usage. Basically it's a re-implementation of the closed source /usr/share/rt-sdk-ara240/scripts/ara2_metrics_bin/hw_metrics.out.

ara_status.c:

#include <stdio.h>
#include <stdlib.h>
#include "dvapi.h"

int main() {
    dv_session_t *session = NULL;
    dv_endpoint_t *ep_list = NULL;
    int ep_count = 0;
    dv_status_code_t status;
    const char *socket_path = "/run/proxy.sock"; 

    // 1. Establish session
    status = dv_session_create_via_unix_socket(socket_path, &session);
    if (status != DV_SUCCESS) {
        fprintf(stderr, "Failed to connect: %s\n", dv_stringify_status_code(status));
        return 1;
    }

    // 2. Get list of NPU endpoints
    dv_endpoint_get_list(session, &ep_list, &ep_count);

    for (int i = 0; i < ep_count; i++) {
        dv_endpoint_t *ep = &ep_list[i];
        dv_endpoint_statistics_t *stats = NULL;
        int s_count = 0;
        bool is_busy = false;

        // 3. Retrieve status and statistics
        dv_get_endpoint_busyness(session, ep, &is_busy);
        status = dv_endpoint_get_statistics(session, ep, &stats, &s_count);

        if (status == DV_SUCCESS && s_count > 0) {
            // DRAM Calculations (Bytes to GB)
            double used_gb = (double)stats->ep_dram_stats.ep_total_dram_occupancy_size / 1073741824.0;
            double total_gb = (double)stats->ep_dram_stats.ep_total_dram_size / 1073741824.0;
            double dram_pct = (total_gb > 0) ? (used_gb / total_gb) * 100.0 : 0.0;

            // NPU Utilization (Queue occupancy)
            double npu_load = 0.0;
            if (stats->ep_infq_stats && stats->ep_infq_stats->length > 0) {
                npu_load = ((double)stats->ep_infq_stats->occupancy_count / stats->ep_infq_stats->length) * 100.0;
            }

            printf("--- NPU Endpoint %d Statistics ---\n", i);
            printf("Busy State:       %s\n", is_busy ? "TRUE" : "FALSE");
            printf("NPU Utilization:  %.1f%%\n", npu_load);
            printf("Temperature:      %.1f C\n", stats->ep_temp);
            printf("NNP Clock:        %d MHz\n", stats->ep_nnp_clk);
            printf("SBP Clock:        %d MHz\n", stats->ep_sbp_clk);
            printf("DRAM Clock:       %d MHz\n", stats->ep_dram_clk);
            
            // Format: DRAM Usage: 8.2GB/16.0GB (51.3%)
            printf("DRAM Usage:       %.1fGB/%.1fGB (%.1f%%)\n", used_gb, total_gb, dram_pct);
            printf("\n");

            dv_endpoint_free_statistics(stats, s_count);
        }
    }

    // 4. Cleanup
    dv_endpoint_free_group(ep_list);
    dv_session_close(session);
    return 0;
}

Compile:

apt update && apt install build-essentials
gcc ara_status.c -I/usr/include/sdk_ara/ -L/usr/lib/ -laraclient_aarch64 -o ara_status

Execution:

# ./ara_status
--- NPU Endpoint 0 Statistics ---
Busy State:       FALSE
NPU Utilization:  0.0%
Temperature:      56.0 C
NNP Clock:        900 MHz
SBP Clock:        355 MHz
DRAM Clock:       1066 MHz
DRAM Usage:       10.0GB/16.0GB (62.5%)

command-line python eIQ chatbot

This is a command-line chatbot written in python using the eIQ AAF Connector

chat.py:

import json
import requests
import time
import sys

API_URL = "http://127.0.0.1:8000/v1/chat/completions"
MODEL_NAME = "Qwen2.5-7B-Instruct" 

def chat():
    print(f"--- i.MX LLM Session (Model: {MODEL_NAME}) ---")
    print("Type 'exit' to stop.\n")
    
    history = [{"role": "system", "content": "You are a helpful AI assistant."}]

    while True:
        user_input = input("You: ")
        if user_input.lower() in ['exit', 'quit']:
            break

        history.append({"role": "user", "content": user_input})
        payload = {
            "model": MODEL_NAME,
            "messages": history,
            "temperature": 0.7,
            "stream": True 
        }

        print("AI: ", end="", flush=True)
        
        # Start timing
        start_time = time.time()
        full_reply = ""
        token_count = 0

        try:
            response = requests.post(API_URL, json=payload, stream=True)
            response.raise_for_status()

            for line in response.iter_lines():
                if line:
                    decoded_line = line.decode('utf-8')
                    if decoded_line.startswith("data: "):
                        content = decoded_line[6:]
                        if content.strip() == "[DONE]":
                            break
                        
                        chunk = json.loads(content)
                        if "choices" in chunk and chunk["choices"][0]["delta"].get("content"):
                            text = chunk["choices"][0]["delta"]["content"]
                            print(text, end="", flush=True)
                            full_reply += text
                            token_count += 1 # Rough estimate of tokens
            
            # End timing
            end_time = time.time()
            duration = end_time - start_time
            tps = token_count / duration if duration > 0 else 0

            print(f"\n\n--- Stats ---")
            print(f"Time taken: {duration:.2f} seconds")
            print(f"Throughput: {tps:.2f} tokens/sec")
            print(f"-------------\n")
            
            history.append({"role": "assistant", "content": full_reply})

        except Exception as e:
            print(f"\nError: {e}")

if __name__ == "__main__":
   chat()

Execution:

$ uv venv # create virtual python env in current dir
$ uv pip install requests # install python deps
$ uv run chat.py # run in venv
--- i.MX LLM Session (Model: Qwen2.5-7B-Instruct) ---
Type 'exit' to stop.

You: Why is the sky blue
AI: The sky appears blue because of a phenomenon called Rayleigh scattering. When sunlight enters the Earth's atmosphere, it collides with molecules and small particles in the air. Sunlight is made up of different colors, each of which has a different wavelength. Blue light has a shorter wavelength and is scattered more than other colors by the gases and particles in the atmosphere. This scattering makes the sky appear blue to our eyes.

During sunrise and sunset, the sky can appear red or orange because the light has to travel through more of the Earth's atmosphere. This longer path means that more blue and green light is scattered out of the beam, leaving the red and orange wavelengths to dominate the light that reaches our eyes.

So, the blue color of the sky is primarily due to the way shorter wavelength light is scattered by the Earth's atmosphere.

--- Stats ---
Time taken: 29.18 seconds
Throughput: 5.04 tokens/sec
-------------

You: exit

Web based python eIQ chatbot

This is a web based chatbot in python using eIQ AAF Connector

webchat.py:

import sys
import os
from datetime import datetime

# --- KINARA SDK PATH INJECTION ---
DVAPI_DIR = "/usr/share/rt-sdk-ara240_2.0.4/include"
if os.path.exists(DVAPI_DIR):
    sys.path.append(DVAPI_DIR)

import streamlit as st
import requests
import json
import time
import psutil
import threading
import argparse

# Attempt to import the Kinara Python APIs
try:
    from dvapi import DVSession, dv_endpoint_get_statistics, dv_endpoint_free_statistics
except ImportError:
    st.error(f"Critical: dvapi.py not found at {DVAPI_DIR}")
    st.stop()

# --- ARGUMENT PARSING ---
parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, default="127.0.0.1", help="AAF Connector Host")
parser.add_argument("--port", type=str, default="8000", help="AAF Connector Port")
parser.add_argument("--proxy-sock", type=str, default="/var/run/proxy.sock", help="Kinara Proxy socket")
args, _ = parser.parse_known_args()

# --- CONFIGURATION ---
MODEL_NAME = "Qwen2.5-7B-Instruct"
API_URL = f"http://{args.host}:{args.port}/v1/chat/completions"
LOGO_URL = "/root/gateworks_logo.png"

# --- HARDWARE TELEMETRY HELPERS ---
def get_dvapi_npu_stats():
    try:
        ret, session = DVSession.create_via_unix_socket(args.proxy_sock)
        if ret != 0: return None
        with session:
            ret, ep_list = session.get_endpoint_list()
            if ret != 0 or not ep_list: return None
            ret, stats_ptr, count = dv_endpoint_get_statistics(session._session, ep_list[0]._endpoint)
            if ret == 0 and count.value > 0:
                s = stats_ptr[0]
                TOTAL_CAPACITY_GB = 16.0
                free_gb = s.ep_dram_stats.ep_total_free_size / 1073741824
                used_gb = max(0, TOTAL_CAPACITY_GB - free_gb)
                dram_pct = (used_gb / TOTAL_CAPACITY_GB) * 100
                is_busy = st._npu_lock.locked()
                data = {"temp": s.ep_temp, "util": 100 if is_busy else 0, "ram_pct": dram_pct}
                dv_endpoint_free_statistics(stats_ptr, count)
                return data
    except: return None

def get_system_thermals():
    zones = []
    try:
        for zone in sorted(os.listdir("/sys/class/thermal/")):
            if zone.startswith("thermal_zone"):
                with open(f"/sys/class/thermal/{zone}/temp", "r") as f:
                    z_temp = int(f.read().strip()) / 1000.0
                zones.append(z_temp)
    except: pass
    return zones

def build_sidebar_html():
    n_stats = get_dvapi_npu_stats()
    cpu_usage = psutil.cpu_percent()
    sys_ram = psutil.virtual_memory().percent
    thermals = get_system_thermals()
    
    npu_html = f"<div style='border-top:1px solid #444; padding-top:5px; font-size:0.82rem;'><b>🔥 Ara2 NPU</b><br>"
    if n_stats:
        npu_html += f"NPU: {n_stats['util']}% {n_stats['temp']:.1f}C | RAM: {n_stats['ram_pct']:.1f}%"
    else:
        npu_html += "NPU Telemetry Unavailable"
    npu_html += "</div>"

    sys_html = f"<div style='border-top:1px solid #444; margin-top:8px; padding-top:5px; font-size:0.82rem;'><b>💻 Syst
m</b><br>"
    temp_str = "/".join([f"{t:.1f}C" for t in thermals])
    sys_html += f"CPU: {cpu_usage:.1f}% {temp_str} | RAM: {sys_ram:.1f}%</div>"

    perf_val = st.session_state.get('last_perf', 'N/A')
    perf_html = f"<div style='border-top:1px solid #444; margin-top:8px; padding-top:5px; font-size:0.82rem;'><b>⚡ Las
 Result</b><br>{perf_val}</div>"
    return npu_html + sys_html + perf_html

# --- GLOBAL STATE ---
if not hasattr(st, '_npu_lock'): st._npu_lock = threading.Lock()
if not hasattr(st, '_active_user'): st._active_user = "None"

st.set_page_config(page_title="Gateworks Venice AI", layout="wide")

# --- SIDEBAR ---
with st.sidebar:
    try: st.image(LOGO_URL, width=220)
    except: st.write("### Gateworks Venice")
    
    status_slot = st.empty()
    # Simplified to just show the IP address
    user_id = st.context.ip_address or "127.0.0.1"

    if st._npu_lock.locked():
        status_slot.warning(f"⚠️ BUSY: {st._active_user}")
    else:
        status_slot.success("🟢 READY")
    
    st.caption(f"User: {user_id}")
    
    stats_slot = st.empty()
    stats_slot.markdown(build_sidebar_html(), unsafe_allow_html=True)

# --- MAIN INTERFACE ---
st.title("🤖 i.MX Edge LLM")

if "messages" not in st.session_state: st.session_state.messages = []
for msg in st.session_state.messages:
    with st.chat_message(msg["role"]): st.markdown(msg["content"])

if prompt := st.chat_input("Ask the NPU..."):
    st.chat_message("user").markdown(prompt)
    st.session_state.messages.append({"role": "user", "content": prompt})

    # Console: Log the Incoming Request / Queue status
    ts_in = datetime.now().strftime("%H:%M:%S")
    print(f"[{ts_in}] QUEUED: Request from {user_id} -> '{prompt[:40]}...'")

    with st.chat_message("assistant"):
        response_placeholder = st.empty()
        
        # This lock handles the "Queued" logic—it will block here if someone else is talking
        with st._npu_lock:
            st._active_user = user_id
            status_slot.warning(f"⚠️ BUSY: {user_id}")
            
            ts_start = datetime.now().strftime("%H:%M:%S")
            print(f"[{ts_start}] PROCESSING: Active inference for {user_id}")
            
            full_response, token_count, start_time = "", 0, time.time()

            try:
                payload = {"model": MODEL_NAME, "messages": st.session_state.messages, "stream": True}
                r = requests.post(API_URL, json=payload, stream=True, timeout=120)
                
                for line in r.iter_lines():
                    if line:
                        decoded = line.decode('utf-8').replace('data: ', '')
                        if decoded.strip() == "[DONE]": break
                        try:
                            chunk = json.loads(decoded)
                            content = chunk["choices"][0]["delta"].get("content", "")
                            if content:
                                full_response += content
                                token_count += 1
                                response_placeholder.markdown(full_response + "▌")
                                
                                if token_count % 12 == 0:
                                    stats_slot.markdown(build_sidebar_html(), unsafe_allow_html=True)
                        except: continue

                duration = time.time() - start_time
                tps = token_count / duration if duration > 0 else 0
                st.session_state.last_perf = f"{token_count} tokens @ {tps:.1f} t/s"
                
                response_placeholder.markdown(full_response)
                st.session_state.messages.append({"role": "assistant", "content": full_response})

                # Console: Log Completion
                ts_out = datetime.now().strftime("%H:%M:%S")
                print(f"[{ts_out}] COMPLETE: {user_id} | {token_count} tokens | {tps:.1f} t/s")

            except Exception as e:
                st.error(f"Error: {e}")
                print(f"[{datetime.now().strftime('%H:%M:%S')}] ERROR: {e}")
            finally:
                st._active_user = "None"
                stats_slot.markdown(build_sidebar_html(), unsafe_allow_html=True)
                status_slot.success("🟢 READY")
                st.rerun()

Execution:

$ mkdir /root/webapp
$ cd /root/webapp
$ uv venv # create virtual python env in current dir
$ uv pip install streamlit requests psutil argparse # install python deps
$ uv run streamlit run webchat.py --server.address 0.0.0.0 --server.port 8501 -- --user-map users.json --host 127.0.0.1 --port 8000

Service:

  • if want this to run as a service:
    cat << EOF > /etc/systemd/system/eiq-webapp.service:
    [Unit]
    Description=Streamlit Webapp for eIQ AAF
    # Start after network is up
    After=network.target
    # We don't use 'After=eiq-aaf-connector.service' to avoid potential boot cycles
    StartLimitIntervalSec=0
    
    [Service]
    Type=simple
    User=root
    # Ensure we are in the directory where webapp.py lives
    WorkingDirectory=/root/webapp
    
    # 1. Wait until the Connector is actually listening on Port 8000
    ExecStartPre=/bin/bash -c 'until ss -Hltn | grep -E -q ":8000([[:space:]]|$)"; do echo "Waiting for eIQ Connector on Port 8000..." >&2; sleep 5; done'
    
    # 2. Launch the app using uv
    # Note: Using absolute path for uv is safer in systemd
    ExecStart=/usr/local/bin/uv run streamlit run webapp.py \
        --server.address 0.0.0.0 \
        --server.port 8501 \
        -- \
        --user-map users.json \
        --host 127.0.0.1 \
        --port 8000
    
    # Restart logic
    Restart=on-failure
    RestartSec=10s
    StartLimitBurst=0
    
    # Standard Logging
    StandardOutput=journal
    StandardError=journal
    
    [Install]
    WantedBy=multi-user.target
    EOF
    systemctl daemon-reload
    systemctl enable eiq-webapp.service
    systemctl start eiq-webapp.service
    

Troubleshooting

Please note software support should be routed through NXP, who produces the Ara240 DNPU Chip and Software SDK.

https://community.nxp.com

Note: See TracWiki for help on using the wiki.