Skip to content

Commit b4819fd

Browse files
authored
Merge pull request #54 from ckaqq/master
feat: 增加并行式断点续传上传文件功能
2 parents 239a6a8 + 834dca3 commit b4819fd

File tree

5 files changed

+146
-3
lines changed

5 files changed

+146
-3
lines changed

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,15 @@ $file = fopen('/local/path/file', 'r');
9595
$client->write('/save/path', $file);
9696
```
9797

98+
#### 使用并行式断点续传上传文件
99+
100+
```
101+
$serviceConfig->setUploadType('BLOCK_PARALLEL');
102+
$client = new Upyun($serviceConfig);
103+
$file = fopen('/local/path/file', 'r');
104+
$client->write('/save/path', $file);
105+
```
106+
98107
#### 上传图片并转换格式为 `png`,详见[上传作图](http://docs.upyun.com/cloud/image/#_2)
99108

100109
```

src/Upyun/Api/Rest.php

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,4 +105,28 @@ public function withHeaders($headers)
105105
}
106106
return $this;
107107
}
108+
109+
public function toRequest()
110+
{
111+
$url = $this->endpoint . $this->storagePath;
112+
$body = null;
113+
if ($this->file && $this->method === 'PUT') {
114+
$body = $this->file;
115+
}
116+
117+
$request = new Psr7\Request(
118+
$this->method,
119+
Util::encodeURI($url),
120+
$this->headers,
121+
$body
122+
);
123+
$authHeader = Signature::getHeaderSign($this->config,
124+
$this->method,
125+
$request->getUri()->getPath()
126+
);
127+
foreach ($authHeader as $head => $value) {
128+
$request = $request->withHeader($head, $value);
129+
}
130+
return $request;
131+
}
108132
}

src/Upyun/Config.php

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class Config
3232
public $useSsl;
3333

3434
/**
35-
* @var string 上传使用的接口类型,可以设置为 `REST`:使用 rest api 上传,`AUTO` 根据文件大小自动判断,`BLOCK` 使用断点续传
35+
* @var string 上传使用的接口类型,可以设置为 `REST`:使用 rest api 上传,`AUTO` 根据文件大小自动判断,`BLOCK` 使用串行式断点续传,`BLOCK_PARALLEL` 使用并行式断点续传
3636
* 当上传小文件时,不推荐使用断点续传;上传时如果设置了异步预处理`withAsyncProcess=true`,将会使用表单 api 上传
3737
*/
3838
public $uploadType = 'AUTO';
@@ -42,6 +42,11 @@ class Config
4242
*/
4343
public $sizeBoundary = 31457280;
4444

45+
/**
46+
* @var int 并行式断点续传的并发数
47+
*/
48+
public $concurrency = 5;
49+
4550
/**
4651
* @var int request timeout seconds
4752
*/
@@ -144,4 +149,14 @@ public function getProtocol()
144149
{
145150
return $this->useSsl ? 'https://' : 'http://';
146151
}
152+
153+
public function setUploadType($uploadType)
154+
{
155+
$this->uploadType = $uploadType;
156+
}
157+
158+
public function setConcurrency($concurrency)
159+
{
160+
$this->concurrency = $concurrency;
161+
}
147162
}

src/Upyun/Uploader.php

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
use Upyun\Api\Rest;
55
use Upyun\Api\Form;
66
use GuzzleHttp\Psr7;
7+
use GuzzleHttp\Pool;
8+
use GuzzleHttp\Client;
79

810
class Uploader
911
{
@@ -37,13 +39,15 @@ public function upload($path, $file, $params, $withAsyncProcess)
3739
->withHeaders($params)
3840
->withFile($stream)
3941
->send();
42+
} elseif ($this->config->uploadType === 'BLOCK_PARALLEL') {
43+
return $this->concurrentPointUpload($path, $stream, $params);
4044
} else {
4145
return $this->pointUpload($path, $stream, $params);
4246
}
4347
}
4448

4549
/**
46-
* 断点续传
50+
* 串行式断点续传
4751
* @param $path
4852
* @param $stream
4953
* @param $params
@@ -108,7 +112,8 @@ private function pointUpload($path, $stream, $params)
108112

109113
private function needUseBlock($fileSize)
110114
{
111-
if ($this->config->uploadType === 'BLOCK') {
115+
if ($this->config->uploadType === 'BLOCK' ||
116+
$this->config->uploadType === 'BLOCK_PARALLEL') {
112117
return true;
113118
} elseif ($this->config->uploadType === 'AUTO' &&
114119
$fileSize >= $this->config->sizeBoundary) {
@@ -117,4 +122,81 @@ private function needUseBlock($fileSize)
117122
return false;
118123
}
119124
}
125+
126+
/**
127+
* 并行式断点续传
128+
* @param $path
129+
* @param $stream
130+
* @param $params
131+
*
132+
* @return mixed|\Psr\Http\Message\ResponseInterface
133+
* @throws \Exception
134+
*/
135+
private function concurrentPointUpload($path, $stream, $params)
136+
{
137+
$req = new Rest($this->config);
138+
139+
$headers = array();
140+
if (is_array($params)) {
141+
foreach ($params as $key => $val) {
142+
$headers['X-Upyun-Meta-' . $key] = $val;
143+
}
144+
}
145+
$res = $req->request('PUT', $path)
146+
->withHeaders(array_merge(array(
147+
'X-Upyun-Multi-Disorder' => 'true',
148+
'X-Upyun-Multi-Stage' => 'initiate',
149+
'X-Upyun-Multi-Type' => Psr7\mimetype_from_filename($path),
150+
'X-Upyun-Multi-Length' => $stream->getSize(),
151+
), $headers))
152+
->send();
153+
if ($res->getStatusCode() !== 204) {
154+
throw new \Exception('init request failed when poinit upload!');
155+
}
156+
157+
$init = Util::getHeaderParams($res->getHeaders());
158+
$uuid = $init['x-upyun-multi-uuid'];
159+
$requests = function ($req, $path, $stream, $uuid) {
160+
$blockSize = 1024 * 1024;
161+
$total = ceil($stream->getSize() / $blockSize);
162+
for ($i = 0; $i < $total; $i++) {
163+
$fileBlock = $stream->read($blockSize);
164+
yield $req->request('PUT', $path)
165+
->withHeaders(array(
166+
'X-Upyun-Multi-Stage' => 'upload',
167+
'X-Upyun-Multi-Uuid' => $uuid,
168+
'X-Upyun-Part-Id' => $i
169+
))
170+
->withFile(Psr7\stream_for($fileBlock))
171+
->toRequest();
172+
}
173+
};
174+
$client = new Client([
175+
'timeout' => $this->config->timeout,
176+
]);
177+
$pool = new Pool($client, $requests($req, $path, $stream, $uuid), [
178+
'concurrency' => $this->config->concurrency,
179+
'fulfilled' => function ($res) {
180+
if ($res->getStatusCode() !== 204) {
181+
throw new \Exception('upload request failed when poinit upload!');
182+
}
183+
},
184+
'rejected' => function () {
185+
throw new \Exception('upload request failed when poinit upload!');
186+
},
187+
]);
188+
$promise = $pool->promise();
189+
$promise->wait();
190+
191+
$res = $req->request('PUT', $path)
192+
->withHeaders(array(
193+
'X-Upyun-Multi-Uuid' => $uuid,
194+
'X-Upyun-Multi-Stage' => 'complete'
195+
))
196+
->send();
197+
if ($res->getStatusCode() != 204 && $res->getStatusCode() != 201) {
198+
throw new \Exception('end request failed when poinit upload!');
199+
}
200+
return $res;
201+
}
120202
}

tests/UpyunTest.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ public function testReadFile()
111111
public function testDeleteFile()
112112
{
113113
self::$upyun->write('test-delete.txt', 'test file content 3');
114+
sleep(5);
114115
self::$upyun->delete('test-delete.txt');
115116
try {
116117
self::$upyun->read('test-delete.txt');
@@ -269,4 +270,16 @@ public function testSnapshot()
269270
$result = self::$upyun->snapshot('/php-sdk-sample.mp4', '/snapshot.jpg', '00:00:01', '720x480', 'jpg');
270271
$this->assertTrue($result['status_code'] === 200);
271272
}
273+
274+
public function testParallelUpload()
275+
{
276+
$config = new Config(BUCKET, USER_NAME, PWD);
277+
$config->setUploadType('BLOCK_PARALLEL');
278+
$upyun = new Upyun($config);
279+
$filename = 'test_parallel.jpeg';
280+
$upyun->write($filename, fopen(__DIR__ . '/assets/sample.jpeg', 'rb'));
281+
282+
$size = getUpyunFileSize($filename);
283+
$this->assertEquals($size, PIC_SIZE);
284+
}
272285
}

0 commit comments

Comments
 (0)