From d2a14dec2e72b63316c7a24a23971f190c4da4e4 Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Thu, 7 Sep 2017 21:47:37 +0200 Subject: [PATCH 1/7] Limit Handlers Middleware --- src/Middleware/LimitHandlers.php | 65 ++++++++++++++++++++++++++ tests/Middleware/LimitHandlersTest.php | 32 +++++++++++++ 2 files changed, 97 insertions(+) create mode 100644 src/Middleware/LimitHandlers.php create mode 100644 tests/Middleware/LimitHandlersTest.php diff --git a/src/Middleware/LimitHandlers.php b/src/Middleware/LimitHandlers.php new file mode 100644 index 00000000..31d831cf --- /dev/null +++ b/src/Middleware/LimitHandlers.php @@ -0,0 +1,65 @@ +limit = $limit; + $this->queued = new SplQueue(); + } + + public function __invoke(ServerRequestInterface $request, $next) + { + $body = $request->getBody(); + if ($body instanceof ReadableStreamInterface) { + $body->pause(); + } + $deferred = new Deferred(); + $this->queued->enqueue($deferred); + + $this->processQueue(); + + return $deferred->promise()->then(function () use ($request, $next) { + $this->pending++; + $body = $request->getBody(); + if ($body instanceof ReadableStreamInterface) { + $body->resume(); + } + return $next($request); + })->then(function ($response) { + $this->pending--; + $this->processQueue(); + return $response; + }, function ($error) { + $this->pending--; + $this->processQueue(); + return $error; + }); + } + + private function processQueue() + { + if ($this->pending >= $this->limit) { + return; + } + + if ($this->queued->count() === 0) { + return; + } + + $this->queued->dequeue()->resolve(); + } +} diff --git a/tests/Middleware/LimitHandlersTest.php b/tests/Middleware/LimitHandlersTest.php new file mode 100644 index 00000000..c19da5fb --- /dev/null +++ b/tests/Middleware/LimitHandlersTest.php @@ -0,0 +1,32 @@ + Date: Sun, 10 Sep 2017 22:35:10 +0200 Subject: [PATCH 2/7] LimitHandlers tests --- tests/Middleware/LimitHandlersTest.php | 78 ++++++++++++++++++++++++-- 1 file changed, 73 insertions(+), 5 deletions(-) diff --git a/tests/Middleware/LimitHandlersTest.php b/tests/Middleware/LimitHandlersTest.php index c19da5fb..fd1c3288 100644 --- a/tests/Middleware/LimitHandlersTest.php +++ b/tests/Middleware/LimitHandlersTest.php @@ -3,30 +3,98 @@ namespace React\Tests\Http\Middleware; use React\Http\Middleware\LimitHandlers; +use React\Http\ServerRequest; use React\Promise\Deferred; use React\Tests\Http\TestCase; -use RingCentral\Psr7\Request; final class LimitHandlersTest extends TestCase { - public function testNonStreamingBody() + public function testLimitOneRequestConcurrently() { /** * The first request */ - $requestA = new Request('GET', 'https://example.com/'); + $requestA = new ServerRequest('GET', 'https://example.com/'); $deferredA = new Deferred(); $calledA = false; - $nextA = function () use (&$calledA) { + $nextA = function () use (&$calledA, $deferredA) { $calledA = true; + return $deferredA->promise(); }; /** * The second request */ - $requestB = new Request('GET', 'https://www.example.com/'); + $requestB = new ServerRequest('GET', 'https://www.example.com/'); + $deferredB = new Deferred(); + $calledB = false; + $nextB = function () use (&$calledB, $deferredB) { + $calledB = true; + return $deferredB->promise(); + }; + /** + * The third request + */ + $requestC = new ServerRequest('GET', 'https://www.example.com/'); + $calledC = false; + $nextC = function () use (&$calledC) { + $calledC = true; + }; + + /** + * The handler + * + */ $limitHandlers = new LimitHandlers(1); + $this->assertFalse($calledA); + $this->assertFalse($calledB); + $this->assertFalse($calledC); + + $limitHandlers($requestA, $nextA); + + $this->assertTrue($calledA); + $this->assertFalse($calledB); + $this->assertFalse($calledC); + + $limitHandlers($requestB, $nextB); + + $this->assertTrue($calledA); + $this->assertFalse($calledB); + $this->assertFalse($calledC); + + $limitHandlers($requestC, $nextC); + + $this->assertTrue($calledA); + $this->assertFalse($calledB); + $this->assertFalse($calledC); + + /** + * Ensure resolve frees up a slot + */ + $deferredA->resolve(); + + $this->assertTrue($calledA); + $this->assertTrue($calledB); + $this->assertFalse($calledC); + + /** + * Ensure reject also frees up a slot + */ + $deferredB->reject(); + + $this->assertTrue($calledA); + $this->assertTrue($calledB); + $this->assertTrue($calledC); + } + + public function testStreamPauseAndResume() + { + $body = $this->getMockBuilder('React\Http\HttpBodyStream')->disableOriginalConstructor()->getMock(); + $body->expects($this->once())->method('pause'); + $body->expects($this->once())->method('resume'); + $limitHandlers = new LimitHandlers(1); + $limitHandlers(new ServerRequest('GET', 'https://example.com/', [], $body), function () {}); } } From 1e6efd6eb9016727fbc957c70a0889f3c3ec19c9 Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Mon, 11 Sep 2017 19:11:58 +0200 Subject: [PATCH 3/7] PHP 5.3 -.- --- src/Middleware/LimitHandlers.php | 23 ++++++++++++++--------- tests/Middleware/LimitHandlersTest.php | 2 +- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/Middleware/LimitHandlers.php b/src/Middleware/LimitHandlers.php index 31d831cf..a1ca0930 100644 --- a/src/Middleware/LimitHandlers.php +++ b/src/Middleware/LimitHandlers.php @@ -32,25 +32,30 @@ public function __invoke(ServerRequestInterface $request, $next) $this->processQueue(); - return $deferred->promise()->then(function () use ($request, $next) { - $this->pending++; + $that = $this; + $pending = &$this->pending; + return $deferred->promise()->then(function () use ($request, $next, $that, &$pending) { + $pending++; $body = $request->getBody(); if ($body instanceof ReadableStreamInterface) { $body->resume(); } return $next($request); - })->then(function ($response) { - $this->pending--; - $this->processQueue(); + })->then(function ($response) use ($that, &$pending) { + $pending--; + $that->processQueue(); return $response; - }, function ($error) { - $this->pending--; - $this->processQueue(); + }, function ($error) use ($that, &$pending) { + $pending--; + $that->processQueue(); return $error; }); } - private function processQueue() + /** + * @internal + */ + public function processQueue() { if ($this->pending >= $this->limit) { return; diff --git a/tests/Middleware/LimitHandlersTest.php b/tests/Middleware/LimitHandlersTest.php index fd1c3288..cb53e4e4 100644 --- a/tests/Middleware/LimitHandlersTest.php +++ b/tests/Middleware/LimitHandlersTest.php @@ -95,6 +95,6 @@ public function testStreamPauseAndResume() $body->expects($this->once())->method('pause'); $body->expects($this->once())->method('resume'); $limitHandlers = new LimitHandlers(1); - $limitHandlers(new ServerRequest('GET', 'https://example.com/', [], $body), function () {}); + $limitHandlers(new ServerRequest('GET', 'https://example.com/', array(), $body), function () {}); } } From 71665504ab7225f2f648cb4b5fbcd2683eb147d5 Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Mon, 11 Sep 2017 22:15:32 +0200 Subject: [PATCH 4/7] Renamed LimitHandlers to LimitHandlersMiddleware --- .../{LimitHandlers.php => LimitHandlersMiddleware.php} | 2 +- ...itHandlersTest.php => LimitHandlersMiddlewareTest.php} | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) rename src/Middleware/{LimitHandlers.php => LimitHandlersMiddleware.php} (97%) rename tests/Middleware/{LimitHandlersTest.php => LimitHandlersMiddlewareTest.php} (92%) diff --git a/src/Middleware/LimitHandlers.php b/src/Middleware/LimitHandlersMiddleware.php similarity index 97% rename from src/Middleware/LimitHandlers.php rename to src/Middleware/LimitHandlersMiddleware.php index a1ca0930..e325ac35 100644 --- a/src/Middleware/LimitHandlers.php +++ b/src/Middleware/LimitHandlersMiddleware.php @@ -7,7 +7,7 @@ use React\Stream\ReadableStreamInterface; use SplQueue; -final class LimitHandlers +final class LimitHandlersMiddleware { const DEFAULT_LIMIT = 10; diff --git a/tests/Middleware/LimitHandlersTest.php b/tests/Middleware/LimitHandlersMiddlewareTest.php similarity index 92% rename from tests/Middleware/LimitHandlersTest.php rename to tests/Middleware/LimitHandlersMiddlewareTest.php index cb53e4e4..a259a70a 100644 --- a/tests/Middleware/LimitHandlersTest.php +++ b/tests/Middleware/LimitHandlersMiddlewareTest.php @@ -2,12 +2,12 @@ namespace React\Tests\Http\Middleware; -use React\Http\Middleware\LimitHandlers; +use React\Http\Middleware\LimitHandlersMiddleware; use React\Http\ServerRequest; use React\Promise\Deferred; use React\Tests\Http\TestCase; -final class LimitHandlersTest extends TestCase +final class LimitHandlersMiddlewareTest extends TestCase { public function testLimitOneRequestConcurrently() { @@ -46,7 +46,7 @@ public function testLimitOneRequestConcurrently() * The handler * */ - $limitHandlers = new LimitHandlers(1); + $limitHandlers = new LimitHandlersMiddleware(1); $this->assertFalse($calledA); $this->assertFalse($calledB); @@ -94,7 +94,7 @@ public function testStreamPauseAndResume() $body = $this->getMockBuilder('React\Http\HttpBodyStream')->disableOriginalConstructor()->getMock(); $body->expects($this->once())->method('pause'); $body->expects($this->once())->method('resume'); - $limitHandlers = new LimitHandlers(1); + $limitHandlers = new LimitHandlersMiddleware(1); $limitHandlers(new ServerRequest('GET', 'https://example.com/', array(), $body), function () {}); } } From cf92bcf15d6762d8d8fbc959fe5ea752193d03c3 Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Mon, 11 Sep 2017 22:21:17 +0200 Subject: [PATCH 5/7] Added a bit of docs about the limit argument of the constructor --- src/Middleware/LimitHandlersMiddleware.php | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/Middleware/LimitHandlersMiddleware.php b/src/Middleware/LimitHandlersMiddleware.php index e325ac35..e7209c21 100644 --- a/src/Middleware/LimitHandlersMiddleware.php +++ b/src/Middleware/LimitHandlersMiddleware.php @@ -15,6 +15,12 @@ final class LimitHandlersMiddleware private $pending = 0; private $queued; + /** + * @param int $limit Maximum amount of concurrent requests handled. + * + * For example when $limit is set to 10, 10 requests will flow to $next + * while more incoming requests have to wait until one is done. + */ public function __construct($limit = self::DEFAULT_LIMIT) { $this->limit = $limit; From 43736d6a7038d5700e7a1d473728c4802d53ef0d Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Mon, 11 Sep 2017 22:22:16 +0200 Subject: [PATCH 6/7] Removed unnecessary $that --- src/Middleware/LimitHandlersMiddleware.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Middleware/LimitHandlersMiddleware.php b/src/Middleware/LimitHandlersMiddleware.php index e7209c21..f8a3a253 100644 --- a/src/Middleware/LimitHandlersMiddleware.php +++ b/src/Middleware/LimitHandlersMiddleware.php @@ -40,7 +40,7 @@ public function __invoke(ServerRequestInterface $request, $next) $that = $this; $pending = &$this->pending; - return $deferred->promise()->then(function () use ($request, $next, $that, &$pending) { + return $deferred->promise()->then(function () use ($request, $next, &$pending) { $pending++; $body = $request->getBody(); if ($body instanceof ReadableStreamInterface) { From f7ffcb2ec9d7120149fb245b4f68a0a9ddfe658b Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Mon, 11 Sep 2017 23:10:11 +0200 Subject: [PATCH 7/7] LimitHandlersMiddleware functional test --- tests/FunctionalServerTest.php | 54 ++++++++++++++++++++++++++++++++-- 1 file changed, 52 insertions(+), 2 deletions(-) diff --git a/tests/FunctionalServerTest.php b/tests/FunctionalServerTest.php index a0b6f0b7..56b26e4c 100644 --- a/tests/FunctionalServerTest.php +++ b/tests/FunctionalServerTest.php @@ -2,6 +2,9 @@ namespace React\Tests\Http; +use Psr\Http\Message\ServerRequestInterface; +use React\Http\Middleware\LimitHandlersMiddleware; +use React\Http\Middleware\RequestBodyBufferMiddleware; use React\Http\MiddlewareRunner; use React\Socket\Server as Socket; use React\EventLoop\Factory; @@ -13,7 +16,7 @@ use React\Http\Response; use React\Socket\SecureServer; use React\Stream\ReadableStreamInterface; -use React\Promise\Promise; +use React\Promise; use React\Promise\PromiseInterface; use React\Promise\Stream; use React\Stream\ThroughStream; @@ -614,7 +617,7 @@ public function testConnectWithThroughStreamReturnedFromPromiseReturnsDataAsGive $stream->end(); }); - return new Promise(function ($resolve) use ($loop, $stream) { + return new Promise\Promise(function ($resolve) use ($loop, $stream) { $loop->addTimer(0.001, function () use ($resolve, $stream) { $resolve(new Response(200, array(), $stream)); }); @@ -676,6 +679,53 @@ public function testConnectWithClosedThroughStreamReturnsNoData() $socket->close(); } + + public function testLimitHandlersMiddlewareRequestStreamPausing() + { + $loop = Factory::create(); + $connector = new Connector($loop); + + $server = new Server(new MiddlewareRunner(array( + new LimitHandlersMiddleware(5), + new RequestBodyBufferMiddleware(16 * 1024 * 1024), // 16 MiB + function (ServerRequestInterface $request, $next) use ($loop) { + return new Promise\Promise(function ($resolve) use ($request, $loop, $next) { + $loop->addTimer(0.1, function () use ($request, $resolve, $next) { + $resolve($next($request)); + }); + }); + }, + function (ServerRequestInterface $request) { + return new Response(200, array(), (string)strlen((string)$request->getBody())); + } + ))); + + $socket = new Socket(0, $loop); + $server->listen($socket); + + $result = array(); + for ($i = 0; $i < 6; $i++) { + $result[] = $connector->connect($socket->getAddress())->then(function (ConnectionInterface $conn) { + $conn->write( + "GET / HTTP/1.0\r\nContent-Length: 1024\r\nHost: " . noScheme($conn->getRemoteAddress()) . "\r\n\r\n" . + str_repeat('a', 1024) . + "\r\n\r\n" + ); + + return Stream\buffer($conn); + }); + } + + $responses = Block\await(Promise\all($result), $loop, 1.0); + + foreach ($responses as $response) { + $this->assertContains("HTTP/1.0 200 OK", $response, $response); + $this->assertTrue(substr($response, -4) == 1024, $response); + } + + $socket->close(); + } + } function noScheme($uri)