Wir bekommen das Video von der Kamera via DVRIP mit PHP

In einem früheren Artikel habe ich versprochen, ein Skript zu zeigen, das das Video aus der Kamera zieht, und obwohl seitdem eine gewisse Zeit vergangen ist, müssen die Versprechen erfüllt werden. Also mache ich es.

Es ist einfach so passiert, dass mit Asynchronität in der Welt des Server-Web alles zusammenhängt, aber nicht PHP.

Nun, weil dieses aussterbende Modell Speicherverluste aufweist und es in PHP in der Tat nichts von der Stange gibt, außer stream_select () und stream_set_blocking () .

Irgendwo dort, in PECL, gibt es eine Art libuv , das im Prinzip nur ein Wrapper für die ursprünglichen Funktionen der ursprünglichen Bibliothek ist. Wenn Sie es also so verwenden, wie es ist, ergeben sich einige Herausforderungen. Wie auch immer, wer wird das bei klarem Verstand tun?

Aber wenn wir aufhören, in der Welt von PHP4 zu leben, und ein wenig zu modernen Realitäten zurückkehren, werden wir sehen, dass sich die Dinge in den letzten Jahren etwas verändert haben. Wir haben so interessante Tools wie ReactPHP und AmPHP , deren Komponenten die Funktionalität von Node.j gut abdecken, und das Vorhandensein von Generatoren ermöglicht es uns, asynchronen Code in einem praktischen Stil wie async / await zu schreiben und all diese endlosen Rückrufe in Rückrufen und Kilometerketten zu vermeiden. ) .then (). then () .

Aus diesem Grund gibt es meiner Meinung nach praktisch keine Aufgaben aus der Welt von Node.js, die PHP nicht lösen konnte. Aber wenn es noch welche gibt, dann hängt alles nur vom Vorhandensein einiger separater Bibliotheken ab und nicht vom Mangel an Möglichkeiten an sich.
Es mag Leute überraschen, zu erfahren, dass die PHP-Standardbibliothek bereits alles enthält, was wir zum Schreiben ereignisgesteuerter und nicht blockierender Anwendungen benötigen. Wir stoßen in diesem Bereich nur dann an die Grenzen der nativen PHP-Funktionalität, wenn wir Tausende von Dateideskriptoren gleichzeitig nach IO-Aktivitäten fragen. Selbst in diesem Fall liegt der Fehler jedoch nicht bei PHP, sondern beim zugrunde liegenden Aufruf select () des Systems, der bei zunehmender Last eine lineare Verschlechterung der Leistung aufweist.

amphp.org/amp/event-loop

Natürlich ist die Frage, ob Sie all dies in der Produktion nutzen können, noch nicht vollständig geklärt. Wenn Sie jedoch keine offensichtlichen Fehler machen, die Ihnen eine andere asynchrone Laufzeitumgebung nicht verzeihen würde, ist alles in Ordnung.

DVRIP


Wir werden das angeblich von den Chinesen erfundene Protokoll betrachten, das zur Kommunikation von Software mit Überwachungskameras verwendet wird.

Es funktioniert alles über TCP, DVRIP (manchmal Sofia) genannt, und als ich es brauchte, fand ich nur zwei mehr oder weniger vernünftige Bibliotheken, von denen eine beschrieb, dass im Allgemeinen nur eine Verbindung mit der Kamera hergestellt und ein paar Nachrichten ausgetauscht wurden, aber Paket-Header-Beschreibung, die mir sehr geholfen hat.

Das zweite wurde ein wenig später gefunden, als sich mein Syndrom verschlimmerte, und es funktionierte ein wenig anders als ich es gerne hätte.

Im Allgemeinen habe ich mit einem Schnüffler und einer Beschreibung des Paket-Headers ein bestimmtes Skript skizziert, das sich irgendwo auf meinem Server dreht, und in diesem Artikel wird ein bestimmter Proof of Concept vorgestellt.

Das Protokoll selbst ist nicht komplex und das Paket sieht folgendermaßen aus:

  • Header von 20 Bytes, der in PHP als bezeichnet werden kann

    unpack('Chead/Cversion/x2/IsessionId/Isequence/x2/SmsgId/Ilen', $buffer) 
  • Der Header wird von einer langen Nachricht gefolgt und kann entweder json, ergänzt durch zwei Bytes \ x0a \ x00, oder Binärdaten enthalten.

Wir werden nur json senden, aber wir werden beide empfangen.

Die Antwort (wenn es sich um ein Paket mit json handelt) enthält normalerweise ein Ret- Feld, das den Erfolg unserer Anfrage anzeigt. Antworten, die einen Ret- Code ungleich 100 oder 515 enthalten , werden absichtlich als fehlerhaft eingestuft.

Lassen Sie uns mit diesem Grundwissen eine Verbindung zur Kamera herstellen und verstehen.
Wir brauchen zwei Bibliotheken - amphp / socket , die fast alles abrufen , was wir brauchen, und evenement / evenement , die in packagist verfügbar sind und keine Erweiterungen erfordern.

Die Arbeit mit TCP in Amp sieht ungefähr so ​​aus:

 Loop::run(function () { $socket = yield connect("tcp://{$addr}:{$port}"); yield $socket->write('Hello'); while ($chunk = yield $socket->read()) { echo $chunk; } $socket->close(); }); 

DVRIP läuft normalerweise auf Port 34567, in unserem Fall ist es also ungefähr so:

 $socket = yield connect('tcp://192.168.0.200:34567'); 

Als nächstes melden wir uns an, indem wir unseren Benutzernamen und unser Passwort an die Kamera senden. Diese werden wie folgt berechnet:

 function sofiaHash(string $password) : string { $md5 = md5($password, true); return implode('', array_map(function ($i) use ($md5) { $c = (ord($md5[2 * $i]) + ord($md5[2 * $i + 1])) % 62; $c += $c > 9 ? ($c > 35 ? 61 : 55) : 48; return chr($c); }, range(0, 7))); } 

Bei erfolgreicher Autorisierung fordern wir ein Video an, und wenn die Kamera es uns geben kann, beginnen wir mit der Übertragung.

Jeder Pakettyp hat eine eigene msgId, damit wir verstehen, auf welche Anfrage wir die Antwort erhalten haben. Im Allgemeinen kein staubiger Job.

Beschreiben wir der Einfachheit halber zunächst die Klasse für unser Paket:

 class Packet { public const SUCCESS_CODES = [100, 515]; public const MESSAGE_CODES = [ 'packet.request.login' => 1000, 'packet.request.keepAlive' => 1006, 'packet.request.claim' => 1413, 'packet.response.login' => 1001, 'packet.response.keepAlive' => 1007, 'packet.response.claim' => 1414, 'packet.videoControl' => 1410, 'packet.binary' => 1412, ]; protected int $head = 255; protected int $version = 0; protected int $sessionId; protected int $sequence; protected int $msgId; protected ?string $rawData; protected ?array $data; public function __construct(int $msgId, ?array $data = null, int $sequence = 0, int $sessionId = 0) { $this->msgId = $msgId; $this->data = $data; $this->sequence = $sequence; $this->sessionId = $sessionId; } public function __toString() : string { if ($this->data !== null) { $this->rawData = json_encode($this->data, JSON_THROW_ON_ERROR) . "\x0a\x00"; } return pack('CCx2IIx2SI', $this->head, $this->version, $this->sessionId, $this->sequence, $this->msgId, strlen($this->rawData)) . $this->rawData; } public function getData() : ?array { return $this->data; } public function getRawData() : ?string { return $this->rawData; } public function getSession() : int { return $this->sessionId; } public function getSequence() : int { return $this->sequence; } public function getMessageType() : string { $types = array_flip(static::MESSAGE_CODES); if (!isset($types[$this->msgId])) { throw new \Exception('Unknown message type: ' . $this->msgId); } return $types[$this->msgId]; } } 

und versuchen Sie, eine Autorisierungsanfrage zu senden

 yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.login'], [ 'EncryptType' => 'MD5', 'LoginType' => 'DVRIP-Web', 'PassWord' => sofiaHash('admin'), 'UserName' => 'admin', ])); 

Ja, großartig. Wir haben eine Verbindung hergestellt und eine Nachricht gesendet, aber wir wissen nicht einmal, ob die Kamera korrekt war und haben zumindest irgendwie darauf reagiert. Wir müssen die Antwort irgendwie lesen (machen wir es asynchron), bestimmte Pakete darin auswählen und sie dekodieren.

Um die Logik der Verarbeitung von Paketen, die als Antwort eingehen , irgendwie zu trennen, entschied ich mich für die Verwendung der Abendbibliothek und skizzierte darauf basierend eine Klasse, die bereits dekodierte Pakete auswirft

 class Reader extends EventEmitter { protected InputStream $socket; public function __construct(InputStream $socket) { $this->socket = $socket; } public function start() : \Generator { $buffer = ''; while (!$this->socket->isClosed() && $result = yield Packet::read($this->socket, $buffer)) { [$packet, $chunk] = $result; $buffer = $chunk; $chunk = null; $this->emit('packet', [$packet]); $this->emit($packet->getMessageType(), [$packet]); } $buffer = null; } } 

Außerdem wurden unserer Packet-Klasse einige statische Methoden hinzugefügt, mit denen Pakete aus dem Datenstrom extrahiert und dekodiert werden

 public static function read(InputStream $socket, string $buffer = '') : Promise { return call(function () use ($socket, $buffer) { $header = null; do { if ($header === null && strlen($buffer) > 20) { $header = unpack('Chead/Cversion/x2/IsessionId/Isequence/x2/SmsgId/Ilen', substr($buffer, 0, 20)); $buffer = substr($buffer, 20); } if ($header !== null && strlen($buffer) >= (int) $header['len']) { return [ static::fromRaw($header, (int) $header['len'] > 0 ? substr($buffer, 0, (int) $header['len']) : null), substr($buffer, (int) $header['len']) ]; } $buffer .= yield $socket->read(); } while (!$socket->isClosed()); }); } protected static function fromRaw(array $header, ?string $rawData) : self { $packet = new static($header['msgId'], null, $header['sequence'], $header['sessionId']); $packet->rawData = $rawData; $packet->data = (int) $packet->msgId === static::MESSAGE_CODES['packet.binary'] || $rawData === null ? null : json_decode(substr($rawData, 0, -2), true, 512, JSON_THROW_ON_ERROR); return $packet; } 

Jetzt können wir durch die Verwendung von Evenement unsere Logik in der Form beschreiben

 $reader->on($packetType, $handler); 

Und so bekam ich diese Kette, in der ich eine weitere Keep Alive- und SIGINT / SIGTERM-Verarbeitung hinzufügte:

 $reader = new Reader($socket = yield connect('tcp://10.0.5.100:49152')); $reader->on('packet.response.login', asyncCoroutine(function (Packet $packet) use ($socket) { if (empty($packet->getData()['Ret']) || !in_array($packet->getData()['Ret'], Packet::SUCCESS_CODES)) { throw new \Exception('Wrong login data'); } yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.claim'], [ 'Name' => 'OPMonitor', 'OPMonitor' => ['Action' => 'Claim', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode' => 'TCP']], ], 1, $packet->getSession())); })); $reader->once('packet.response.login', function (Packet $packet) use ($socket) { Loop::unreference(Loop::repeat($packet->getData()['AliveInterval'] * 1000, function ($watcherId) use ($socket, $packet) { if ($socket->getResource() === null) { return Loop::cancel($watcherId); } yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.keepAlive'], [ 'Name' => 'KeepAlive', 'SessionID' => $packet->getData()['SessionID'] ], 0, $packet->getSession())); })); }); $reader->on('packet.response.claim', asyncCoroutine(function (Packet $packet) use ($socket) { yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.videoControl'], [ 'Name' => 'OPMonitor', 'OPMonitor' => ['Action' => 'Start', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode' => 'TCP']], ], $packet->getSequence() + 1, $packet->getSession())); })); $emitter = new Emitter(); $reader->on('packet.binary', function ($packet) use ($emitter) { $emitter->emit($packet->getRawData()); }); $reader->once('packet.binary', function (Packet $packet) use ($socket) { $signalHandler = function () use ($socket, $packet) { yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.videoControl'], [ 'Name' => 'OPMonitor', 'OPMonitor' => ['Action' => 'Stop', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode' => 'TCP']], ], 0, $packet->getSession())); $socket->close(); }; Loop::unreference(Loop::onSignal(defined('SIGINT') ? SIGINT : 2, $signalHandler, 'SIGINT')); Loop::unreference(Loop::onSignal(defined('SIGTERM') ? SIGTERM : 15, $signalHandler, 'SIGTERM')); }); asyncCall([$reader, 'start']); yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.login'], [ 'EncryptType' => 'MD5', 'LoginType' => 'DVRIP-Web', 'PassWord' => sofiaHash('123qwea'), 'UserName' => 'admin', ])); yield pipe(new IteratorStream($emitter->iterate()), getStdout()); 

Es versteht sich von selbst, dass wir anstelle von stdout den Videostream umleiten können, wo immer wir wollen.

Letztes Drehbuch
 <?php ini_set('display_errors', 'stderr'); use Amp\Emitter; use Amp\Loop; use Amp\Promise; use Amp\ByteStream\InputStream; use Amp\ByteStream\IteratorStream; use Evenement\EventEmitter; use function Amp\call; use function Amp\asyncCall; use function Amp\asyncCoroutine; use function Amp\ByteStream\getStderr; use function Amp\ByteStream\getStdout; use function Amp\ByteStream\pipe; use function Amp\Socket\connect; require 'vendor/autoload.php'; function sofiaHash(string $password) : string { $md5 = md5($password, true); return implode('', array_map(function ($i) use ($md5) { $c = (ord($md5[2 * $i]) + ord($md5[2 * $i + 1])) % 62; $c += $c > 9 ? ($c > 35 ? 61 : 55) : 48; return chr($c); }, range(0, 7))); } class Reader extends EventEmitter { protected InputStream $socket; public function __construct(InputStream $socket) { $this->socket = $socket; } public function start() : \Generator { $buffer = ''; while (!$this->socket->isClosed() && $result = yield Packet::read($this->socket, $buffer)) { [$packet, $chunk] = $result; $buffer = $chunk; $chunk = null; $this->emit('packet', [$packet]); $this->emit($packet->getMessageType(), [$packet]); } $buffer = null; } } class Packet { public const SUCCESS_CODES = [100, 515]; public const MESSAGE_CODES = [ 'packet.request.login' => 1000, 'packet.request.keepAlive' => 1006, 'packet.request.claim' => 1413, 'packet.response.login' => 1001, 'packet.response.keepAlive' => 1007, 'packet.response.claim' => 1414, 'packet.videoControl' => 1410, 'packet.binary' => 1412, ]; public static function read(InputStream $socket, string $buffer = '') : Promise { return call(function () use ($socket, $buffer) { $header = null; do { if ($header === null && strlen($buffer) > 20) { $header = unpack('Chead/Cversion/x2/IsessionId/Isequence/x2/SmsgId/Ilen', substr($buffer, 0, 20)); $buffer = substr($buffer, 20); } if ($header !== null && strlen($buffer) >= (int) $header['len']) { return [ static::fromRaw($header, (int) $header['len'] > 0 ? substr($buffer, 0, (int) $header['len']) : null), substr($buffer, (int) $header['len']) ]; } $buffer .= yield $socket->read(); } while (!$socket->isClosed()); }); } protected static function fromRaw(array $header, ?string $rawData) : self { $packet = new static($header['msgId'], null, $header['sequence'], $header['sessionId']); $packet->rawData = $rawData; $packet->data = (int) $packet->msgId === static::MESSAGE_CODES['packet.binary'] || $rawData === null ? null : json_decode(substr($rawData, 0, -2), true, 512, JSON_THROW_ON_ERROR); return $packet; } protected int $head = 255; protected int $version = 0; protected int $sessionId; protected int $sequence; protected int $msgId; protected ?string $rawData; protected ?array $data; public function __construct(int $msgId, ?array $data = null, int $sequence = 0, int $sessionId = 0) { $this->msgId = $msgId; $this->data = $data; $this->sequence = $sequence; $this->sessionId = $sessionId; } public function __toString() : string { if ($this->data !== null) { $this->rawData = json_encode($this->data, JSON_THROW_ON_ERROR) . "\x0a\x00"; } return pack('CCx2IIx2SI', $this->head, $this->version, $this->sessionId, $this->sequence, $this->msgId, strlen($this->rawData)) . $this->rawData; } public function getData() : ?array { return $this->data; } public function getRawData() : ?string { return $this->rawData; } public function getSession() : int { return $this->sessionId; } public function getSequence() : int { return $this->sequence; } public function getMessageType() : string { $types = array_flip(static::MESSAGE_CODES); if (!isset($types[$this->msgId])) { throw new \Exception('Unknown message type: ' . $this->msgId); } return $types[$this->msgId]; } } Loop::run(function () : \Generator { $reader = new Reader($socket = yield connect('tcp://10.0.5.100:49152')); $reader->on('packet.response.login', asyncCoroutine(function (Packet $packet) use ($socket) { if (empty($packet->getData()['Ret']) || !in_array($packet->getData()['Ret'], Packet::SUCCESS_CODES)) { throw new \Exception('Wrong login data'); } yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.claim'], [ 'Name' => 'OPMonitor', 'OPMonitor' => ['Action' => 'Claim', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode' => 'TCP']], ], 1, $packet->getSession())); })); $reader->once('packet.response.login', function (Packet $packet) use ($socket) { Loop::unreference(Loop::repeat($packet->getData()['AliveInterval'] * 1000, function ($watcherId) use ($socket, $packet) { if ($socket->getResource() === null) { return Loop::cancel($watcherId); } yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.keepAlive'], [ 'Name' => 'KeepAlive', 'SessionID' => $packet->getData()['SessionID'] ], 0, $packet->getSession())); })); }); $reader->on('packet.response.claim', asyncCoroutine(function (Packet $packet) use ($socket) { yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.videoControl'], [ 'Name' => 'OPMonitor', 'OPMonitor' => ['Action' => 'Start', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode' => 'TCP']], ], $packet->getSequence() + 1, $packet->getSession())); })); $emitter = new Emitter(); $reader->on('packet.binary', function ($packet) use ($emitter) { $emitter->emit($packet->getRawData()); }); $reader->once('packet.binary', function (Packet $packet) use ($socket) { $signalHandler = function () use ($socket, $packet) { yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.videoControl'], [ 'Name' => 'OPMonitor', 'OPMonitor' => ['Action' => 'Stop', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode' => 'TCP']], ], 0, $packet->getSession())); $socket->close(); }; Loop::unreference(Loop::onSignal(defined('SIGINT') ? SIGINT : 2, $signalHandler, 'SIGINT')); Loop::unreference(Loop::onSignal(defined('SIGTERM') ? SIGTERM : 15, $signalHandler, 'SIGTERM')); }); asyncCall([$reader, 'start']); yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.login'], [ 'EncryptType' => 'MD5', 'LoginType' => 'DVRIP-Web', 'PassWord' => sofiaHash('123qwea'), 'UserName' => 'admin', ])); yield pipe(new IteratorStream($emitter->iterate()), getStdout()); }); 

Jetzt können wir versuchen, auf diese Weise ein Video aufzunehmen

 $ php recorder.php > output.h264 

Wir haben einen SIGINT-Handler hinzugefügt. Wenn Sie also müde werden, drücken Sie einfach Strg + C und das Skript stoppt die Videoübertragung und schließt die Verbindung normal.

Und wir können zum Beispiel den Stream nach ffmpeg umleiten und on the fly transcodieren.

 $ php recorder.php | ffmpeg -nostdin -y -hide_banner -loglevel verbose -f h264 -i pipe:0 -pix_fmt yuv420p -c copy -movflags +frag_keyframe+empty_moov+default_base_moof -reset_timestamps 1 -f mpegts output.mpegts 

Und Sie können aus der Form verzweifelte Ketten machen

 $ php recorder.php | ffmpeg ... | curl -d @- ... 

Starten Sie all dies mit einer Art Skript, das jede Stunde neu gestartet wird, sodass wir reibungslose Teile mit einer Dauer von 1 Stunde erhalten, und zwar im Handumdrehen, wo immer wir sie benötigen.

Warum?


Ich bin geneigt zu glauben, dass dieser Artikel die Frage "Warum?" Es gibt fertige Lösungen für die Registrierung von Kameras, Kameras können im Allgemeinen eine Menge Dinge selbst tun, wie sie in den Kommentaren zum vorherigen Artikel zu Recht beanstandet haben, und im Allgemeinen könnte es viel einfacher gemacht werden.

Ich hatte nur Freizeit, die ich irgendwo machen musste, den Wunsch, meinen eigenen Weg zu gehen, und das unermüdliche Verlangen nach Experimenten.

Und es funktioniert.

Natürlich erfordert das im Artikel vorgestellte Skript einige Änderungen, und Sie sollten es nicht in dieser Form verwenden - ich wiederhole, dies ist ein Proof of Concept. Im Allgemeinen rate ich Ihnen dringend, kniehohe hausgemachte Produkte zu verwenden, um die Sicherheit Ihres Zuhauses zu gewährleisten.
In meinem Fall ist Zuverlässigkeit nicht so wichtig, daher funktioniert diese Option. Über dieses Skript dreht sich ein Skript-Beobachter, der den Stream zu ffmpeg überträgt, stundenweise Stücke aufzeichnet und auf die VK hochlädt. Bei einer Unterbrechung oder einigen unvorhergesehenen Ereignissen wird das Skript neu gestartet.

Im Allgemeinen hätte dies alles vermieden werden können, indem einfach das Video über RTSP über TCP abgerufen wurde, wie ich am Ende des vorherigen Artikels angedeutet hatte, aber ffmpeg hielt gerne zu einem zufälligen Zeitpunkt inne und reagierte nur auf SIGKILL.

Ich begann zu vermuten, dass all dies darauf zurückzuführen war, dass die übergeordnete Skriptüberwachung von der Krone ausgeführt wurde und eine niedrigere Priorität erhielt, die Erhöhung der Priorität jedoch nur die Häufigkeit des Problems verringerte.

Im Falle des obigen Skripts funktioniert alles viel stabiler und hat noch keine Beschwerden hervorgerufen.

Source: https://habr.com/ru/post/de484518/


All Articles