Skip to content

feat: 增加并行式断点续传上传文件功能 #54

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ $file = fopen('/local/path/file', 'r');
$client->write('/save/path', $file);
```

#### 使用并行式断点续传上传文件

```
$serviceConfig->setUploadType('BLOCK_PARALLEL');
$client = new Upyun($serviceConfig);
$file = fopen('/local/path/file', 'r');
$client->write('/save/path', $file);
```

#### 上传图片并转换格式为 `png`,详见[上传作图](http://docs.upyun.com/cloud/image/#_2)

```
Expand Down
24 changes: 24 additions & 0 deletions src/Upyun/Api/Rest.php
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,28 @@ public function withHeaders($headers)
}
return $this;
}

public function toRequest()
{
$url = $this->endpoint . $this->storagePath;
$body = null;
if ($this->file && $this->method === 'PUT') {
$body = $this->file;
}

$request = new Psr7\Request(
$this->method,
Util::encodeURI($url),
$this->headers,
$body
);
$authHeader = Signature::getHeaderSign($this->config,
$this->method,
$request->getUri()->getPath()
);
foreach ($authHeader as $head => $value) {
$request = $request->withHeader($head, $value);
}
return $request;
}
}
17 changes: 16 additions & 1 deletion src/Upyun/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Config
public $useSsl;

/**
* @var string 上传使用的接口类型,可以设置为 `REST`:使用 rest api 上传,`AUTO` 根据文件大小自动判断,`BLOCK` 使用断点续传
* @var string 上传使用的接口类型,可以设置为 `REST`:使用 rest api 上传,`AUTO` 根据文件大小自动判断,`BLOCK` 使用串行式断点续传,`BLOCK_PARALLEL` 使用并行式断点续传
* 当上传小文件时,不推荐使用断点续传;上传时如果设置了异步预处理`withAsyncProcess=true`,将会使用表单 api 上传
*/
public $uploadType = 'AUTO';
Expand All @@ -42,6 +42,11 @@ class Config
*/
public $sizeBoundary = 31457280;

/**
* @var int 并行式断点续传的并发数
*/
public $concurrency = 5;

/**
* @var int request timeout seconds
*/
Expand Down Expand Up @@ -144,4 +149,14 @@ public function getProtocol()
{
return $this->useSsl ? 'https://' : 'http://';
}

public function setUploadType($uploadType)
{
$this->uploadType = $uploadType;
}

public function setConcurrency($concurrency)
{
$this->concurrency = $concurrency;
}
}
86 changes: 84 additions & 2 deletions src/Upyun/Uploader.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
use Upyun\Api\Rest;
use Upyun\Api\Form;
use GuzzleHttp\Psr7;
use GuzzleHttp\Pool;
use GuzzleHttp\Client;

class Uploader
{
Expand Down Expand Up @@ -37,13 +39,15 @@ public function upload($path, $file, $params, $withAsyncProcess)
->withHeaders($params)
->withFile($stream)
->send();
} elseif ($this->config->uploadType === 'BLOCK_PARALLEL') {
return $this->concurrentPointUpload($path, $stream, $params);
} else {
return $this->pointUpload($path, $stream, $params);
}
}

/**
* 断点续传
* 串行式断点续传
* @param $path
* @param $stream
* @param $params
Expand Down Expand Up @@ -108,7 +112,8 @@ private function pointUpload($path, $stream, $params)

private function needUseBlock($fileSize)
{
if ($this->config->uploadType === 'BLOCK') {
if ($this->config->uploadType === 'BLOCK' ||
$this->config->uploadType === 'BLOCK_PARALLEL') {
return true;
} elseif ($this->config->uploadType === 'AUTO' &&
$fileSize >= $this->config->sizeBoundary) {
Expand All @@ -117,4 +122,81 @@ private function needUseBlock($fileSize)
return false;
}
}

/**
* 并行式断点续传
* @param $path
* @param $stream
* @param $params
*
* @return mixed|\Psr\Http\Message\ResponseInterface
* @throws \Exception
*/
private function concurrentPointUpload($path, $stream, $params)
{
$req = new Rest($this->config);

$headers = array();
if (is_array($params)) {
foreach ($params as $key => $val) {
$headers['X-Upyun-Meta-' . $key] = $val;
}
}
$res = $req->request('PUT', $path)
->withHeaders(array_merge(array(
'X-Upyun-Multi-Disorder' => 'true',
'X-Upyun-Multi-Stage' => 'initiate',
'X-Upyun-Multi-Type' => Psr7\mimetype_from_filename($path),
'X-Upyun-Multi-Length' => $stream->getSize(),
), $headers))
->send();
if ($res->getStatusCode() !== 204) {
throw new \Exception('init request failed when poinit upload!');
}

$init = Util::getHeaderParams($res->getHeaders());
$uuid = $init['x-upyun-multi-uuid'];
$requests = function ($req, $path, $stream, $uuid) {
$blockSize = 1024 * 1024;
$total = ceil($stream->getSize() / $blockSize);
for ($i = 0; $i < $total; $i++) {
$fileBlock = $stream->read($blockSize);
yield $req->request('PUT', $path)
->withHeaders(array(
'X-Upyun-Multi-Stage' => 'upload',
'X-Upyun-Multi-Uuid' => $uuid,
'X-Upyun-Part-Id' => $i
))
->withFile(Psr7\stream_for($fileBlock))
->toRequest();
}
};
$client = new Client([
'timeout' => $this->config->timeout,
]);
$pool = new Pool($client, $requests($req, $path, $stream, $uuid), [
'concurrency' => $this->config->concurrency,
'fulfilled' => function ($res) {
if ($res->getStatusCode() !== 204) {
throw new \Exception('upload request failed when poinit upload!');
}
},
'rejected' => function () {
throw new \Exception('upload request failed when poinit upload!');
},
]);
$promise = $pool->promise();
$promise->wait();

$res = $req->request('PUT', $path)
->withHeaders(array(
'X-Upyun-Multi-Uuid' => $uuid,
'X-Upyun-Multi-Stage' => 'complete'
))
->send();
if ($res->getStatusCode() != 204 && $res->getStatusCode() != 201) {
throw new \Exception('end request failed when poinit upload!');
}
return $res;
}
}
13 changes: 13 additions & 0 deletions tests/UpyunTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public function testReadFile()
public function testDeleteFile()
{
self::$upyun->write('test-delete.txt', 'test file content 3');
sleep(5);
self::$upyun->delete('test-delete.txt');
try {
self::$upyun->read('test-delete.txt');
Expand Down Expand Up @@ -269,4 +270,16 @@ public function testSnapshot()
$result = self::$upyun->snapshot('/php-sdk-sample.mp4', '/snapshot.jpg', '00:00:01', '720x480', 'jpg');
$this->assertTrue($result['status_code'] === 200);
}

public function testParallelUpload()
{
$config = new Config(BUCKET, USER_NAME, PWD);
$config->setUploadType('BLOCK_PARALLEL');
$upyun = new Upyun($config);
$filename = 'test_parallel.jpeg';
$upyun->write($filename, fopen(__DIR__ . '/assets/sample.jpeg', 'rb'));

$size = getUpyunFileSize($filename);
$this->assertEquals($size, PIC_SIZE);
}
}