在
上一篇文章中,我曾承诺要展示一个从摄像机提取视频的脚本,尽管自那时以来已经经过了一定的时间,但仍需要兑现承诺。 所以我正在做。
碰巧的是,在服务器Web世界中,异步与所有事物相关联,而PHP与之无关。
好吧,因为,您知道,这种垂死的模型,内存泄漏以及PHP中除了
stream_select()和
stream_set_blocking()之外确实没有其他
可用的东西 。
在PECL上的某个地方,存在某种
libuv ,它原则上只是原始库原始功能的包装,因此按原样使用它会给您带来一些挑战。 无论如何,谁会在他们的正确思维中做到这一点?
但是,如果我们不再生活在PHP4的世界中,而又回到现代现实中,那么我们会发现近年来情况有所改变。 我们有
ReactPHP和
AmPHP这样有趣的工具,
它们的组件很好地覆盖了Node.js的功能,并且生成器的存在使我们能够以便利的方式编写异步代码,例如
async / await ,避免了回调和公里链中所有这些无尽的回调。
)。然后() 。
然后() 。
因此,在我看来,这就是为什么在Node.js领域中几乎没有PHP无法解决的任务。 但是,如果仍然存在,那么一切都仅取决于是否存在一些单独的库,而不是缺少这样的机会。
得知PHP标准库已经具备编写事件驱动和非阻塞应用程序所需的一切,可能会让人们感到惊讶。 当我们要求它同时轮询数以千计的文件描述符以进行IO活动时,我们才达到此区域中本机PHP功能的极限。 即使在这种情况下,故障也不在于PHP,而是底层系统的select()调用,其性能随着负载的增加而线性下降。
amphp.org/amp/event-loop
当然,在生产环境中使用所有这些的问题尚未完全解决,但是如果您没有做错任何明显的错误事情(任何其他异步运行时环境都不会原谅您),那么一切都会很好。
录像机
我们将考虑该协议,该协议据说是中国人发明的,用于与监控摄像头进行软件通信。
所有这些都可以在称为DVRIP(有时是索非亚)的TCP上运行,到我需要它时,我发现仅两个或多个理智的库,其中一个通常描述为仅连接到相机并交换了一些消息,但是具有包标题说明对我有很大帮助。
第二个发现是在不久之后发现的,当时我的综合症没有在这里发明,这与我想要的有点不同。
通常,带着嗅探器和程序包头的描述,我简要地绘制了一个脚本,该脚本正在服务器上某个位置旋转,并将在本文中以某种概念验证的形式呈现。
协议本身并不复杂,程序包如下所示:
我们将只发送json,但同时接收两者。
答案(如果它是带有json的软件包)通常包含一个
Ret字段,指示我们的请求成功。 包含不等于
100或
515的
Ret码的响应将被视为故意错误。
有了这些基本知识,让我们连接到相机并进行了解。
我们需要两个库
-amphp / socket ,几乎可以
提取我们需要的所有东西,而
evenement / evenement则可以在packagist中使用,不需要任何扩展。
在Amp中使用TCP看起来像这样:
Loop::run(function () { $socket = yield connect("tcp://{$addr}:{$port}"); yield $socket->write('Hello'); while ($chunk = yield $socket->read()) { echo $chunk; } $socket->close(); });
DVRIP通常在端口34567上运行,因此在我们的情况下,它将是这样的:
$socket = yield connect('tcp://192.168.0.200:34567');
接下来,我们通过将用户名和密码哈希发送到摄像机来登录,该哈希计算如下:
function sofiaHash(string $password) : string { $md5 = md5($password, true); return implode('', array_map(function ($i) use ($md5) { $c = (ord($md5[2 * $i]) + ord($md5[2 * $i + 1])) % 62; $c += $c > 9 ? ($c > 35 ? 61 : 55) : 48; return chr($c); }, range(0, 7))); }
如果授权成功,我们会请求一个视频,如果摄像机可以将其提供给我们,那么我们将开始传输它。
每种类型的程序包都有其自己的msgId,因此我们可以了解收到答案的请求。 通常,这不是一个尘土飞扬的工作。
为了方便起见,让我们首先描述我们的包的类:
class Packet { public const SUCCESS_CODES = [100, 515]; public const MESSAGE_CODES = [ 'packet.request.login' => 1000, 'packet.request.keepAlive' => 1006, 'packet.request.claim' => 1413, 'packet.response.login' => 1001, 'packet.response.keepAlive' => 1007, 'packet.response.claim' => 1414, 'packet.videoControl' => 1410, 'packet.binary' => 1412, ]; protected int $head = 255; protected int $version = 0; protected int $sessionId; protected int $sequence; protected int $msgId; protected ?string $rawData; protected ?array $data; public function __construct(int $msgId, ?array $data = null, int $sequence = 0, int $sessionId = 0) { $this->msgId = $msgId; $this->data = $data; $this->sequence = $sequence; $this->sessionId = $sessionId; } public function __toString() : string { if ($this->data !== null) { $this->rawData = json_encode($this->data, JSON_THROW_ON_ERROR) . "\x0a\x00"; } return pack('CCx2IIx2SI', $this->head, $this->version, $this->sessionId, $this->sequence, $this->msgId, strlen($this->rawData)) . $this->rawData; } public function getData() : ?array { return $this->data; } public function getRawData() : ?string { return $this->rawData; } public function getSession() : int { return $this->sessionId; } public function getSequence() : int { return $this->sequence; } public function getMessageType() : string { $types = array_flip(static::MESSAGE_CODES); if (!isset($types[$this->msgId])) { throw new \Exception('Unknown message type: ' . $this->msgId); } return $types[$this->msgId]; } }
并尝试发送授权请求
yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.login'], [ 'EncryptType' => 'MD5', 'LoginType' => 'DVRIP-Web', 'PassWord' => sofiaHash('admin'), 'UserName' => 'admin', ]));
是的,太好了 我们已建立联系并发送了一条消息,但我们甚至不知道相机是否正确,并且至少对此有所反应。 我们需要以某种方式读取答案(让我们异步执行),选择其中的特定数据包并对其进行解码。
为了以某种方式分离处理作为响应的数据包的逻辑,我决定使用
偶数库并草绘一个基于该库的类,该类将丢弃已解码的数据包
class Reader extends EventEmitter { protected InputStream $socket; public function __construct(InputStream $socket) { $this->socket = $socket; } public function start() : \Generator { $buffer = ''; while (!$this->socket->isClosed() && $result = yield Packet::read($this->socket, $buffer)) { [$packet, $chunk] = $result; $buffer = $chunk; $chunk = null; $this->emit('packet', [$packet]); $this->emit($packet->getMessageType(), [$packet]); } $buffer = null; } }
并且还向Packet类添加了一些静态方法,实际上,这些方法将从数据流中提取和解码数据包
public static function read(InputStream $socket, string $buffer = '') : Promise { return call(function () use ($socket, $buffer) { $header = null; do { if ($header === null && strlen($buffer) > 20) { $header = unpack('Chead/Cversion/x2/IsessionId/Isequence/x2/SmsgId/Ilen', substr($buffer, 0, 20)); $buffer = substr($buffer, 20); } if ($header !== null && strlen($buffer) >= (int) $header['len']) { return [ static::fromRaw($header, (int) $header['len'] > 0 ? substr($buffer, 0, (int) $header['len']) : null), substr($buffer, (int) $header['len']) ]; } $buffer .= yield $socket->read(); } while (!$socket->isClosed()); }); } protected static function fromRaw(array $header, ?string $rawData) : self { $packet = new static($header['msgId'], null, $header['sequence'], $header['sessionId']); $packet->rawData = $rawData; $packet->data = (int) $packet->msgId === static::MESSAGE_CODES['packet.binary'] || $rawData === null ? null : json_decode(substr($rawData, 0, -2), true, 512, JSON_THROW_ON_ERROR); return $packet; }
现在,通过使用
Evenement ,我们可以以以下形式描述我们的逻辑
$reader->on($packetType, $handler);
所以我得到了这个链,在其中添加了另一个Keep Alive和SIGINT / SIGTERM处理:
$reader = new Reader($socket = yield connect('tcp://10.0.5.100:49152')); $reader->on('packet.response.login', asyncCoroutine(function (Packet $packet) use ($socket) { if (empty($packet->getData()['Ret']) || !in_array($packet->getData()['Ret'], Packet::SUCCESS_CODES)) { throw new \Exception('Wrong login data'); } yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.claim'], [ 'Name' => 'OPMonitor', 'OPMonitor' => ['Action' => 'Claim', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode' => 'TCP']], ], 1, $packet->getSession())); })); $reader->once('packet.response.login', function (Packet $packet) use ($socket) { Loop::unreference(Loop::repeat($packet->getData()['AliveInterval'] * 1000, function ($watcherId) use ($socket, $packet) { if ($socket->getResource() === null) { return Loop::cancel($watcherId); } yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.keepAlive'], [ 'Name' => 'KeepAlive', 'SessionID' => $packet->getData()['SessionID'] ], 0, $packet->getSession())); })); }); $reader->on('packet.response.claim', asyncCoroutine(function (Packet $packet) use ($socket) { yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.videoControl'], [ 'Name' => 'OPMonitor', 'OPMonitor' => ['Action' => 'Start', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode' => 'TCP']], ], $packet->getSequence() + 1, $packet->getSession())); })); $emitter = new Emitter(); $reader->on('packet.binary', function ($packet) use ($emitter) { $emitter->emit($packet->getRawData()); }); $reader->once('packet.binary', function (Packet $packet) use ($socket) { $signalHandler = function () use ($socket, $packet) { yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.videoControl'], [ 'Name' => 'OPMonitor', 'OPMonitor' => ['Action' => 'Stop', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode' => 'TCP']], ], 0, $packet->getSession())); $socket->close(); }; Loop::unreference(Loop::onSignal(defined('SIGINT') ? SIGINT : 2, $signalHandler, 'SIGINT')); Loop::unreference(Loop::onSignal(defined('SIGTERM') ? SIGTERM : 15, $signalHandler, 'SIGTERM')); }); asyncCall([$reader, 'start']); yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.login'], [ 'EncryptType' => 'MD5', 'LoginType' => 'DVRIP-Web', 'PassWord' => sofiaHash('123qwea'), 'UserName' => 'admin', ])); yield pipe(new IteratorStream($emitter->iterate()), getStdout());
不用说,我们可以将视频流重定向到所需的位置,而不是stdout。
最终脚本 <?php ini_set('display_errors', 'stderr'); use Amp\Emitter; use Amp\Loop; use Amp\Promise; use Amp\ByteStream\InputStream; use Amp\ByteStream\IteratorStream; use Evenement\EventEmitter; use function Amp\call; use function Amp\asyncCall; use function Amp\asyncCoroutine; use function Amp\ByteStream\getStderr; use function Amp\ByteStream\getStdout; use function Amp\ByteStream\pipe; use function Amp\Socket\connect; require 'vendor/autoload.php'; function sofiaHash(string $password) : string { $md5 = md5($password, true); return implode('', array_map(function ($i) use ($md5) { $c = (ord($md5[2 * $i]) + ord($md5[2 * $i + 1])) % 62; $c += $c > 9 ? ($c > 35 ? 61 : 55) : 48; return chr($c); }, range(0, 7))); } class Reader extends EventEmitter { protected InputStream $socket; public function __construct(InputStream $socket) { $this->socket = $socket; } public function start() : \Generator { $buffer = ''; while (!$this->socket->isClosed() && $result = yield Packet::read($this->socket, $buffer)) { [$packet, $chunk] = $result; $buffer = $chunk; $chunk = null; $this->emit('packet', [$packet]); $this->emit($packet->getMessageType(), [$packet]); } $buffer = null; } } class Packet { public const SUCCESS_CODES = [100, 515]; public const MESSAGE_CODES = [ 'packet.request.login' => 1000, 'packet.request.keepAlive' => 1006, 'packet.request.claim' => 1413, 'packet.response.login' => 1001, 'packet.response.keepAlive' => 1007, 'packet.response.claim' => 1414, 'packet.videoControl' => 1410, 'packet.binary' => 1412, ]; public static function read(InputStream $socket, string $buffer = '') : Promise { return call(function () use ($socket, $buffer) { $header = null; do { if ($header === null && strlen($buffer) > 20) { $header = unpack('Chead/Cversion/x2/IsessionId/Isequence/x2/SmsgId/Ilen', substr($buffer, 0, 20)); $buffer = substr($buffer, 20); } if ($header !== null && strlen($buffer) >= (int) $header['len']) { return [ static::fromRaw($header, (int) $header['len'] > 0 ? substr($buffer, 0, (int) $header['len']) : null), substr($buffer, (int) $header['len']) ]; } $buffer .= yield $socket->read(); } while (!$socket->isClosed()); }); } protected static function fromRaw(array $header, ?string $rawData) : self { $packet = new static($header['msgId'], null, $header['sequence'], $header['sessionId']); $packet->rawData = $rawData; $packet->data = (int) $packet->msgId === static::MESSAGE_CODES['packet.binary'] || $rawData === null ? null : json_decode(substr($rawData, 0, -2), true, 512, JSON_THROW_ON_ERROR); return $packet; } protected int $head = 255; protected int $version = 0; protected int $sessionId; protected int $sequence; protected int $msgId; protected ?string $rawData; protected ?array $data; public function __construct(int $msgId, ?array $data = null, int $sequence = 0, int $sessionId = 0) { $this->msgId = $msgId; $this->data = $data; $this->sequence = $sequence; $this->sessionId = $sessionId; } public function __toString() : string { if ($this->data !== null) { $this->rawData = json_encode($this->data, JSON_THROW_ON_ERROR) . "\x0a\x00"; } return pack('CCx2IIx2SI', $this->head, $this->version, $this->sessionId, $this->sequence, $this->msgId, strlen($this->rawData)) . $this->rawData; } public function getData() : ?array { return $this->data; } public function getRawData() : ?string { return $this->rawData; } public function getSession() : int { return $this->sessionId; } public function getSequence() : int { return $this->sequence; } public function getMessageType() : string { $types = array_flip(static::MESSAGE_CODES); if (!isset($types[$this->msgId])) { throw new \Exception('Unknown message type: ' . $this->msgId); } return $types[$this->msgId]; } } Loop::run(function () : \Generator { $reader = new Reader($socket = yield connect('tcp://10.0.5.100:49152')); $reader->on('packet.response.login', asyncCoroutine(function (Packet $packet) use ($socket) { if (empty($packet->getData()['Ret']) || !in_array($packet->getData()['Ret'], Packet::SUCCESS_CODES)) { throw new \Exception('Wrong login data'); } yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.claim'], [ 'Name' => 'OPMonitor', 'OPMonitor' => ['Action' => 'Claim', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode' => 'TCP']], ], 1, $packet->getSession())); })); $reader->once('packet.response.login', function (Packet $packet) use ($socket) { Loop::unreference(Loop::repeat($packet->getData()['AliveInterval'] * 1000, function ($watcherId) use ($socket, $packet) { if ($socket->getResource() === null) { return Loop::cancel($watcherId); } yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.keepAlive'], [ 'Name' => 'KeepAlive', 'SessionID' => $packet->getData()['SessionID'] ], 0, $packet->getSession())); })); }); $reader->on('packet.response.claim', asyncCoroutine(function (Packet $packet) use ($socket) { yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.videoControl'], [ 'Name' => 'OPMonitor', 'OPMonitor' => ['Action' => 'Start', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode' => 'TCP']], ], $packet->getSequence() + 1, $packet->getSession())); })); $emitter = new Emitter(); $reader->on('packet.binary', function ($packet) use ($emitter) { $emitter->emit($packet->getRawData()); }); $reader->once('packet.binary', function (Packet $packet) use ($socket) { $signalHandler = function () use ($socket, $packet) { yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.videoControl'], [ 'Name' => 'OPMonitor', 'OPMonitor' => ['Action' => 'Stop', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode' => 'TCP']], ], 0, $packet->getSession())); $socket->close(); }; Loop::unreference(Loop::onSignal(defined('SIGINT') ? SIGINT : 2, $signalHandler, 'SIGINT')); Loop::unreference(Loop::onSignal(defined('SIGTERM') ? SIGTERM : 15, $signalHandler, 'SIGTERM')); }); asyncCall([$reader, 'start']); yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.login'], [ 'EncryptType' => 'MD5', 'LoginType' => 'DVRIP-Web', 'PassWord' => sofiaHash('123qwea'), 'UserName' => 'admin', ])); yield pipe(new IteratorStream($emitter->iterate()), getStdout()); });
现在我们可以尝试以这种方式录制一段视频
$ php recorder.php > output.h264
我们添加了一个SIGINT处理程序,因此当您感到疲倦时,只需按Ctrl + C,脚本将停止视频传输并正常关闭连接。
例如,我们可以将流重定向到ffmpeg并即时进行代码转换。
$ php recorder.php | ffmpeg -nostdin -y -hide_banner -loglevel verbose -f h264 -i pipe:0 -pix_fmt yuv420p -c copy -movflags +frag_keyframe+empty_moov+default_base_moof -reset_timestamps 1 -f mpegts output.mpegts
您可以将形式的疯狂链
$ php recorder.php | ffmpeg ... | curl -d @- ...
使用某种脚本(每个小时都会重新启动)来启动所有这些程序,因此无论何时何地,我们都可以随时随地获得持续1小时的流畅片段。
怎么了
我倾向于相信本文会引起“为什么?”的问题。 有现成的摄像机注册解决方案,通常,摄像机可以自己做很多事情,因为它们在上一篇文章的评论中正确地反对了,并且总体上可以简化很多。
只是我有空闲时间要在某个地方做,渴望以自己的方式进行,并且对实验充满热情。
而且有效。
当然,本文中介绍的脚本需要进行一些修改,并且您不应以这种形式使用它-我再说一遍,这是概念上的证明。 总的来说,我强烈警告您使用膝盖高的自制产品,以确保您的房屋安全。
就我而言,可靠性并不是那么关键,因此该选项将起作用。 脚本观察者正在对该脚本进行旋转,该脚本将流传输到ffmpeg,按小时记录片段并将其上载到VK,在断开连接或任何意外情况下重新启动脚本。
总的来说,所有这些都可以通过在RTSP over TCP上简单地拉出视频来避免,正如我在上一篇文章的结尾建议的那样,但是ffmpeg喜欢愚蠢地在一个随机的时间停顿,只开始响应SIGKILL。
我开始怀疑所有这一切是因为父级脚本监视表是由表冠运行的,并且其优先级较低,但是提高优先级只会降低问题发生的频率。
在上述脚本的情况下,一切工作都更加稳定,尚未引起任何投诉。