From 834dca319eb01515cc8eec1adfe804710130b985 Mon Sep 17 00:00:00 2001 From: ck Date: Wed, 24 Apr 2019 14:53:04 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E5=B9=B6=E8=A1=8C?= =?UTF-8?q?=E5=BC=8F=E6=96=AD=E7=82=B9=E7=BB=AD=E4=BC=A0=E4=B8=8A=E4=BC=A0?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 9 +++++ src/Upyun/Api/Rest.php | 24 ++++++++++++ src/Upyun/Config.php | 17 ++++++++- src/Upyun/Uploader.php | 86 +++++++++++++++++++++++++++++++++++++++++- tests/UpyunTest.php | 13 +++++++ 5 files changed, 146 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 12f49b7..8eda96a 100644 --- a/README.md +++ b/README.md @@ -95,6 +95,15 @@ $file = fopen('/local/path/file', 'r'); $client->write('/save/path', $file); ``` +#### 使用并行式断点续传上传文件 + +``` +$serviceConfig->setUploadType('BLOCK_PARALLEL'); +$client = new Upyun($serviceConfig); +$file = fopen('/local/path/file', 'r'); +$client->write('/save/path', $file); +``` + #### 上传图片并转换格式为 `png`,详见[上传作图](http://docs.upyun.com/cloud/image/#_2) ``` diff --git a/src/Upyun/Api/Rest.php b/src/Upyun/Api/Rest.php index f63d775..be4b0a0 100644 --- a/src/Upyun/Api/Rest.php +++ b/src/Upyun/Api/Rest.php @@ -105,4 +105,28 @@ public function withHeaders($headers) } return $this; } + + public function toRequest() + { + $url = $this->endpoint . $this->storagePath; + $body = null; + if ($this->file && $this->method === 'PUT') { + $body = $this->file; + } + + $request = new Psr7\Request( + $this->method, + Util::encodeURI($url), + $this->headers, + $body + ); + $authHeader = Signature::getHeaderSign($this->config, + $this->method, + $request->getUri()->getPath() + ); + foreach ($authHeader as $head => $value) { + $request = $request->withHeader($head, $value); + } + return $request; + } } diff --git a/src/Upyun/Config.php b/src/Upyun/Config.php index b32b809..1b99b6c 100644 --- a/src/Upyun/Config.php +++ b/src/Upyun/Config.php @@ -32,7 +32,7 @@ class Config public $useSsl; /** - * @var string 上传使用的接口类型,可以设置为 `REST`:使用 rest api 上传,`AUTO` 根据文件大小自动判断,`BLOCK` 使用断点续传 + * @var string 上传使用的接口类型,可以设置为 `REST`:使用 rest api 上传,`AUTO` 根据文件大小自动判断,`BLOCK` 使用串行式断点续传,`BLOCK_PARALLEL` 使用并行式断点续传 * 当上传小文件时,不推荐使用断点续传;上传时如果设置了异步预处理`withAsyncProcess=true`,将会使用表单 api 上传 */ public $uploadType = 'AUTO'; @@ -42,6 +42,11 @@ class Config */ public $sizeBoundary = 31457280; + /** + * @var int 并行式断点续传的并发数 + */ + public $concurrency = 5; + /** * @var int request timeout seconds */ @@ -144,4 +149,14 @@ public function getProtocol() { return $this->useSsl ? 'https://' : 'http://'; } + + public function setUploadType($uploadType) + { + $this->uploadType = $uploadType; + } + + public function setConcurrency($concurrency) + { + $this->concurrency = $concurrency; + } } diff --git a/src/Upyun/Uploader.php b/src/Upyun/Uploader.php index 862644e..95e6f2f 100644 --- a/src/Upyun/Uploader.php +++ b/src/Upyun/Uploader.php @@ -4,6 +4,8 @@ use Upyun\Api\Rest; use Upyun\Api\Form; use GuzzleHttp\Psr7; +use GuzzleHttp\Pool; +use GuzzleHttp\Client; class Uploader { @@ -37,13 +39,15 @@ public function upload($path, $file, $params, $withAsyncProcess) ->withHeaders($params) ->withFile($stream) ->send(); + } elseif ($this->config->uploadType === 'BLOCK_PARALLEL') { + return $this->concurrentPointUpload($path, $stream, $params); } else { return $this->pointUpload($path, $stream, $params); } } /** - * 断点续传 + * 串行式断点续传 * @param $path * @param $stream * @param $params @@ -108,7 +112,8 @@ private function pointUpload($path, $stream, $params) private function needUseBlock($fileSize) { - if ($this->config->uploadType === 'BLOCK') { + if ($this->config->uploadType === 'BLOCK' || + $this->config->uploadType === 'BLOCK_PARALLEL') { return true; } elseif ($this->config->uploadType === 'AUTO' && $fileSize >= $this->config->sizeBoundary) { @@ -117,4 +122,81 @@ private function needUseBlock($fileSize) return false; } } + + /** + * 并行式断点续传 + * @param $path + * @param $stream + * @param $params + * + * @return mixed|\Psr\Http\Message\ResponseInterface + * @throws \Exception + */ + private function concurrentPointUpload($path, $stream, $params) + { + $req = new Rest($this->config); + + $headers = array(); + if (is_array($params)) { + foreach ($params as $key => $val) { + $headers['X-Upyun-Meta-' . $key] = $val; + } + } + $res = $req->request('PUT', $path) + ->withHeaders(array_merge(array( + 'X-Upyun-Multi-Disorder' => 'true', + 'X-Upyun-Multi-Stage' => 'initiate', + 'X-Upyun-Multi-Type' => Psr7\mimetype_from_filename($path), + 'X-Upyun-Multi-Length' => $stream->getSize(), + ), $headers)) + ->send(); + if ($res->getStatusCode() !== 204) { + throw new \Exception('init request failed when poinit upload!'); + } + + $init = Util::getHeaderParams($res->getHeaders()); + $uuid = $init['x-upyun-multi-uuid']; + $requests = function ($req, $path, $stream, $uuid) { + $blockSize = 1024 * 1024; + $total = ceil($stream->getSize() / $blockSize); + for ($i = 0; $i < $total; $i++) { + $fileBlock = $stream->read($blockSize); + yield $req->request('PUT', $path) + ->withHeaders(array( + 'X-Upyun-Multi-Stage' => 'upload', + 'X-Upyun-Multi-Uuid' => $uuid, + 'X-Upyun-Part-Id' => $i + )) + ->withFile(Psr7\stream_for($fileBlock)) + ->toRequest(); + } + }; + $client = new Client([ + 'timeout' => $this->config->timeout, + ]); + $pool = new Pool($client, $requests($req, $path, $stream, $uuid), [ + 'concurrency' => $this->config->concurrency, + 'fulfilled' => function ($res) { + if ($res->getStatusCode() !== 204) { + throw new \Exception('upload request failed when poinit upload!'); + } + }, + 'rejected' => function () { + throw new \Exception('upload request failed when poinit upload!'); + }, + ]); + $promise = $pool->promise(); + $promise->wait(); + + $res = $req->request('PUT', $path) + ->withHeaders(array( + 'X-Upyun-Multi-Uuid' => $uuid, + 'X-Upyun-Multi-Stage' => 'complete' + )) + ->send(); + if ($res->getStatusCode() != 204 && $res->getStatusCode() != 201) { + throw new \Exception('end request failed when poinit upload!'); + } + return $res; + } } diff --git a/tests/UpyunTest.php b/tests/UpyunTest.php index b672125..c49a861 100644 --- a/tests/UpyunTest.php +++ b/tests/UpyunTest.php @@ -111,6 +111,7 @@ public function testReadFile() public function testDeleteFile() { self::$upyun->write('test-delete.txt', 'test file content 3'); + sleep(5); self::$upyun->delete('test-delete.txt'); try { self::$upyun->read('test-delete.txt'); @@ -269,4 +270,16 @@ public function testSnapshot() $result = self::$upyun->snapshot('/php-sdk-sample.mp4', '/snapshot.jpg', '00:00:01', '720x480', 'jpg'); $this->assertTrue($result['status_code'] === 200); } + + public function testParallelUpload() + { + $config = new Config(BUCKET, USER_NAME, PWD); + $config->setUploadType('BLOCK_PARALLEL'); + $upyun = new Upyun($config); + $filename = 'test_parallel.jpeg'; + $upyun->write($filename, fopen(__DIR__ . '/assets/sample.jpeg', 'rb')); + + $size = getUpyunFileSize($filename); + $this->assertEquals($size, PIC_SIZE); + } }