في
مقال سابق ، وعدت بعرض برنامج نصي يسحب الفيديو من الكاميرا ، وعلى الرغم من مرور فترة زمنية معينة منذ ذلك الحين ، يجب الوفاء بالوعود. لذلك أنا أفعل ذلك.
لقد حدث أن عدم التزامن في عالم خادم الويب يرتبط بكل شيء ، ولكن ليس PHP.
حسنًا ، لأنه ، كما تعلمون ، هذا النموذج المحتضر ، يتسرب من الذاكرة ، وفي الحقيقة لا يوجد شيء خارج المربع في PHP باستثناء
stream_select () و
stream_set_blocking () .
في مكان ما هناك ، على PECL ، هناك نوع من
libuv ، والذي هو من حيث المبدأ مجرد غلاف للوظائف الأصلية للمكتبة الأصلية ، لذا فإن استخدامه كما يوفر لك بعض التحديات. على أي حال ، من في عقولهم الصحيحة سيفعل ذلك؟
ولكن إذا توقفنا عن العيش في عالم PHP4 ونعود إلى الواقع الحديث قليلاً ، فسنرى أن الأمور قد تغيرت إلى حد ما في السنوات الأخيرة. لدينا أدوات مثيرة للاهتمام مثل
ReactPHP و
AmPHP ، التي تغطي مكوناتها وظائف Node.js بشكل جيد ، ووجود المولدات يتيح لنا كتابة التعليمات البرمجية غير المتزامنة بأسلوب مناسب ، مثل
async / await ، وتجنب كل عمليات الاسترجاعات التي لا نهاية لها في عمليات الاسترجاعات وسلاسل الكيلومترات
. ثم ( ) .ثم (). ثم () .
لهذا السبب الآن ، يبدو لي ، أنه لا يوجد عملياً أي مهام من عالم Node.js لم يستطع PHP حلها. ولكن إذا كان لا يزال هناك بعض ، فإن كل شيء يعتمد فقط على وجود بعض المكتبات المنفصلة ، وليس على نقص الفرص على هذا النحو.
قد يفاجئ الناس لمعرفة أن المكتبة القياسية لـ PHP لديها بالفعل كل ما نحتاجه لكتابة التطبيقات التي تعتمد على الأحداث وغير المحظورة. لا نصل إلى حدود وظائف PHP الأصلية في هذا المجال إلا عندما نطلب منها استطلاع الآلاف من واصفات الملفات لنشاط IO في نفس الوقت. حتى في هذه الحالة ، على الرغم من أن الخطأ ليس مع PHP ولكن استدعاء النظام الأساسي select () هو خطي في تدهور الأداء مع زيادة التحميل.
amphp.org/amp/event-loop
بالطبع ، لم يتم إغلاق مسألة استخدام كل هذا في الإنتاج بالكامل ، ولكن إذا لم تفعل أشياء خاطئة بشكل واضح ، لن تسامحك أي بيئة تشغيل غير متزامنة أخرى بها ، فسيكون كل شيء على ما يرام.
DVRIP
سننظر في البروتوكول الذي يفترض أن الصينيين اخترعوه ، والذي يستخدم لتوصيل البرامج بكاميرات المراقبة.
كل هذا يعمل على رأس TCP ، يسمى DVRIP (في بعض الأحيان صوفيا) ، وفي الوقت الذي احتاجت إليه ، وجدت مكتبتين عاقلتين أو أكثر ، وصفت واحدة منها بشكل عام الاتصال فقط بالكاميرا وتبادل عدة رسائل ، ولكن وصف رأس الحزمة التي ساعدتني كثيرا.
تم العثور على الثانية في وقت لاحق ، في وقت تفاقم متلازمة بلدي لم يخترع هنا ، وكان يعمل بشكل مختلف قليلا عن ما أود.
بشكل عام ، مُسلحًا بمتشمم ووصف لرأس الحزمة ، لقد قمت برسم فترة قصيرة من البرنامج النصي الذي يدور في مكان ما على الخادم الخاص بي في مكان ما ، وسيتم تقديمه في دليل معيّن على المفهوم في هذه المقالة.
البروتوكول نفسه ليس معقدًا وتبدو الحزمة كما يلي:
سننقل json فقط ، لكننا سنستقبل كليهما.
عادةً ما تحتوي الإجابة (إذا كانت حزمة مع json) على حقل Ret يشير إلى نجاح طلبنا. الردود التي تحتوي على كود
Ret التي لا تساوي
100 أو
515 سوف تعتبر خاطئة بشكل متعمد.
مسلحين بهذه المعرفة الأساسية ، دعنا نتواصل مع الكاميرا ونفهمها.
نحتاج إلى مكتبتين -
amphp / socket ، والتي ستقوم
بسحب كل ما نحتاج إليه تقريبًا ، و
evenement / evenement ، وهي متوفرة في packagist ولا تتطلب أي امتدادات.
يبدو العمل مع TCP في Amp شيئًا مثل هذا:
Loop::run(function () { $socket = yield connect("tcp://{$addr}:{$port}"); yield $socket->write('Hello'); while ($chunk = yield $socket->read()) { echo $chunk; } $socket->close(); });
عادةً ما يتم تشغيل DVRIP على المنفذ 34567 ، لذلك في حالتنا سيكون الأمر مثل هذا:
$socket = yield connect('tcp://192.168.0.200:34567');
بعد ذلك ، نقوم بتسجيل الدخول عن طريق إرسال اسم المستخدم وكلمة مرور تجزئة إلى الكاميرا ، والتي يتم حسابها على النحو التالي:
function sofiaHash(string $password) : string { $md5 = md5($password, true); return implode('', array_map(function ($i) use ($md5) { $c = (ord($md5[2 * $i]) + ord($md5[2 * $i + 1])) % 62; $c += $c > 9 ? ($c > 35 ? 61 : 55) : 48; return chr($c); }, range(0, 7))); }
في حالة الحصول على ترخيص ناجح ، نطلب مقطع فيديو ، وإذا تمكنت الكاميرا من إعطائه لنا ، فإننا نبدأ في إرساله.
كل نوع من الحزمة له msgId خاص به ، حتى نتمكن من فهم ما طلبنا من الإجابة. بشكل عام ، وليس وظيفة المتربة.
للراحة ، دعونا أولاً صف الفصل الدراسي الخاص بنا:
class Packet { public const SUCCESS_CODES = [100, 515]; public const MESSAGE_CODES = [ 'packet.request.login' => 1000, 'packet.request.keepAlive' => 1006, 'packet.request.claim' => 1413, 'packet.response.login' => 1001, 'packet.response.keepAlive' => 1007, 'packet.response.claim' => 1414, 'packet.videoControl' => 1410, 'packet.binary' => 1412, ]; protected int $head = 255; protected int $version = 0; protected int $sessionId; protected int $sequence; protected int $msgId; protected ?string $rawData; protected ?array $data; public function __construct(int $msgId, ?array $data = null, int $sequence = 0, int $sessionId = 0) { $this->msgId = $msgId; $this->data = $data; $this->sequence = $sequence; $this->sessionId = $sessionId; } public function __toString() : string { if ($this->data !== null) { $this->rawData = json_encode($this->data, JSON_THROW_ON_ERROR) . "\x0a\x00"; } return pack('CCx2IIx2SI', $this->head, $this->version, $this->sessionId, $this->sequence, $this->msgId, strlen($this->rawData)) . $this->rawData; } public function getData() : ?array { return $this->data; } public function getRawData() : ?string { return $this->rawData; } public function getSession() : int { return $this->sessionId; } public function getSequence() : int { return $this->sequence; } public function getMessageType() : string { $types = array_flip(static::MESSAGE_CODES); if (!isset($types[$this->msgId])) { throw new \Exception('Unknown message type: ' . $this->msgId); } return $types[$this->msgId]; } }
وحاول إرسال طلب تفويض
yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.login'], [ 'EncryptType' => 'MD5', 'LoginType' => 'DVRIP-Web', 'PassWord' => sofiaHash('admin'), 'UserName' => 'admin', ]));
نعم عظيم. لقد وصلنا وأرسلنا رسالة ، لكننا لا نعرف حتى إذا كانت الكاميرا صحيحة وتفاعلت معها بطريقة أو بأخرى على الأقل. نحتاج إلى قراءة الإجابة بطريقة أو بأخرى (دعونا نفعل ذلك بشكل غير متزامن) ، وتحديد حزم معينة فيه وفك تشفيرها.
من أجل فصل منطق معالجة حزم البيانات الواردة ردًا على ذلك ، قررت استخدام مكتبة
المساء وأرسم فئة بناءً على ذلك ستطرد الحزم التي تم فك شفرتها بالفعل
class Reader extends EventEmitter { protected InputStream $socket; public function __construct(InputStream $socket) { $this->socket = $socket; } public function start() : \Generator { $buffer = ''; while (!$this->socket->isClosed() && $result = yield Packet::read($this->socket, $buffer)) { [$packet, $chunk] = $result; $buffer = $chunk; $chunk = null; $this->emit('packet', [$packet]); $this->emit($packet->getMessageType(), [$packet]); } $buffer = null; } }
وأضف أيضًا طريقتين ثابتتين إلى فئة الحزم لدينا ، والتي في الواقع ستقوم باستخراج الحزم وفك تشفيرها من دفق البيانات
public static function read(InputStream $socket, string $buffer = '') : Promise { return call(function () use ($socket, $buffer) { $header = null; do { if ($header === null && strlen($buffer) > 20) { $header = unpack('Chead/Cversion/x2/IsessionId/Isequence/x2/SmsgId/Ilen', substr($buffer, 0, 20)); $buffer = substr($buffer, 20); } if ($header !== null && strlen($buffer) >= (int) $header['len']) { return [ static::fromRaw($header, (int) $header['len'] > 0 ? substr($buffer, 0, (int) $header['len']) : null), substr($buffer, (int) $header['len']) ]; } $buffer .= yield $socket->read(); } while (!$socket->isClosed()); }); } protected static function fromRaw(array $header, ?string $rawData) : self { $packet = new static($header['msgId'], null, $header['sequence'], $header['sessionId']); $packet->rawData = $rawData; $packet->data = (int) $packet->msgId === static::MESSAGE_CODES['packet.binary'] || $rawData === null ? null : json_decode(substr($rawData, 0, -2), true, 512, JSON_THROW_ON_ERROR); return $packet; }
الآن ، من خلال استخدام
Evenement ، يمكننا وصف منطقنا في النموذج
$reader->on($packetType, $handler);
وهكذا حصلت على هذه السلسلة ، حيث أضفت معالجة أخرى لـ Keep Alive و SIGINT / SIGTERM:
$reader = new Reader($socket = yield connect('tcp://10.0.5.100:49152')); $reader->on('packet.response.login', asyncCoroutine(function (Packet $packet) use ($socket) { if (empty($packet->getData()['Ret']) || !in_array($packet->getData()['Ret'], Packet::SUCCESS_CODES)) { throw new \Exception('Wrong login data'); } yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.claim'], [ 'Name' => 'OPMonitor', 'OPMonitor' => ['Action' => 'Claim', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode' => 'TCP']], ], 1, $packet->getSession())); })); $reader->once('packet.response.login', function (Packet $packet) use ($socket) { Loop::unreference(Loop::repeat($packet->getData()['AliveInterval'] * 1000, function ($watcherId) use ($socket, $packet) { if ($socket->getResource() === null) { return Loop::cancel($watcherId); } yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.keepAlive'], [ 'Name' => 'KeepAlive', 'SessionID' => $packet->getData()['SessionID'] ], 0, $packet->getSession())); })); }); $reader->on('packet.response.claim', asyncCoroutine(function (Packet $packet) use ($socket) { yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.videoControl'], [ 'Name' => 'OPMonitor', 'OPMonitor' => ['Action' => 'Start', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode' => 'TCP']], ], $packet->getSequence() + 1, $packet->getSession())); })); $emitter = new Emitter(); $reader->on('packet.binary', function ($packet) use ($emitter) { $emitter->emit($packet->getRawData()); }); $reader->once('packet.binary', function (Packet $packet) use ($socket) { $signalHandler = function () use ($socket, $packet) { yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.videoControl'], [ 'Name' => 'OPMonitor', 'OPMonitor' => ['Action' => 'Stop', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode' => 'TCP']], ], 0, $packet->getSession())); $socket->close(); }; Loop::unreference(Loop::onSignal(defined('SIGINT') ? SIGINT : 2, $signalHandler, 'SIGINT')); Loop::unreference(Loop::onSignal(defined('SIGTERM') ? SIGTERM : 15, $signalHandler, 'SIGTERM')); }); asyncCall([$reader, 'start']); yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.login'], [ 'EncryptType' => 'MD5', 'LoginType' => 'DVRIP-Web', 'PassWord' => sofiaHash('123qwea'), 'UserName' => 'admin', ])); yield pipe(new IteratorStream($emitter->iterate()), getStdout());
غني عن القول أنه بدلاً من stdout ، يمكننا إعادة توجيه دفق الفيديو أينما نريد.
النصي النهائي <?php ini_set('display_errors', 'stderr'); use Amp\Emitter; use Amp\Loop; use Amp\Promise; use Amp\ByteStream\InputStream; use Amp\ByteStream\IteratorStream; use Evenement\EventEmitter; use function Amp\call; use function Amp\asyncCall; use function Amp\asyncCoroutine; use function Amp\ByteStream\getStderr; use function Amp\ByteStream\getStdout; use function Amp\ByteStream\pipe; use function Amp\Socket\connect; require 'vendor/autoload.php'; function sofiaHash(string $password) : string { $md5 = md5($password, true); return implode('', array_map(function ($i) use ($md5) { $c = (ord($md5[2 * $i]) + ord($md5[2 * $i + 1])) % 62; $c += $c > 9 ? ($c > 35 ? 61 : 55) : 48; return chr($c); }, range(0, 7))); } class Reader extends EventEmitter { protected InputStream $socket; public function __construct(InputStream $socket) { $this->socket = $socket; } public function start() : \Generator { $buffer = ''; while (!$this->socket->isClosed() && $result = yield Packet::read($this->socket, $buffer)) { [$packet, $chunk] = $result; $buffer = $chunk; $chunk = null; $this->emit('packet', [$packet]); $this->emit($packet->getMessageType(), [$packet]); } $buffer = null; } } class Packet { public const SUCCESS_CODES = [100, 515]; public const MESSAGE_CODES = [ 'packet.request.login' => 1000, 'packet.request.keepAlive' => 1006, 'packet.request.claim' => 1413, 'packet.response.login' => 1001, 'packet.response.keepAlive' => 1007, 'packet.response.claim' => 1414, 'packet.videoControl' => 1410, 'packet.binary' => 1412, ]; public static function read(InputStream $socket, string $buffer = '') : Promise { return call(function () use ($socket, $buffer) { $header = null; do { if ($header === null && strlen($buffer) > 20) { $header = unpack('Chead/Cversion/x2/IsessionId/Isequence/x2/SmsgId/Ilen', substr($buffer, 0, 20)); $buffer = substr($buffer, 20); } if ($header !== null && strlen($buffer) >= (int) $header['len']) { return [ static::fromRaw($header, (int) $header['len'] > 0 ? substr($buffer, 0, (int) $header['len']) : null), substr($buffer, (int) $header['len']) ]; } $buffer .= yield $socket->read(); } while (!$socket->isClosed()); }); } protected static function fromRaw(array $header, ?string $rawData) : self { $packet = new static($header['msgId'], null, $header['sequence'], $header['sessionId']); $packet->rawData = $rawData; $packet->data = (int) $packet->msgId === static::MESSAGE_CODES['packet.binary'] || $rawData === null ? null : json_decode(substr($rawData, 0, -2), true, 512, JSON_THROW_ON_ERROR); return $packet; } protected int $head = 255; protected int $version = 0; protected int $sessionId; protected int $sequence; protected int $msgId; protected ?string $rawData; protected ?array $data; public function __construct(int $msgId, ?array $data = null, int $sequence = 0, int $sessionId = 0) { $this->msgId = $msgId; $this->data = $data; $this->sequence = $sequence; $this->sessionId = $sessionId; } public function __toString() : string { if ($this->data !== null) { $this->rawData = json_encode($this->data, JSON_THROW_ON_ERROR) . "\x0a\x00"; } return pack('CCx2IIx2SI', $this->head, $this->version, $this->sessionId, $this->sequence, $this->msgId, strlen($this->rawData)) . $this->rawData; } public function getData() : ?array { return $this->data; } public function getRawData() : ?string { return $this->rawData; } public function getSession() : int { return $this->sessionId; } public function getSequence() : int { return $this->sequence; } public function getMessageType() : string { $types = array_flip(static::MESSAGE_CODES); if (!isset($types[$this->msgId])) { throw new \Exception('Unknown message type: ' . $this->msgId); } return $types[$this->msgId]; } } Loop::run(function () : \Generator { $reader = new Reader($socket = yield connect('tcp://10.0.5.100:49152')); $reader->on('packet.response.login', asyncCoroutine(function (Packet $packet) use ($socket) { if (empty($packet->getData()['Ret']) || !in_array($packet->getData()['Ret'], Packet::SUCCESS_CODES)) { throw new \Exception('Wrong login data'); } yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.claim'], [ 'Name' => 'OPMonitor', 'OPMonitor' => ['Action' => 'Claim', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode' => 'TCP']], ], 1, $packet->getSession())); })); $reader->once('packet.response.login', function (Packet $packet) use ($socket) { Loop::unreference(Loop::repeat($packet->getData()['AliveInterval'] * 1000, function ($watcherId) use ($socket, $packet) { if ($socket->getResource() === null) { return Loop::cancel($watcherId); } yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.keepAlive'], [ 'Name' => 'KeepAlive', 'SessionID' => $packet->getData()['SessionID'] ], 0, $packet->getSession())); })); }); $reader->on('packet.response.claim', asyncCoroutine(function (Packet $packet) use ($socket) { yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.videoControl'], [ 'Name' => 'OPMonitor', 'OPMonitor' => ['Action' => 'Start', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode' => 'TCP']], ], $packet->getSequence() + 1, $packet->getSession())); })); $emitter = new Emitter(); $reader->on('packet.binary', function ($packet) use ($emitter) { $emitter->emit($packet->getRawData()); }); $reader->once('packet.binary', function (Packet $packet) use ($socket) { $signalHandler = function () use ($socket, $packet) { yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.videoControl'], [ 'Name' => 'OPMonitor', 'OPMonitor' => ['Action' => 'Stop', 'Parameter' => ['Channel' => 0, 'CombinMode' => 'NONE', 'StreamType' => 'Main', 'TransMode' => 'TCP']], ], 0, $packet->getSession())); $socket->close(); }; Loop::unreference(Loop::onSignal(defined('SIGINT') ? SIGINT : 2, $signalHandler, 'SIGINT')); Loop::unreference(Loop::onSignal(defined('SIGTERM') ? SIGTERM : 15, $signalHandler, 'SIGTERM')); }); asyncCall([$reader, 'start']); yield $socket->write((string) new Packet(Packet::MESSAGE_CODES['packet.request.login'], [ 'EncryptType' => 'MD5', 'LoginType' => 'DVRIP-Web', 'PassWord' => sofiaHash('123qwea'), 'UserName' => 'admin', ])); yield pipe(new IteratorStream($emitter->iterate()), getStdout()); });
الآن يمكننا محاولة تسجيل مقطع فيديو بهذه الطريقة
$ php recorder.php > output.h264
أضفنا معالج SIGINT ، لذلك عندما تتعب ، فقط اضغط على Ctrl + C وسيقوم البرنامج النصي بإيقاف نقل الفيديو وإغلاق الاتصال بشكل طبيعي.
ويمكننا ، على سبيل المثال ، إعادة توجيه الدفق إلى ffmpeg ورمز الشفرة أثناء الطيران.
$ php recorder.php | ffmpeg -nostdin -y -hide_banner -loglevel verbose -f h264 -i pipe:0 -pix_fmt yuv420p -c copy -movflags +frag_keyframe+empty_moov+default_base_moof -reset_timestamps 1 -f mpegts output.mpegts
ويمكنك عمل سلاسل محمومة من النموذج
$ php recorder.php | ffmpeg ... | curl -d @- ...
إطلاق كل هذا بنوع من البرامج النصية التي ستستأنف كل ساعة ، حتى نحصل على قطع ناعمة تدوم ساعة واحدة ، أثناء التنقل ، أينما نحتاج.
لماذا؟
أنا أميل إلى الاعتقاد بأن هذا المقال سوف يثير السؤال "لماذا؟" هناك حلول جاهزة للتسجيل من الكاميرات ، والكاميرات بشكل عام يمكنها القيام بالكثير من الأشياء بنفسها ، لأنها اعترضت بحق في التعليقات على المقال السابق ، ويمكن عمومًا جعلها أسهل بكثير.
إنه فقط كان لدي وقت فراغ اضطررت إلى القيام به في مكان ما ، والرغبة في شق طريقي الخاص ، وشغف لا يطاق للتجارب.
وهو يعمل.
بالطبع ، يتطلب النص المعروض في المقالة بعض التعديلات ، ويجب ألا تستخدمه في هذا النموذج - وأكرر ، هذا دليل على المفهوم. بشكل عام ، أحذرك بشدة من استخدام منتجات منزلية الصنع عالية الركبة لضمان سلامة منزلك.
في حالتي ، لا تعد الموثوقية أمرًا بالغ الأهمية ، لذلك سيعمل هذا الخيار. يدور مراقب البرنامج النصي فوق هذا البرنامج النصي ، الذي ينقل الدفق إلى ffmpeg ، ويسجل القطع بالساعة ويحملها إلى VK ، ويعيد تشغيل البرنامج النصي في حالة انقطاع الاتصال أو بعض الأحداث غير المتوقعة.
بشكل عام ، كان من الممكن تجنب كل هذا بمجرد سحب الفيديو عبر RTSP عبر TCP ، كما اقترحت في نهاية المقالة السابقة ، لكن ffmpeg أحب أن يتوقف بغباء في لحظة عشوائية في الوقت المناسب والبدء في الاستجابة فقط لـ SIGKILL.
بدأت أشك في أن كل هذا كان بسبب مراقبة النص الأصل من قبل التاج وحصلت على أولوية أقل ، ولكن زيادة الأولوية قللت فقط من وتيرة المشكلة.
في حالة النص أعلاه ، كل شيء يعمل بشكل أكثر استقرارًا ولم يرفع أي شكاوى بعد.