目录
1. 背景说明
在上一篇文章:【树莓派】ffmpeg + nginx 推 rtmp 视频流实现远程监控,讲到了一种简单的远程监控方案
其实在实现这种方案之前,我还尝试了另外一种更趋于底层实现的方案,这种方案参考了 mjpg-streamer 的能力
由于 mjpg-streamer 是安装在树莓派上,所以这种视频转发只适用于局域网
而要实现远程监控,就需要一个公共 IP 地址的转发服务器,在这个服务器上实现类似 mjpg-streamer 的视频转发能力
我的基本实现思路如下图:
基于这种方案,需要解决以下一些问题:
- 树莓派将视频流转发至阿里云服务器
- 阿里云服务器接收视频流
- 阿里云服务器将视频流解码成图片
- 阿里云服务器接收客户端发送的 Http 请求
- 阿里云服务器将解码后的图片组装成响应报文返回给客户端
- 客户端展示图片
另外,在实际实现中,还有以下一些问题需要考虑:
- 如何实现支持多客户端查看图片?(多线程)
- 图片如何才能连续不断发送达到实时视频的效果?(HTTP长连接)
- 云服务器如何知道什么时候才有新的图片要发送?(简单的生产者/消费者模型)
树莓派发送视频,基于系统提供的 raspivid 命令就可以做到
客户端请求和图片展示,普通的 html 标签 也可以做到
所以主要的工作就在于云服务器的接收和转发能力。
我的实现是基于 Spring boot 项目,接下来就逐一介绍上述问题的解决方案,供参考
2. 树莓派发送视频流
树莓派发送视频流,除了将树莓派的摄像头功能打开之外,并不需要额外的操作
只需要在云服务器启动之后,执行下面的命令即可
raspivid -w 640 -h 480 -t 0 -o tcp://<云服务IP>:<端口Port>
3. 云服务器接收树莓派请求
接受树莓派的请求,关键代码如下:
ServerSocket cameraServerSocket = new ServerSocket(port); Socket cameraSocket = cameraServerSocket.accept();
4. 云服务器将视频流解码为图片
树莓派 raspivid 命令发送的 tcp 请求携带的是 H264 格式的视频流信息
可以通过循环获取其中的视频帧,然后再将视频帧转换成图片,关键的处理代码如下:
InputStream cameraStream = cameraSocket.getInputStream(); FFmpegFrameGrabber frameGrabber = new FFmpegFrameGrabber(cameraStream); frameGrabber.setFrameRate(30); frameGrabber.setFormat("h264"); frameGrabber.setVideoBitrate(15); frameGrabber.setVideoOption("preset", "ultrafast"); frameGrabber.setNumBuffers(25000000); Frame frame = frameGrabber.grab(); Java2DFrameConverter converter = new Java2DFrameConverter(); BufferedImage bufferedImage = converter.convert(frame); frame = frameGrabber.grab();
以上依赖的包的 maven 配置:
<groupId>org.bytedeco</groupId><artifactId>javacv</artifactId><groupId>org.bytedeco.javacpp-presets</groupId><artifactId>ffmpeg-platform</artifactId><version>4.0.2-1.4.3</version>
5. 云服务器接受客户端请求
与云服务器接受树莓派的请求类似,可以用下面的方式接受客户端发送的请求:
ServerSocket videoServerSocket; Socket videoSocket = videoServerSocket.accept();
6. 云服务器组装响应报文
首先是获取客户端请求的输出流,然后因为是要循环发送图片,所以响应是一个 http 的长连接,要在响应头中定义 boundary 参数
大体的实现思路是,先发送响应头,然后等有图片生产再发送图片:
InputStreamReader streamReader = new InputStreamReader(socket.getInputStream()); BufferedReader bufferedReader = new BufferedReader(streamReader); videoStream = socket.getOutputStream(); sendVideoHeader(videoStream);
发送图片:
BufferedImage bufferedImage = cameraHandler.getBufferedImage();if (null == bufferedImage) { cameraHandler.getLock().wait(); bufferedImage = cameraHandler.getBufferedImage();assert bufferedImage != null; sendImage(bufferedImage);
上面用到的两个方法:
private void sendVideoHeader(OutputStream videoStream) { DataOutputStream videoDataStream = new DataOutputStream(videoStream); videoDataStream.write(("HTTP/1.0 200 OK\r\n" + "Server: walle\r\n" + "Connection: close\r\n" + "Max-Age: 0\r\n" + "Expires: 0\r\n" + "Cache-Control: no-cache, private\r\n" + "Pragma: no-cache\r\n" + "Content-Type: multipart/x-mixed-replace; " + "boundary=--BoundaryString\r\n\r\n").getBytes()); logger.info("Write video response header success"); } catch (IOException e) { logger.info("Write video response header failed!"); logger.info(e.getMessage());private void sendImage(byte[] image) throws IOException { DataOutputStream videoDataStream = new DataOutputStream(videoStream); videoDataStream.write(("--BoundaryString" + "\r\n").getBytes()); videoDataStream.write(("Content-Type: image/jpg" + "\r\n").getBytes()); videoDataStream.write(("Content-Length: " + image.length + "\r\n\r\n").getBytes()); videoDataStream.write(image); videoDataStream.write(("\r\n").getBytes());
注意:响应头和响应内容之间的分割符是“\r\n\r\n”
7. 客户端展示图片
客户端展示图片可以用 标签,示例如下:
<div id="video-jwplayer_wrapper" style="position: relative; display: block; width: 960px; height: 540px;"><img style="-webkit-user-select: none;" src="http://<云服务器IP>:<客户端Port>" width="640" height="480">
8. 关于多客户端支持
为了支持多摄像头和多客户端接入,我用到了多线程,但是对线程的生命周期没有进行有效的强管控
具体的内容见后面的示例
9. 关于视频解码到图片发送的协作
如果一帧视频解码完成,怎么知道该进行发送图片?
如果一张图片发送完成,又应该进行怎样的操作?
为了解决上面两个问题,用到了线程锁的机制来协作解码和图片发送
大致的思路就是图片发送完成之后,图片发送线程进入等待
视频流一帧解码完成之后,唤醒图片发送线程
具体内容见后面的代码实现
10. 实现图片持续发送
要实现视频效果,就要持续更新图片,所以图片要实现持续发送
其实在上面的组装响应报文已经提到了解决办法,就是简历 http 的长连接
在第一个响应头中定义 boundary 字段,然后就可以实现图片持续发送
11. 附代码实现
代码组织结构,入口在 WebCamera 中:
WebCamera:
package com.amwalle.walle.raspi.camera;import org.bytedeco.javacv.FFmpegFrameGrabber;import org.bytedeco.javacv.Frame;import org.bytedeco.javacv.Java2DFrameConverter;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import javax.imageio.ImageIO;import java.awt.image.BufferedImage;import java.net.ServerSocket;private static final Logger logger = LoggerFactory.getLogger(WebCamera.class);public void forwardCameraVideo() throws IOException { Camera camera = new Camera(cameraPort);new Thread(camera).start(); Video video = new Video(videoPort);new Thread(video).start();
Camera:
package com.amwalle.walle.raspi.camera;import org.slf4j.LoggerFactory;import java.io.IOException;import java.net.ServerSocket;import java.util.ArrayList;public class Camera implements Runnable {private static final Logger logger = LoggerFactory.getLogger(Camera.class);private ServerSocket cameraServerSocket;private static List<CameraHandler> cameraList; Camera(int port) throws IOException { cameraServerSocket = new ServerSocket(port); cameraList = new ArrayList<>();public static CameraHandler getCameraById(String id) {for (CameraHandler cameraHandler : cameraListif (id.equals(cameraHandler.getCameraId())) {public static CameraHandler getCameraByName(String name) {for (CameraHandler cameraHandler : cameraListif (name.equals(cameraHandler.getCameraId())) {static CameraHandler getCameraByIndex(int index) {if (index >= cameraList.size() || index < 0) {return cameraList.get(index); logger.info("Camera server socket is listening..."); Socket cameraSocket = cameraServerSocket.accept(); logger.info("New camera accepted, now there are " + (cameraList.size() + 1) + " camera(s)!"); CameraHandler cameraHandler = new CameraHandler(cameraSocket); cameraList.add(cameraHandler);new Thread(cameraHandler).start(); } catch (IOException e) { logger.info("Camera server socket failed to accept!"); logger.info(Arrays.toString(e.getStackTrace()));public static int getCameraCount() {return cameraList.size();
Video:
package com.amwalle.walle.raspi.camera;import org.slf4j.LoggerFactory;import java.net.ServerSocket;import java.util.ArrayList;public class Video implements Runnable {private static final Logger logger = LoggerFactory.getLogger(Video.class);private ServerSocket videoServerSocket;private static List<OutputStream> videoList; Video(int port) throws IOException { videoServerSocket = new ServerSocket(port); videoList = new ArrayList<>(); logger.info("Video server socket is listening..."); Socket videoSocket = videoServerSocket.accept(); logger.info("A new web video connected"); VideoHandler videoHandler = new VideoHandler(videoSocket);new Thread(videoHandler).start(); } catch (IOException e) { logger.info("Video server socket failed to accept!"); logger.info(Arrays.toString(e.getStackTrace()));
CameraHandler:
package com.amwalle.walle.raspi.camera;import org.bytedeco.javacv.FFmpegFrameGrabber;import org.bytedeco.javacv.Frame;import org.bytedeco.javacv.Java2DFrameConverter;import org.slf4j.LoggerFactory;import java.awt.image.BufferedImage;import java.io.IOException;import java.io.InputStream;public class CameraHandler implements Runnable {private static final Logger logger = LoggerFactory.getLogger(CameraHandler.class);private final String CAMERA_ID;private final String CAMERA_NAME;private final String LOCK = "LOCK";private Socket cameraSocket;private static BufferedImage bufferedImage; CameraHandler(Socket socket) throws IOException {this.cameraSocket = socket;public String getCameraName() { InputStream cameraStream = cameraSocket.getInputStream(); FFmpegFrameGrabber frameGrabber = new FFmpegFrameGrabber(cameraStream); frameGrabber.setFrameRate(30); frameGrabber.setFormat("h264"); frameGrabber.setVideoBitrate(15); frameGrabber.setVideoOption("preset", "ultrafast"); frameGrabber.setNumBuffers(25000000); Frame frame = frameGrabber.grab(); Java2DFrameConverter converter = new Java2DFrameConverter(); BufferedImage bufferedImage = converter.convert(frame); setBufferedImage(bufferedImage); frame = frameGrabber.grab(); } catch (IOException e) { logger.info("Video handle error, exit ..."); logger.info(e.getMessage());private void setBufferedImage(BufferedImage image) {BufferedImage getBufferedImage() {
VideoHandler:
package com.amwalle.walle.raspi.camera;import org.slf4j.LoggerFactory;import javax.imageio.ImageIO;import java.awt.image.BufferedImage;import java.text.SimpleDateFormat;public class VideoHandler implements Runnable {private static final Logger logger = LoggerFactory.getLogger(VideoHandler.class);private CameraHandler cameraHandler;private OutputStream videoStream; VideoHandler(Socket socket) throws IOException { InputStreamReader streamReader = new InputStreamReader(socket.getInputStream()); BufferedReader bufferedReader = new BufferedReader(streamReader); String line = bufferedReader.readLine();while (null != line && !line.equals("")) { line = bufferedReader.readLine(); videoStream = socket.getOutputStream(); sendVideoHeader(videoStream); cameraHandler = Camera.getCameraByIndex(0);if (null == cameraHandler) { Thread.currentThread().interrupt();while (!Thread.currentThread().isInterrupted()) {synchronized (cameraHandler.getLock()) { BufferedImage bufferedImage = cameraHandler.getBufferedImage();if (null == bufferedImage) { cameraHandler.getLock().wait(); bufferedImage = cameraHandler.getBufferedImage();assert bufferedImage != null; sendImage(addWaterMark(bufferedImage)); cameraHandler.getLock().wait(); } catch (InterruptedException | IOException e) { logger.info(e.getMessage()); Thread.currentThread().interrupt();private void sendVideoHeader(OutputStream videoStream) { DataOutputStream videoDataStream = new DataOutputStream(videoStream); videoDataStream.write(("HTTP/1.0 200 OK\r\n" + "Server: walle\r\n" + "Connection: close\r\n" + "Max-Age: 0\r\n" + "Expires: 0\r\n" + "Cache-Control: no-cache, private\r\n" + "Pragma: no-cache\r\n" + "Content-Type: multipart/x-mixed-replace; " + "boundary=--BoundaryString\r\n\r\n").getBytes()); logger.info("Write video response header success"); } catch (IOException e) { logger.info("Write video response header failed!"); logger.info(e.getMessage());private void sendImage(byte[] image) throws IOException { DataOutputStream videoDataStream = new DataOutputStream(videoStream); videoDataStream.write(("--BoundaryString" + "\r\n").getBytes()); videoDataStream.write(("Content-Type: image/jpg" + "\r\n").getBytes()); videoDataStream.write(("Content-Length: " + image.length + "\r\n\r\n").getBytes()); videoDataStream.write(image); videoDataStream.write(("\r\n").getBytes());private byte[] convertBufferedImageToByte(BufferedImage bufferedImage) throws IOException { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); ImageIO.write(bufferedImage, "jpg", byteArrayOutputStream); byteArrayOutputStream.flush();return byteArrayOutputStream.toByteArray();private byte[] addWaterMark(BufferedImage bufferedImage) throws IOException { Graphics2D graphics = bufferedImage.createGraphics(); graphics.setFont(new Font("Arial", Font.ITALIC, 14)); graphics.setColor(Color.white); graphics.setComposite(AlphaComposite.getInstance(AlphaComposite.SRC_ATOP, 1.0f)); String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); graphics.drawString(time, 50, 20); graphics.setComposite(AlphaComposite.getInstance(AlphaComposite.SRC_OVER)); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); byteArrayOutputStream.flush(); ImageIO.write(bufferedImage, "jpg", byteArrayOutputStream); byteArrayOutputStream.close();return byteArrayOutputStream.toByteArray();
调用入口:
package com.amwalle.walle;import com.amwalle.walle.raspi.camera.WebCamera;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import java.io.IOException;public class WalleApplication {public static void main(String[] args) throws IOException { SpringApplication.run(WalleApplication.class, args); WebCamera webCamera = new WebCamera(); webCamera.forwardCameraVideo();
使用的方式就是先启动 Spring boot 项目,然后树莓派发送视频流,再在浏览器打开示例的 html 文件