MultipartUpload.php 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. <?php
  2. namespace Qcloud\Cos;
  3. use GuzzleHttp\Pool;
  4. class MultipartUpload {
  5. const MIN_PART_SIZE = 1048576;
  6. const MAX_PART_SIZE = 5368709120;
  7. const DEFAULT_PART_SIZE = 5242880;
  8. const MAX_PARTS = 10000;
  9. private $client;
  10. private $options;
  11. private $partSize;
  12. private $parts;
  13. private $body;
  14. private $progress;
  15. private $totalSize;
  16. private $uploadedSize;
  17. private $concurrency;
  18. private $partNumberList;
  19. private $needMd5;
  20. private $retry;
  21. public function __construct($client, $body, $options = array()) {
  22. $minPartSize = $options['PartSize'];
  23. unset($options['PartSize']);
  24. $this->body = $body;
  25. $this->client = $client;
  26. $this->options = $options;
  27. $this->partSize = $this->calculatePartSize($minPartSize);
  28. $this->concurrency = isset($options['Concurrency']) ? $options['Concurrency'] : 10;
  29. $this->progress = isset($options['Progress']) ? $options['Progress'] : function($totalSize, $uploadedSize) {};
  30. $this->parts = [];
  31. $this->partNumberList = [];
  32. $this->uploadedSize = 0;
  33. $this->totalSize = $this->body->getSize();
  34. $this->needMd5 = isset($options['ContentMD5']) ? $options['ContentMD5'] : true;
  35. $this->retry = isset($options['Retry']) ? $options['Retry'] : 3;
  36. }
  37. public function performUploading() {
  38. $uploadId= $this->initiateMultipartUpload();
  39. $this->uploadParts($uploadId);
  40. foreach ( $this->parts as $key => $row ){
  41. $num1[$key] = $row ['PartNumber'];
  42. $num2[$key] = $row ['ETag'];
  43. }
  44. array_multisort($num1, SORT_ASC, $num2, SORT_ASC, $this->parts);
  45. return $this->client->completeMultipartUpload(array(
  46. 'Bucket' => $this->options['Bucket'],
  47. 'Key' => $this->options['Key'],
  48. 'UploadId' => $uploadId,
  49. 'Parts' => $this->parts)
  50. );
  51. }
  52. public function uploadParts($uploadId) {
  53. $uploadRequests = function ($uploadId) {
  54. $partNumber = 1;
  55. $index = 1;
  56. $offset = 0;
  57. $partSize = 0;
  58. for ( ; ; $partNumber ++) {
  59. if ($this->body->eof()) {
  60. break;
  61. }
  62. $body = $this->body->read($this->partSize);
  63. $partSize = $this->partSize;
  64. if ($offset + $this->partSize >= $this->totalSize) {
  65. $partSize = $this->totalSize - $offset;
  66. }
  67. $offset += $partSize;
  68. if (empty($body)) {
  69. break;
  70. }
  71. if (isset($this->parts[$partNumber])) {
  72. continue;
  73. }
  74. $this->partNumberList[$index]['PartNumber'] = $partNumber;
  75. $this->partNumberList[$index]['PartSize'] = $partSize;
  76. $params = array(
  77. 'Bucket' => $this->options['Bucket'],
  78. 'Key' => $this->options['Key'],
  79. 'UploadId' => $uploadId,
  80. 'PartNumber' => $partNumber,
  81. 'Body' => $body,
  82. 'ContentMD5' => $this->needMd5
  83. );
  84. if ($this->needMd5 == false) {
  85. unset($params["ContentMD5"]);
  86. }
  87. if (!isset($this->parts[$partNumber])) {
  88. $command = $this->client->getCommand('uploadPart', $params);
  89. $request = $this->client->commandToRequestTransformer($command);
  90. $index ++;
  91. yield $request;
  92. }
  93. }
  94. };
  95. $pool = new Pool($this->client->httpClient, $uploadRequests($uploadId), [
  96. 'concurrency' => $this->concurrency,
  97. 'fulfilled' => function ($response, $index) {
  98. $index = $index + 1;
  99. $partNumber = $this->partNumberList[$index]['PartNumber'];
  100. $partSize = $this->partNumberList[$index]['PartSize'];
  101. //兼容两种写法,防止index为undefined
  102. if (array_key_exists('etag', $response->getHeaders())) {
  103. $etag = $response->getHeaders()["etag"][0];
  104. }
  105. if (array_key_exists('ETag', $response->getHeaders())) {
  106. $etag = $response->getHeaders()["ETag"][0];
  107. }
  108. $part = array('PartNumber' => $partNumber, 'ETag' => $etag);
  109. $this->parts[$partNumber] = $part;
  110. $this->uploadedSize += $partSize;
  111. call_user_func_array($this->progress, [$this->totalSize, $this->uploadedSize]);
  112. },
  113. 'rejected' => function ($reason, $index) {
  114. printf("part [%d] upload failed, reason: %s\n", $index, $reason);
  115. throw($reason);
  116. }
  117. ]);
  118. $promise = $pool->promise();
  119. $promise->wait();
  120. }
  121. public function resumeUploading() {
  122. $uploadId = $this->options['UploadId'];
  123. $rt = $this->client->ListParts(
  124. array('UploadId' => $uploadId,
  125. 'Bucket'=>$this->options['Bucket'],
  126. 'Key'=>$this->options['Key']));
  127. $parts = array();
  128. if (count($rt['Parts']) > 0) {
  129. foreach ($rt['Parts'] as $part) {
  130. $this->parts[$part['PartNumber']] = array('PartNumber' => $part['PartNumber'], 'ETag' => $part['ETag']);
  131. }
  132. }
  133. $this->uploadParts($uploadId);
  134. foreach ( $this->parts as $key => $row ){
  135. $num1[$key] = $row ['PartNumber'];
  136. $num2[$key] = $row ['ETag'];
  137. }
  138. array_multisort($num1, SORT_ASC, $num2, SORT_ASC, $this->parts);
  139. return $this->client->completeMultipartUpload(array(
  140. 'Bucket' => $this->options['Bucket'],
  141. 'Key' => $this->options['Key'],
  142. 'UploadId' => $uploadId,
  143. 'Parts' => $this->parts)
  144. );
  145. }
  146. private function calculatePartSize($minPartSize)
  147. {
  148. $partSize = intval(ceil(($this->body->getSize() / self::MAX_PARTS)));
  149. $partSize = max($minPartSize, $partSize);
  150. $partSize = min($partSize, self::MAX_PART_SIZE);
  151. $partSize = max($partSize, self::MIN_PART_SIZE);
  152. return $partSize;
  153. }
  154. private function initiateMultipartUpload() {
  155. $result = $this->client->createMultipartUpload($this->options);
  156. return $result['UploadId'];
  157. }
  158. }