Skip to content

[WIP] PHPLIB-114: Implement GridFS specification #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
],
"require": {
"php": ">=5.4",
"ext-mongodb": "^1.0.0"
"ext-mongodb": "^1.0.0",
"ext-hash": "*"
},
"autoload": {
"psr-4": { "MongoDB\\": "src/" },
Expand Down
171 changes: 171 additions & 0 deletions src/GridFS/Bucket.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
<?php

namespace MongoDB\GridFS;

use MongoDB\Collection;
use MongoDB\Database;
use MongoDB\BSON\ObjectId;
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\WriteConcern;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\InvalidArgumentTypeException;
use MongoDB\Exception\RuntimeException;
use MongoDB\Exception\UnexpectedValueException;

/**
* Bucket abstracts the GridFS files and chunks collections.
*
* @api
*/
class Bucket
{
private $manager;
private $databaseName;
private $options;

private $filesCollection;
private $chunksCollection;

private $ensuredIndexes = false;

/**
* Constructs a GridFS bucket.
*
* Supported options:
*
* * bucketName (string): The bucket name, which will be used as a prefix
* for the files and chunks collections. Defaults to "fs".
*
* * chunkSizeBytes (integer): The chunk size in bytes. Defaults to
* 261120 (i.e. 255 KiB).
*
* * readPreference (MongoDB\Driver\ReadPreference): Read preference.
*
* * writeConcern (MongoDB\Driver\WriteConcern): Write concern.
*
* @param Manager $manager Manager instance from the driver
* @param string $databaseName Database name
* @param array $options Bucket options
* @throws InvalidArgumentException
*/
public function __construct(Manager $manager, $databaseName, array $options = [])
{
$options += [
'bucketName' => 'fs',
'chunkSizeBytes' => 261120,
];

if (isset($options['bucketName']) && ! is_string($options['bucketName'])) {
throw new InvalidArgumentTypeException('"bucketName" option', $options['bucketName'], 'string');
}

if (isset($options['chunkSizeBytes']) && ! is_integer($options['chunkSizeBytes'])) {
throw new InvalidArgumentTypeException('"chunkSizeBytes" option', $options['chunkSizeBytes'], 'integer');
}

if (isset($options['readPreference']) && ! $options['readPreference'] instanceof ReadPreference) {
throw new InvalidArgumentTypeException('"readPreference" option', $options['readPreference'], 'MongoDB\Driver\ReadPreference');
}

if (isset($options['writeConcern']) && ! $options['writeConcern'] instanceof WriteConcern) {
throw new InvalidArgumentTypeException('"writeConcern" option', $options['writeConcern'], 'MongoDB\Driver\WriteConcern');
}

$this->manager = $manager;
$this->databaseName = (string) $databaseName;
$this->options = $options;

$this->filesCollection = new Collection(
$manager,
sprintf('%s.%s.files', $this->databaseName, $options['bucketName']),
isset($options['writeConcern']) ? $options['writeConcern'] : null,
isset($options['readPreference']) ? $options['readPreference'] : null,
);

$this->chunksCollection = new Collection(
$manager,
sprintf('%s.%s.chunks', $this->databaseName, $options['bucketName']),
isset($options['writeConcern']) ? $options['writeConcern'] : null,
isset($options['readPreference']) ? $options['readPreference'] : null,
);
}

/**
* Opens a Stream for reading the contents of a file specified by ID.
*
* @param ObjectId $id
* @return Stream
*/
public function openDownloadStream(ObjectId $id)
{

}

/**
* Return the chunkSizeBytes option for this Bucket.
*
* @return integer
*/
public function getChunkSizeBytes()
{
return $this->options['chunkSizeBytes'];
}

/**
* Opens a Stream for writing the contents of a file.
*
* @param ObjectId $id
* @return Stream
*/
public function openUploadStream($filename, array $options = [])
{

}

private function ensureIndexes()
{
// Indexes should only be ensured once before the first write operation
if ($this->ensuredIndexes) {
return;
}

if ( ! $this->isFilesCollectionEmpty()) {
return;
}

$this->ensureFilesIndex();
$this->ensureChunksIndex();

$this->ensuredIndexes = true;
}

private function ensureChunksIndex()
{
foreach ($this->chunksCollection->listIndexes() as $index) {
if ($index->isUnique() && $index->getKey() === ['files_id' => 1, 'n' => 1]) {
return;
}
}

$this->chunksCollection->createIndex(['files_id' => 1, 'n' => 1], ['unique' => true]);
}

private function ensureFilesIndex()
{
foreach ($this->filesCollection->listIndexes() as $index) {
if ($index->getKey() === ['filename' => 1, 'uploadDate' => 1]) {
return;
}
}

$this->filesCollection->createIndex(['filename' => 1, 'uploadDate' => 1]);
}

private function isFilesCollectionEmpty()
{
return null === $this->filesCollection->findOne([], [
'readPreference' => new ReadPreference(ReadPreference::RP_PRIMARY),
'projection' => ['_id' => 1],
]);
}
}
180 changes: 180 additions & 0 deletions src/GridFS/StreamWrapper.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
<?php

namespace MongoDB\GridFS;

use MongoDB\Driver\Server;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\InvalidArgumentTypeException;
use MongoDB\Exception\RuntimeException;
use MongoDB\Exception\UnexpectedValueException;

/**
* Stream wrapper for reading and writing a GridFS file.
*
* @internal
* @see MongoDB\GridFS\Bucket::openUploadStream()
*/
class StreamWrapper
{
public $context;

private $bucket;
private $filename;
private $options;

private $protocol = 'gridfs';

private $at;
private $hashContext;

/**
* Constructs a writable upload stream.
*
* Supported options:
*
* * chunkSizeBytes (integer): The number of bytes per chunk of this file.
* Defaults to the chunkSizeBytes of the Bucket.
*
* * metadata (document): User data for the "metadata" field of the files
* collection document.
*
* The following options are deprecated:
*
* * aliases (string[]): An array of aliases (i.e. filenames). Applications
* wishing to store aliases should add an aliases field to the metadata
* document instead.
*
* * contentType (string): A valid MIME type. Applications wishing to store
* a contentType should add a contentType field to the metadata document
* instead.
*
* @param Bucket $bucket Database name
* @param string $filename Filename
* @param array $options Upload options
* @throws InvalidArgumentException
*/
public function __construct(Bucket $bucket, $filename, array $options = [])
{
$options += [
'chunkSizeBytes' => $bucket->getChunkSizeBytes(),
];

if (isset($options['chunkSizeBytes']) && ! is_integer($options['chunkSizeBytes'])) {
throw new InvalidArgumentTypeException('"chunkSizeBytes" option', $options['chunkSizeBytes'], 'integer');
}

if (isset($options['metadata']) && ! is_array($options['metadata']) && ! is_object($options['metadata'])) {
throw new InvalidArgumentTypeException('"metadata" option', $options['metadata'], 'array or object');
}

if (isset($options['aliases'])) {
if ( ! is_array($options['aliases'])) {
throw new InvalidArgumentTypeException('"aliases" option', $options['aliases'], 'array or object');
}

$expectedIndex = 0;

foreach ($options['aliases'] as $i => $alias) {
if ($i !== $expectedIndex) {
throw new InvalidArgumentException(sprintf('"aliases" option is not a list (unexpected index: "%s")', $i));
}

if ( ! is_string($alias)) {
throw new InvalidArgumentTypeException(sprintf('$options["aliases"][%d]', $i), $alias, 'string');
}

$expectedIndex += 1;
}
}

if (isset($options['contentType']) && ! is_string($options['contentType'])) {
throw new InvalidArgumentTypeException('"contentType" option', $options['contentType'], 'string');
}

$this->bucket = $bucket;
$this->filename = (string) $filename;
$this->options = $options;
$this->hashContext = hash_init('md5');
}

public function stream_write($data)
{
hash_update($this->hashContext, $data);

//fopen('php://memory', )
}

/**
* Register the GridFS stream wrapper.
*
* @param Manager $manager Manager instance from the driver
* @param string $protocol Protocol to register
*/
public static function register(Manager $manager, $protocol = 'gridfs')
{
if (in_array($protocol, stream_get_wrappers())) {
stream_wrapper_unregister($protocol);
}

// Set the client passed in as the default stream context client
stream_wrapper_register($protocol, get_called_class(), STREAM_IS_URL);
$default = stream_context_get_options(stream_context_get_default());
$default[$protocol]['manager'] = $manager;
stream_context_set_default($default);
}

public function stream_open($path, $mode, $options, &$openedPath)
{
$this->initProtocol($path);
$this->params = $this->getDatabase($path);
$this->mode = rtrim($mode, 'bt');

if ($errors = $this->validate($path, $this->mode)) {
return $this->triggerError($errors);
}

return $this->boolCall(function() use ($path) {
switch ($this->mode) {
case 'r': return $this->openReadStream($path);
case 'a': return $this->openAppendStream($path);
default: return $this->openWriteStream($path);
}
});
}

private function validate($path, $mode)
{
$errors = [];

if (!in_array($mode, ['r', 'w', 'a', 'x'])) {
$errors[] = "Mode not supported: {$mode}. "
. "Use one 'r', 'w', 'a', or 'x'.";
}

return $errors;
}

/**
* Trigger one or more errors
*
* @param string|array $errors Errors to trigger
* @param mixed $flags If set to STREAM_URL_STAT_QUIET, then no
* error or exception occurs
*
* @return bool Returns false
* @throws \RuntimeException if throw_errors is true
*/
private function triggerError($errors, $flags = null)
{
// This is triggered with things like file_exists()
if ($flags & STREAM_URL_STAT_QUIET) {
return $flags & STREAM_URL_STAT_LINK
// This is triggered for things like is_link()
? $this->formatUrlStat(false)
: false;
}
// This is triggered when doing things like lstat() or stat()
trigger_error(implode("\n", (array) $errors), E_USER_WARNING);
return false;
}
}
Loading