|
Moodle
2.2.1
http://www.collinsharper.com
|
00001 <?php 00030 require_once dirname(__FILE__) . '/Exception.php'; 00031 00033 require_once dirname(__FILE__) . '/XMLObj.php'; 00034 00036 require_once dirname(__FILE__) . '/Log.php'; 00037 00049 class XMPPHP_XMLStream { 00053 protected $socket; 00057 protected $parser; 00061 protected $buffer; 00065 protected $xml_depth = 0; 00069 protected $host; 00073 protected $port; 00077 protected $stream_start = '<stream>'; 00081 protected $stream_end = '</stream>'; 00085 protected $disconnected = false; 00089 protected $sent_disconnect = false; 00093 protected $ns_map = array(); 00097 protected $current_ns = array(); 00101 protected $xmlobj = null; 00105 protected $nshandlers = array(); 00109 protected $xpathhandlers = array(); 00113 protected $idhandlers = array(); 00117 protected $eventhandlers = array(); 00121 protected $lastid = 0; 00125 protected $default_ns; 00129 protected $until = ''; 00133 protected $until_count = ''; 00137 protected $until_happened = false; 00141 protected $until_payload = array(); 00145 protected $log; 00149 protected $reconnect = true; 00153 protected $been_reset = false; 00157 protected $is_server; 00161 protected $last_send = 0; 00165 protected $use_ssl = false; 00169 protected $reconnectTimeout = 30; 00170 00180 public function __construct($host = null, $port = null, $printlog = false, $loglevel = null, $is_server = false) { 00181 $this->reconnect = !$is_server; 00182 $this->is_server = $is_server; 00183 $this->host = $host; 00184 $this->port = $port; 00185 $this->setupParser(); 00186 $this->log = new XMPPHP_Log($printlog, $loglevel); 00187 } 00188 00193 public function __destruct() { 00194 if(!$this->disconnected && $this->socket) { 00195 $this->disconnect(); 00196 } 00197 } 00198 00204 public function getLog() { 00205 return $this->log; 00206 } 00207 00213 public function getId() { 00214 $this->lastid++; 00215 return $this->lastid; 00216 } 00217 00223 public function useSSL($use=true) { 00224 $this->use_ssl = $use; 00225 } 00226 00234 public function addIdHandler($id, $pointer, $obj = null) { 00235 $this->idhandlers[$id] = array($pointer, $obj); 00236 } 00237 00247 public function addHandler($name, $ns, $pointer, $obj = null, $depth = 1) { 00248 #TODO deprication warning 00249 $this->nshandlers[] = array($name,$ns,$pointer,$obj, $depth); 00250 } 00251 00259 public function addXPathHandler($xpath, $pointer, $obj = null) { 00260 if (preg_match_all("/\(?{[^\}]+}\)?(\/?)[^\/]+/", $xpath, $regs)) { 00261 $ns_tags = $regs[0]; 00262 } else { 00263 $ns_tags = array($xpath); 00264 } 00265 foreach($ns_tags as $ns_tag) { 00266 list($l, $r) = explode("}", $ns_tag); 00267 if ($r != null) { 00268 $xpart = array(substr($l, 1), $r); 00269 } else { 00270 $xpart = array(null, $l); 00271 } 00272 $xpath_array[] = $xpart; 00273 } 00274 $this->xpathhandlers[] = array($xpath_array, $pointer, $obj); 00275 } 00276 00284 public function addEventHandler($name, $pointer, $obj) { 00285 $this->eventhandlers[] = array($name, $pointer, $obj); 00286 } 00287 00295 public function connect($timeout = 30, $persistent = false, $sendinit = true) { 00296 $this->sent_disconnect = false; 00297 $starttime = time(); 00298 00299 do { 00300 $this->disconnected = false; 00301 $this->sent_disconnect = false; 00302 if($persistent) { 00303 $conflag = STREAM_CLIENT_CONNECT | STREAM_CLIENT_PERSISTENT; 00304 } else { 00305 $conflag = STREAM_CLIENT_CONNECT; 00306 } 00307 $conntype = 'tcp'; 00308 if($this->use_ssl) $conntype = 'ssl'; 00309 $this->log->log("Connecting to $conntype://{$this->host}:{$this->port}"); 00310 try { 00311 $this->socket = @stream_socket_client("$conntype://{$this->host}:{$this->port}", $errno, $errstr, $timeout, $conflag); 00312 } catch (Exception $e) { 00313 throw new XMPPHP_Exception($e->getMessage()); 00314 } 00315 if(!$this->socket) { 00316 $this->log->log("Could not connect.", XMPPHP_Log::LEVEL_ERROR); 00317 $this->disconnected = true; 00318 # Take it easy for a few seconds 00319 sleep(min($timeout, 5)); 00320 } 00321 } while (!$this->socket && (time() - $starttime) < $timeout); 00322 00323 if ($this->socket) { 00324 stream_set_blocking($this->socket, 1); 00325 if($sendinit) $this->send($this->stream_start); 00326 } else { 00327 throw new XMPPHP_Exception("Could not connect before timeout."); 00328 } 00329 } 00330 00334 public function doReconnect() { 00335 if(!$this->is_server) { 00336 $this->log->log("Reconnecting ($this->reconnectTimeout)...", XMPPHP_Log::LEVEL_WARNING); 00337 $this->connect($this->reconnectTimeout, false, false); 00338 $this->reset(); 00339 $this->event('reconnect'); 00340 } 00341 } 00342 00343 public function setReconnectTimeout($timeout) { 00344 $this->reconnectTimeout = $timeout; 00345 } 00346 00350 public function disconnect() { 00351 $this->log->log("Disconnecting...", XMPPHP_Log::LEVEL_VERBOSE); 00352 if(false == (bool) $this->socket) { 00353 return; 00354 } 00355 $this->reconnect = false; 00356 $this->send($this->stream_end); 00357 $this->sent_disconnect = true; 00358 $this->processUntil('end_stream', 5); 00359 $this->disconnected = true; 00360 } 00361 00367 public function isDisconnected() { 00368 return $this->disconnected; 00369 } 00370 00378 private function __process($maximum=5) { 00379 00380 $remaining = $maximum; 00381 00382 do { 00383 $starttime = (microtime(true) * 1000000); 00384 $read = array($this->socket); 00385 $write = array(); 00386 $except = array(); 00387 if (is_null($maximum)) { 00388 $secs = NULL; 00389 $usecs = NULL; 00390 } else if ($maximum == 0) { 00391 $secs = 0; 00392 $usecs = 0; 00393 } else { 00394 $usecs = $remaining % 1000000; 00395 $secs = floor(($remaining - $usecs) / 1000000); 00396 } 00397 $updated = @stream_select($read, $write, $except, $secs, $usecs); 00398 if ($updated === false) { 00399 $this->log->log("Error on stream_select()", XMPPHP_Log::LEVEL_VERBOSE); 00400 if ($this->reconnect) { 00401 $this->doReconnect(); 00402 } else { 00403 fclose($this->socket); 00404 $this->socket = NULL; 00405 return false; 00406 } 00407 } else if ($updated > 0) { 00408 # XXX: Is this big enough? 00409 $buff = @fread($this->socket, 4096); 00410 if(!$buff) { 00411 if($this->reconnect) { 00412 $this->doReconnect(); 00413 } else { 00414 fclose($this->socket); 00415 $this->socket = NULL; 00416 return false; 00417 } 00418 } 00419 $this->log->log("RECV: $buff", XMPPHP_Log::LEVEL_VERBOSE); 00420 xml_parse($this->parser, $buff, false); 00421 } else { 00422 # $updated == 0 means no changes during timeout. 00423 } 00424 $endtime = (microtime(true)*1000000); 00425 $time_past = $endtime - $starttime; 00426 $remaining = $remaining - $time_past; 00427 } while (is_null($maximum) || $remaining > 0); 00428 return true; 00429 } 00430 00436 public function process() { 00437 $this->__process(NULL); 00438 } 00439 00446 public function processTime($timeout=NULL) { 00447 if (is_null($timeout)) { 00448 return $this->__process(NULL); 00449 } else { 00450 return $this->__process($timeout * 1000000); 00451 } 00452 } 00453 00461 public function processUntil($event, $timeout=-1) { 00462 $start = time(); 00463 if(!is_array($event)) $event = array($event); 00464 $this->until[] = $event; 00465 end($this->until); 00466 $event_key = key($this->until); 00467 reset($this->until); 00468 $this->until_count[$event_key] = 0; 00469 $updated = ''; 00470 while(!$this->disconnected and $this->until_count[$event_key] < 1 and (time() - $start < $timeout or $timeout == -1)) { 00471 $this->__process(); 00472 } 00473 if(array_key_exists($event_key, $this->until_payload)) { 00474 $payload = $this->until_payload[$event_key]; 00475 unset($this->until_payload[$event_key]); 00476 unset($this->until_count[$event_key]); 00477 unset($this->until[$event_key]); 00478 } else { 00479 $payload = array(); 00480 } 00481 return $payload; 00482 } 00483 00487 public function Xapply_socket($socket) { 00488 $this->socket = $socket; 00489 } 00490 00499 public function startXML($parser, $name, $attr) { 00500 if($this->been_reset) { 00501 $this->been_reset = false; 00502 $this->xml_depth = 0; 00503 } 00504 $this->xml_depth++; 00505 if(array_key_exists('XMLNS', $attr)) { 00506 $this->current_ns[$this->xml_depth] = $attr['XMLNS']; 00507 } else { 00508 $this->current_ns[$this->xml_depth] = $this->current_ns[$this->xml_depth - 1]; 00509 if(!$this->current_ns[$this->xml_depth]) $this->current_ns[$this->xml_depth] = $this->default_ns; 00510 } 00511 $ns = $this->current_ns[$this->xml_depth]; 00512 foreach($attr as $key => $value) { 00513 if(strstr($key, ":")) { 00514 $key = explode(':', $key); 00515 $key = $key[1]; 00516 $this->ns_map[$key] = $value; 00517 } 00518 } 00519 if(!strstr($name, ":") === false) 00520 { 00521 $name = explode(':', $name); 00522 $ns = $this->ns_map[$name[0]]; 00523 $name = $name[1]; 00524 } 00525 $obj = new XMPPHP_XMLObj($name, $ns, $attr); 00526 if($this->xml_depth > 1) { 00527 $this->xmlobj[$this->xml_depth - 1]->subs[] = $obj; 00528 } 00529 $this->xmlobj[$this->xml_depth] = $obj; 00530 } 00531 00540 public function endXML($parser, $name) { 00541 #$this->log->log("Ending $name", XMPPHP_Log::LEVEL_DEBUG); 00542 #print "$name\n"; 00543 if($this->been_reset) { 00544 $this->been_reset = false; 00545 $this->xml_depth = 0; 00546 } 00547 $this->xml_depth--; 00548 if($this->xml_depth == 1) { 00549 #clean-up old objects 00550 #$found = false; #FIXME This didn't appear to be in use --Gar 00551 foreach($this->xpathhandlers as $handler) { 00552 if (is_array($this->xmlobj) && array_key_exists(2, $this->xmlobj)) { 00553 $searchxml = $this->xmlobj[2]; 00554 $nstag = array_shift($handler[0]); 00555 if (($nstag[0] == null or $searchxml->ns == $nstag[0]) and ($nstag[1] == "*" or $nstag[1] == $searchxml->name)) { 00556 foreach($handler[0] as $nstag) { 00557 if ($searchxml !== null and $searchxml->hasSub($nstag[1], $ns=$nstag[0])) { 00558 $searchxml = $searchxml->sub($nstag[1], $ns=$nstag[0]); 00559 } else { 00560 $searchxml = null; 00561 break; 00562 } 00563 } 00564 if ($searchxml !== null) { 00565 if($handler[2] === null) $handler[2] = $this; 00566 $this->log->log("Calling {$handler[1]}", XMPPHP_Log::LEVEL_DEBUG); 00567 $handler[2]->$handler[1]($this->xmlobj[2]); 00568 } 00569 } 00570 } 00571 } 00572 foreach($this->nshandlers as $handler) { 00573 if($handler[4] != 1 and array_key_exists(2, $this->xmlobj) and $this->xmlobj[2]->hasSub($handler[0])) { 00574 $searchxml = $this->xmlobj[2]->sub($handler[0]); 00575 } elseif(is_array($this->xmlobj) and array_key_exists(2, $this->xmlobj)) { 00576 $searchxml = $this->xmlobj[2]; 00577 } 00578 if($searchxml !== null and $searchxml->name == $handler[0] and ($searchxml->ns == $handler[1] or (!$handler[1] and $searchxml->ns == $this->default_ns))) { 00579 if($handler[3] === null) $handler[3] = $this; 00580 $this->log->log("Calling {$handler[2]}", XMPPHP_Log::LEVEL_DEBUG); 00581 $handler[3]->$handler[2]($this->xmlobj[2]); 00582 } 00583 } 00584 foreach($this->idhandlers as $id => $handler) { 00585 if(array_key_exists('id', $this->xmlobj[2]->attrs) and $this->xmlobj[2]->attrs['id'] == $id) { 00586 if($handler[1] === null) $handler[1] = $this; 00587 $handler[1]->$handler[0]($this->xmlobj[2]); 00588 #id handlers are only used once 00589 unset($this->idhandlers[$id]); 00590 break; 00591 } 00592 } 00593 if(is_array($this->xmlobj)) { 00594 $this->xmlobj = array_slice($this->xmlobj, 0, 1); 00595 if(isset($this->xmlobj[0]) && $this->xmlobj[0] instanceof XMPPHP_XMLObj) { 00596 $this->xmlobj[0]->subs = null; 00597 } 00598 } 00599 unset($this->xmlobj[2]); 00600 } 00601 if($this->xml_depth == 0 and !$this->been_reset) { 00602 if(!$this->disconnected) { 00603 if(!$this->sent_disconnect) { 00604 $this->send($this->stream_end); 00605 } 00606 $this->disconnected = true; 00607 $this->sent_disconnect = true; 00608 fclose($this->socket); 00609 if($this->reconnect) { 00610 $this->doReconnect(); 00611 } 00612 } 00613 $this->event('end_stream'); 00614 } 00615 } 00616 00624 public function charXML($parser, $data) { 00625 if(array_key_exists($this->xml_depth, $this->xmlobj)) { 00626 $this->xmlobj[$this->xml_depth]->data .= $data; 00627 } 00628 } 00629 00636 public function event($name, $payload = null) { 00637 $this->log->log("EVENT: $name", XMPPHP_Log::LEVEL_DEBUG); 00638 foreach($this->eventhandlers as $handler) { 00639 if($name == $handler[0]) { 00640 if($handler[2] === null) { 00641 $handler[2] = $this; 00642 } 00643 $handler[2]->$handler[1]($payload); 00644 } 00645 } 00646 foreach($this->until as $key => $until) { 00647 if(is_array($until)) { 00648 if(in_array($name, $until)) { 00649 $this->until_payload[$key][] = array($name, $payload); 00650 if(!isset($this->until_count[$key])) { 00651 $this->until_count[$key] = 0; 00652 } 00653 $this->until_count[$key] += 1; 00654 #$this->until[$key] = false; 00655 } 00656 } 00657 } 00658 } 00659 00663 public function read() { 00664 $buff = @fread($this->socket, 1024); 00665 if(!$buff) { 00666 if($this->reconnect) { 00667 $this->doReconnect(); 00668 } else { 00669 fclose($this->socket); 00670 return false; 00671 } 00672 } 00673 $this->log->log("RECV: $buff", XMPPHP_Log::LEVEL_VERBOSE); 00674 xml_parse($this->parser, $buff, false); 00675 } 00676 00682 public function send($msg, $timeout=NULL) { 00683 00684 if (is_null($timeout)) { 00685 $secs = NULL; 00686 $usecs = NULL; 00687 } else if ($timeout == 0) { 00688 $secs = 0; 00689 $usecs = 0; 00690 } else { 00691 $maximum = $timeout * 1000000; 00692 $usecs = $maximum % 1000000; 00693 $secs = floor(($maximum - $usecs) / 1000000); 00694 } 00695 00696 $read = array(); 00697 $write = array($this->socket); 00698 $except = array(); 00699 00700 $select = @stream_select($read, $write, $except, $secs, $usecs); 00701 00702 if($select === False) { 00703 $this->log->log("ERROR sending message; reconnecting."); 00704 $this->doReconnect(); 00705 # TODO: retry send here 00706 return false; 00707 } elseif ($select > 0) { 00708 $this->log->log("Socket is ready; send it.", XMPPHP_Log::LEVEL_VERBOSE); 00709 } else { 00710 $this->log->log("Socket is not ready; break.", XMPPHP_Log::LEVEL_ERROR); 00711 return false; 00712 } 00713 00714 $sentbytes = @fwrite($this->socket, $msg); 00715 $this->log->log("SENT: " . mb_substr($msg, 0, $sentbytes, '8bit'), XMPPHP_Log::LEVEL_VERBOSE); 00716 if($sentbytes === FALSE) { 00717 $this->log->log("ERROR sending message; reconnecting.", XMPPHP_Log::LEVEL_ERROR); 00718 $this->doReconnect(); 00719 return false; 00720 } 00721 $this->log->log("Successfully sent $sentbytes bytes.", XMPPHP_Log::LEVEL_VERBOSE); 00722 return $sentbytes; 00723 } 00724 00725 public function time() { 00726 list($usec, $sec) = explode(" ", microtime()); 00727 return (float)$sec + (float)$usec; 00728 } 00729 00733 public function reset() { 00734 $this->xml_depth = 0; 00735 unset($this->xmlobj); 00736 $this->xmlobj = array(); 00737 $this->setupParser(); 00738 if(!$this->is_server) { 00739 $this->send($this->stream_start); 00740 } 00741 $this->been_reset = true; 00742 } 00743 00747 public function setupParser() { 00748 $this->parser = xml_parser_create('UTF-8'); 00749 xml_parser_set_option($this->parser, XML_OPTION_SKIP_WHITE, 1); 00750 xml_parser_set_option($this->parser, XML_OPTION_TARGET_ENCODING, 'UTF-8'); 00751 xml_set_object($this->parser, $this); 00752 xml_set_element_handler($this->parser, 'startXML', 'endXML'); 00753 xml_set_character_data_handler($this->parser, 'charXML'); 00754 } 00755 00756 public function readyToProcess() { 00757 $read = array($this->socket); 00758 $write = array(); 00759 $except = array(); 00760 $updated = @stream_select($read, $write, $except, 0); 00761 return (($updated !== false) && ($updated > 0)); 00762 } 00763 }