Few months ago I wrote a post
to extract h264
data from a v380
camera and streamed it over RTSP
using
rtsp simple server. Recently I coupled that with
Asus TinkerBoard
and Balena cloud to have the ability for the v380
camera to detect a person in the frame.
To implement this I tried (as much as I could) to use the code that is already available and
not to write anything from the scratch. So for the person detection, I used the code from
pyimagesearch.
To deploy this code, I used balena's debian-python:3.7-build container image
.
To stich everything together and pass on the output from the object detection module to the clinet, I used tornado.
I’m yet to post the source code for this demo on github, I’ll do that soon.
How does it work?
There are two parts, the client that runs on a computer/PC and the server that runs on the tinkerboard (you can use other boards also, balena supports a wide range of devices).
- We have a
python3.7
container running on the tinkerboard which hasopencv
installed on it. - We have an
http
server to handle the websocket connections. - We have a program that reads the
RTSP
stream, applies the person detection algorithm and gets the co-ordinates for the person in the give frame. - We send these detections over the websocket to the client.
- The client on the other hand opens a new
RTSP
connection and reads the stream. - It also connects to the web server running on the tinker board. Over the websocket it receives the detections (co-ordinates for the bounding boxes).
- The client then draws those bounding boxes over the video feed that is coming from the
RTSP
server and displays it.
How to implement it?
Create an application and add devices to it on balena cloud
- Login to balena.io
- Create a new application
- add devices to the application.
- Download and write balena os image on an SDcard. In this step you can also configure the wifi network credentials.
- Power on the device. Soon after the device powers on, it will connect to your network (ethernet/wifi depending upon what you have configured) and you will be able to see it’s status “connected” on the balena dashboard.
The detailed documentation for doing the above mentioned process is available at balena.io/docs
Just a quick note, you can ssh into both your host os running on the device as well as in a specific container where your application is running.
In order to do that
# to login to an application container
$ balena ssh <device id> <container>
# to login to the host os on the device
$ balena ssh <device id>
The CODE
For this demo, I’ve used tornado instead of flask. The main reason behind using tornado is that it uses non blocking network I/O so it is better for serving the data over websocket.
Web server:
#!python3 import tornado.ioloop import tornado.web import tornado.websocket clients = [] # Handle http response for '/' class MainHandler(tornado.web.RequestHandler): def get(self): self.write("This is a websocket-server for person detection demo") # Handle socket connections from device. # Receive the messages (bounding boxes) fromthe device # and pass it on to the client for rendering. class InputWebSocket(tornado.websocket.WebSocketHandler): def open(self): print("WebSocket opened") # Forward the incoming message to all of the clients. def on_message(self, message): for i in clients: i.write_message(message) def on_close(self): print("WebSocket closed") # Handle incoming connections from the clinets class OutWebSocket(tornado.websocket.WebSocketHandler): def open(self): # Add incoming connection to the client list. print("OutWebSocket opened") clients.append(self) def on_message(self, message): pass def on_close(self): print("WebSocket closed") clients.remove(self) def make_app(): return tornado.web.Application( [ (r"/", MainHandler), (r"/in", InputWebSocket), (r"/out", OutWebSocket), ] ) if __name__ == "__main__": app = make_app() app.listen(80) tornado.ioloop.IOLoop.current().start()
Dockerfile.template
:Dockerfile.template
is a special dockerfile file that can be used for many devices. Here is the documentation for that.FROM balenalib/%%BALENA_MACHINE_NAME%%-debian-python:3.7-build # Set the working directory to /usr/src/app WORKDIR /usr/src/app # Install dependencies for opencv RUN apt-get update && \ apt-get install -yq \ python3 \ python3-dev \ python3-pip \ python3-setuptools \ gstreamer-1.0 \ v4l-utils \ libopus-dev \ libvpx-dev \ libsrtp2-dev \ libopencv-dev \ libatlas3-base \ libatlas-base-dev \ libjasper-dev \ libavformat-dev \ libswscale-dev \ libqtgui4 \ libqt4-test \ libavdevice-dev \ libavfilter-dev \ libavcodec-dev \ libhdf5-dev \ libhdf5-serial-dev && apt-get clean && rm -rf /var/lib/apt/lists/* # Install python libraries RUN curl -s https://bootstrap.pypa.io/get-pip.py | python3 RUN pip3 install --upgrade pip RUN pip3 install --upgrade urllib3 RUN pip3 install --upgrade setuptools wheel RUN pip3 install numpy opencv-python --index-url https://www.piwheels.org/simple COPY requirements.txt requirements.txt # Install requirements for the application RUN pip3 install -r requirements.txt # Copy the source code to working directory COPY . ./ ENV UDEV=1 # main.py will run when container starts up on the device CMD ["bash","-c","./start-services.sh"]
Script to start the services:
#!/bin/sh bash -c 'python3 -u src/detection.py -u "<Video stream URL>"'& python3 -u src/main.py
Service for person detection:
#!python3 from imutils.video import VideoStream import base64 import websocket import cv2 import imutils import sys import json import numpy as np import argparse import cv2 cv2.useOptimized() model = "MobileNetSSD_deploy.caffemodel" proto = "MobileNetSSD_deploy.prototxt.txt" # initialize the list of class labels MobileNet SSD was trained to # detect, then generate a set of bounding box colors for each class CLASSES = [ "person" ] COLORS = np.random.uniform(0, 255, size=(len(CLASSES), 3)) # construct the argument parse and parse the arguments ap = argparse.ArgumentParser() ap.add_argument("-u", "--url", required=True, help="URL for the video") ap.add_argument( "-c", "--confidence", type=float, default=0.2, help="minimum probability to filter weak detections", ) args = vars(ap.parse_args()) # load our serialized model from disk print("[INFO] loading model...") net = cv2.dnn.readNetFromCaffe(proto, model) img = {"img": np.zeros(255)} ws = [] def init_connection(): try: ws.append(websocket.WebSocket()) ws[0].connect("ws://127.0.0.1:80/in") except Exception as e: print(e) sys.exit(1) def detect(image): (h, w) = image.shape[:2] blob = cv2.dnn.blobFromImage( cv2.resize(image, (300, 300)), 0.007843, (300, 300), 127.5 ) # pass the blob through the network and obtain the detections and # predictions print("[INFO] computing object detections...") net.setInput(blob) detections = net.forward() boxes = [] # loop over the detections for i in np.arange(0, detections.shape[2]): # extract the confidence (i.e., probability) associated with the # prediction confidence = detections[0, 0, i, 2] # filter out weak detections by ensuring the `confidence` is # greater than the minimum confidence if confidence > args["confidence"]: # extract the index of the class label from the `detections`, # then compute the (x, y)-coordinates of the bounding box for # the object idx = int(detections[0, 0, i, 1]) box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) (startX, startY, endX, endY) = box.astype("int") boxes.append([int(startX), int(startY), int(endX), int(endY)]) # display the prediction label = "{}: {:.2f}%".format(CLASSES[idx], confidence * 100) print("[INFO] {}".format(label)) cv2.rectangle(image, (startX, startY), (endX, endY), COLORS[idx], 2) y = startY - 15 if startY - 15 > 15 else startY + 15 cv2.putText( image, label, (startX, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, COLORS[idx], 2 ) # send the detections over websocket data = json.dumps({"detections": boxes}) print(data) ws[0].send(data) def start(url): cap = VideoStream(url).start() print("device init-success") while True: try: frame = cap.read() if type(frame) is np.ndarray: detect(frame) else: pass except KeyboardInterrupt as ke: break cv2.destroyAllWindows() if __name__ == "__main__": init_connection() start(args["url"])
The client:
This is the client that will run on a pc, the purpose of this tool is to
- receive the
RTSP
stream - connect to the web socket server runnin on the device
- get the detections
- draw bounding boxes for the given detections on the frame
- display the frame
#!python3 from base64 import b64encode, b64decode import cv2 import websocket import numpy as np import json import sys import imutils try: import thread except ImportError: import _thread as thread import time current_detections = {"list": []} status_ = {"received": False} def read_cam(url): if url: vc = cv2.VideoCapture(url) while True: # read the frame from rtsp stream status, frame = vc.read() if status: if status_["received"]: if current_detections["list"]: # Draw the bounding boxes for (x, y, w, h) in current_detections["list"][0]: cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 2) # Display the frame cv2.imshow("img", frame) if cv2.waitKey(1) & 0xFF == ord("q"): sys.exit(0) # Receive the data from websocket def on_message(ws, data): if data: print("data received") dict_op = json.loads(data) if len(current_detections["list"]): current_detections["list"].pop(0) if "detections" in dict_op: current_detections["list"].append(dict_op["detections"]) status_["received"] = True def on_error(ws, error): print(error) def on_close(ws): print("### closed ###") def on_open(ws): pass if __name__ == "__main__": rtsp_url = sys.argv[1] ws_url = sys.argv[2] # start reading the frames from rtsp stream in a saparate thread. thread.start_new_thread(read_cam, (rtsp_url,)) # create a websocket connection websocket.enableTrace(True) ws = websocket.WebSocketApp( ws_url, on_message=on_message, on_error=on_error, on_close=on_close, ) ws.on_open = on_open ws.run_forever()
- receive the
The directory structure for this project looks like
balena-edge-v380-person-detection/ ├── Dockerfile.template ├── MobileNetSSD_deploy.caffemodel ├── MobileNetSSD_deploy.prototxt.txt ├── README.md ├── requirements.txt ├── src │ ├── detection.py │ └── main.py └── start-services.sh 1 directory, 8 files
Push the code on to the devices
Once this is in place, push the code to the device with
$ balena push <application_name>
Once this is done, you will have a person detection service running for v380
camera.
NOTE: All of the code mentioned in this post so far is taken from a various sources on the internet and stitched together.It needs refactoring 😅 The purpose of this exercise was to hack on tinkerboard to try out balena os and balena cloud to have a quick and dirty solution running.
Key take aways from this post.
balena is:
- Easy to setup on to the multiple devices
- It has a wide varity of devices supported and container images for a lots of different tools.
- Baisic connectivity issues are taken care of.
- With balena You get the things that usually are a pain to implement as boiler plate.
- With a Single click/command you can deploy your applications on multiple device.