gcp_channel = $channel; $this->method = $method; $this->deserialize = $deserialize; $this->options = $options; $this->_affinity = null; if (isset($this->gcp_channel->affinity_conf['affinity_by_method'][$method])) { $this->_affinity = $this->gcp_channel->affinity_conf['affinity_by_method'][$method]; } } /** * Pick a ChannelRef from the channel pool based on the request and * the affinity config. * * @param mixed $argument Requests. * * @return ChannelRef */ protected function _rpcPreProcess($argument) { $this->affinity_key = null; if ($this->_affinity) { $command = $this->_affinity['command']; if ($command == self::BOUND || $command == self::UNBIND) { $this->affinity_key = $this->getAffinityKeyFromProto($argument); } } $this->channel_ref = $this->gcp_channel->getChannelRef($this->affinity_key); $this->channel_ref->activeStreamRefIncr(); return $this->channel_ref; } /** * Update ChannelRef when RPC finishes. * * @param \stdClass $status The status object, with integer $code, string * $details, and array $metadata members * @param mixed $response Response. */ protected function _rpcPostProcess($status, $response) { if ($this->_affinity) { $command = $this->_affinity['command']; if ($command == self::BIND) { if ($status->code != \Grpc\STATUS_OK) { return; } $affinity_key = $this->getAffinityKeyFromProto($response); $this->gcp_channel->bind($this->channel_ref, $affinity_key); } elseif ($command == self::UNBIND) { $this->gcp_channel->unbind($this->affinity_key); } } $this->channel_ref->activeStreamRefDecr(); } /** * Get the affinity key based on the affinity config. * * @param mixed $proto Objects may contain the affinity key. * * @return string Affinity key. */ protected function getAffinityKeyFromProto($proto) { if ($this->_affinity) { $names = $this->_affinity['affinityKey']; $names_arr = explode(".", $names); foreach ($names_arr as $name) { $getAttrMethod = 'get' . ucfirst($name); $proto = call_user_func_array(array($proto, $getAttrMethod), array()); } return $proto; } echo "Cannot find the field in the proto\n"; } /** * @return mixed The metadata sent by the server */ public function getMetadata() { if (!$this->has_real_call) { $this->createRealCall(); $this->has_real_call = true; } return $this->real_call->getMetadata(); } /** * @return mixed The trailing metadata sent by the server */ public function getTrailingMetadata() { if (!$this->has_real_call) { $this->createRealCall(); $this->has_real_call = true; } return $this->real_call->getTrailingMetadata(); } /** * @return string The URI of the endpoint */ public function getPeer() { if (!$this->has_real_call) { $this->createRealCall(); $this->has_real_call = true; } return $this->real_call->getPeer(); } /** * Cancels the call. */ public function cancel() { if (!$this->has_real_call) { $this->has_real_call = true; $this->createRealCall(); } $this->real_call->cancel(); } /** * Serialize a message to the protobuf binary format. * * @param mixed $data The Protobuf message * * @return string The protobuf binary format */ protected function _serializeMessage($data) { return $this->real_call->_serializeMessage($data); } /** * Deserialize a response value to an object. * * @param string $value The binary value to deserialize * * @return mixed The deserialized value */ protected function _deserializeResponse($value) { return $this->real_call->_deserializeResponse($value); } /** * Set the CallCredentials for the underlying Call. * * @param CallCredentials $call_credentials The CallCredentials object */ public function setCallCredentials($call_credentials) { $this->call->setCredentials($call_credentials); } }