-
Notifications
You must be signed in to change notification settings - Fork 0
/
inetAIS.php
237 lines (224 loc) · 16.1 KB
/
inetAIS.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
<?php
/*
https://meri.digitraffic.fi/api/ais/v1/locations?latitude=60.1688&longitude=24.939&radius=30
version 0.2.7
Если в конфиге указать переменную $gpsdPROXYhost, то демон будет пытаться
отдать данные gpsdPROXY, кроме обслуживания указанного порта.
*/
chdir(__DIR__); // задаем директорию выполнение скрипта
require_once("fCommon.php");
require_once("fAIS.php");
require_once("fgpsdPROXY.php");
require_once("params.php");
$getTPVtmeout = round(0.75*$getDataTimeout); // получать координаты подвижной точки не чаще, чем сек.
$inSocket=null; $gpsdPROXYsocket=null;
if($inetAIShost){ // Входящее соединение для клиентов
$inSocket = stream_socket_server("tcp://$inetAIShost:$inetAISport",$errno,$errstr);
if($inSocket) echo "Inbound connections are expected on tcp://$inetAIShost:$inetAISport\n";
};
if(isset($gpsdPROXYhost)) { // Соединение с gpsdPROXY для передачи данных
$gpsdPROXYsocket = gpsdPROXYconnect($gpsdPROXYhost,$gpsdPROXYport); // fgpsdPROXY.php
if($gpsdPROXYsocket) echo "The gpsdPROXY feeder open.\n";
}
if(!$inSocket and $gpsdPROXYsocket) exit("Imposible to create inbound socket or gpsdPROXY feeder: $errstr\n");
$instrumentsData = array('AIS'=>array()); // собственно собираемые / кешируемые данные
$getDataTimeout = min(min($gpsdProxyTimeouts['AIS']),$getDataTimeout);
echo "Gets data from AIS source every $getDataTimeout sec.\n";
echo "Sends AIS TPV every {$AISintervals['TPV']} sec, and other info every {$AISintervals['metainfo']} sec.\n";
echo "\n";
$rotateBeam = array("|","/","-","\\");
$rBi = 0;
$inboundConnects = array(); // потоки входящих соединений, array
$externalProcesses = array(); // индексный массив внешних процессов, вида: array("key or id" => array("process"=>resource,"pipes"=>array(),'inString'=>""))
$outPipes = array(); // массив исходящих потоков
$errPipes = array(); // массив ошибочных потоков
$mesNMEA = array(); // массив сообщений AIS для отправки клиентам, воспринимается как очередь
$lastGetFromSource = 0;
$lastGetTPV = 0;
$countrecievedMMSI = 0;
do{
//$microtime = microtime(true);
$timeout = min($getDataTimeout,$getTPVtmeout);
$inPipes = $inboundConnects; // будем слушать уже открытые потоки
if($inSocket) $inPipes[] = $inSocket; // будем слушать входной сокет
foreach($externalProcesses as $process){ // для каждого запущенного внешнего процесса
$inPipes[] = $process["pipes"][1]; // будем слушать поток его стандартного вывода (чисто для памяти: там лежат переменные, которые являются ссылками на ресурсы)
$inPipes[] = $process["pipes"][2]; // будем слушать поток его stderr, потому что внешние процессы у нас -- скрипты php, и stderr собственно скрипта выводится в stdout php
};
$errPipes = $inboundConnects; // проверять будем только клиентские потоки, потому что см. выше
if(isset($gpsdPROXYhost)) { // Перезапустим соединение с gpsdPROXY, если оно обломилось.
//echo "\ngpsdPROXYsocket:".$gpsdPROXYsocket.", gettype()=".gettype($gpsdPROXYsocket)."\n";
if(!$gpsdPROXYsocket) { // если с сокетом что, то в sendAIStogpsdPROXY ему присвоится null, и всё будет работать и в PHP8
$gpsdPROXYsocket = gpsdPROXYconnect($gpsdPROXYhost,$gpsdPROXYport);
if($gpsdPROXYsocket) echo "The gpsdPROXY feeder reopen. \n";
else echo "The gpsdPROXY feeder false to reopen. \n";
};
};
// Показ сообщения
if($inboundConnects or $gpsdPROXYsocket) {
echo($rotateBeam[$rBi]); // вращающаяся палка
echo " Connected ";
if($inSocket) {
echo (count($inboundConnects))." clients";
if($gpsdPROXYsocket) echo " and ";
else echo ". ";
};
if($gpsdPROXYsocket) echo "from gpsdPROXY. ";
echo "Changed targets ";
if(@count($recievedMMSI)) $countrecievedMMSI = count($recievedMMSI); // таким образом, в $countrecievedMMSI количество последних когда-то изменённых целей, а не факт, что за последний оборот ничего не произошло
echo "$countrecievedMMSI";
if(@$AISinterestPoints['self']) echo " pos:".round($AISinterestPoints['self']['latitude'],3).",".round($AISinterestPoints['self']['longitude'],3)." ";
else echo " ";
echo "\r";
$rBi++;
if($rBi>=count($rotateBeam)) $rBi = 0;
}
else {
// если у нас нет ни одного сокета (нет клиентов, и нет входящего соединения)
// то stream_select не будет ждать ни при каком значении timeout. Что баг.
// А в PHP8 -- это вообще Fatal error. Казлы.
// Поэтому при отсутствии входящих соединений и соединения с gpsdPROXY
// используем sleep, чтобы оно, во-первых, в основном стояло, а во-вторых, время от времени
// пыталось соединиться с gpsdPROXY.
if(!isset($gpsdPROXYhost)) $timeout = null; // ждать вечно, если не нужно общаться с gpsdPROXY
echo "No inbound connections, waiting on $inetAIShost:$inetAISport \r";
};
//$timeout = $getDataTimeout; // для целей тестирования
//echo "\ntimeout=$timeout; inPipes:"; print_r($inPipes); echo "outPipes:"; print_r($outPipes);
if($inPipes or $outPipes or $errPipes){ // это концептуально неправильно, но будет работать в PHP8. Правильный код с проверкой $nStreams===null в PHP8 работать не будет
$nStreams = @stream_select($inPipes,$outPipes,$errPipes,$timeout);
}
else sleep($getDataTimeout); // нет ни входящих, ни сокета для подключения, ни gpsdPROXY
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Проблемы. С нашей стороны?
if($errPipes) {
// всё так просто, потому что мы следим только за клиентскими потоками, не за потоками
// внешних процессов
foreach($errPipes as $streem){
unset($inboundConnects[array_search($streem, $inboundConnects)]); // удалим проблемный поток из списка действующих
if(is_resource($streem)) fclose($streem);
unset($streem);
echo "There is a problem with the client streem, the streem is closed and deleted \n";
};
};
// Чтение
$recievedMMSI = array(); // массив изменившихся целей вида array($mmsi)
if($inPipes) {
//echo "\n inPipes:";var_dump($inPipes);echo "\n";
//echo "\n externalProcesses:";var_dump($externalProcesses);echo "\n";
//echo "\n inboundConnects:";var_dump($inboundConnects);echo " \n
$toDie = array();
foreach($inPipes as $pipe){
//echo "\ninPipes pipe="; var_dump($pipe);echo " \n";
if($pipe === $inSocket){ //echo "во входной сокет кто-то постучался \n";
$inboundConnects[] = stream_socket_accept($inSocket, -1);
continue; // к следующему потоку
};
if(($procID = isExternalPipe($pipe,2))!==false){ // поток из stderr внешнего процесса
$externalProcesses[$procID]['inString'] .= stream_get_contents($pipe);
if(feof($pipe)) {
echo "The process error {$externalProcesses[$procID]['inString']} on external process $procID \n";
//echo "Проблема с внешним процессом $procID \n";
$toDie[] = $procID;
};
continue; // к следующему потоку
};
if(($procID = isExternalPipe($pipe,1))!==false){ // поток из stduot внешнего процесса
//echo "внешний процесс $procID что-то вернул \n";
@$externalProcesses[$procID]['inString'] .= trim(stream_get_contents($pipe));
if(feof($pipe)) {
//if($procID==='getTPVprocess') {echo "getTPVprocess data:";print_r($externalProcesses[$procID]);echo ";\n";}
$extData = unserialize($externalProcesses[$procID]['inString']);
//$extData = json_decode($externalProcesses[$procID]['inString'],true);
//echo "\nextData=";print_r($extData);echo ";\n";
//if($procID==='getTPVprocess') {echo "getTPVprocess extData=";print_r($extData);echo ";\n";}
$toDie[] = $procID; // в любом случае этот процесс надо убить
if(!is_array($extData)){
if($externalProcesses[$procID]['inString'] == 'N;') {} // оно возвращает это, если в указанной области нет целей AIS
//elseif($externalProcesses[$procID]['inString'] == 'no any Signal K resources found') {} // не удалось получить координаты
else echo "The problem '{$externalProcesses[$procID]['inString']}' with external process $procID \n";
continue; // к следующему потоку
}
elseif($extData['error']){
echo "The error '{$extData['error']}' on external process $procID \n";
continue; // к следующему потоку
};
// updInstrumentsData понимает как набор с координатами, так и набор с метаинформацией
if($extData) { // непустой массив
if($extData['class']=='TPV'){ // это координаты
//echo "Координаты:";print_r($extData);
$AISinterestPoints['self'] = array('latitude'=>$extData['lat'],'longitude'=>$extData['lon'],'radius'=>$movingPOIradius);
}
else { // это информация AIS
$recievedMMSI = array_unique(array_merge($recievedMMSI,updInstrumentsData($extData))); // плоский массив
//echo "имеется целей AIS в instrumentsData ".count($instrumentsData['AIS'])."\n";
list($noMetaData,$deletedMMSI) = chkFreshOfData(); // Проверим актуальность данных и получим список тех, для кого нет полной информации. При этом в $recievedMMSI могли бы остаться mmsi удалённых в этом процессе объектов
//echo "осталось свежих целей AIS в instrumentsData ".count($instrumentsData['AIS'])."\n";
$recievedMMSI = array_diff($recievedMMSI,$deletedMMSI); // теперь в $recievedMMSI mmsi изменённых целей AIS, оставшихся в $instrumentsData
};
};
$extData = '';
};
continue; // к следующему потоку
};
echo "Other streems with inbound data: \n";
// вообще-то, там ничего не должно приходить, но telnet, например, присылает сообщение о закрытии соединения
$res = fgets($pipe,2048); // обязательно надо читать, иначе stream_socket_accept сразу будет возвращать поток, в котором что-то есть
echo "res=$res;\n";
if($res === false) closeClient($pipe); // клиент отвалился
};
// Поскольку каждый внешний процесс имеет несколько потоков, нужно прочесть все потоки
// прежде чем убивать внешний процесс и его потоки.
// После прочтения всех потоков убиваем те внешние процессы, на которые указали
$toDie = array_unique($toDie);
array_walk($toDie,'closeProcess');
$externalProcesses = array_merge($externalProcesses); // перенумеруем процессы с начала, чтобы их номера не увеличивались бесконечно
};
// Выполнение
// Поскольку для реализации отдачи в gpsdPROXY оно оборачивается и при отсутствии кому отдавать,
// организуем запросы наружу только при наличии получателя внутри
if($inboundConnects or $gpsdPROXYsocket){ // есть клиенты или соединение с gpsdPROXY
if((time()-$lastGetFromSource)>=$getDataTimeout) { // спрашивать данные у источника не чаще указанного, а не каждый оборот
$lastGetFromSource = time();
// Получение координат целей AIS
foreach($AISinterestPoints as $label => $poi){ // опросим все точки
//echo "Get AIS targets for $label point \n"; print_r($poi);
openProcess("$phpCLIexec getAISdata.php",serialize($poi));
};
// Получение метаданных
if($noMetaData and !is_resource($externalProcesses['getMetaDataProcess']['process'])){ // не запущен процесс получения метаданных
//echo "Has ".count($noMetaData)." AIS targets without full metadata \n";
//echo "noMetaData=";print_r($noMetaData);
exec('pkill getMetaData.php'); // убъём, если такой процесс запущен. Нормально он должен был успеть.
openProcess("$phpCLIexec getMetaData.php",serialize($noMetaData),'getMetaDataProcess');
$noMetaData = null;
};
};
// Получение координат подвижной точки (собственных, ага)
// Их нужно получать с отдельным интервалом, потому что интервал $getDataTimeout
// может быть большим, и свои координаты всегда будут не в той точке
if($netAISgpsdHost and (time()-$lastGetTPV)>=$getTPVtmeout) { // спрашивать координаты не чаще указанного, а не каждый оборот
$lastGetTPV = time();
if(!is_resource(@$externalProcesses['getTPVprocess']['process'])){ // не запущен процесс получения метаданных
//echo "Запускаем процесс получения координат \n";
exec('pkill getAISdata.php'); // убъём, если такой процесс запущен. Нормально он должен был успеть.
openProcess("$phpCLIexec getTPV.php",'','getTPVprocess');
};
};
};
// Запись
//file_put_contents('instrumentsData.json',json_encode($instrumentsData,JSON_PRETTY_PRINT));
if($inetAIShost) { //есть (должно быть) входящее соединение для клиентов
if($inboundConnects and $recievedMMSI){ // есть клиенты и есть, что передавать
$mesNMEA = array_merge($mesNMEA,getAISData(array_intersect(array_keys($instrumentsData["AIS"]),$recievedMMSI)));
//echo "$mesNMEA\n";
};
sendAIS(); // Отправляет одно первое сообщение AIS из массива $mesNMEA в каждый из потоков в массиве $сonnects
};
if($gpsdPROXYsocket and $recievedMMSI) { // отсылать gpsdPROXY
//echo "\n отсылаем в GPSDPROXY\n"; // синхронно
sendAIStogpsdPROXY();
};
//if((microtime(true)-$microtime)>0.3) echo "Ждали ".(microtime(true)-$microtime)." sec. ";
}while(true);
?>