ESB文档库 ESB文档库
00 概述
01 产品安装指南
02 快速入门指南
03 ESB Studio使用指南
04 企业服务总线使用指南
05 高级配置指南
06 接口服务说明
07 升级&数据迁移指南
08 FAQ
  • 组件开发
  • 新建ESB项目
  • 新建公共Module
  • 新建业务Module
  • 创建自定义Transport
  • 创建自定义Endpoint
  • 创建自定义业务服务
  • 创建自定义代理服务
  • 第三方jar包配置
  • 测试运行
  • 其他示例代码

# 组件开发

  1. 自定义协议扩展框架

    ESB提供了一套扩展框架,专门用于实现定制的Transport或Endpoint。其核心是提供了两个基础抽象类供定制化实现扩展,它们是:

    com.primeton.esb.adapter.framework.component.AbstractTransportComponent 用于Transport扩展,定制的Transport需直接或间接地继承此类,并重写其中的几个抽象方法。

    com.primeton.esb.adapter.framework.component.AbstractEndpointComponent 用于Endpoint扩展,定制的Endpoint需直接或间接地继承此类,并重写其中的几个抽象方法。

    说明:为保证在ESB工程中能找到该实现类,建议直接在ESB Studio中的ESB工程中创建并实现类,而不要用其他开发工具。(实际上,由于ESB Studio是基于Eclipse开发的,

    其创建并实现Java类的方法与Eclipse是基本一样的)

    1.1 Transport实现要点

    在transport实现类中,需要重写的几个重要方法如下:

    void init()
         transport初始化时被调用一次。通常在此应读取相关配置参数,初始化运行环境。
    void start()
         transport启动时被调用。此后应开始处理客户端的服务请求。
    void stop()
         transport停止时被调用。此后应停止处理客户端的服务请求。
    void handleReply(INormalizedMessage replyMsg)
         当ESB向transport返回应答消息时被调用,其中replyMsg参数代表应答消息。此时应根据应答消息的内容向客户端返回应答报文,并结束本次服务请求。
    void handleError(INormalizedMessage errMsg)
         当ESB向transport返回错误消息时被调用。其中errMsg参数代表错误消息。此时应根据错误消息的内容向客户端返回错误报文,并结束本次服务请求。
    void onChange(IComponentConfig cfg)
         当transport配置参数发生变化时(如系统管理员在console中改变了参数值并执行热更新)被调用,其中cfg参数代表更新后的配置。此时应根据新的配置内容调整transport的行为。
         此外,transport实现中还需调用如下的方法:
    void handleRequest(INormalizedMessage requestMsg)
         将请求消息发送到请求通道。
    

    需要注意的是,为保证ESB系统的性能,通常transport应具备多线程并发处理能力,且ESB的异步处理机制决定了请求和应答会在不同的线程中处理,具体说就是:transport通过调用handleRequest将请求消息发给ESB,而在应答消息返回时,ESB会(通常在另一个线程中)回调handleReply()或handleError()。 因此,如果需要请求和应答处理之间保持特定的状态信息,transport实现需自行实现一套上下文的挂起及恢复机制(某些通信框架本身提供类似机制,可直接利用,如Jetty和JavaNIO)。此外,ESB为每个服务请求生成唯一的ContinuationID标识符字串(可在handleRequest()后从系统头中取得),可以此作为上下文挂起及恢复的依据。

    1.2 Endpoint实现要点

    在Endpoint实现类中,需要重写的几个重要方法如下:

    void init()
         endpoint初始化时被调用一次。通常在此应读取相关配置参数,初始化运行环境。
    void start()
         endpoint启动时被调用。此后应开始处理来自ESB的服务请求。
    void stop()
         endpoint停止时被调用。此后应停止处理来自ESB的服务请求。
    void onChange(IComponentConfig cfg)
         当endpoint配置参数发生变化时(如系统管理员在console中改变了参数值并执行热更新)被调用,其中cfg参数代表更新后的配置。此时应根据新的配置内容调整endpoint的行为。
    void handleRequest(INormalizedMessage requestMsg)
         当ESB调用外部服务(即代理服务或中介服务调用业务服务)时被调用。其中requestMsg代表服务请求消息,此时应根据请求消息的内容执行对目标服务器的访问,并根据访问结果生成应答消息或错误消息,进而调用handleReply或handleError返回给ESB。
         此外,endpoint实现中可以调用如下方法:
    void handleReply(INormalizedMessage replyMsg)
         将应答消息发送到应答通道。
    void handleError(INormalizedMessage errMsg)
         将错误消息发送到错误通道。
    

    需要注意的是,为保证ESB系统的性能,通常endpoint应具备多线程并发处理能力,且在请求和应答处理间应采用异步机制,具体说就是:endpoint在处理handleRequest()期间,不宜使线程阻塞于IO访问,而宜采用异步IO的处理方式,尤其在向服务器发送请求之后等待应答的这段时间。 因此,handleReply()或handleError()的调用通常会与handleRequest()的调用处于不同的线程,如果此期间需要保持特定的状态信息,endpoint需自行实现一套上下文的挂起及恢复机制(某些通信框架本身提供类似机制,可直接利用,如apache-httpclient和JavaNIO)。 当然,如果采用简单的同步IO处理,则不需要这些复杂的机制,功能上也可以实现,只是对ESB的整体性能不利。

    无论是否采用异步IO处理,向ESB返回的消息(应答消息或错误消息)的系统头中都必须包含如下字段,且内容必须与请求消息中的一致:

    键值常量 键值字串 值类型 说明
    CONTINUATION_ID $tip.continuationId String 服务请求ID。
    MEDIA_RETURN_XPATH_NAME $tip.media.returnXpath String 中介服务返回值变量名。在由中介服务调用业务服务时需要,否则为空。
    MEDIA_UUID_NAME $tip.media.uuid String 中介服务上下文ID。在由中介服务调用业务服务时需要,否则为空。
    REPLY_CHANNEL $tip.reply.channel String 应答通道名。
  2. TCP协议扩展示例

    2.1 场景概述

    本示例基于一个模拟服务端程序及一个模拟客户端程序,分别模拟实施场景中的服务提供者和服务使用者。二者间基于TCP协议通信,其中:

    模拟服务端在8888端口监听,当收到客户端连接请求后即建立并处理连接。针对每一个连接,先接收请求报文,再返回应答报文,最后关闭连接。

    模拟客户端则主动连接服务器,发送请求报文,接收应答报文,最后关闭连接。

    无论是请求报文还是应答报文,均遵从如下格式:

    字段 字节长度 说明
    内容长度 4 整数,网络(大端)字节序。表示后续报文内容的长度。
    内容 由内容长度决定 报文内容,UTF-8编码。

    本示例模拟在一个TCP协议环境下由ESB实现穿透服务的过程,即把模拟服务端提供的(基于TCP协议接口的)服务集成到ESB上,并重新发布(同样是基于TCP协议接口的)服务。而模拟客户端不再直接调用模拟服务端,而是调用ESB提供的TCP服务,且除了IP地址和端口相应变化外,其余功能实现不需改变。

    2.2 ESB的配置及扩展实现

    为实现示例场景,需要配置一个ESB穿透服务,具体步骤如下:

  • 配置一个TCP协议的Endpoint,连接模拟服务端。
  • 配置一个TCP协议的业务服务,基于TCP Endpoint调用模拟服务端的服务。
  • 配置一个中介服务。在中介服务流程中调用TCP业务服务,并在调用业务服务前后输出请求和应答报文内容。
  • 配置一个TCP协议的Transport,接收并处理模拟客户端的连接。
  • 配置一个TCP协议的代理服务,基于TCP Transport处理客户端的服务请求,并调用中介服务处理该请求

以上配置中所用到的TCP协议的transport和endpoint在ESB产品的默认安装包中是不存在的,需要做定制化的扩展实现。其中:

TCP协议的transport需要模仿模拟服务端的工作方式,即以短连接方式处理来自模拟客户端的连接请求。即针对每个连接,先接收请求报文,生成请求消息发送到请求通道;之后在应答消息从应答通道返回(或错误消息从错误通道返回)时,向客户端返回应答报文,并关闭连接。

TCP协议的endpoint需要模仿模拟客户端的工作方式,即针对每个来自ESB的请求消息,建立到模拟服务端的连接,根据请求消息的内容发送请求报文,接收应答报文,再根据应答报文内容生成应答消息并发送到应答通道。如此期间发生错误,则生成错误消息并发送到错误通道。

2.3 补充说明

本示例的目的在于说明ESB协议接入的扩展机制,而非TCP协议本身,故在TCP协议层面的处理比较简单。实际应用中基于TCP协议的通信机制可能更复杂(如长连接等),但相关内容的讨论不属于本文的范围。

此外,在TCP处理层面,出于简单考虑,没有使用JavaNIO的机制,而是采用传统的Socket机制,因此在endpoint实现中未能做到真正的异步处理,在实际应用中,这种处理方式可能会影响ESB的整体性能。

以下将创建一个简单的自定义协议服务,从而熟悉自定义协议服务的开发过程。

# 新建ESB项目

打开ESB Studio,进入ESB开发平台。点击ESB Studio工具条上的【文件->新建->项目】,在弹出的对话框中选择"ESB->ESB项目",如下图所示:

点击【下一步】,填写项目名称"custdemo",

点击【下一步】,显示项目的引用库信息。点击【完成】,在"资源管理器"视图中显示出custdemo项目。custdemo项目下默认没有创建模块。

# 新建公共Module

首先我们需要在custdemo项目中创建公共Module,选中custdemo项目,在右键菜单中选择"创建->公共Module",在弹出的对话框中设置Module名称, 如下图所示:

点击【完成】,新建的公共Module目录结构,如下图所示:

该Module用来配置HTTP服务、JMS服务、Web Service及自定义协议服务需要的公共信息:

  • ransport:为代理服务实现通信接入。
  • ndpoint:为业务服务实现通信接出。
  • ariable:可创建变量、变量分类等。
  • Java:可创建标准Java类。

# 新建业务Module

在ESB项目中开发各种服务,需要另外创建业务Module。选中custdemo项目,在右键菜单中选择"创建->业务Module",在弹出的对话框中设置Module名称, 如下图所示:

点击【完成】,在项目中生成业务Module:com.primeton.esb.custdemo.restaurant,其中包含代理服务、中介服务、业务服务、Variable及Java。

一个ESB项目可以有多个业务Module。

  • 代理服务:需关联到一个业务服务(穿透)或中介服务,即由这个业务服务或中介服务完成所代理的服务请求。直接关联业务服务时,协议类型需匹配。
  • 中介服务:由中介流描述的处理逻辑,实现代理服务的业务处理。中介流中可包含一个或多个业务服务,以及相关的处理逻辑、数据转换等。中介服务中可以对JAVA方法、子中介服务、WS业务服务、JMS业务服务、HTTP业务服务、CUST业务服务、TCP业务服务、UDP业务服务、FTP业务服务、EJB业务服务、EMAIL业务服务等进行编排。
  • 业务服务:是连接中介服务与Endpoint的数据处理层,根据不同的协议分为HTTP、JMS、WS、EJB、EMAIL、FTP、TCP、UDP、CUST等。
  • Variable:可创建变量、变量分类等。
  • Java:可创建标准Java类。

# 创建自定义Transport

在公共Module中选中"Transport",点右键,选择"创建->CUST类型Transport",在弹出的对话框中,配置相关参数,如下图所示:

点击【完成】,在资源管理器中产生"transport_cust.tansport_custx",并在中间编辑器中显示出具体的信息。配置如下图所示,填写必要信息并保存 。

  • 协议类型:所要开发的协议的名称,根据实际情况填写。这里我们以TCP为协议类型。
  • 扩展实现: Transport端组件实现类,负责接收请求并调用Transport发送请求。
  • 扩展属性:自定义组件的所有相关属性配置,扩展属性的属性值支持变量配置。本例则设置一个port属性,表示TCP监听端口,值为9999。
  • 响应线程池配置信息:为提升ESB消息处理能力,在消息接收处配置了线程池机制,是性能调优的核心配置之一。本教程案例统一采用全局配置,有关这部分的介绍可以参见产品联机帮助《ESB开发指南-使用指南-运行参数配置-线程池配置》。

输入协议类型为TCP,点击"扩展实现*"链接,打开类创建窗口,填写类名称TCPTransport,此类必须继承com.primeton.esb.adapter.framework.component.AbstractTransportComponent类,如下图所示:

点击【完成】后,自动转到TCP Transport类源代码界面进行开发。源码详见TCPTransport.java。

package com.primeton.esb.custdemo.common;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.primeton.esb.adapter.framework.component.AbstractTransportComponent;
import com.primeton.esb.adapter.framework.config.IComponentConfig;
import com.primeton.esb.adapter.framework.exception.ESBAdapterException;
import com.primeton.esb.adapter.framework.messaging.INormalizedMessage;
import com.primeton.esb.adapter.framework.messaging.NormalizedMessageBuilder;
import com.primeton.esb.model.Constants;

public class TCPTransport extends AbstractTransportComponent {

   private int port;
   private ServerSocket serverSock;
   private Thread listenerThread;
   private ConcurrentHashMap<String, TCPTransport_Context> ctxMap = new ConcurrentHashMap<String, TCPTransport_Context>();

   public void handleError(INormalizedMessage errMsg) throws ESBAdapterException {
      // TODO 自动生成方法存根
      Socket sock = null;
      try {
         String cid = (String) errMsg.getSystemHeader(Constants.CONTINUATION_ID);
         TCPTransport_Context ctx = ctxMap.remove(cid);
         if (ctx == null)
            throw new Exception("resume context fail. cid='" + cid + "'");
         String err = (String) errMsg.getMessagePayload();
         sock = ctx.sock;
         DataOutputStream out = new DataOutputStream(sock.getOutputStream());
         TCPUtil.writeString(out, err);
      } catch (Exception e) {
         e.printStackTrace();
         throw new ESBAdapterException("handle error-reply error", e);
      } finally {
         if (sock != null) {
            try {
               sock.close(); //确保关闭了连接
            } catch (Exception e1) {
               e1.printStackTrace();
            }
         }
      }
   }

   public void handleReply(INormalizedMessage replyMsg) throws ESBAdapterException {
      // TODO 自动生成方法存根
      Socket sock = null;
      try {
         //从应答消息中取回cid
         String cid = (String) replyMsg.getSystemHeader(Constants.CONTINUATION_ID);
         //恢复上下文
         TCPTransport_Context ctx = ctxMap.remove(cid);
         if (ctx == null)
            throw new Exception("resume context fail. cid='" + cid + "'");

         //取得应答消息文本。(另外可从系统头、协议头中取得其他应答信息)
         String reply = (String) replyMsg.getMessagePayload();

         //构造应答消息并返回客户端(注意:此处用到了上下文中保存的socket连接及请求消息)
         sock = ctx.sock;
         DataOutputStream out = new DataOutputStream(sock.getOutputStream());
         TCPUtil.writeString(out, reply);
      } catch (Exception e) {
         e.printStackTrace();
         throw new ESBAdapterException("handle reply error", e);
      } finally {
         if (sock != null) {
            try {
               sock.close(); //确保关闭了连接
            } catch (Exception e1) {
               e1.printStackTrace();
            }
         }
      }
   }

   public void init() throws ESBAdapterException {
      // TODO 自动生成方法存根
      //从配置的扩展属性中取得配置的端口号。
      String portStr = this.config.getExtProperty("port");
      this.port = Integer.parseInt(portStr);
   }

   public void onChange(IComponentConfig arg0) throws ESBAdapterException {
      // TODO 自动生成方法存根
      //配置发生变化时的处理。(用于支持热更新)
   }

   public void start() throws ESBAdapterException {
      // TODO 自动生成方法存根
      if (this.serverSock != null)
         throw new ESBAdapterException("tcp server is in running.");

      try {
         this.serverSock = new ServerSocket(this.port);
      } catch (Exception e) {
         throw new ESBAdapterException("open listener at port " + this.port + " error.", e);
      }

      TCPTransport_ListenerRunnable listener = new TCPTransport_ListenerRunnable(serverSock);
      this.listenerThread = new Thread(listener);
      this.listenerThread.start();

      System.out.println("TCP SERVER: init listener at port " + this.port);
   }

   public void stop() throws ESBAdapterException {
      // TODO 自动生成方法存根
      if (this.listenerThread != null) {
         this.listenerThread.interrupt();
      }
      this.listenerThread = null;

      if (this.serverSock != null) {
         try {
            this.serverSock.close();
         } catch (Exception e) {
            e.printStackTrace();
         }
      }
      this.serverSock = null;
   }

   //=================================================
   class TCPTransport_ListenerRunnable implements Runnable {
      private ServerSocket serverSock;

      public TCPTransport_ListenerRunnable(ServerSocket serverSock) {
         this.serverSock = serverSock;
      }

      public void run() {
         System.out.println("TCP SERVER: listener start !");
         Socket clientSock = null;
         while (true) {
            //accept客户端连接
            try {
               clientSock = this.serverSock.accept();
            } catch (Exception e) {
               //accept出错,则停止整个TCP服务器,关闭服务端连接。
               System.err.println("TCP SERVER: accept socket error.");
               e.printStackTrace();
               try {
                  this.serverSock.close();
               } catch (Exception e1) {
                  e1.printStackTrace();
               }
               break;
            }
            //启动worker线程,处理客户端连接。
            try {
               TCPTransport_WorkerRunnable workerRunner = new TCPTransport_WorkerRunnable(clientSock);
               Thread workerThread = new Thread(workerRunner);
               workerThread.start();
            } catch (Exception e) {
               //启动workder线程出错,只关闭该请求的连接,不影响TCP服务器继续运行。
               System.err.println("TCP SERVER: start worker error.");
               e.printStackTrace();
               try {
                  clientSock.close();
               } catch (Exception e1) {
                  e1.printStackTrace();
               }
            }
         }
      }
   }

   class TCPTransport_WorkerRunnable implements Runnable {
      private Socket sock;

      public TCPTransport_WorkerRunnable(Socket sock) {
         this.sock = sock;
      }

      public void run() {
         String cid = null;
         try {

            //读取客户端请求数据
            DataInputStream in = new DataInputStream(this.sock.getInputStream());

            String request = TCPUtil.readString(in);
            System.err.println("TCP SERVER: req=" + request);

            //构造esb请求消息
            NormalizedMessageBuilder<String> msgBuilder = NormalizedMessageBuilder.withPayload(request);
            msgBuilder.setRouteName("default"); //设置代理服务的路由ID(用于路由到指定的代理服务)

            //设置消息头(此处用一个空的map做示例,可以根据需要改用其他类型)
            Map<String, String> messageHeaders = new HashMap<String, String>();
            msgBuilder.setMessageHeaders(messageHeaders);

            //设置协议头(此处用一个空的map做示例,可以根据需要改用其他类型)
            Map<String, String> protocolHeaders = new HashMap<String, String>();
            msgBuilder.setProtocolHeaders(protocolHeaders);

            INormalizedMessage msg = msgBuilder.build();

            //将请求消息发送到request-channel
            handleRequest(msg);

            //从请求消息中取回cid(由底层框架生成,唯一标识一次服务请求)
            cid = (String) msg.getSystemHeader(Constants.CONTINUATION_ID);

            //用上下文保存当前状态
            TCPTransport_Context ctx = new TCPTransport_Context();
            ctx.cid = cid;
            ctx.sock = this.sock;
            ctx.request = request;

            //将上下文挂起,等待异步回调处理
            ctxMap.put(ctx.cid, ctx);

            System.err.println("TCP SERVER: end request. cid=" + ctx.cid);


         } catch (Exception e) {
            System.err.println("TCP SERVER: execute worker error.");
            e.printStackTrace();
            try {
               this.sock.close();
            } catch (Exception e1) {
               e1.printStackTrace();
            }
            if (cid != null) {
               ctxMap.remove(cid); //出错时,确保删除了上下文,避免内存泄露。
            }
         }
      }
   }

   class TCPTransport_Context {
      String cid;
      Socket sock;
      String request;
   }
}

TCPTransport主要实现方法介绍

方法 说明
init() 组件初始化,ESB启动时会调用此方法
start() 组件启动,ESB启动时会调用此方法
stop() 组件停止,当通过Console卸载时会调用此方法
handleReply(INormalizedMessage arg0) 处理消息响应,当发出请求后,对于ESB 响应消息进行处理
handleError(INormalizedMessage arg0) 处理消息响应,当发出请求后,对于接收的ESB 错误响应消息进行处理
onChange(IComponentConfig arg0) 热更新,当通过Console修改配置参数时,会调用次方法

NormalizeMessage(规格化消息)为组件框架与ESBServer交互的消息,通过NormalizedMessageBuilder类来构建,创建方式如下:

NormalizedMessageBuilder<String> msgBuilder =
NormalizedMessageBuilder.withPayload("message");
//此值必须与代理服务扩展属性中的routeName值保持一致
msgBuilder.setRouteName("default");
// 添加系统头
msgBuilder.addSystemHeader("", "");
// 设置消息头
Map<String, String> messageHeaders = new HashMap<String, String>();
msgBuilder.setMessageHeaders(messageHeaders);
// 设置协议头
Map<String, String> protocolHeaders = new HashMap<String, String>();
msgBuilder.setProtocolHeaders(protocolHeaders);
INormalizedMessage nm = msgBuilder.build();
//调用transport发送请求
handleRequest(nm);
构建NormalizedMessage消息时routeName值必须与扩展属性配置中的routeName属性值保持一致,否则Transport发送请求时找不到对应的代理服务出现异常。NormalizedMessageBuilder msgBuilder = NormalizedMessageBuilder.withPayload("message");msgBuilder.setRouteName("default");

NormalizedMessageBuilder主要方法介绍:

方法 说明
withPayload(T payload) 创建NormalizedMessageBuilder,并设置NormalizedMessage消息体
addSystemHeader(String headerName, Object headerValue) ESB内部系统消息头
setProtocolHeaders(Object headerValue) 协议头,根据实际设置
setMessageHeaders(Object headerValue) 消息头,根据实际设置
setMessagePayload(T messagePayload) 设置NormalizedMessage的消息体
setRouteName(String routeName) 标识消息由Transport路由到哪个代理服务,此值必须与代理服务扩展属性中的routeName值保持一致

在Transport面板点击+按钮,可进行扩展属性的添加,属性标识和属性值为必填项,扩展属性配置如下图所示:

ESB Server启动后,扩展属性值会被加载到com.primeton.esb.adapter.framework.config.IComponentConfig类中,获取方式如下所示:

public void init() throws ESBAdapterException {
    //从配置的扩展属性中取得配置的端口号。
    String portStr = this.config.getExtProperty("port");
    ...
}

配置完成后的Transport如下图所示:

# 创建自定义Endpoint

在公共Module中展开"Endpoint",选中其下面的"CUST",点右键,选择"创建->CUST类型Endpoint",在弹出的对话框中,配置相关参数,如下图所示:

点击【完成】,在资源管理器中产生"endpoint_cust.endpoint_custx",并在中间编辑器中显示出具体的信息。配置如下图所示,填写配置信息并保存。

CUST Endpoint配置:

  • 协议类型:所要开发的协议的名称,根据实际情况填写。这里我们以TCP为协议类型。
  • 扩展实现:Endpoint端组件实现,主要由Endpoint端调用处理请求返回响应结果。
  • 扩展属性:自定义组件的所有相关属性配置。本例则设置两个属性:host属性,表示endpoint所连接的目标主机地址;port属性,表示目标主机的TCP监听端口。本例中两个值分别为"127.0.0.1"和"8888"。
  • 请求线程池配置信息:为提升ESB消息处理能力,在消息接收处配置了线程池机制,是性能调优的核心配置之一。本教程案例统一采用全局配置,有关这部分的介绍可以参见产品联机帮助《ESB开发指南-使用指南-运行参数配置-线程池配置》。

点击"扩展实现*"按钮,打开类创建窗口,填写类名称TCPEndpoint,此类必须继承com.primeton.esb.adapter.framework.component.AbstractEndpointComponent类,如下图所示:

点击【完成】后,自动转到TCP Endpoint类源代码界面进行开发。源码详见TCPEndpoint.java。

package com.primeton.esb.custdemo.common;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;

import com.primeton.esb.adapter.framework.component.AbstractEndpointComponent;
import com.primeton.esb.adapter.framework.config.IComponentConfig;
import com.primeton.esb.adapter.framework.exception.ESBAdapterException;
import com.primeton.esb.adapter.framework.messaging.INormalizedMessage;
import com.primeton.esb.adapter.framework.messaging.impl.NormalizedMessage;
import com.primeton.esb.model.Constants;

public class TCPEndpoint extends AbstractEndpointComponent {

   private String host;
   private int port;

   public void handleRequest(INormalizedMessage requestMsg)
           throws ESBAdapterException {
      // TODO 自动生成方法存根

      //取得请求消息文本。(另外可从系统头、协议头中取得其他请求信息)
      String request = (String) requestMsg.getMessagePayload();

      //建立到目标主机的短连接
      SocketAddress addr = new InetSocketAddress(this.host, this.port);
      Socket sock = new Socket();
      try {
         sock.connect(addr);
      } catch (Exception e) {
         throw new ESBAdapterException("connect to server " + this.host + ":" + this.port + "' error", e);
      }
      System.out.println("TCPEndpoint: connect to server " + this.host + ":" + this.port);

      //向主机发送请求报文,并取得应答报文。(同步处理)
      String reply = null;
      try {
         DataOutputStream out = new DataOutputStream(sock.getOutputStream());
         TCPUtil.writeString(out, request);

         DataInputStream in = new DataInputStream(sock.getInputStream());
         reply = TCPUtil.readString(in);

      } catch (Exception e) {
         throw new ESBAdapterException("handle request error", e);
      } finally {
         try {
            sock.close();
         } catch (Exception e) {
            e.printStackTrace();
         }
      }

      for (String key : requestMsg.getSystemHeaders().keySet()) {
         Object value = requestMsg.getSystemHeaders().get(key);
         System.out.println("----->sys-header: " + key + "=" + value);
      }
		/*
		----->sys-header: $tip.continuationId=0d2e06c1-2e6d-11e3-8280-24fd52ce693f
		----->sys-header: $history=[{timestamp=1381053165884, componentName=com.primeton.esb.cust_prj.common.transport.transport_cust.outgoing.request.channel, componentType=channel}, {timestamp=1381053165884, componentName=com.primeton.esb.cust_prj.cust_demo.ps.tcp.proxyService_tcp.default.incoming.request.channel, componentType=channel}, {timestamp=1381053165884, componentName=com.primeton.esb.cust_prj.cust_demo.ps.tcp.proxyService_tcp.default.outgoing.request.channel, componentType=channel}, {timestamp=1381053165884, componentName=com.primeton.esb.cust_prj.cust_demo.ms.tcp.mediaService.incoming.request.channel, componentType=channel}, {timestamp=1381053165962, componentName=com.primeton.esb.cust_prj.cust_demo.ms.tcp.mediaService.outgoing.request.channel, componentType=channel}, {timestamp=1381053165962, componentName=com.primeton.esb.cust_prj.cust_demo.bs.tcp.bizService_tcp.default.incoming.request.channel, componentType=channel}, {timestamp=1381053165962, componentName=com.primeton.esb.cust_prj.cust_demo.bs.tcp.bizService_tcp.default.outgoing.request.channel, componentType=channel}, {timestamp=1381053165962, componentName=com.primeton.esb.cust_prj.common.endpoint.endpoint_tcp.incoming.request.channel, componentType=channel}]
		----->sys-header: $tip.media.uuid=9446a716-3706-4206-a6c0-b3520fe37f66
		----->sys-header: $id=0d3c5ea1-2e6d-11e3-8280-24fd52ce693f
		----->sys-header: $tip.reply.channel=com.primeton.esb.cust_prj.cust_demo.bs.tcp.bizService_tcp.default.incoming.reply.channel
		----->sys-header: $tip.error.channel=com.primeton.esb.cust_prj.cust_demo.bs.tcp.bizService_tcp.default.incoming.error.channel
		----->sys-header: $timestamp=1381053165884
		----->sys-header: $tip.media.returnXpath=outMessage
		----->sys-header: $tip.request.channel=com.primeton.esb.cust_prj.common.endpoint.endpoint_tcp.incoming.request.channel
		----->sys-header: $tip_message_invoke_type=3
		----
		--->const Constants.CONTINUATION_ID='$tip.continuationId'
		--->const Constants.REPLY_CHANNEL='$tip.reply.channel'
		--->const Constants.MEDIA_UUID_NAME='$tip.media.uuid'
		--->const Constants.MEDIA_RETURN_XPATH_NAME='$tip.media.returnXpath'
		*/

      String cid = (String) requestMsg.getSystemHeader(Constants.CONTINUATION_ID);
      //String msgId = (String)requestMsg.getSystemHeader(MessageHeaders.ID);
      String replyCh = (String) requestMsg.getSystemHeader(Constants.REPLY_CHANNEL);
      //Integer invokeType = (Integer)requestMsg.getSystemHeader("$tip_message_invoke_type"); 


      String uuid = (String) requestMsg.getSystemHeader("$tip.media.uuid");  //RouterHandler.java:50
      String xpath = (String) requestMsg.getSystemHeader("$tip.media.returnXpath");  //RouterHandler.java:50

      //Integer invokeType = (Integer)requestMsg.getSystemHeader("$tip_message_invoke_type"); 

      //返回应答消息。
      NormalizedMessage replyMsg = new NormalizedMessage();

      //replyMsg.getSystemHeaders().putAll(requestMsg.getSystemHeaders());
      replyMsg.getSystemHeaders().put(Constants.CONTINUATION_ID, cid);
      replyMsg.getSystemHeaders().put(Constants.REPLY_CHANNEL, replyCh);  //RouterHandler.java:50
      //replyMsg.getSystemHeaders().put("$tip.media.uuid", uuid); //MediationService.java:158
      replyMsg.getSystemHeaders().put(Constants.MEDIA_UUID_NAME, uuid); //MediationService.java:158
      //replyMsg.getSystemHeaders().put("$tip.media.returnXpath", xpath);
      replyMsg.getSystemHeaders().put(Constants.MEDIA_RETURN_XPATH_NAME, xpath);

      System.out.println("--->const Constants.CONTINUATION_ID='" + Constants.CONTINUATION_ID + "'");
      System.out.println("--->const Constants.REPLY_CHANNEL='" + Constants.REPLY_CHANNEL + "'");
      System.out.println("--->const Constants.MEDIA_UUID_NAME='" + Constants.MEDIA_UUID_NAME + "'");
      System.out.println("--->const Constants.MEDIA_RETURN_XPATH_NAME='" + Constants.MEDIA_RETURN_XPATH_NAME + "'");


      //BaseDataContextImpl.java:290

      //replyMsg.getSystemHeaders().put(MessageHeaders.ID, msgId);
      //replyMsg.getSystemHeaders().put("$tip_message_invoke_type", invokeType);

      replyMsg.setMessagePayload(reply);
      this.handleReply(replyMsg);
   }

   public void init() throws ESBAdapterException {
      // TODO 自动生成方法存根
      //从配置的扩展属性中取得配置的端口号。
      this.host = this.config.getExtProperty("host");
      String portStr = this.config.getExtProperty("port");
      this.port = Integer.parseInt(portStr);
   }

   public void onChange(IComponentConfig arg0) throws ESBAdapterException {
      // TODO 自动生成方法存根
      //配置发生变化时的处理。(用于支持热更新)
   }

   public void start() throws ESBAdapterException {
      // TODO 自动生成方法存根
   }

   public void stop() throws ESBAdapterException {
      // TODO 自动生成方法存根
   }

   //===============================================
   class TCPEndpoint_ReplyReader {

   }
}

TCP Endpoint主要实现方法介绍:

方法 说明
init() 组件初始化,ESB启动时会调用此方法
start() 组件启动,ESB启动时会调用此方法
stop() 组件停止,当通过Console卸载时会调用此方法
handleRequest(INormalizedMessage arg0) 处理请求,Endpoint接收到请求后会调用到此方法
onChange(IComponentConfig arg0) 热更新,当通过Console修改配置参数时,会调用此方法

在Endpoint面板点击+按钮,可进行扩展属性的添加,属性标识和属性值为必填项,扩展属性如下图所示:

配置完成后的Endpoint如下图所示:

# 创建自定义业务服务

在业务Module(com.primeton.esb.custdemo.restaurant)中点击"业务服务",在右键菜单中选择"创建->CUST业务服务",在弹出的对话框中,输入类别cust,类别名称cust,如下图所示:

  • 类别:用于cust业务服务的业务化分类,便于业务服务的管理、查找。可以使用已经存在的类型,也可以填写新的类型。
  • 文件名称:存储业务服务配置信息的文件名称。该文件名称要求本module中唯一,不推荐长文件名。
  • 显示名称:业务服务的显示名称。显示名称可以为中文,推荐具有业务含义的显示名,便于管理、查找。

点击【完成】,在资源管理器中产生"bizService_cust.outbound_custx",并在中间编辑器中显示出具体的信息。点击"Endpoint*"后的【选择...】,选择刚创建的CUST类型的Endpoint:com.primeton.esb.custdemo.common.endpoint.endpoint_cust,如下图所示:

  • 协议类型:所要开发的协议的名称,根据实际情况填写。这里我们以TCP为协议类型。
  • 扩展属性:自定义组件的相关属性配置,支持变量
  • 请求线程池配置信息:为提升ESB消息处理能力,在消息接收处配置了线程池机制,是性能调优的核心配置之一。本教程案例统一采用全局配置,有关这部分的介绍可以参见产品联机帮助《ESB开发指南-使用指南-运行参数配置-线程池配置》。

# 创建自定义代理服务

在业务Module(com.primeton.esb.custdemo.restaurant)中点击"代理服务",在右键菜单中选择"创建->CUST代理服务",在弹出的对话框中,输入类别cust,类别名称cust,设置文件名和显示名称,如下图所示:

点击【完成】,在资源管理器中产生"proxyService_cust.inbound_custx",并在中间编辑器中显示出具体的信息。"Transport"关联公共Module中的CUST类型Transport, 点击"Transport*"后的【选择...】,选择刚创建的CUST类型的Transport:com.primeton.esb.custdemo.common.transport.transport_cust, "关联服务类型"选择"业务服务", 点击"服务名称"对应的【选择...】,在弹出的对话框中选择CUST业务服务"com.primeton.esb.custdemo.restaurant.bs.cust.bizService_cust.default",填写相关信息并保存,如下图所示:

  • 协议类型:所要开发的协议的名称,根据实际情况填写。这里我们以TCP为协议类型。
  • 扩展属性:自定义组件的所有相关属性配置,支持变量。
  • 线程池配置信息:为提升ESB消息处理能力,在消息接收处配置了线程池机制,是性能调优的核心配置之一。本教程案例统一采用全局配置,有关这部分的介绍可以参见产品联机帮助《ESB开发指南-使用指南-运行参数配置-线程池配置》。

扩展属性配置中的routeName属性值必须要填写,构建NormalizedMessage消息时routeName值要保持一致,否则Transport发送请求时找不到对应的代理服务出现异常。

NormalizedMessageBuilder<String> msgBuilder = NormalizedMessageBuilder.withPayload("message");
msgBuilder.setRouteName("default");

配置如下图所示:

# 第三方jar包配置

在studio下开发扩展类实现,如果用到相应的第三方jar包,需要将相应jar包配置到classpath中。在custdemo项目打开右键菜单,选择属性, 在打开的配置面板中点击左边导航栏中"【Java构建路径】"按钮,然后选中"【库】"选项卡,点击"【添加外部JAR(X)】",将第三方jar包配置到classpath。

# 测试运行

在开发环境中安装的ESB Studio内置了ESB Server,服务开发完后可以自动以文件形式部署到ESB Server中,直接可以测试运行。 运行时需要将第三方拷贝至esb安装目录\server\libs下,选择server1(custdemo)运行。如下图所示:

启动成功后,运行结果下图所示:

通过TCPTestClient向9999端口发送消息,经过ESB转发后,通过TCPTestServer可以监听到8888端口的消息,如下图所示:

# 其他示例代码

其他可能用到的源码

package com.primeton.esb.custdemo.common;

import java.io.DataInputStream;
import java.io.DataOutputStream;

public class TCPUtil {
   private static final int MAX_DATALEN = 8192;
   private static final String DEFAULT_ENCODING = "UTF-8";

   public static byte[] readData(DataInputStream in) throws Exception {
      int len = in.readInt();
      if (len < 0 || len > MAX_DATALEN)
         throw new Exception("read data len error: " + len);
      byte[] buf = new byte[len];
      in.read(buf);
      return buf;
   }

   public static void writeData(DataOutputStream out, byte[] buf) throws Exception {
      if (buf.length > MAX_DATALEN)
         throw new Exception("write data len error: " + buf.length);
      out.writeInt(buf.length);
      out.write(buf);
   }

   public static String readString(DataInputStream in, String encoding) throws Exception {
      byte[] buf = readData(in);
      String str = new String(buf, encoding);
      return str;
   }

   public static void writeString(DataOutputStream out, String str, String encoding) throws Exception {
      byte[] buf = str.getBytes(encoding);
      writeData(out, buf);
   }

   public static String readString(DataInputStream in) throws Exception {
      return readString(in, DEFAULT_ENCODING);
   }

   public static void writeString(DataOutputStream out, String str) throws Exception {
      writeString(out, str, DEFAULT_ENCODING);
   }
}
package com.primeton.esb.custdemo.common;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.ServerSocket;
import java.net.Socket;

public class TCPTestServer {
   public static void main(String[] args) throws Exception {

      int port = 8888;

      ServerSocket sock = new ServerSocket(port);
      System.err.println("TEST SERVER: start listen ...");
      try {
         while (true) {
            Socket clientSock = sock.accept();
            startWorker(clientSock);
         }
      } catch (Exception e) {
         e.printStackTrace();
      }

   }

   private static void startWorker(final Socket clientSock) {
      Runnable runner = new Runnable() {
         public void run() {
            try {
               while (true) {
                  DataInputStream in = new DataInputStream(clientSock.getInputStream());
                  DataOutputStream out = new DataOutputStream(clientSock.getOutputStream());

                  String request = TCPUtil.readString(in);
                  String reply = "reply for '" + request + "'";
                  TCPUtil.writeString(out, reply);

                  System.out.println("recv&send request='" + request + "', reply='" + reply + "'");
               }
            } catch (java.io.EOFException e) {
               System.out.println("client closed. ");
            } catch (Exception e) {
               e.printStackTrace();
            } finally {
               try {
                  clientSock.close();
               } catch (Exception e) {
                  e.printStackTrace();
               }
            }
         }
      };
      Thread t = new Thread(runner);
      t.start();
   }
}
package com.primeton.esb.custdemo.common;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;

public class TCPTestClient {

   private static String host = "localhost";
   //private static int port = 8888;
   private static int port = 9999;

   public static void main(String[] args) throws Exception {
      int threadNum = 3, queryNum = 5;
      //int threadNum=1, queryNum=1;
      for (int i = 0; i < threadNum; i++) {
         startQueryThread(host, port, queryNum, i);
      }
   }

   private static void startQueryThread(final String host, final int port,
                                        final int queryNum, final int threadIndex) {
      Runnable query = new Runnable() {
         public void run() {
            for (int i = 0; i < queryNum; i++) {
               try {
                  doQuery(host, port, threadIndex, i);
               } catch (Exception e) {
                  e.printStackTrace();
                  break;
               }
            }
         }
      };
      Thread t = new Thread(query);
      t.start();
   }

   private static void doQuery(String host, int port, int threadIndex, int queryIndex) throws Exception {

      SocketAddress addr = new InetSocketAddress(host, port);
      Socket sock = new Socket();
      try {
         sock.connect(addr);
      } catch (Exception e) {
         throw new Exception("connect to server error.", e);
      }

      try {
         DataOutputStream out = new DataOutputStream(sock.getOutputStream());
         DataInputStream in = new DataInputStream(sock.getInputStream());

         String request = "request at (" + threadIndex + "," + queryIndex + ")";

         System.out.println("send request: " + request);
         TCPUtil.writeString(out, request);
         String reply = TCPUtil.readString(in);
         System.out.println("recv reply: " + reply);
      } catch (Exception e) {
         throw new Exception("connect to server error.", e);
      } finally {
         sock.close();
      }
   }

}

← 3.5.14 中介服务快速发布数据库表服务示例 3.6.2 消息构建 →