封装Thrift相关操作类

最近研究Thrift了一段时间,对Thrift的server端和client端操作的代码进行了稍微封装,这样编写一个后台服务器的时候,就专心写业务逻辑就行了,对thrift内部服务器代码不用关心,因为这些代码都是通用的。

如果你不知道thrift是什么东东,我之前有写一个简单示例:thrift入门hello示例

我同时写了几个语言版本的服务器,有php、python和java,并作了相应测试。client端用的php。

作为测试,我以thrift的IDL格式写了一个简单的服务接口 mongotest.thrift:

namespace php mongotest
namespace py mongotest

service MongoTest {
    string getServerStatus();
}

这个服务功能就是查看mongodb数据库的连接状态,这样就可以测试这种开发方式连接数据库的一些行为。

下面帖出封装的server类,并写出生成一个服务端实例的代码。

java版server,ThriftServer.java

import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TServer.Args;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TSSLTransportFactory;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TTransportFactory;
import org.apache.thrift.transport.TSSLTransportFactory.TSSLTransportParameters;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.protocol.TCompactProtocol;

public class ThriftServer {
  public org.apache.thrift.TProcessor processor;

  public ThriftServer(org.apache.thrift.TProcessor processor){
    this.processor = processor;
  }

  public void startServer() {
    try {
        //这里可以选择TThreadPoolServer 和 TThreadedSelectorServer
      Runnable threadPool = new Runnable() {
        public void run() {
          threadPool(processor);
        }
      };
      //Runnable secure = new Runnable() {
        //public void run() {
          //secure(processor);
        //}
      //};

      new Thread(threadPool).start();
      //new Thread(secure).start();
    } catch (Exception x) {
      x.printStackTrace();
    }
  }

  public void threadPool(org.apache.thrift.TProcessor processor) {
    try {
      TServerTransport serverTransport = new TServerSocket(9090);
      //TServer server = new TSimpleServer(new Args(serverTransport).processor(processor));

      // Use this for a multithreaded server
      TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport).processor(processor);
      args.maxWorkerThreads = 50;
      TServer server = new TThreadPoolServer(args);

      System.out.println("Starting the server...");
      server.serve();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  public void threadSelector(org.apache.thrift.TProcessor processor) {
    try {
      TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(9090);
      //异步IO,需要使用TFramedTransport,它将分块缓存读取。
      TTransportFactory transportFactory = new TFramedTransport.Factory();
      //使用高密度二进制协议
      TProtocolFactory proFactory = new TCompactProtocol.Factory();
      TServer server = new TThreadedSelectorServer(
              new TThreadedSelectorServer.Args(serverTransport)
              .protocolFactory(proFactory)
              .transportFactory(transportFactory)
              .processor(processor)
              );

      System.out.println("Starting the server...");
      server.serve();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  public void secure(org.apache.thrift.TProcessor processor) {
    try {
      /*
       * Use TSSLTransportParameters to setup the required SSL parameters. In this example
       * we are setting the keystore and the keystore password. Other things like algorithms,
       * cipher suites, client auth etc can be set.
       */
      TSSLTransportParameters params = new TSSLTransportParameters();
      // The Keystore contains the private key
      params.setKeyStore("./lib/java/test/.keystore", "thrift", null, null);

      /*
       * Use any of the TSSLTransportFactory to get a server transport with the appropriate
       * SSL configuration. You can use the default settings if properties are set in the command line.
       * Ex: -Djavax.net.ssl.keyStore=.keystore and -Djavax.net.ssl.keyStorePassword=thrift
       *
       * Note: You need not explicitly call open(). The underlying server socket is bound on return
       * from the factory class.
       */
      TServerTransport serverTransport = TSSLTransportFactory.getServerSocket(9093, 0, null, params);
      //TServer server = new TSimpleServer(new Args(serverTransport).processor(processor));

      // Use this for a multi threaded server
      TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport).processor(processor);
      args.maxWorkerThreads = 50;
       TServer server = new TThreadPoolServer(args);

      System.out.println("Starting the secure server...");
      server.serve();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

有了上面封装的类后,要写一个基于thrift相应服务的socket后台,就相当简单了,下面是服务器调用的例子 JavaServer.java:

import mongotest.*;

public class JavaServer {

    public static void main(String[] args){
        MongoTestHandler handler = new MongoTestHandler();  //之前定义的接口mongotest.thrift,具体实现类
        MongoTest.Processor processor = new MongoTest.Processor(handler); //这个调用的是通过thrift自动生成的类
        ThriftServer server = new ThriftServer(processor);
        server.startServer();  //这里也可以把服务器的一些参数,如端口等封装进去

    }

}

python版server,ThriftServer.py:

#!/usr/bin/env python
#encoding=utf-8

import sys
sys.path.append('./gen-py')

from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer

class ThriftServer:

    def __init__(self, processor, port=9090):
        self.processor = processor
        self.port = port

    def startServer(self):
        processor = self.processor
        transport = TSocket.TServerSocket(port=self.port)
        tfactory = TTransport.TBufferedTransportFactory()
        pfactory = TBinaryProtocol.TBinaryProtocolFactory()

        #server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)

        # You could do one of these for a multithreaded server
        #server = TServer.TThreadedServer(processor, transport, tfactory, pfactory)
        server = TServer.TThreadPoolServer(processor, transport, tfactory, pfactory)
        server.daemon = True #enable ctrl+c to exit the server
        server.setNumThreads(100);
        #server = TServer.TForkingServer(processor, transport, tfactory, pfactory)

        print 'Starting the server...'
        server.serve()
        print 'done.'

python版调用示例:

#!/usr/bin/env python
#encoding=utf-8

import sys
sys.path.append('./gen-py')
from ThriftServer import ThriftServer
from MongoTestHandler import MongoTestHandler
from mongotest import MongoTest

handler = MongoTestHandler()
processor = MongoTest.Processor(handler)
server = ThriftServer(processor, 9090)

server.startServer()

php版server,ThriftServer.php:

<?php

error_reporting(E_ALL);

require_once __DIR__.'/lib/Thrift/ClassLoader/ThriftClassLoader.php';

use Thrift\ClassLoader\ThriftClassLoader;

$GEN_DIR = realpath(dirname(__FILE__).'').'/gen-php';

$loader = new ThriftClassLoader();
$loader->registerNamespace('Thrift', __DIR__ . '/lib');
$loader->register();
use Thrift\Protocol\TBinaryProtocol;
use Thrift\Transport\TPhpStream;
use Thrift\Transport\TBufferedTransport;
use Thrift\Server\TServerSocket;
use Thrift\Server\TForkingServer;
use Thrift\Factory\TTransportFactory;
use Thrift\Factory\TBinaryProtocolFactory;

class ThriftServer {

    private $processor;
    private $server_ip;
    private $server_port;

    function __construct($processor, $server_ip="localhost", $server_port=9090){
        $this->processor = $processor;
        $this->server_ip = $server_ip;
        $this->server_port = $server_port;
    }

    function startServer(){
        $processor = $this->processor;
        try {
            $transport = new TServerSocket($this->server_ip, $this->server_port);
        } catch (Exception $e) {
            echo 'port already in use.';
            exit();
        }

        $outputTransportFactory = $inputTransportFactory = new TTransportFactory($transport);
        $outputProtocolFactory = $inputProtocolFactory = new TBinaryProtocolFactory();

        $server = new TForkingServer(
            $processor,
            $transport,
            $inputTransportFactory,
            $outputTransportFactory,
            $inputProtocolFactory,
            $outputProtocolFactory
        );

        header('Content-Type: application/x-thrift');
        print 'Starting the server...';
        $server->serve();
    }

}

php调用示例,PhpServer.php:

<?php

include_once __DIR__.'/gen-php/mongotest/MongoTest.php';
include_once __DIR__.'/MongoTestHandler.php';
include_once __DIR__.'/ThriftServer.php';
error_reporting(E_ALL);

$handler = new MongoTestHandler();
$processor = new \mongotest\MongoTestProcessor($handler);
$server = new ThriftServer($processor);
$server->startServer();

对于客户端,只做了php一个版本,其它语言也很容易,下面是php版client封装类,ThriftClient.php:

<?php

error_reporting(E_ALL);

require_once __DIR__.'/lib/Thrift/ClassLoader/ThriftClassLoader.php';

use Thrift\ClassLoader\ThriftClassLoader;

$GEN_DIR = realpath(dirname(__FILE__)).'/gen-php';

$loader = new ThriftClassLoader();
$loader->registerNamespace('Thrift', __DIR__ . '/lib');
$loader->register();

use Thrift\Protocol\TBinaryProtocol;
use Thrift\Transport\TSocket;
use Thrift\Transport\THttpClient;
use Thrift\Transport\TBufferedTransport;
use Thrift\Exception\TException;

class ThriftClient {

    public $client;
    private $transport;
    private $protocol;
    private $client_class;
    private $server_ip;
    private $server_port;
    private $socket;

    function __construct($client_class, $server_ip='localhost', $server_port=9090){
        $this->client_class = $client_class;
        $this->server_ip = $server_ip;
        $this->server_port = $server_port;
    }

    function __destruct(){
        $this->transport->close();
    }

    public function getClient(){
        $this->socket = new TSocket($this->server_ip, $this->server_port);
        $this->transport = new TBufferedTransport($this->socket, 1024, 1024);
        $this->protocol = new TBinaryProtocol($this->transport);
        $this->client = new \mongotest\MongoTestClient($this->protocol);
        $this->transport->open();
        return $this->client;
    }
}

?>

php版client调用示例,MongoTest-client-thrift.php:

<?php
include_once __DIR__.'/gen-php/mongotest/MongoTest.php';
include_once __DIR__.'/ThriftClient.php';

$thrift = new ThriftClient('\mongotest\MongoTestClient', 'localhost', 9090);
$client = $thrift->getClient();
$ret = $client->getServerStatus();   //调用服务方法
echo $ret;

?>

相关代码打包(省略了thrift的库和通过thrift命令自动生成的代码,及一些jar依赖包,如mongodb包)