1: <?php
2:
3: namespace PHPFastCGI\FastCGIDaemon\Connection;
4:
5: use PHPFastCGI\FastCGIDaemon\ConnectionHandler\ConnectionHandlerFactoryInterface;
6: use PHPFastCGI\FastCGIDaemon\ConnectionHandler\ConnectionHandlerInterface;
7:
8: 9: 10: 11:
12: class StreamSocketConnectionPool implements ConnectionPoolInterface
13: {
14: 15: 16:
17: private $serverSocket;
18:
19: 20: 21:
22: private $clientSockets;
23:
24: 25: 26:
27: private $connections;
28:
29: 30: 31:
32: private $connectionHandlers;
33:
34: 35: 36: 37: 38:
39: public function __construct($socket)
40: {
41: stream_set_blocking($socket, 0);
42:
43: $this->serverSocket = $socket;
44: $this->clientSockets = [];
45: $this->connections = [];
46: $this->connectionHandlers = [];
47: }
48:
49: 50: 51:
52: public function operate(ConnectionHandlerFactoryInterface $connectionHandlerFactory, $timeoutLoop)
53: {
54: $timeoutLoopSeconds = (int) floor($timeoutLoop);
55: $timeoutLoopMicroseconds = (int) (($timeoutLoop - $timeoutLoopSeconds) * 1000000);
56:
57: $write = $except = [];
58:
59: $read = array_merge(['pool' => $this->serverSocket], $this->clientSockets);
60:
61: if (false === @stream_select($read, $write, $except, $timeoutLoopSeconds, $timeoutLoopMicroseconds)) {
62: $error = error_get_last();
63:
64: if (false === stripos($error['message'], 'interrupted system call')) {
65: throw new \RuntimeException('stream_select failed: '.$error['message']);
66: }
67: } else {
68: foreach (array_keys($read) as $id) {
69: if ('pool' === $id) {
70: $this->acceptConnection($connectionHandlerFactory);
71: } else {
72: $this->connectionHandlers[$id]->ready();
73: }
74: }
75:
76: $this->removeConnections();
77: }
78: }
79:
80: 81: 82:
83: public function shutdown()
84: {
85: foreach ($this->connectionHandlers as $connectionHandler) {
86: $connectionHandler->shutdown();
87: }
88:
89: $this->removeConnections();
90:
91: while (count($this->connections) > 0) {
92: $write = $except = [];
93:
94: $read = $this->clientSockets;
95:
96: stream_select($read, $write, $except, 1);
97:
98: foreach (array_keys($read) as $id) {
99: $this->connectionHandlers[$id]->ready();
100: }
101:
102: $this->removeConnections();
103: }
104:
105: fclose($this->serverSocket);
106: }
107:
108: 109: 110: 111: 112:
113: private function acceptConnection(ConnectionHandlerFactoryInterface $connectionHandlerFactory)
114: {
115: $clientSocket = @stream_socket_accept($this->serverSocket);
116:
117: if (false !== $clientSocket) {
118: stream_set_blocking($clientSocket, 0);
119:
120: $connection = new StreamSocketConnection($clientSocket);
121: $handler = $connectionHandlerFactory->createConnectionHandler($connection);
122:
123: $id = spl_object_hash($connection);
124:
125: $this->clientSockets[$id] = $clientSocket;
126: $this->connections[$id] = $connection;
127: $this->connectionHandlers[$id] = $handler;
128: }
129: }
130:
131: 132: 133:
134: private function removeConnections()
135: {
136: foreach ($this->connections as $id => $connection) {
137: if ($connection->isClosed()) {
138: unset($this->clientSockets[$id]);
139: unset($this->connections[$id]);
140: unset($this->connectionHandlers[$id]);
141: }
142: }
143: }
144: }
145: