wiki:expansion/gw16168

Version 14 (modified by Tim Harvey, 22 hours ago) ( diff )

fix image typo

GW16168 NXP Ara240 DNPU AI Accelerator

The GW16168 NXP Ara240 DNPU AI Accelerator is an M.2 2280 M-Key card by Gateworks for use in the Gateworks single board computers. For more product information see here: https://www.gateworks.com/products/gw16168-m2-ai-accelerator-usa-made/

Terminology

The following terminology is used in the Kinara documentation:

  • Ara1 / Ara2 processor: An ultra low-power programmable Neural Network processor
  • Kinara SDK: Kinara Software Development Kit
  • DVNC: Kinara Network Compiler
  • DVSim: Kinara Simulator
  • DVConvert: Kinara Network Converter
  • NNApp: Neural Network Application
  • Development Platform: Machine to compile model and run simulator
  • Target Platform: Platform which Ara1 / Ara2 processor connects to
  • PPA: Power, Performance and Accuracy - metrics reported by the compiler
  • SOF: Schedule Optimization Factor - a measure reported by the compiler
  • CNN: Convolutional Neural Network - a deep learning model designed to analyze and process grid-like data such as images, videos and sometimes audio and text
  • LLM: Large Language Model - an AI model trained on massive amounts of text data to understand, summarize, and generate human-like language
  • VLM: Vision Language Model - a multimodal AI that bridges the gap between sight and language. It essentially gives an LLM the ability to "see" by integrating a vision encoder with a language
  • sLLM: small Language Model - a lightweight version of an LLM designed to be more efficient, especially for "edge" devices with limited hardware resources

Public:

NXP Ara240 DNPU AI Accelerator Quick Start

Using NXP deb distribution packages

Currently NXP is distributing the Ara2 runtime in binary form. They have released the kernel driver as opensource which resolves kernel compatibility issues which is a huge step but the userspace apps and libraries remain dynamic linked binary objects.

The current deb packages have some shortcomings:

  • packages are not very consistent; some have a systemd service in the data, others create one via postinst
  • they were intended to install on top of the NXP Embedded Linux Firmware (version L6.12.34-2.1.0) and intended to support only NXP dev kit boards so the dependencies are incomplete and don't match what would be on other Linux based root filesystems (Ubuntu system for example)

If you extract the deb's and examine the DEBIAN directory you can see how to install them on other boards and root filesystems.

It is fairly common for AI models to make use of python and NXP is using that here. The rt-sdk-ara2 includes a couple of Python Wheels that are used in the examples. A Python Wheel is a standard built-package format for distributing Python libraries. It is essentially a ZIP-format archive with a .whl extension that contains all the files needed for a package to run immediately after being. It's also standard when using Python to run into package version incompatibilities which is why user based Python virtual environments are used.

Note the deb files require an NXP account to download (from NXP ARA SDK Landing page) so the instructions below assume you have them already in the current directory.

rt-sdk-ara2

The ara2 runtime should not really be considered an 'SDK' - it has nothing to do with software development, its simply the set of utils and libs needed to use the Ara2.

The rt-sdk-ara2 provides a complete runtime environment for AI/ML acceleration using the Ara240 NPU on for aarch64. This package includes:

  • Runtime libraries for Ara240 NPU integration
  • Python bindings (DVAPI) for custom inference applications
  • Optimum-Ara framework for LLMs and VLMs
  • GStreamer plugins for Real-Time Detection Object Applications
  • Helper scripts for monitoring, benchmarking, and model management
  • Systemd service for automatic hardware initialization

Installation on a Gateworks board with Ubuntu based OS:

  • extract the debian 'data' (do not install the package!)
    # extract data (but don't install)
    dpkg-deb --vextract rt-sdk-ara2_2.0.4.deb /
    
  • take care of postinst steps
    • miscelaneous
      # create app dirs (used for models)
      mkdir -pv /usr/share/{cnn,llm}
      # get rid of circular symlink
      rm /usr/share/rt-sdk-ara240_2.0.4/rt-sdk-ara240_2.0.4
      
    • install uv package manager for Python virtualization and packaging for local user (which is installed to ~/.local/bin so we create symlinks to /usr/bin)
      apt update && apt install -y curl
      curl -LsSf https://astral.sh/uv/install.sh | sh
      ln -s /root/.local/bin/uv /usr/bin/uv
      ln -s /root/.local/bin/uvx /usr/bin/uvx
      
    • build driver (the one in the deb is specific to the IMX BSP kernel)
      apt update && apt install -y build-essential git bc file flex bison
      git clone https://github.com/nxp-imx-support/uiodma-driver
      ( cd uiodma-driver/uiodma; make )
      # install it where the rt service expects to find it (over the top of the non-compatible one)
      cp uiodma-driver/uiodma/uiodma.ko /usr/share/rt-sdk-ara240/driver/
      
    • enable service:
      # enable service
      systemctl enable rt-sdk-ara2.service
      # start service now (unless you reboot)
      systemctl start rt-sdk-ara2.service
      
    • use 'fetch_models' to pre-compiled models for testing via the fetch_models script which will fetch models from HuggingFace.
      # list models available for nxp/ara
      fetch_models --list
      # install YOLOv8
      fetch_models --repo-id nxp/YOLOv8 # 746MB (711MiB)
      
      • the script is a python wrapper that uses uvx and the fetch-models python wheel (/usr/share/python-wheels/fetch_models-1.0.0-py3-none-any.whl) to fetch and install models from HuggingFace HUB
      • the models will be installed in either /usr/share/cnn (Convolutional Neural Network) and /usr/share/llm (Large Language Model)
      • NXP has Ara2 optimized models at https://huggingface.co/nxp
      • the script has a hard coded list of models available and where to install them locally. You can use 'python -m zipfile -e /usr/share/python-wheels/fetch_models-1.0.0-py3-none-any.whl ./fetch_models' to see what it's doing

Notable Files:

  • /usr/lib/
    • libaraclient_aarch64.so - base library for interfacing with ara2
    • libara_vision_inference.so - inference lib that builds on libaraclient
  • /usr/lib/gstreamer-1.0
    • libgstdvPre.so
    • libgstdvInfo.so
    • libgstdvPost.so
  • /usr/share/rt-sdk-ara240 (symlink to a version independent dir at same location)
    • hw_utils/boot_img - firmware files
    • hw_utils/ddr_config - ddr binaries
    • hw_utils/bins/ - the hw utils for bringup/programming
    • optimum-ara/ - extension of the Hugging Face library that integrates with Ara240 DNPU
    • scripts - various wrappers around the tools etc
    • nnapp - tool for benchmarking models
    • config - various example yaml config files used for proxy/nnapp
    • include/dvapi.py - python bindings to dvapi
    • driver/uiodma.ko - driver (where the setup script expects to find it)
  • /usr/share/python-wheels - python wheels for fetch_models and optimum_ara
  • /usr/shar/doc/rt-sdk-ara2 - license info
  • /usr/include/sdk_ara - headers for C libs
  • /usr/bin - various scripts
  • /etc/udev/rules.d/99-ara2.rules - udev rule which makes the PCI ID dependent on the systemd service
  • /etc/systemd/system/rt-sdk-ara2.service - systemd service that handles the various hw util config
  • /etc/rt-sdk-ara240/cnn_config.yaml - config for nnapp
  • /etc/rt-sdk-ara240/proxy_config.yam - config for proxy

Notes:

  • This will not program flash - that is a manual step only required if there is an update
  • The 'uv' package manager is a fast all-in-one Python package and project manager written in Rust which makes it easy to work with virtual env's to avoid Python package version clashing which is essential
  • on bootup make sure you wait for the console messages indicating the Proxy is launched before using it as it can take a couple of minutes
  • the binary tools and libs are all currently dynamic linked against stdlibc
  • the GStreamer libs require GStreamer 1.26 or newer

Verification steps:

  1. show chip_info
    chip_info.sh
    
  2. verify service
    # show service status
    systemctl status rt-sdk-ara2.service --no-pager -l 
    # view detailed service logs
    journalctl -u rt-sdk-ara2.service
    # verify proxy is running (critical)
    ps -eaf | grep proxy_ara240
    

Examples:

  • Download pre-compiled models for testing:
    • The fetch_models script from the ara2-rt will fetch models from HuggingFace.
      # list models available for nxp/ara
      fetch_models --list
      # install YOLOv8
      fetch_models --repo-id nxp/YOLOv8 # 746MB (711MiB)
      
    • the 'fetch_models' script is a python wrapper that uses uvx and the fetch-models python wheel (/usr/share/python-wheels/fetch_models-1.0.0-py3-none-any.whl) to fetch and install models from HuggingFace HUB
    • the models will be installed in /usr/share/cnn (Convolutional Neural Network) and /usr/share/llm (Large Language Model)
    • NXP has Ara2 optimized models at https://huggingface.co/nxp
  • Run performance benchmark (uses nnapp)
    run_model_perf.sh
    
    • the 'run_model_perf.sh' script makes it easy to list and show model categories and models and is a wrapper around the nnapp app which has a lot of options and a config file
  • monitor real-time NPU metrics including utilization, temperature, DRAM usage and device state (interactively during benchmarking or model execution)
    ara2_metrics.sh
    

GStreamer plugins

The rt-sdk-ara2 provides a set of gstreamer plugins for inference:

  • dvPre
  • dvInf
  • dvPost

Without more documentation or source for these its likely best to think of them as: dvPre prepares buffers, dvInf hands them off to the NPU and dvPost processes the response.

The dvPre element must have 32bit pixel samples (ie format=BGRA using 4 bytes per pixel, blue, green, red, alpha; alpha byte is completely empty padding data not used for transparency just as a structural spacer), not 24-bit format=RGB (3 bytes one for red, green, blue).

All three elements require the model specified via the 'model' property. If using yolov8x for example you would specify the path to the yolov8x.dvm

For detection models the dvPost element frame data will contain a buffer with number of bytes (32bit) followed by a series of detection structures containing the bounding box, confidence level, and COCO class ID of the object detected.

The units for the bounding box are relative to the models size and will need to be scaled back to your original image size. For example the YOLO models operate on 640x640 pixel data. You can pass something larger in and it will essentially tile but its unclear if there is an advantage of doing that.

The gstreamer plugins are currently provided as binary only shared objects. They are linked against stdlibc (libc.so.6) and libgstreamer-1.0.so.0 and compatible with GStreamer 1.26 or newer.

If you are using a rootfs that does not have GStreamer 1.26 you will need to build it or provide it via virtualization. For example Ubuntu 24.x Noble has GStreamer 1.24, Ubuntu 25.x has GStreamer 1.26 and Ubuntu 26.x Ocelot has GStreamer 1.28. So if you were running Ubuntu Noble you could use distrobox/docker to install GStreamer 1.26 and its dependencies using Ubuntu 25.x.

Examples:

  • Ubuntu noble (24.04):
    • Ubuntu noble has GStreamer 1.24 which is not compatible with the 1.26 plugins
    • one solution could be a GStreamer 1.26 PPA backport but we have not found any
    • one solution is a containerized Ubuntu 25.04 container on Ubuntu 24.04 rootfs:
      apt update && apt install -y distrobox docker.io
      # Create a 25.04 container that can see your hardware
      distrobox create --image ubuntu:25.04 --name gst126  --volume /usr/lib/gstreamer-1.0:/opt/ara2/plugins:ro \
        --volume /usr/lib:/opt/ara2/libs:ro \
        --volume /usr/share/cnn:/usr/share/cnn \
        --volume /usr/share/llm:/usr/share/llm \
        --volume /dev/bus/usb:/dev/bus/usb
      # enter the container to use it
      distrobox enter gst126
      # export vars via ~/.bashrc (exit and enter the distrobox to take effect)
      echo "export GST_PLUGIN_PATH=/opt/ara2/plugins" >> ~/.bashrc
      echo "export LD_LIBRARY_PATH=/opt/ara2/libs:\$LD_LIBRARY_PATH" >> ~/.bashrc
      
      • whenever using the ARA plugins you will need to make sure you do so in the gst126 environment
      • the volume param creates bind mounts between the host and the virtual target
      • you can also always access the host rootfs via /run/host
      • also make sure you install gstreamer and anything that uses it within that virtual environment
      • this uses virtualization, not emulation - there is no performance hit or latency added, its just a different set of executables
      • disk space for the ubuntu 25.04 base above is about 1.54GB
  • Ubuntu 26.04 resolute
    • Ubuntu resolute (26.04) has GStreamer 1.28 which the 1.26 plugins are backwards compatible with
    • gstreamer 1.28 decodebin is picking hardware-accelerated v4l2jpegdec (on Venice) instead of the standard software decoder jpegdec and v4l2jpegdec does not support YUV3 (typical for standard JPEG images) so if using it you will need to take steps to disable it or prefer jpegdec over it. For example you can use GST_PLUGIN_FEATURE_RANK="v4l2jpegdec:NONE" or set the rank at runtime such is done in the detection examples below

Install GStreamer:

apt-get update && apt install -y \
   gstreamer1.0-x \
   gstreamer1.0-tools \
   gstreamer1.0-plugins-base \
   gstreamer1.0-plugins-good \
   gstreamer1.0-plugins-bad \
   gstreamer1.0-plugins-ugly \
   gstreamer1.0-libav \
   v4l-utils
  • this adds about 500MiB of disk space

Specify Plugin path:

# export now to current shell
export GST_PLUGIN_PATH=/usr/lib/gstreamer-1.0/
# put in .bashrc so it happens for any new bash shell
echo "export GST_PLUGIN_PATH=/usr/lib/gstreamer-1.0/" >> ~/.bashrc
  • this tells GStreamer to look for plugins in the non-standard location of the ARA gstreamer plugins

At this point you should be able to inspect the dvPre, dvInf, and dvPost elements:

gst-inspect-1.0 dvPre
gst-inspect-1.0 dvInf
gst-inspect-1.0 dvPost

Detection Examples

Examples:

  • gst-launch pipeline prototyping:
    • enabling debug level 6 on dvPost will show the number of object detections in its debug output but if you want to do anything with that data you need to write an application that can decode frame buffers. Still this is useful for prototyping:
      • perform detection on a v4l2 video device like a webcam:
        DEV=/dev/video2
        MODEL=/usr/share/cnn/detection/yolov8n/model.dvm
        GST_DEBUG="dvPost:6" \
        gst-launch-1.0 -v \
          v4l2src device=$DEV ! \
          video/x-raw,width=640,height=480,framerate=30/1 ! \
          videoconvert ! video/x-raw,format=BGRA ! \
          dvPre model=$MODEL ! \
          dvInf model=$MODEL sock=/var/run/proxy.sock use-shm=false ! \
          dvPost model=$MODEL ! \
          fakesink sync=false | grep Detected
        
      • perform a detection on an image:
        URI=file:///$PWD/traffic.png
        MODEL=/usr/share/cnn/detection/yolov8n/model.dvm
        GST_DEBUG="dvPost:6" \
        GST_PLUGIN_FEATURE_RANK="v4l2jpegdec:NONE" \
        gst-launch-1.0 -v \
          urisourcebin uri=$URI ! decodebin ! \
          videoconvert ! video/x-raw,format=BGRA ! \
          dvPre model=$MODEL ! \
          dvInf model=$MODEL sock=/var/run/proxy.sock use-shm=false ! \
          dvPost model=$MODEL ! \
          fakesink sync=false | grep Detected
        
        • the GST_PLUGIN_FEATURE_RANK is to disable the use of the v4l2jpegdec hardware decode on GStreamer 1.28 as it does not support a compatible format needed by dvPre (jet jpegdec does)
  • Image detection with boxing via Python
    • Python is incredibly useful for accessing GStreamer and handling the ARA detection frame data and imagemagick provides excellent tools for converting and drawing on images:
    • (optional) install lighttpd so that we can easily see our resulting images via a browser
      apt-get install -y lighttpd
      # add configuration for directory listing and mapping of /root to /
      cat << EOF >> /etc/lighttpd/lighttpd.conf
      dir-listing.encoding    = "utf-8"
      server.dir-listing      = "enable"
      
      # directory access
      alias.url += (
              "/root" => "/root",
      )
      EOF
      # make the dir executable
      chmod ugo+x .
      # restart the web server
      /etc/init.d/lighttpd restart
      
    • install imagemagick which we will use to draw named boxes for detections
      apt-get install -y imagemagick
      
    • create a dir for us to work in and create the script
      mkdir image-detect; cd image-detect
      # create python script
      cat <<\EOF > image_detect.py
      #!/usr/bin/env python3
      """
      Ara NPU Multi-Format Universal Image Decoder
      ============================================
      """
      
      import ctypes
      import os
      import sys
      import subprocess
      import gi
      
      gi.require_version('Gst', '1.0')
      from gi.repository import Gst
      
      Gst.init(None)
      
      # Standard COCO Class Mapping for printing human-readable labels
      COCO_CLASSES = {
          0: "person", 1: "bicycle", 2: "car", 3: "motorcycle", 4: "airplane", 5: "bus",
          6: "train", 7: "truck", 8: "boat", 9: "traffic light", 10: "fire hydrant",
          11: "stop sign", 12: "parking meter", 13: "bench", 14: "bird", 15: "cat",
          16: "dog", 17: "horse", 18: "sheep", 19: "cow", 20: "elephant", 21: "bear",
          22: "zebra", 23: "giraffe", 24: "backpack", 25: "umbrella", 26: "handbag",
          27: "tie", 28: "suitcase", 29: "frisbee", 30: "skis", 31: "snowboard",
          32: "sports ball", 33: "kite", 34: "baseball bat", 35: "baseball glove",
          36: "skateboard", 37: "surfboard", 38: "tennis racket", 39: "bottle",
          40: "wine glass", 41: "cup", 42: "fork", 43: "knife", 44: "spoon", 45: "bowl",
          46: "banana", 47: "apple", 48: "sandwich", 49: "orange", 50: "broccoli",
          51: "carrot", 52: "hot dog", 53: "pizza", 54: "donut", 55: "cake",
          56: "chair", 57: "couch", 58: "potted plant", 59: "bed", 60: "dining table",
          61: "toilet", 62: "tv", 63: "laptop", 64: "mouse", 65: "remote", 66: "keyboard",
          67: "cell phone", 68: "microwave", 69: "oven", 70: "toaster", 71: "sink",
          72: "refrigerator", 73: "book", 74: "clock", 75: "vase", 76: "scissors",
          77: "teddy bear", 78: "hair drier", 79: "toothbrush"
      }
      
      class AraDetection(ctypes.Structure):
          _layout_ = "ms"
          _pack_ = 1
          _fields_ = [
              ("xmin", ctypes.c_float), ("ymin", ctypes.c_float),
              ("xmax", ctypes.c_float), ("ymax", ctypes.c_float),
              ("confidence", ctypes.c_float), ("class_id", ctypes.c_int32),
              ("class_name_ptr", ctypes.c_void_p)
          ]
      
      def main():
          if len(sys.argv) < 3:
              print(f"Usage: {sys.argv[0]} <input_image> <output_image> [model]")
              sys.exit(1)
      
          input_image = sys.argv[1]
          output_image = sys.argv[2]
          model = "/usr/share/cnn/detection/yolov8n/model.dvm"
          if len(sys.argv) > 3:
              model = sys.argv[3]
      
          if not os.path.exists(input_image):
              print(f"ERROR: File '{input_image}' could not be located.")
              sys.exit(1)
      
          # Fetch native dimensions using ImageMagick
          try:
              dimensions = subprocess.check_output(f"identify -format '%w %h' {input_image}", shell=True).decode().split()
              w_native, h_native = int(dimensions[0]), int(dimensions[1])
          except Exception as e:
              print(f"ERROR: Failed to read image properties using ImageMagick: {e}")
              sys.exit(1)
         
          # Print target properties cleanly
          print(f"\nmodel: {model}")
          print(f"image: {os.path.basename(input_image)} {w_native}x{h_native}")
      
          MODEL_W, MODEL_H = 640, 640
      
          pipe_str = (
              f"multifilesrc location={input_image} loop=false num-buffers=2 ! decodebin name=d ! "
              f"videoconvert ! videoscale ! video/x-raw,width={MODEL_W},height={MODEL_H} ! "
              f"videoconvert ! video/x-raw,format=BGRA ! "
              f"dvPre model={model} ! "
              f"dvInf model={model} sock=/var/run/proxy.sock use-shm=true shm-path=/dev/shm/ara_inf_ ! "
              f"dvPost model={model} orig-width={MODEL_W} orig-height={MODEL_H} ! "
              f"appsink name=mysink sync=false async=false emit-signals=true"
          )
      
          # Before creating the launcher, adjust the system plugin registry ranking 
          # so GStreamer ignores v4l2jpegdec element (as it doesn't support BGRA output)
          registry = Gst.Registry.get()
          feature = registry.lookup_feature("v4l2jpegdec")
          if feature:
              # Lower its rank to ZERO so decodebin skips over it permanently
              feature.set_rank(0)
      
          pipeline = Gst.parse_launch(pipe_str)
          sink = pipeline.get_by_name("mysink")
          pipeline.set_state(Gst.State.PLAYING)
      
          last_valid_raw_bytes = None
      
          while True:
              sample = sink.emit("pull-sample")
              if not sample:
                  break
              buffer = sample.get_buffer()
              last_valid_raw_bytes = buffer.extract_dup(0, buffer.get_size())
      
          pipeline.set_state(Gst.State.NULL)
          
          processed_detections = []
      
          if last_valid_raw_bytes and len(last_valid_raw_bytes) >= 4:
              num_detections = int.from_bytes(last_valid_raw_bytes[:4], byteorder='little')
              
              if 0 < num_detections < 1000:
                  print(f"DETECTIONS LOGGED: FOUND {num_detections} ACTIVE OBJECTS")
                  print("-" * 70)
                  
                  offset = 4
                  ds = ctypes.sizeof(AraDetection)
                  
                  for i in range(num_detections):
                      if offset + ds > len(last_valid_raw_bytes): break
                      det = AraDetection.from_buffer_copy(last_valid_raw_bytes[offset:offset+ds])
                      offset += ds
                      
                          # Compute native image coordinate translation mapping
                      x1_mapped = det.xmin * (w_native / MODEL_W)
                      x2_mapped = det.xmax * (w_native / MODEL_W)
                      y1_mapped = det.ymin * (h_native / MODEL_H)
                      y2_mapped = det.ymax * (h_native / MODEL_H)
                      
                      coco_name = COCO_CLASSES.get(det.class_id, "unknown")
                      
                      print(f"Object {i+1}: ID={det.class_id} | Name={coco_name} | Confidence={det.confidence * 100:.1f}%")
                      print(f"          Bounding Box -> [{int(x1_mapped)}, {int(y1_mapped)}] to [{int(x2_mapped)}, {int(y2_mapped)}]")
                      print("-" * 70)
                      
                      processed_detections.append((coco_name, det.confidence, x1_mapped, y1_mapped, x2_mapped, y2_mapped))
      
          # Render final multi-object annotated canvas
          if processed_detections:
              cmd_args = [f"convert {input_image}"]
              for coco_name, conf, x1, y1, x2, y2 in processed_detections:
                  ix1, iy1, ix2, iy2 = int(x1), int(y1), int(x2), int(y2)
                  label = f"{coco_name} {conf*100:.1f}%"
                  cmd_args.append(f'-stroke green -strokewidth 2 -fill none -draw "rectangle {ix1},{iy1} {ix2},{iy2}"')
                  cmd_args.append(f'-stroke none -fill white -pointsize 16 -annotate +{ix1}+{iy1 - 6} "{label}"')
                  
              cmd_args.append(output_image)
              draw_cmd = " ".join(cmd_args)
              
              try:
                  subprocess.run(draw_cmd, shell=True, check=True)
                  print(f"SUCCESS: Mapped all boxes and text labels onto -> '{output_image}'\n")
              except subprocess.CalledProcessError:
                  print("ERROR: ImageMagick rendering execution failed.\n")
          else:
              print("INFO: No operational object targets were captured by the NPU context.\n")
      
      if __name__ == '__main__':
          main()
      EOF
      
    • The script using PyGObject which is a Python package that provides bindings for libraries based on GObject Introspection such as GTK, WebKit, and GStreamer. It allows you to use C-based frameworks in python. We need to install the C libs for GSTreamer for this:
      apt-get install -y \
        libcairo2-dev \
        libgirepository-2.0-dev \
        python3-dev \
        python3-gst-1.0 \
        cmake pkg-config
      # we are also going to need to install gstreamer and its dev packages
      apt-get install -y \
        libgstreamer1.0-dev \
        libgstreamer-plugins-base1.0-dev \
        libgstreamer-plugins-bad1.0-dev \
        gstreamer1.0-plugins-base \
        gstreamer1.0-plugins-good \
        gstreamer1.0-plugins-bad \
        gstreamer1.0-plugins-ugly \
        gstreamer1.0-libav \
        gstreamer1.0-tools
      
    • create a python virtual env (always a good idea to keep python dependencies containerized) and install python libs we need:
      # create a venv (.venv)
      uv venv
      # install our scripts dependencies
      uv pip install pygobject
      
    • (optional) fetch some images for detection
      # fetch a coco validation image; it contains a dog on a bench and the dog is at 208,147 to 293,289
      wget http://images.cocodataset.org/val2017/000000546829.jpg -O dog.jpg
      # use ffmpeg to grab a frame from within an MP4
      apt install -y ffmpeg
      ffmpeg -i /usr/share/ara2-vision-examples/sample_videos/video_0.mp4 -f null - # shows how lon git is (time=00:00:15.50)
      ffmpeg -i /usr/share/ara2-vision-examples/sample_videos/video_0.mp4 -ss 00:00:5 -frames:v 1 traffic.png
      
    • run the script (image_detect.py <source-image> <destination-image> [model-path])
      uv run image_detect.py dog.jpg coco_detections.jpg
      
      • Note that without shm the pipeline needs to copy the raw image bytes over a local network-style socket connection. By mounting a dedicated memory path to /dev/shm you can eliminate that transfer (zero-copy): dvPre dumps the processed directly into a designated block of system RAM and dvInf uses a pointer to it
      • you would think that if your original image was 1080x1920 and you resized it to the model size of 640x640 that if you tell dvPost the orig-width=1080 orig-height=1920 that it would scale the bounding boxes properly however in practice it seems it does not unless your image has the same aspect ratio of the model. mapping it as above (telling dvPost that the image is 640x640 and scaling ourselves) resolves this
      • images:

COCO validation image with a dog on a bench COCO dog image with detections

Traffic image Traffic image with detections via yolo8n Traffic image with detections via yolo8x

  • Video detection with boxing via Python in a headless webapp
    • Python is incredibly useful for accessing GStreamer and handling the ARA detection frame data and building webapps
    • The script using PyGObject which is a Python package that provides bindings for libraries based on GObject Introspection such as GTK, WebKit, and GStreamer. It allows you to use C-based frameworks in python. We need to install the C libs for GSTreamer for this:
      apt-get install -y \
        libcairo2-dev \
        libgirepository-2.0-dev \
        python3-dev \
        python3-gst-1.0 \
        cmake pkg-config
      # we are also going to need to install gstreamer and its dev packages
      apt-get install -y \
        libgstreamer1.0-dev \
        libgstreamer-plugins-base1.0-dev \
        libgstreamer-plugins-bad1.0-dev \
        gstreamer1.0-plugins-base \
        gstreamer1.0-plugins-good \
        gstreamer1.0-plugins-bad \
        gstreamer1.0-plugins-ugly \
        gstreamer1.0-libav \
        gstreamer1.0-tools
      
    • create a python virtual env (always a good idea to keep python dependencies containerized) and install python libs we need:
      # create a venv (.venv)
      uv venv
      # install our scripts dependencies
      uv pip install pygobject opencv-python-headless
      cat << EOF > vision-webapp.py
      #!/usr/bin/env python3
      """
      Ara NPU Basic Video Stream & Inference Hub
      ==========================================
      """
      
      import argparse
      import ctypes
      import glob
      import os
      import sys
      import threading
      import time
      import logging
      import cv2
      import numpy as np
      from flask import Flask, Response, jsonify, request, render_template_string
      import gi
      
      gi.require_version('Gst', '1.0')
      from gi.repository import Gst
      Gst.init(None)
      
      # Quiet down Werkzeug HTTP traffic logging to suppress 1Hz AJAX console pollution
      log = logging.getLogger('werkzeug')
      log.setLevel(logging.ERROR)
      
      app = Flask(__name__)
      lock = threading.Lock()
      
      class AraDetection(ctypes.Structure):
          _pack_ = 1
          _fields_ = [
              ("xmin", ctypes.c_float), ("ymin", ctypes.c_float),
              ("xmax", ctypes.c_float), ("ymax", ctypes.c_float),
              ("confidence", ctypes.c_float), ("class_id", ctypes.c_int32),
              ("class_name_ptr", ctypes.c_void_p)
          ]
      
      # --- STATE STORAGE ---
      STATE_REPO = {
          "frame": None,
          "detections": [],
          "active_source": None,
          "active_model_name": "yolov8n",
          "active_model_path": "/usr/share/cnn/detection/yolov8n/model.dvm",
          "restart_flag": False,
          "source_registry": [],
          "model_registry": ["yolov8n"],
          
          # Target Pipeline Resolutions
          "CANVAS_W": 640,
          "CANVAS_H": 360,
          "MODEL_W": 640,
          "MODEL_H": 640,
          
          # Live Telemetry Metrics
          "native_w": 0,
          "native_h": 0,
          "stream_w": 0,
          "stream_h": 0,
          "inference_fps": 0.0
      }
      
      # FPS Calculation variables bound directly to the Inference thread
      inference_timestamps = []
      
      COCO_LABELS = {
          0: 'person', 1: 'bicycle', 2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus',
          6: 'train', 7: 'truck', 8: 'boat', 9: 'traffic light', 10: 'fire hydrant',
          11: 'stop sign', 12: 'parking meter', 13: 'bench', 14: 'bird', 15: 'cat',
          16: 'dog', 17: 'horse', 18: 'sheep', 19: 'cow', 20: 'elephant', 21: 'bear',
          22: 'zebra', 23: 'giraffe', 24: 'backpack', 25: 'umbrella', 26: 'handbag',
          27: 'tie', 28: 'suitcase', 29: 'frisbee', 30: 'skis', 31: 'snowboard',
          32: 'sports ball', 33: 'kite', 34: 'baseball bat', 35: 'baseball glove',
          36: 'skateboard', 37: 'surfboard', 38: 'tennis racket', 39: 'bottle',
          40: 'wine glass', 41: 'cup', 42: 'fork', 43: 'knife', 44: 'spoon', 45: 'bowl',
          46: 'banana', 47: 'apple', 48: 'sandwich', 49: 'orange', 50: 'broccoli',
          51: 'carrot', 52: 'hot dog', 53: 'pizza', 54: 'donut', 55: 'cake',
          56: 'chair', 57: 'couch', 58: 'potted plant', 59: 'bed', 60: 'dining table',
          61: 'toilet', 62: 'tv', 63: 'laptop', 64: 'mouse', 65: 'remote', 66: 'keyboard',
          67: 'cell phone', 68: 'microwave', 69: 'oven', 70: 'toaster', 71: 'sink',
          72: 'refrigerator', 73: 'book', 74: 'clock', 75: 'vase', 76: 'scissors',
          77: 'teddy bear', 78: 'hair drier', 79: 'toothbrush'
      }
      
      def build_source_injection_string(source_path):
          if source_path.endswith(".mp4"):
              return f"filesrc location={source_path} ! decodebin ! videoconvert ! tee name=t "
          else:
              return f"v4l2src device={source_path} ! videoconvert ! tee name=t "
      
      def gstreamer_orchestration_loop():
          global inference_timestamps
          CANVAS_W = STATE_REPO["CANVAS_W"]
          CANVAS_H = STATE_REPO["CANVAS_H"]
          MODEL_W = STATE_REPO["MODEL_W"]
          MODEL_H = STATE_REPO["MODEL_H"]
      
          while True:
              while STATE_REPO["active_source"] is None:
                  time.sleep(0.2)
                  if STATE_REPO["restart_flag"]:
                      break
      
              current_target_source = STATE_REPO["active_source"]
              current_target_model = STATE_REPO["active_model_path"]
              STATE_REPO["restart_flag"] = False
              
              if current_target_source is None:
                  continue
                  
              source_segment = build_source_injection_string(current_target_source)
              
              pipe_str = (
                  f"{source_segment} "
                  f"t. ! queue max-size-buffers=2 leaky=downstream ! appsink name=nativesink sync=false async=false emit-signals=true "
                  f"t. ! queue max-size-buffers=2 leaky=downstream ! videoscale ! video/x-raw,width={CANVAS_W},height={CANVAS_H} ! videoconvert ! video/x-raw,format=BGR ! appsink name=framesink sync=false async=false emit-signals=true "
                  f"t. ! queue max-size-buffers=2 leaky=downstream ! "
                  f"videoscale ! video/x-raw,width={MODEL_W},height={MODEL_H} ! videoconvert ! video/x-raw,format=BGRA ! "
                  f"dvPre model={current_target_model} ! "
                  f"dvInf model={current_target_model} sock=/var/run/proxy.sock use-shm=true shm-path=/dev/shm/ara_inf_ ! "
                  f"dvPost model={current_target_model} orig-width={MODEL_W} orig-height={MODEL_H} ! "
                  f"appsink name=postsink sync=false async=false emit-signals=true"
              )
      
              print(f"[LAUNCH PIPELINE]\n   {pipe_str}\n")
              pipeline = Gst.parse_launch(pipe_str)
              
              native_sink = pipeline.get_by_name("nativesink")
              frame_sink = pipeline.get_by_name("framesink")
              post_sink = pipeline.get_by_name("postsink")
      
              def on_native_caps(sink):
                  sample = sink.emit("pull-sample")
                  if sample:
                      caps = sample.get_caps()
                      struct = caps.get_structure(0)
                      STATE_REPO["native_w"] = struct.get_value("width")
                      STATE_REPO["native_h"] = struct.get_value("height")
                  return Gst.FlowReturn.OK
      
              def on_new_detection(sink):
                  global inference_timestamps
                  sample = sink.emit("pull-sample")
                  if sample:
                      # Calculate FPS derived purely from the inference hardware return loop
                      now = time.time()
                      inference_timestamps.append(now)
                      if len(inference_timestamps) > 30:
                          inference_timestamps.pop(0)
                      if len(inference_timestamps) > 1:
                          STATE_REPO["inference_fps"] = len(inference_timestamps) / (inference_timestamps[-1] - inference_timestamps[0])
      
                      buffer = sample.get_buffer()
                      raw_bytes = buffer.extract_dup(0, buffer.get_size())
                      if raw_bytes and len(raw_bytes) >= 4:
                          num_detections = np.frombuffer(raw_bytes[:4], dtype=np.uint32)[0]
                          local_dets = []
                          offset = 4
                          ds = ctypes.sizeof(AraDetection)
                          for _ in range(num_detections):
                              if offset + ds > len(raw_bytes): break
                              det = AraDetection.from_buffer_copy(raw_bytes[offset:offset+ds])
                              offset += ds
                              local_dets.append((det.class_id, det.confidence, det.xmin, det.ymin, det.xmax, det.ymax))
                          STATE_REPO["detections"] = local_dets
                  return Gst.FlowReturn.OK
      
              def on_new_frame(sink):
                  sample = sink.emit("pull-sample")
                  if sample:
                      buffer = sample.get_buffer()
                      caps = sample.get_caps()
                      struct = caps.get_structure(0)
                      w = struct.get_value("width")
                      h = struct.get_value("height")
                      
                      STATE_REPO["stream_w"] = w
                      STATE_REPO["stream_h"] = h
                      
                      raw_bytes = buffer.extract_dup(0, buffer.get_size())
                      if raw_bytes:
                          try:
                              frame_flat = np.frombuffer(raw_bytes, dtype=np.uint8)
                              frame_arr = frame_flat.reshape((h, w, 3))
                              STATE_REPO["frame"] = frame_arr.copy()
                          except ValueError:
                              pass
                  return Gst.FlowReturn.OK
      
              native_sink.connect("new-sample", on_native_caps)
              post_sink.connect("new-sample", on_new_detection)
              frame_sink.connect("new-sample", on_new_frame)
              pipeline.set_state(Gst.State.PLAYING)
      
              bus = pipeline.get_bus()
              while True:
                  msg = bus.timed_pop_filtered(Gst.SECOND * 0.05, Gst.MessageType.ERROR | Gst.MessageType.EOS)
                  if msg:
                      if msg.type == Gst.MessageType.EOS and current_target_source.endswith(".mp4"):
                          pipeline.seek_simple(Gst.Format.TIME, Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT, 0)
                          continue
                      break
                  
                  if STATE_REPO["restart_flag"]:
                      break
              
              pipeline.set_state(Gst.State.NULL)
              STATE_REPO["frame"] = None
              STATE_REPO["detections"] = []
              STATE_REPO["native_w"] = 0
              STATE_REPO["native_h"] = 0
              STATE_REPO["stream_w"] = 0
              STATE_REPO["stream_h"] = 0
              STATE_REPO["inference_fps"] = 0.0
              inference_timestamps = []
              time.sleep(1.0)
      
      @app.route('/')
      def index():
          src_active = STATE_REPO["active_source"]
          
          if not STATE_REPO["source_registry"]:
              src_html = '<option value="" disabled selected>-- NO VALID INPUT SOURCES AVAILABLE --</option>'
          else:
              src_html = '<option value="" disabled selected>-- SELECT TARGET SOURCE CHANNEL --</option>' if src_active is None else ""
              src_html += "".join(f'<option value="{s}" {"selected" if s == src_active else ""}>{s}</option>' for s in STATE_REPO["source_registry"])
          
          mdl_active = STATE_REPO["active_model_name"]
          mdl_html = "".join(f'<option value="{m}" {"selected" if m == mdl_active else ""}>{m}</option>' for m in STATE_REPO["model_registry"])
      
          html_template = """<!DOCTYPE html>
          <html>
          <head>
              <title>Ara Stream Client</title>
              <style>
                  body { font-family: sans-serif; background: #0c0c0e; color: #e1e1e6; margin: 0; padding: 20px; display: flex; flex-direction: column; align-items: center; }
                  .dashboard-layout { display: flex; flex-direction: column; gap: 15px; width: 660px; }
                  .panel { background: #121216; padding: 12px 15px; border-radius: 6px; border: 1px solid #1f1f24; display: flex; flex-direction: column; gap: 10px; }
                  .control-row { display: flex; align-items: center; justify-content: space-between; }
                  label { font-size: 12px; font-weight: bold; color: #8f8f9d; text-transform: uppercase; }
                  select { background: #0c0c0e; color: #fff; border: 1px solid #04d361; padding: 6px 10px; border-radius: 4px; width: 420px; outline: none; }
                  .stats-banner { display: flex; justify-content: space-between; background: #17171f; padding: 10px 15px; border: 1px solid #1f1f24; border-radius: 4px; font-family: monospace; font-size: 13px; color: #8f8f9d; }
                  .stats-banner span strong { color: #04d361; }
                  .media-container { background: #121216; padding: 8px; border-radius: 6px; border: 1px solid #1f1f24; position: relative; min-height: 480px; display: flex; align-items: center; justify-content: center; }
                  img { display: block; border-radius: 4px; width: 100%; height: auto; }
                  .overlay { position: absolute; top: 0; left: 0; width: 100%; height: 100%; background: rgba(12,12,14,0.9); display: flex; flex-direction: column; align-items: center; justify-content: center; border-radius: 6px; text-align: center; }
                  .prompt-text { color: #04d361; font-weight: bold; font-size: 16px; margin-bottom: 10px; }
              </style>
              <script>
                  let streamStarted = {% if active_src %}true{% else %}false{% endif %};
                  
                  async function switchConfig() {
                      const src = document.getElementById('source-picker').value;
                      const mdl = document.getElementById('model-picker').value;
                      if(!src) return;
                      
                      await fetch('/api/swap_config', {
                          method: 'POST',
                          headers: { 'Content-Type': 'application/json' },
                          body: JSON.stringify({ "source": src, "model": mdl })
                      });
                      
                      streamStarted = true;
                      document.getElementById('gatekeeper-overlay').style.display = 'none';
                      setTimeout(() => {
                          document.getElementById('stream-player').src = '/stream.mjpg';
                      }, 1000);
                  }
      
                  async function updateStreamMetrics() {
                      if (!streamStarted) return;
                      try {
                          const response = await fetch('/api/stream_info');
                          const data = await response.json();
                          
                          document.getElementById('metric-res').innerText = 'Source:' + data.native_w + 'x' + data.native_h + ' Canvas:' + data.width + 'x' + data.height;
                          document.getElementById('metric-fps').innerText = data.fps.toFixed(1);
                          document.getElementById('metric-dets').innerText = data.detections;
                      } catch (err) {}
                  }
                  setInterval(updateStreamMetrics, 1000);
              </script>
          </head>
          <body>
              <h2>Ara Vision Engine</h2>
              <div class="dashboard-layout">
                  <div class="panel">
                      <div class="control-row">
                          <label for="source-picker">Media Stream Target:</label>
                          <select id="source-picker" onchange="switchConfig()">""" + src_html + """</select>
                      </div>
                      <div class="control-row">
                          <label for="model-picker">NPU Pipeline Model:</label>
                          <select id="model-picker" onchange="switchConfig()">""" + mdl_html + """</select>
                      </div>
                  </div>
      
                  <div class="stats-banner">
                      <span id="metric-res">Source:0x0 Canvas:0x0</span>
                      <span>NPU Inference: <span id="metric-fps">0.0</span> FPS</span>
                      <span>Active Detections: <span id="metric-dets">0</span></span>
                  </div>
      
                  <div class="media-container">
                      {% if not active_src %}
                      <div class="overlay" id="gatekeeper-overlay">
                          <div class="prompt-text">Awaiting Source Context</div>
                          <div style="color: #8f8f9d; font-size: 13px; max-width: 400px;">Please select a media path and model from the drop-downs above to mount your pipeline.</div>
                      </div>
                      {% endif %}
                      <img id="stream-player" {% if active_src %}src="/stream.mjpg"{% endif %} style="max-width: """ + str(STATE_REPO["CANVAS_W"]) + """px;" />
                  </div>
              </div>
          </body>
          </html>"""
          return render_template_string(html_template, active_src=src_active)
      
      @app.route('/api/stream_info')
      def stream_info():
          with lock:
              return jsonify({
                  "native_w": STATE_REPO["native_w"],
                  "native_h": STATE_REPO["native_h"],
                  "width": STATE_REPO["stream_w"],
                  "height": STATE_REPO["stream_h"],
                  "fps": STATE_REPO["inference_fps"],
                  "detections": len(STATE_REPO["detections"])
              })
      
      @app.route('/api/swap_config', methods=['POST'])
      def swap_config():
          payload = request.get_json()
          src_selected = payload.get("source")
          mdl_selected = payload.get("model")
          
          with lock:
              trigger_restart = False
              if src_selected in STATE_REPO["source_registry"] and STATE_REPO["active_source"] != src_selected:
                  STATE_REPO["active_source"] = src_selected
                  trigger_restart = True
              if mdl_selected in STATE_REPO["model_registry"] and STATE_REPO["active_model_name"] != mdl_selected:
                  base_dir = app.config["MODEL_DIR"]
                  STATE_REPO["active_model_name"] = mdl_selected
                  STATE_REPO["active_model_path"] = os.path.join(base_dir, mdl_selected, "model.dvm")
                  trigger_restart = True
              if trigger_restart:
                  STATE_REPO["restart_flag"] = True
          return jsonify({"status": "success"})
      
      def generate_mjpeg_stream_generator():
          MODEL_W = float(STATE_REPO["MODEL_W"])
          MODEL_H = float(STATE_REPO["MODEL_H"])
          
          while True:
              time.sleep(0.04)
              frame_copy = STATE_REPO["frame"]
              local_dets = list(STATE_REPO["detections"])
              if frame_copy is not None:
                  frame = frame_copy.copy()
                  h_native, w_native, _ = frame_copy.shape
                  for class_id, confidence, rx1, ry1, rx2, ry2 in local_dets:
                      cx1 = int(rx1 * (float(w_native) / MODEL_W))
                      cx2 = int(rx2 * (float(w_native) / MODEL_W))
                      cy1 = int(ry1 * (float(h_native) / MODEL_H))
                      cy2 = int(ry2 * (float(h_native) / MODEL_H))
                      label = f"{COCO_LABELS.get(class_id, f'Class {class_id}')} ({confidence*100:.1f}%)"
                      cv2.rectangle(frame, (cx1, cy1), (cx2, cy2), (0, 255, 97), 2)
                      cv2.putText(frame, label, (cx1, max(15, cy1 - 5)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 97), 2)
                  _, encoded_img = cv2.imencode(".jpg", frame)
                  yield (b'--frame\r\n'
                         b'Content-Type: image/jpeg\r\n\r\n' + encoded_img.tobytes() + b'\r\n')
              else:
                  waiting_canvas = np.zeros((480, 640, 3), dtype=np.uint8)
                  cv2.putText(waiting_canvas, "AWAITING MEDIA INPUT SELECTION...", (140, 240), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 97), 1)
                  _, encoded_img = cv2.imencode(".jpg", waiting_canvas)
                  yield (b'--frame\r\n'
                         b'Content-Type: image/jpeg\r\n\r\n' + encoded_img.tobytes() + b'\r\n')
      
      @app.route('/stream.mjpg')
      def video_feed_stream_route():
          return Response(generate_mjpeg_stream_generator(), mimetype='multipart/x-mixed-replace; boundary=frame')
      
      def main():
          parser = argparse.ArgumentParser(description="Wiki Template: Ara Flask Video Engine")
          parser.add_argument("--camera", default=None, help="Camera context device node path")
          parser.add_argument("--mp4", default=None, help="Directory containing target mp4 sample videos")
          parser.add_argument("--port", type=int, default=8080, help="Target port mapping")
          parser.add_argument("--model-dir", default="/usr/share/cnn/detection", help="Directory containing target models")
          parser.add_argument("--model", default="yolov8n", help="Initial model selection")
          args = parser.parse_args()
      
          app.config["MODEL_DIR"] = args.model_dir
          STATE_REPO["source_registry"] = []
      
          if args.camera and os.path.exists(args.camera):
              STATE_REPO["source_registry"].append(args.camera)
      
          if args.mp4 and os.path.exists(args.mp4):
              local_videos = glob.glob(os.path.join(args.mp4, "*.mp4"))
              for vid in sorted(local_videos):
                  STATE_REPO["source_registry"].append(vid)
      
          if os.path.exists(args.model_dir):
              discovered_models = []
              for entry in sorted(os.listdir(args.model_dir)):
                  full_subdir = os.path.join(args.model_dir, entry)
                  if os.path.isdir(full_subdir) and os.path.exists(os.path.join(full_subdir, "model.dvm")):
                      discovered_models.append(entry)
              if discovered_models:
                  STATE_REPO["model_registry"] = discovered_models
                  STATE_REPO["active_model_name"] = args.model if args.model in discovered_models else discovered_models[0]
                  STATE_REPO["active_model_path"] = os.path.join(args.model_dir, STATE_REPO["active_model_name"], "model.dvm")
      
          threading.Thread(target=gstreamer_orchestration_loop, daemon=True).start()
      
          print(f"Server serving on: http://localhost:{args.port}/")
          app.run(host='0.0.0.0', port=args.port, threaded=True, use_reloader=False, debug=False)
      
      if __name__ == '__main__':
          main()
      EOF
      
    • run the script (vison-webapp.py [--port <portno>] [--camera <camera-dev>] [--mp4 <mp4-dir>]
      uv run vision-webapp.py --camera /dev/video_webcam --mp4 /usr/share/ara2-vision-examples/sample_videos/
      
    • you can provide a webcam device path to enable streaming from a webcam and/or an mp4 directory to enable processing those. A dropdown will allow you to select the input stream and the model and the browser window will show you detections and statistics

screenshot of vision webapp performing labelling on an mp4 of traffic

eIQ AAF Connector

The eIQ AAF Connector (edge Intelligence Ara Application Framework) is a REST-based server that enables LLM inference on NXP i.MX processors with the ARA-240 DNPU. The API implemented is the de-facto API standard created by OpenAI for ChatGPT. It provides a simple Chat Completions-based HTTP interface for serving models to client applications.

Requirements:

  • python 3.13 (we will install in a virtual env)
  • uv - used for the user-specific Python virtual environment
  • Optimum Ara framework for running Large Language Models (LLMs) and Vision-Language Models (VLMs) on Ara240 (part of rt-sdk)
  • OpenCV (dependency of the QwenVL engine)
  • Models

Installation on a Gateworks board with Ubuntu based OS:

  • extract the debian 'data' (do not install the package!)
    # extract data (but don't install)
    dpkg-deb --vextract eiq-aaf-connector_2.0.deb /
    
  • take care of postinst steps
    1. Create the /usr/share/eiq/aaf-connector/venv (used by /usr/share/eiq/aaf-connector/venv/bin/connector)
      # needs python 3.13 so we will install it in a virtual env for this user
      uv python install 3.13
      uv venv --python 3.13 "/usr/share/eiq/aaf-connector/venv"
      # activate venv
      source "/usr/share/eiq/aaf-connector/venv/bin/activate"
      # install Python dependencies in venv from the Optimum Ara wheel
      uv pip install --no-progress /usr/share/python-wheels/optimum_ara-2.0.0.2-py3-none-any.whl
      # install Python dependencies in venv from the eIQ wheel in this package
      uv pip install --no-progress /usr/share/python-wheels/eiq_aaf_connector-2.0.0-py3-none-any.whl
      # ditch the default opencv-python which depends on libgl1-mesa and install the headless version instead
      uv pip uninstall opencv-python
      uv pip install opencv-python-headless
      # deactivate venv
      deactivate
      
    2. Create systemd service file (not sure why this wasn't in the deb)
      cat > /etc/systemd/system/eiq-aaf-connector.service << EOF
      [Unit]
      Description=eIQ AAF Connector Service
      # No 'After' or 'Wants' for rt-sdk-ara2.service here
      # This prevents the 'Ordering Cycle' entirely
      After=network.target
      StartLimitIntervalSec=0
      
      [Service]
      Type=simple
      User=root
      WorkingDirectory=/usr/share/eiq/aaf-connector
      
      # This loop now handles the dependency logic internally.
      # It will spin until the proxy is actually alive, regardless of 
      # which service started it or when.
      ExecStartPre=/bin/bash -c 'until ss -Hltn | grep -E -q ":5000([[:space:]]|$)"; do echo "Waiting for ARA2 Proxy to initialize..." >&2; sleep 5; done'
      ExecStartPre=/bin/sleep 2
      
      ExecStart=/usr/share/eiq/aaf-connector/venv/bin/connector --host 0.0.0.0 --port 8000
      
      Restart=on-failure
      RestartSec=10s
      StartLimitBurst=0
      
      StandardOutput=journal
      StandardError=journal
      
      [Install]
      WantedBy=multi-user.target
      EOF
      
    • this one differs from the one in the deb's postinst script as I found that one to not work (it would not wait for the proxy to be alive)
    • If you wish this to be accessible from the Network set the host to '0.0.0.0' instead of '127.0.0.1':
      sed -i 's|--host 127.0.0.1|--host 0.0.0.0|g' /etc/systemd/system/eiq-aaf-connector.service
      
    1. add Ara2 optimized LLM models (these get installed to /usr/share/llm)
      fetch_models --repo-id nxp/Qwen2.5-7B-Instruct-Ara240 # 7.7GiB
      fetch_models --repo-id nxp/Qwen2.5-Coder-1.5B-Ara240 # 1.67GiB
      
    2. edit the config file to enable the two models we just downloaded (using jq):
      apt update && apt install -y jq
      jq '(.available_models[] | select(.name == "Qwen2.5-Coder-1.5B") |  .enabled) = true' /usr/share/eiq/aaf-connector/server_config.json > /tmp/config.json && \
        mv /tmp/config.json /usr/share/eiq/aaf-connector/server_config.json
      jq '(.available_models[] | select(.name == "Qwen2.5-7B-Instruct") |  .enabled) = true' /usr/share/eiq/aaf-connector/server_config.json > /tmp/config.json && \
        mv /tmp/config.json /usr/share/eiq/aaf-connector/server_config.json
      
    • you can just as easily edit the file manually if you want
    1. Enable and start service
      # Enable service on boot
      systemctl enable eiq-aaf-connector.service
      # Start the service now (or reboot)
      systemctl start eiq-aaf-connector.service
      

Note that it takes several minutes for the service to actually be ready for connections as it must process the models (monitor with 'journalctl -u eiq-aaf-connector.service --no-pager -f' and test that its ready for listening with 'ss -tulpn | grep :8000').

By default, the connector configured above will start on 127.0.0.1:8000 which is the local loopback interface. To be able to run requests from another device, you can change the host to '0.0.0.0' in the service file.

Notable Files:

  • /usr/share/eiq/aaf-connector/server_config.json (server config file)
  • /usr/share/python-wheels/eiq_aaf_connector-2.0.0-py3-none-any.whl - Python wheel
  • /usr/bin/aaf-connector - shell script that activates the venv and executes the connector
  • /usr/share/eiq/aaf-connector/venv - Python virtual env used by connector
  • /etc/systemd/system/eiq-aaf-connector.service - systemd service

The connector self-hosts API documentation at http://<serverip>:8000/docs

Example Usage:

  • verify connector running
    # show service status
    systemctl status eiq-aaf-connector.service --no-pager -l
    # view detailed service logs
    journalctl -u eiq-aaf-connector.service
    # verify process exists
    ps -ef | grep aaf-connector
    # verify port open
    ss -tulpn | grep :8000 # show IP:PORT server is listening on
    
  • view API docs and interact with server (requires changing the host to '0.0.0.0' in the ExecStart config for /etc/systemd/system/eiq-aaf-connector.service by opening http://<serverip>:8000/docs
  • use API via curl/jq
    # make sure curl and jq are installed (jq allows easy interaction with json data)
    apt install -y curl jq 
    # list of models
    curl -X 'GET' \
      'http://127.0.0.1:8000/v1/models' \
      -H 'accept: application/json' | jq
    # get info about a specific model (Qwen2.5-7B-Instruct)
    curl -X 'GET' \
      'http://127.0.0.1:8000/params/Qwen2.5-7B-Instruct' \
      -H 'accept: application/json' | jq
    # send a LLM query
    curl -X POST http://127.0.0.1:8000/v1/chat/completions -H "Content-Type: application/json" -d '{
      "model": "Qwen2.5-7B-Instruct",
      "messages": [
        {"role": "system", "content": "You are a helpful assistant running on NXP i.MX hardware."},
        {"role": "user", "content": "Explain what an NPU is in one sentence."}
      ],
      "max_tokens": 50
    }' | jq
    
  • run connector by hand (useful for troubleshooting or monitoring)
    systemctl stop eiq-aaf-connector.service
    source "/usr/share/eiq/aaf-connector/venv/bin/activate"
    connector --host 0.0.0.0 --port 8000 # will run until stopped
    deactivate
    

Ara2 SDK examples

Here are some Ara2 SDK examples that were 'vibe coded' within minutes

dvapi stats

This is an ANSI c app that provides an example of using the dvapi to connect to the proxy and obtain NPU endpoint stats such as temperature, clocks and usage. Basically it's a re-implementation of the closed source /usr/share/rt-sdk-ara240/scripts/ara2_metrics_bin/hw_metrics.out.

ara_status.c:

#include <stdio.h>
#include <stdlib.h>
#include "dvapi.h"

int main() {
    dv_session_t *session = NULL;
    dv_endpoint_t *ep_list = NULL;
    int ep_count = 0;
    dv_status_code_t status;
    const char *socket_path = "/run/proxy.sock"; 

    // 1. Establish session
    status = dv_session_create_via_unix_socket(socket_path, &session);
    if (status != DV_SUCCESS) {
        fprintf(stderr, "Failed to connect: %s\n", dv_stringify_status_code(status));
        return 1;
    }

    // 2. Get list of NPU endpoints
    dv_endpoint_get_list(session, &ep_list, &ep_count);

    for (int i = 0; i < ep_count; i++) {
        dv_endpoint_t *ep = &ep_list[i];
        dv_endpoint_statistics_t *stats = NULL;
        int s_count = 0;
        bool is_busy = false;

        // 3. Retrieve status and statistics
        dv_get_endpoint_busyness(session, ep, &is_busy);
        status = dv_endpoint_get_statistics(session, ep, &stats, &s_count);

        if (status == DV_SUCCESS && s_count > 0) {
            // DRAM Calculations (Bytes to GB)
            double used_gb = (double)stats->ep_dram_stats.ep_total_dram_occupancy_size / 1073741824.0;
            double total_gb = (double)stats->ep_dram_stats.ep_total_dram_size / 1073741824.0;
            double dram_pct = (total_gb > 0) ? (used_gb / total_gb) * 100.0 : 0.0;

            // NPU Utilization (Queue occupancy)
            double npu_load = 0.0;
            if (stats->ep_infq_stats && stats->ep_infq_stats->length > 0) {
                npu_load = ((double)stats->ep_infq_stats->occupancy_count / stats->ep_infq_stats->length) * 100.0;
            }

            printf("--- NPU Endpoint %d Statistics ---\n", i);
            printf("Busy State:       %s\n", is_busy ? "TRUE" : "FALSE");
            printf("NPU Utilization:  %.1f%%\n", npu_load);
            printf("Temperature:      %.1f C\n", stats->ep_temp);
            printf("NNP Clock:        %d MHz\n", stats->ep_nnp_clk);
            printf("SBP Clock:        %d MHz\n", stats->ep_sbp_clk);
            printf("DRAM Clock:       %d MHz\n", stats->ep_dram_clk);
            
            // Format: DRAM Usage: 8.2GB/16.0GB (51.3%)
            printf("DRAM Usage:       %.1fGB/%.1fGB (%.1f%%)\n", used_gb, total_gb, dram_pct);
            printf("\n");

            dv_endpoint_free_statistics(stats, s_count);
        }
    }

    // 4. Cleanup
    dv_endpoint_free_group(ep_list);
    dv_session_close(session);
    return 0;
}

Compile:

apt update && apt install build-essentials
gcc ara_status.c -I/usr/include/sdk_ara/ -L/usr/lib/ -laraclient_aarch64 -o ara_status

Execution:

# ./ara_status
--- NPU Endpoint 0 Statistics ---
Busy State:       FALSE
NPU Utilization:  0.0%
Temperature:      56.0 C
NNP Clock:        900 MHz
SBP Clock:        355 MHz
DRAM Clock:       1066 MHz
DRAM Usage:       10.0GB/16.0GB (62.5%)

command-line python eIQ chatbot

This is a command-line chatbot written in python using the eIQ AAF Connector

chat.py:

import json
import requests
import time
import sys

API_URL = "http://127.0.0.1:8000/v1/chat/completions"
MODEL_NAME = "Qwen2.5-7B-Instruct" 

def chat():
    print(f"--- i.MX LLM Session (Model: {MODEL_NAME}) ---")
    print("Type 'exit' to stop.\n")
    
    history = [{"role": "system", "content": "You are a helpful AI assistant."}]

    while True:
        user_input = input("You: ")
        if user_input.lower() in ['exit', 'quit']:
            break

        history.append({"role": "user", "content": user_input})
        payload = {
            "model": MODEL_NAME,
            "messages": history,
            "temperature": 0.7,
            "stream": True 
        }

        print("AI: ", end="", flush=True)
        
        # Start timing
        start_time = time.time()
        full_reply = ""
        token_count = 0

        try:
            response = requests.post(API_URL, json=payload, stream=True)
            response.raise_for_status()

            for line in response.iter_lines():
                if line:
                    decoded_line = line.decode('utf-8')
                    if decoded_line.startswith("data: "):
                        content = decoded_line[6:]
                        if content.strip() == "[DONE]":
                            break
                        
                        chunk = json.loads(content)
                        if "choices" in chunk and chunk["choices"][0]["delta"].get("content"):
                            text = chunk["choices"][0]["delta"]["content"]
                            print(text, end="", flush=True)
                            full_reply += text
                            token_count += 1 # Rough estimate of tokens
            
            # End timing
            end_time = time.time()
            duration = end_time - start_time
            tps = token_count / duration if duration > 0 else 0

            print(f"\n\n--- Stats ---")
            print(f"Time taken: {duration:.2f} seconds")
            print(f"Throughput: {tps:.2f} tokens/sec")
            print(f"-------------\n")
            
            history.append({"role": "assistant", "content": full_reply})

        except Exception as e:
            print(f"\nError: {e}")

if __name__ == "__main__":
   chat()

Execution:

$ uv venv # create virtual python env in current dir
$ uv pip install requests # install python deps
$ uv run chat.py # run in venv
--- i.MX LLM Session (Model: Qwen2.5-7B-Instruct) ---
Type 'exit' to stop.

You: Why is the sky blue
AI: The sky appears blue because of a phenomenon called Rayleigh scattering. When sunlight enters the Earth's atmosphere, it collides with molecules and small particles in the air. Sunlight is made up of different colors, each of which has a different wavelength. Blue light has a shorter wavelength and is scattered more than other colors by the gases and particles in the atmosphere. This scattering makes the sky appear blue to our eyes.

During sunrise and sunset, the sky can appear red or orange because the light has to travel through more of the Earth's atmosphere. This longer path means that more blue and green light is scattered out of the beam, leaving the red and orange wavelengths to dominate the light that reaches our eyes.

So, the blue color of the sky is primarily due to the way shorter wavelength light is scattered by the Earth's atmosphere.

--- Stats ---
Time taken: 29.18 seconds
Throughput: 5.04 tokens/sec
-------------

You: exit

Web based python eIQ chatbot

This is a web based chatbot in python using eIQ AAF Connector

webchat.py:

import sys
import os
from datetime import datetime

# --- KINARA SDK PATH INJECTION ---
DVAPI_DIR = "/usr/share/rt-sdk-ara240_2.0.4/include"
if os.path.exists(DVAPI_DIR):
    sys.path.append(DVAPI_DIR)

import streamlit as st
import requests
import json
import time
import psutil
import threading
import argparse

# Attempt to import the Kinara Python APIs
try:
    from dvapi import DVSession, dv_endpoint_get_statistics, dv_endpoint_free_statistics
except ImportError:
    st.error(f"Critical: dvapi.py not found at {DVAPI_DIR}")
    st.stop()

# --- ARGUMENT PARSING ---
parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, default="127.0.0.1", help="AAF Connector Host")
parser.add_argument("--port", type=str, default="8000", help="AAF Connector Port")
parser.add_argument("--proxy-sock", type=str, default="/var/run/proxy.sock", help="Kinara Proxy socket")
args, _ = parser.parse_known_args()

# --- CONFIGURATION ---
MODEL_NAME = "Qwen2.5-7B-Instruct"
API_URL = f"http://{args.host}:{args.port}/v1/chat/completions"
LOGO_URL = "/root/gateworks_logo.png"

# --- HARDWARE TELEMETRY HELPERS ---
def get_dvapi_npu_stats():
    try:
        ret, session = DVSession.create_via_unix_socket(args.proxy_sock)
        if ret != 0: return None
        with session:
            ret, ep_list = session.get_endpoint_list()
            if ret != 0 or not ep_list: return None
            ret, stats_ptr, count = dv_endpoint_get_statistics(session._session, ep_list[0]._endpoint)
            if ret == 0 and count.value > 0:
                s = stats_ptr[0]
                TOTAL_CAPACITY_GB = 16.0
                free_gb = s.ep_dram_stats.ep_total_free_size / 1073741824
                used_gb = max(0, TOTAL_CAPACITY_GB - free_gb)
                dram_pct = (used_gb / TOTAL_CAPACITY_GB) * 100
                is_busy = st._npu_lock.locked()
                data = {"temp": s.ep_temp, "util": 100 if is_busy else 0, "ram_pct": dram_pct}
                dv_endpoint_free_statistics(stats_ptr, count)
                return data
    except: return None

def get_system_thermals():
    zones = []
    try:
        for zone in sorted(os.listdir("/sys/class/thermal/")):
            if zone.startswith("thermal_zone"):
                with open(f"/sys/class/thermal/{zone}/temp", "r") as f:
                    z_temp = int(f.read().strip()) / 1000.0
                zones.append(z_temp)
    except: pass
    return zones

def build_sidebar_html():
    n_stats = get_dvapi_npu_stats()
    cpu_usage = psutil.cpu_percent()
    sys_ram = psutil.virtual_memory().percent
    thermals = get_system_thermals()
    
    npu_html = f"<div style='border-top:1px solid #444; padding-top:5px; font-size:0.82rem;'><b>🔥 Ara2 NPU</b><br>"
    if n_stats:
        npu_html += f"NPU: {n_stats['util']}% {n_stats['temp']:.1f}C | RAM: {n_stats['ram_pct']:.1f}%"
    else:
        npu_html += "NPU Telemetry Unavailable"
    npu_html += "</div>"

    sys_html = f"<div style='border-top:1px solid #444; margin-top:8px; padding-top:5px; font-size:0.82rem;'><b>💻 Syst
m</b><br>"
    temp_str = "/".join([f"{t:.1f}C" for t in thermals])
    sys_html += f"CPU: {cpu_usage:.1f}% {temp_str} | RAM: {sys_ram:.1f}%</div>"

    perf_val = st.session_state.get('last_perf', 'N/A')
    perf_html = f"<div style='border-top:1px solid #444; margin-top:8px; padding-top:5px; font-size:0.82rem;'><b>⚡ Las
 Result</b><br>{perf_val}</div>"
    return npu_html + sys_html + perf_html

# --- GLOBAL STATE ---
if not hasattr(st, '_npu_lock'): st._npu_lock = threading.Lock()
if not hasattr(st, '_active_user'): st._active_user = "None"

st.set_page_config(page_title="Gateworks Venice AI", layout="wide")

# --- SIDEBAR ---
with st.sidebar:
    try: st.image(LOGO_URL, width=220)
    except: st.write("### Gateworks Venice")
    
    status_slot = st.empty()
    # Simplified to just show the IP address
    user_id = st.context.ip_address or "127.0.0.1"

    if st._npu_lock.locked():
        status_slot.warning(f"⚠️ BUSY: {st._active_user}")
    else:
        status_slot.success("🟢 READY")
    
    st.caption(f"User: {user_id}")
    
    stats_slot = st.empty()
    stats_slot.markdown(build_sidebar_html(), unsafe_allow_html=True)

# --- MAIN INTERFACE ---
st.title("🤖 i.MX Edge LLM")

if "messages" not in st.session_state: st.session_state.messages = []
for msg in st.session_state.messages:
    with st.chat_message(msg["role"]): st.markdown(msg["content"])

if prompt := st.chat_input("Ask the NPU..."):
    st.chat_message("user").markdown(prompt)
    st.session_state.messages.append({"role": "user", "content": prompt})

    # Console: Log the Incoming Request / Queue status
    ts_in = datetime.now().strftime("%H:%M:%S")
    print(f"[{ts_in}] QUEUED: Request from {user_id} -> '{prompt[:40]}...'")

    with st.chat_message("assistant"):
        response_placeholder = st.empty()
        
        # This lock handles the "Queued" logic—it will block here if someone else is talking
        with st._npu_lock:
            st._active_user = user_id
            status_slot.warning(f"⚠️ BUSY: {user_id}")
            
            ts_start = datetime.now().strftime("%H:%M:%S")
            print(f"[{ts_start}] PROCESSING: Active inference for {user_id}")
            
            full_response, token_count, start_time = "", 0, time.time()

            try:
                payload = {"model": MODEL_NAME, "messages": st.session_state.messages, "stream": True}
                r = requests.post(API_URL, json=payload, stream=True, timeout=120)
                
                for line in r.iter_lines():
                    if line:
                        decoded = line.decode('utf-8').replace('data: ', '')
                        if decoded.strip() == "[DONE]": break
                        try:
                            chunk = json.loads(decoded)
                            content = chunk["choices"][0]["delta"].get("content", "")
                            if content:
                                full_response += content
                                token_count += 1
                                response_placeholder.markdown(full_response + "▌")
                                
                                if token_count % 12 == 0:
                                    stats_slot.markdown(build_sidebar_html(), unsafe_allow_html=True)
                        except: continue

                duration = time.time() - start_time
                tps = token_count / duration if duration > 0 else 0
                st.session_state.last_perf = f"{token_count} tokens @ {tps:.1f} t/s"
                
                response_placeholder.markdown(full_response)
                st.session_state.messages.append({"role": "assistant", "content": full_response})

                # Console: Log Completion
                ts_out = datetime.now().strftime("%H:%M:%S")
                print(f"[{ts_out}] COMPLETE: {user_id} | {token_count} tokens | {tps:.1f} t/s")

            except Exception as e:
                st.error(f"Error: {e}")
                print(f"[{datetime.now().strftime('%H:%M:%S')}] ERROR: {e}")
            finally:
                st._active_user = "None"
                stats_slot.markdown(build_sidebar_html(), unsafe_allow_html=True)
                status_slot.success("🟢 READY")
                st.rerun()

Execution:

$ mkdir /root/webapp
$ cd /root/webapp
$ uv venv # create virtual python env in current dir
$ uv pip install streamlit requests psutil argparse # install python deps
$ uv run streamlit run webchat.py --server.address 0.0.0.0 --server.port 8501 -- --user-map users.json --host 127.0.0.1 --port 8000

Service:

  • if want this to run as a service:
    cat << EOF > /etc/systemd/system/eiq-webapp.service:
    [Unit]
    Description=Streamlit Webapp for eIQ AAF
    # Start after network is up
    After=network.target
    # We don't use 'After=eiq-aaf-connector.service' to avoid potential boot cycles
    StartLimitIntervalSec=0
    
    [Service]
    Type=simple
    User=root
    # Ensure we are in the directory where webapp.py lives
    WorkingDirectory=/root/webapp
    
    # 1. Wait until the Connector is actually listening on Port 8000
    ExecStartPre=/bin/bash -c 'until ss -Hltn | grep -E -q ":8000([[:space:]]|$)"; do echo "Waiting for eIQ Connector on Port 8000..." >&2; sleep 5; done'
    
    # 2. Launch the app using uv
    # Note: Using absolute path for uv is safer in systemd
    ExecStart=/usr/local/bin/uv run streamlit run webapp.py \
        --server.address 0.0.0.0 \
        --server.port 8501 \
        -- \
        --user-map users.json \
        --host 127.0.0.1 \
        --port 8000
    
    # Restart logic
    Restart=on-failure
    RestartSec=10s
    StartLimitBurst=0
    
    # Standard Logging
    StandardOutput=journal
    StandardError=journal
    
    [Install]
    WantedBy=multi-user.target
    EOF
    systemctl daemon-reload
    systemctl enable eiq-webapp.service
    systemctl start eiq-webapp.service
    

Web based python VLM eIQ example

The eIQ AAF Connector can be used to analyze video and images.

Here is an example of a headless web-app based off NXP's vlm-edge-studio example using:

  • Qwen2.5-VL-7B-Instruct-Ara240
  • eIQ AAF Connector

Example:

  • if you want some video examples you can download NXP's vlm-edge-studio_1.0.0.deb and extract its data:
    # extract data (but don't install the deb)
    dpkg-deb --vextract vlm-edge-studio_1.0.0.deb /
    
    • this installs a number of videos to /usr/share/vlm-edge-studio/assets/videos
  • The AAF connector requires a lot of DRAM when loading large models (ie the 12GB Qwen2.5-VL-7B-Instruct model) so we will create a swap file to avoid memory issues when loading the model:
    swapon --show # shows nothing as not enabled
    # pre-allocate space for swap file
    fallocate -l 4G /swapfile
    # make sure it is accessible by root only
    chmod 600 /swapfile
    # format the file as swap
    mkswap /swapfile
    # activate the swapfile
    swapon /swapfile
    # add it to /etc/fstab so that it mounts on boot
    echo '/swapfile none swap sw 0 0' >> /etc/fstab
    
  • install Qwen2.5-VL-7B-Instruct-Ara240 model
    fetch_models --repo-id nxp/Qwen2.5-VL-7B-Instruct-Ara240 # 12GB
    
  • To avoid loading models we are not using into the ARA and run into memory issues, disable all models except for Qwen2.5-7B-Instruct in the AAF connectors config file:
    python3 -c 'import json; p="/usr/share/eiq/aaf-connector/server_config.json"; f=open(p,"r+"); d=json.load(f); [m.update({"enabled": (m.get("name") == "Qwen2.5-VL-7B-Instruct")}) for m in d.get("available_models", [])]; f.seek(0); json.dump(d, f, indent=4); f.truncate()'
    # restart AAF connector
    systemctl restart eiq-aaf-connector.service
    # wait for it to be up and running (as it will take several minutes to load the 12GB Qwen2.5-7B-Instruct to the ARA)
    until (echo > /dev/tcp/127.0.0.1/8000) >/dev/null 2>&1; do echo -n .; sleep 1; done
    
  • create a dir for us to work in and create the python script
    mkdir vlm-webapp; cd vlm-webapp
    cat << \EOF > vlm.py
    import argparse
    import os
    import httpx
    import uvicorn
    import json
    import urllib.request
    import time
    from datetime import datetime
    from fastapi import FastAPI, HTTPException
    from fastapi.responses import HTMLResponse, StreamingResponse
    from fastapi.staticfiles import StaticFiles
    from pydantic import BaseModel
    from typing import List, Dict
    
    # ═══════════════════════════════════════════════════════════════
    # Command Line Arguments & Global Constants Configuration
    # ═══════════════════════════════════════════════════════════════
    parser = argparse.ArgumentParser(description="VLM Edge Studio WebApp Bridge")
    parser.add_argument("--video-dir", required=True, help="Directory path where video MP4 files are hosted")
    parser.add_argument("--aaf-server", default="http://127.0.0.1:8000", help="AAF Server backend Base URL")
    parser.add_argument("--host", default="0.0.0.0", help="Host binding address for this web application")
    parser.add_argument("--port", type=int, default=8080, help="Port binding for this web application")
    parser.add_argument("--verbose", action="store_true", default=False, help="Enable verbose raw JSON payload terminal dumping")
    
    args, _ = parser.parse_known_args()
    
    TARGET_MODEL = "Qwen2.5-VL-7B-Instruct"
    
    app = FastAPI(title="VLM Video Web Analyzer")
    
    if not os.path.isdir(args.video_dir):
        raise RuntimeError(f"Provided video directory target does not exist: {args.video_dir}")
    
    # Mount local streaming static location directly from the primary video-dir configuration
    app.mount("/stream/videos", StaticFiles(directory=args.video_dir), name="videos")
    
    class ChatMessage(BaseModel):
        role: str
        content: str
    
    class MultiTurnPayload(BaseModel):
        video_name: str
        history: List[ChatMessage]
    
    def get_timestamp():
        return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
    
    # ═══════════════════════════════════════════════════════════════
    # API Engine Intercept Proxy Routes
    # ═══════════════════════════════════════════════════════════════
    
    @app.get("/api/videos", tags=["Media"])
    async def get_available_videos():
        try:
            if not os.path.exists(args.video_dir):
                return []
            files = os.listdir(args.video_dir)
            valid_extensions = (".mp4", ".mov", ".mkv", ".avi")
            return [f for f in files if f.lower().endswith(valid_extensions)]
        except Exception as e:
            raise HTTPException(status_code=500, detail=str(e))
    
    @app.get("/api/metrics")
    async def proxy_metrics():
        headers = {"Accept": "application/json", "User-Agent": "AAFConnector/1.0"}
        async with httpx.AsyncClient() as client:
            try:
                url = f"{args.aaf_server}/metrics/"
                response = await client.get(url, params={"model_name": TARGET_MODEL}, headers=headers, timeout=3.0)
                return response.json()
            except Exception as e:
                return {
                    TARGET_MODEL: {
                        "llm_average_token_per_second": 0.0,
                        "llm_first_infer_duration": 0.0,
                        "generated_token_num": 0
                    }
                }
    
    @app.post("/api/analyze")
    async def proxy_analysis_stream(payload: MultiTurnPayload):
        absolute_video_target_path = os.path.join(args.video_dir, payload.video_name)
        start_time = time.time()
        
        if len(payload.history) > 1:
            flattened_text = "Here is the conversation history context for this execution sequence:\n"
            for msg in payload.history[:-1]:
                label = "User Question" if msg.role == "user" else "Your Previous Response"
                flattened_text += f"[{label}]: {msg.content}\n"
            flattened_text += f"\n[New Follow-up Question to Answer]: {payload.history[-1].content}"
        else:
            flattened_text = payload.history[0].content
    
        aaf_payload = {
            "model": TARGET_MODEL,
            "stream": True,
            "messages": [{
                "role": "user",
                "content": [
                    {"type": "text", "text": flattened_text},
                    {"type": "video_url", "video_url": {"url": absolute_video_target_path}}
                ]
            }]
        }
    
        print("\n" + "═"*70)
        print(f"[{get_timestamp()}] [CONVERSATIONAL INFERENCE DISPATCH]")
        print(f"  Model ID      : {TARGET_MODEL}")
        print(f"  Target Path   : {absolute_video_target_path}")
        print(f"  Turn Count    : {len(payload.history)} turns processed in session state.")
        
        if args.verbose:
            print("─"*70)
            print("[RAW OUTGOING JSON PAYLOAD SENT TO AAF SERVER]:")
            print(json.dumps(aaf_payload, indent=2))
            
        print("═"*70 + "\n")
    
        def raw_socket_generator():
            target_endpoint = f"{args.aaf_server}/v1/chat/completions"
            data_bytes = json.dumps(aaf_payload).encode('utf-8')
            
            req = urllib.request.Request(
                target_endpoint,
                data=data_bytes,
                headers={
                    "Content-Type": "application/json",
                    "Accept": "application/json",
                    "User-Agent": "AAFConnector/1.0"
                },
                method="POST"
            )
            
            first_token_received = False
            
            try:
                with urllib.request.urlopen(req, timeout=300.0) as response:
                    while True:
                        line_bytes = response.readline()
                        if not line_bytes:
                            break  
                        
                        line_str = line_bytes.decode('utf-8', errors='ignore')
                        trimmed = line_str.strip()
                        
                        if trimmed:
                            yield f"{trimmed}\n".encode('utf-8')
                            
                            if trimmed.startswith('data: '):
                                data_content = trimmed[5:].strip()
                                if data_content == "[DONE]":
                                    continue
                                    
                                try:
                                    parsed = json.loads(data_content)
                                    token = parsed["choices"][0]["delta"].get("content", "")
                                    if token:
                                        if not first_token_received:
                                            ttft_duration = time.time() - start_time
                                            print(f"[{get_timestamp()}] [TTFT / DECODE PHASE]: {ttft_duration:.2f}s.")
                                            print(f"[{get_timestamp()}] [STREAMING TEXT TOKENS]: ", end="")
                                            first_token_received = True
                                        
                                        print(token, end="", flush=True)
                                except Exception:
                                    pass
                            
            except urllib.error.HTTPError as http_err:
                err_body = http_err.read().decode('utf-8', errors='ignore')
                yield f"data: {{\"error\": \"AAF Server Engine error {http_err.code}: {err_body}\"}}\n\n".encode('utf-8')
            except Exception as e:
                yield f"data: {{\"error\": \"Direct socket pipeline fault: {str(e)}\"}}\n\n".encode('utf-8')
            finally:
                duration = time.time() - start_time
                print("\n" + "═"*70)
                print(f"[{get_timestamp()}] [INFERENCE COMPLETED] Turn Runtime: {duration:.2f}s")
                print("═"*70 + "\n")
    
        return StreamingResponse(raw_socket_generator(), media_type="text/event-stream")
    
    # ═══════════════════════════════════════════════════════════════
    # User Interface (HTML Layer)
    # ═══════════════════════════════════════════════════════════════
    @app.get("/", response_class=HTMLResponse)
    async def serve_index():
        # Enforcing a raw python string (r"") so Python never converts or drops backslashes
        html_content = r"""
        <!DOCTYPE html>
        <html lang="en">
        <head>
            <meta charset="UTF-8">
            <title>VLM Edge Studio Analyzer</title>
            <script src="https://cdn.tailwindcss.com"></script>
            <style>
                .skeleton-pulse {
                    background: linear-gradient(-90deg, #1e293b 0%, #334155 50%, #1e293b 100%);
                    background-size: 400% 400%;
                    animation: pulse 1.5s ease-in-out infinite;
                }
                @keyframes pulse {
                    0% { background-position: 100% 50%; }
                    100% { background-position: 0% 50%; }
                }
            </style>
        </head>
        <body class="bg-gray-900 text-gray-100 min-h-screen p-6">
            <div class="max-w-6xl mx-auto space-y-6">
                <header class="border-b border-gray-800 pb-4 flex justify-between items-center">
                    <div>
                        <h1 class="text-2xl font-bold tracking-wide text-indigo-400">VLM Edge Platform Interface</h1>
                        <p id="metricsPanel" class="text-xs text-gray-400 mt-1 font-mono">Metrics: Waiting for pipeline activity...</p>
                    </div>
                    <div class="flex items-center space-x-3">
                        <span class="text-xs font-mono bg-gray-800 border border-gray-700 rounded px-2.5 py-1 text-indigo-300">Target Profile: __MODEL_NAME_PLACEHOLDER__</span>
                        <button id="clearChatBtn" class="bg-red-900/40 hover:bg-red-800 border border-red-700 text-red-200 text-xs py-1.5 px-3 rounded transition-colors">Clear Chat History</button>
                    </div>
                </header>
    
                <div class="grid grid-cols-1 lg:grid-cols-3 gap-6">
                    <div class="lg:col-span-2 space-y-4">
                        <div class="flex items-center space-x-4">
                            <label class="font-medium text-sm">Select Stream Source:</label>
                            <select id="videoSelect" class="flex-1 bg-gray-800 border border-gray-700 rounded p-2 focus:outline-none focus:border-indigo-500"></select>
                        </div>
                        <div class="bg-black rounded-lg overflow-hidden aspect-video relative flex items-center justify-center border border-gray-800">
                            <video id="videoPlayer" controls class="w-full h-full hidden"></video>
                            <div id="videoPlaceholder" class="text-gray-500 text-sm">No Active Video Stream Sample Loaded</div>
                        </div>
                    </div>
    
                    <div class="flex flex-col h-[480px]">
                        <div class="bg-gray-800 border border-gray-700 rounded-lg p-4 flex-1 flex flex-col min-h-0 relative overflow-hidden">
                            <div class="flex justify-between items-center mb-3 flex-none">
                                <h2 class="text-sm font-semibold tracking-wider text-gray-400 uppercase">Conversational History Log</h2>
                                <div id="busySpinner" class="hidden h-4 w-4 animate-spin rounded-full border-2 border-indigo-500 border-t-transparent"></div>
                            </div>
                            <div id="chatHistoryLog" class="flex-1 space-y-4 text-sm overflow-y-auto bg-gray-900 p-3 rounded border border-gray-750 font-mono min-h-0">
                                <div class="text-gray-500 text-xs italic">System initialized. Awaiting prompt loop...</div>
                            </div>
                        </div>
                        
                        <div class="space-y-2 mt-4 flex-none">
                            <textarea id="promptInput" rows="2" class="w-full bg-gray-800 border border-gray-700 rounded-lg p-3 text-sm focus:outline-none focus:border-indigo-500 resize-none placeholder-gray-500" placeholder="Ask a follow-up question..."></textarea>
                            <button id="submitBtn" class="w-full bg-indigo-600 hover:bg-indigo-500 disabled:bg-gray-700 disabled:cursor-not-allowed text-white font-medium py-2.5 px-4 rounded-lg transition-colors flex items-center justify-center space-x-2">
                                <span id="btnText">Execute Analysis Prompt</span>
                            </button>
                        </div>
                    </div>
                </div>
            </div>
    
            <script>
                const videoSelect = document.getElementById('videoSelect');
                const videoPlayer = document.getElementById('videoPlayer');
                const videoPlaceholder = document.getElementById('videoPlaceholder');
                const promptInput = document.getElementById('promptInput');
                const submitBtn = document.getElementById('submitBtn');
                const btnText = document.getElementById('btnText');
                const chatHistoryLog = document.getElementById('chatHistoryLog');
                const metricsPanel = document.getElementById('metricsPanel');
                const busySpinner = document.getElementById('busySpinner');
                const clearChatBtn = document.getElementById('clearChatBtn');
    
                let chatHistoryBuffer = [];
    
                async function initializeApp() {
                    try {
                        const videoRes = await fetch('/api/videos');
                        const videos = await videoRes.json();
                        videos.forEach(v => videoSelect.add(new Option(v, v)));
                        
                        if(videos.length > 0) handleVideoChange(videos[0]);
                    } catch (e) {
                        chatHistoryLog.innerHTML = `<div class="text-red-400">Initialization Fault: ${e.message}</div>`;
                    }
                }
    
                function appendMessageBlock(role, text, isSkeleton = false) {
                    const block = document.createElement('div');
                    block.className = `p-2.5 rounded border ${role === 'user' ? 'bg-indigo-950/40 border-indigo-900/60 ml-6' : 'bg-gray-800/60 border-gray-700/50 mr-6'} ${isSkeleton ? 'skeleton-pulse min-h-[40px]' : ''}`;
                    
                    const senderLabel = document.createElement('div');
                    senderLabel.className = `text-[10px] font-bold uppercase tracking-wider mb-1 ${role === 'user' ? 'text-indigo-400' : 'text-gray-400'}`;
                    senderLabel.textContent = role === 'user' ? '● User Prompt' : '● Model Response';
                    
                    const contentText = document.createElement('div');
                    contentText.className = "whitespace-pre-wrap leading-relaxed break-words text-sm font-mono text-gray-100";
                    if (!isSkeleton) contentText.textContent = text;
                    
                    block.appendChild(senderLabel);
                    block.appendChild(contentText);
                    chatHistoryLog.appendChild(block);
                    chatHistoryLog.scrollTop = chatHistoryLog.scrollHeight;
                    return contentText;
                }
    
                async function updateMetrics(clientLatencySec) {
                    try {
                        const res = await fetch('/api/metrics');
                        const root = await res.json();
                        const metrics = Object.values(root)[0];
                        if (metrics) {
                            const tps = metrics.llm_average_token_per_second?.toFixed(1) || "0.0";
                            const ttft = metrics.llm_first_infer_duration?.toFixed(2) || "0.00";
                            const tokens = metrics.generated_token_num || 0;
                            metricsPanel.textContent = `Metrics: ${tps} tok/s • TTFT: ${ttft}s • ${tokens} tokens • Latency: ${clientLatencySec.toFixed(2)}s`;
                        }
                    } catch (e) {
                        console.error(e);
                    }
                }
    
                function handleVideoChange(filename) {
                    resetChatHistory();
                    if(!filename) {
                        videoPlayer.classList.add('hidden');
                        videoPlaceholder.classList.remove('hidden');
                        return;
                    }
                    videoPlaceholder.classList.add('hidden');
                    videoPlayer.classList.remove('hidden');
                    videoPlayer.src = `/stream/videos/${encodeURIComponent(filename)}`;
                    videoPlayer.load();
                }
    
                function resetChatHistory() {
                    chatHistoryBuffer = [];
                    chatHistoryLog.innerHTML = `<div class="text-gray-500 text-xs italic">Conversation wiped. Ready for prompt input...</div>`;
                    promptInput.value = "what is happening in this video?";
                }
    
                videoSelect.addEventListener('change', (e) => handleVideoChange(e.target.value));
                clearChatBtn.addEventListener('click', resetChatHistory);
    
                submitBtn.addEventListener('click', async () => {
                    const prompt = promptInput.value.trim();
                    const video_name = videoSelect.value;
                    
                    if (!prompt || !video_name) return;
    
                    const clientStartTime = performance.now();
    
                    appendMessageBlock('user', prompt);
                    chatHistoryBuffer.push({ role: 'user', content: prompt });
                    
                    promptInput.value = "";
                    submitBtn.disabled = true;
                    btnText.textContent = "Processing Inference...";
                    busySpinner.classList.remove('hidden');
                    
                    const liveResponseNode = appendMessageBlock('assistant', "Connecting...", true);
                    
                    try {
                        const response = await fetch('/api/analyze', {
                            method: 'POST',
                            headers: { 'Content-Type': 'application/json' },
                            body: JSON.stringify({ video_name, history: chatHistoryBuffer })
                        });
    
                        if (!response.ok) throw new Error("Server engine pipeline connection fault.");
    
                        liveResponseNode.parentElement.classList.remove('skeleton-pulse');
                        liveResponseNode.textContent = "";
    
                        const reader = response.body.getReader();
                        const decoder = new TextDecoder();
                        let buffer = "";
                        let fullModelResponse = "";
    
                        while (true) {
                            const { value, done } = await reader.read();
                            if (done) break;
                            
                            buffer += decoder.decode(value, { stream: true });
                            const lines = buffer.split('\n');
                            buffer = lines.pop(); 
                            
                            for (const line of lines) {
                                const trimmed = line.trim();
                                
                                if (!trimmed || !trimmed.startsWith('data: ')) continue;
                                
                                const dataStr = trimmed.slice(5).trim();
                                if (dataStr === '[DONE]') continue;
                                
                                try {
                                    const json = JSON.parse(dataStr);
                                    if(json.error) {
                                        liveResponseNode.textContent += `\n[AAF Error]: ${json.error}`;
                                        continue;
                                    }
                                    
                                    const contentToken = json.choices?.[0]?.delta?.content || "";
                                    if (contentToken) {
                                        fullModelResponse += contentToken;
                                        liveResponseNode.textContent = fullModelResponse;
                                        chatHistoryLog.scrollTop = chatHistoryLog.scrollHeight;
                                    }
                                } catch(e) {}
                            }
                        }
                        
                        chatHistoryBuffer.push({ role: 'assistant', content: fullModelResponse });
    
                        const clientLatencySec = (performance.now() - clientStartTime) / 1000;
                        setTimeout(() => updateMetrics(clientLatencySec), 500);
    
                    } catch (err) {
                        liveResponseNode.parentElement.classList.remove('skeleton-pulse');
                        liveResponseNode.textContent = `\n[Pipeline Runtime Exception]: ${err.message}`;
                    } finally {
                        submitBtn.disabled = false;
                        btnText.textContent = "Execute Analysis Prompt";
                        busySpinner.classList.add('hidden');
                    }
                });
    
                initializeApp();
            </script>
        </body>
        </html>
        """
        return HTMLResponse(content=html_content.replace("__MODEL_NAME_PLACEHOLDER__", TARGET_MODEL))
    
    if __name__ == "__main__":
        uvicorn.run(app, host=args.host, port=args.port)
    EOF
    
  • create a python virtual env (always a good idea to keep python dependencies containerized) and install python modules we need:
    # create a venv (.venv)
    uv venv
    # install our scripts dependencies
    uv pip install httpx uvicorn fastapi argparse
    
  • run the app giving it the host interface and port to listen on the URL of the AAF server and the directory of the videos:
    # run the app
    uv run vlm.py  --host 0.0.0.0 --port 8080 --video-dir /usr/share/vlm-edge-studio/assets/videos --aaf-server http://127.0.0.1:8000
    
    • Note that the AAF server must have access to the video so if for some reason its running on a different server make sure to handle adjusting the URL that is submitted to analyze
  • open a browser to the host port 8080, select a video and submit a query

screenshot of vlm webapp showing processing of what is going on in a video

Troubleshooting

Please note software support should be routed through NXP, who produces the Ara240 DNPU Chip and Software SDK.

https://community.nxp.com

Attachments (11)

Download all attachments as: .zip

Note: See TracWiki for help on using the wiki.