|
Moodle
2.2.1
http://www.collinsharper.com
|
00001 <?php 00026 require_once 'Zend/Service/Amazon/Abstract.php'; 00027 00031 require_once 'Zend/Crypt/Hmac.php'; 00032 00043 class Zend_Service_Amazon_Sqs extends Zend_Service_Amazon_Abstract 00044 { 00048 const CREATE_TIMEOUT_DEFAULT = 30; 00049 00053 protected $_sqsEndpoint = 'queue.amazonaws.com'; 00054 00058 protected $_sqsApiVersion = '2009-02-01'; 00059 00063 protected $_sqsSignatureVersion = '2'; 00064 00068 protected $_sqsSignatureMethod = 'HmacSHA256'; 00069 00077 public function __construct($accessKey = null, $secretKey = null, $region = null) 00078 { 00079 parent::__construct($accessKey, $secretKey, $region); 00080 } 00081 00095 public function create($queue_name, $timeout = null) 00096 { 00097 $params = array(); 00098 $params['QueueName'] = $queue_name; 00099 $timeout = ($timeout === null) ? self::CREATE_TIMEOUT_DEFAULT : (int)$timeout; 00100 $params['DefaultVisibilityTimeout'] = $timeout; 00101 00102 $retry_count = 0; 00103 00104 do { 00105 $retry = false; 00106 $result = $this->_makeRequest(null, 'CreateQueue', $params); 00107 00108 if ($result->CreateQueueResult->QueueUrl === null) { 00109 if ($result->Error->Code == 'AWS.SimpleQueueService.QueueNameExists') { 00110 return false; 00111 } elseif ($result->Error->Code == 'AWS.SimpleQueueService.QueueDeletedRecently') { 00112 // Must sleep for 60 seconds, then try re-creating the queue 00113 sleep(60); 00114 $retry = true; 00115 $retry_count++; 00116 } else { 00117 require_once 'Zend/Service/Amazon/Sqs/Exception.php'; 00118 throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code); 00119 } 00120 } else { 00121 return (string) $result->CreateQueueResult->QueueUrl; 00122 } 00123 00124 } while ($retry); 00125 00126 return false; 00127 } 00128 00138 public function delete($queue_url) 00139 { 00140 $result = $this->_makeRequest($queue_url, 'DeleteQueue'); 00141 00142 if ($result->Error->Code !== null) { 00143 require_once 'Zend/Service/Amazon/Sqs/Exception.php'; 00144 throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code); 00145 } 00146 00147 return true; 00148 } 00149 00156 public function getQueues() 00157 { 00158 $result = $this->_makeRequest(null, 'ListQueues'); 00159 00160 if ($result->ListQueuesResult->QueueUrl === null) { 00161 require_once 'Zend/Service/Amazon/Sqs/Exception.php'; 00162 throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code); 00163 } 00164 00165 $queues = array(); 00166 foreach ($result->ListQueuesResult->QueueUrl as $queue_url) { 00167 $queues[] = (string)$queue_url; 00168 } 00169 00170 return $queues; 00171 } 00172 00180 public function count($queue_url) 00181 { 00182 return (int)$this->getAttribute($queue_url, 'ApproximateNumberOfMessages'); 00183 } 00184 00193 public function send($queue_url, $message) 00194 { 00195 $params = array(); 00196 $params['MessageBody'] = urlencode($message); 00197 00198 $checksum = md5($params['MessageBody']); 00199 00200 $result = $this->_makeRequest($queue_url, 'SendMessage', $params); 00201 00202 if ($result->SendMessageResult->MessageId === null) { 00203 require_once 'Zend/Service/Amazon/Sqs/Exception.php'; 00204 throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code); 00205 } else if ((string) $result->SendMessageResult->MD5OfMessageBody != $checksum) { 00206 require_once 'Zend/Service/Amazon/Sqs/Exception.php'; 00207 throw new Zend_Service_Amazon_Sqs_Exception('MD5 of body does not match message sent'); 00208 } 00209 00210 return (string) $result->SendMessageResult->MessageId; 00211 } 00212 00222 public function receive($queue_url, $max_messages = null, $timeout = null) 00223 { 00224 $params = array(); 00225 00226 // If not set, the visibility timeout on the queue is used 00227 if ($timeout !== null) { 00228 $params['VisibilityTimeout'] = (int)$timeout; 00229 } 00230 00231 // SQS will default to only returning one message 00232 if ($max_messages !== null) { 00233 $params['MaxNumberOfMessages'] = (int)$max_messages; 00234 } 00235 00236 $result = $this->_makeRequest($queue_url, 'ReceiveMessage', $params); 00237 00238 if ($result->ReceiveMessageResult->Message === null) { 00239 require_once 'Zend/Service/Amazon/Sqs/Exception.php'; 00240 throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code); 00241 } 00242 00243 $data = array(); 00244 foreach ($result->ReceiveMessageResult->Message as $message) { 00245 $data[] = array( 00246 'message_id' => (string)$message->MessageId, 00247 'handle' => (string)$message->ReceiptHandle, 00248 'md5' => (string)$message->MD5OfBody, 00249 'body' => urldecode((string)$message->Body), 00250 ); 00251 } 00252 00253 return $data; 00254 } 00255 00267 public function deleteMessage($queue_url, $handle) 00268 { 00269 $params = array(); 00270 $params['ReceiptHandle'] = (string)$handle; 00271 00272 $result = $this->_makeRequest($queue_url, 'DeleteMessage', $params); 00273 00274 if ($result->Error->Code !== null) { 00275 return false; 00276 } 00277 00278 // Will always return true unless ReceiptHandle is malformed 00279 return true; 00280 } 00281 00290 public function getAttribute($queue_url, $attribute = 'All') 00291 { 00292 $params = array(); 00293 $params['AttributeName'] = $attribute; 00294 00295 $result = $this->_makeRequest($queue_url, 'GetQueueAttributes', $params); 00296 00297 if ($result->GetQueueAttributesResult->Attribute === null) { 00298 require_once 'Zend/Service/Amazon/Sqs/Exception.php'; 00299 throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code); 00300 } 00301 00302 if(count($result->GetQueueAttributesResult->Attribute) > 1) { 00303 $attr_result = array(); 00304 foreach($result->GetQueueAttributesResult->Attribute as $attribute) { 00305 $attr_result[(string)$attribute->Name] = (string)$attribute->Value; 00306 } 00307 return $attr_result; 00308 } else { 00309 return (string) $result->GetQueueAttributesResult->Attribute->Value; 00310 } 00311 } 00312 00321 private function _makeRequest($queue_url, $action, $params = array()) 00322 { 00323 $params['Action'] = $action; 00324 $params = $this->addRequiredParameters($queue_url, $params); 00325 00326 if ($queue_url === null) { 00327 $queue_url = '/'; 00328 } 00329 00330 $client = self::getHttpClient(); 00331 00332 switch ($action) { 00333 case 'ListQueues': 00334 case 'CreateQueue': 00335 $client->setUri('http://'.$this->_sqsEndpoint); 00336 break; 00337 default: 00338 $client->setUri($queue_url); 00339 break; 00340 } 00341 00342 $retry_count = 0; 00343 00344 do { 00345 $retry = false; 00346 00347 $client->resetParameters(); 00348 $client->setParameterGet($params); 00349 00350 $response = $client->request('GET'); 00351 00352 $response_code = $response->getStatus(); 00353 00354 // Some 5xx errors are expected, so retry automatically 00355 if ($response_code >= 500 && $response_code < 600 && $retry_count <= 5) { 00356 $retry = true; 00357 $retry_count++; 00358 sleep($retry_count / 4 * $retry_count); 00359 } 00360 } while ($retry); 00361 00362 unset($client); 00363 00364 return new SimpleXMLElement($response->getBody()); 00365 } 00366 00386 protected function addRequiredParameters($queue_url, array $parameters) 00387 { 00388 $parameters['AWSAccessKeyId'] = $this->_getAccessKey(); 00389 $parameters['SignatureVersion'] = $this->_sqsSignatureVersion; 00390 $parameters['Timestamp'] = gmdate('Y-m-d\TH:i:s\Z', time()+10); 00391 $parameters['Version'] = $this->_sqsApiVersion; 00392 $parameters['SignatureMethod'] = $this->_sqsSignatureMethod; 00393 $parameters['Signature'] = $this->_signParameters($queue_url, $parameters); 00394 00395 return $parameters; 00396 } 00397 00418 protected function _signParameters($queue_url, array $paramaters) 00419 { 00420 $data = "GET\n"; 00421 $data .= $this->_sqsEndpoint . "\n"; 00422 if ($queue_url !== null) { 00423 $data .= parse_url($queue_url, PHP_URL_PATH); 00424 } 00425 else { 00426 $data .= '/'; 00427 } 00428 $data .= "\n"; 00429 00430 uksort($paramaters, 'strcmp'); 00431 unset($paramaters['Signature']); 00432 00433 $arrData = array(); 00434 foreach($paramaters as $key => $value) { 00435 $arrData[] = $key . '=' . str_replace('%7E', '~', urlencode($value)); 00436 } 00437 00438 $data .= implode('&', $arrData); 00439 00440 $hmac = Zend_Crypt_Hmac::compute($this->_getSecretKey(), 'SHA256', $data, Zend_Crypt_Hmac::BINARY); 00441 00442 return base64_encode($hmac); 00443 } 00444 }