Javaで適当にSocketプログラム
ひょんなことから、Javaのソケットの振る舞いをちゃんと確認しておこうと思いまして、サーバソケットプログラムを書いてみました。
ただ書くのも何なので、マルチスレッドで処理できるようにしてみました。
で、ただマルチスレッドも何なので、着信スレッドと処理スレッドを分けてみました。
Tomcatがこんな感じのことをやってるので、それの真似です。
設定値は手抜きで、保持するクラスを作ってそれを持ちまわっています。
まずは接続を待ち受けるListenerの処理から。
public class Listener { private ExecutorService acceptorPool; private Configuration conf; public Listener() { conf = new Configuration(); acceptorPool = Executors.newFixedThreadPool(conf.getAcceptorCount()); } public void listen() { try { int acceptorCount = conf.getAcceptorCount(); ServerSocket serverSocket = new ServerSocket(conf.getPort(), conf.getBacklog()); System.out.println("listening on " + conf.getPort()); for (int i = 0; i < acceptorCount; i++) { acceptorPool.submit(new Acceptor(serverSocket, conf)); } } catch (IOException e) { e.printStackTrace(); } } }
指定ポートをバインドするサーバソケットを1つ作って、それを複数のAcceptorスレッドに渡してます。
ServerSocketをnewしたら待ち受け状態になります。
でもこの時点では着信処理(Acceptorの処理)はまだ実行されていないので、クライアントの要求は、ServerSocket作成時に指定したバックログ数分だけキューイングされ、バックログ数を越えた接続は拒否されます。
「listening on ...」のところにブレークポイントを張って止めた状態で、バックログ数を越える接続を張ろうとすると確認できます。
処理を再開すると、受け付けた接続分はダーッと処理されます。
続いてAcceptorの処理です。
class Acceptor implements Callable<Void> { private ServerSocket serverSocket; private Configuration conf; private static ExecutorService workerPool; Acceptor(ServerSocket serverSocket, Configuration conf) { this.serverSocket = serverSocket; this.conf = conf; if (workerPool == null) { workerPool = Executors.newFixedThreadPool(conf.getWorkerCount()); } } @Override public Void call() { String threadName = Thread.currentThread().getName(); if (serverSocket == null || serverSocket.isClosed()) { System.err.println("ServerSocket is null or already closed."); return null; } Socket socket = null; try { System.out.println("accepted by " + threadName); while ((socket = serverSocket.accept()) != null) { Worker worker = new Worker(socket, conf); workerPool.submit(worker); } } catch (IOException e) { e.printStackTrace(); try { if (socket != null && !socket.isClosed()) { socket.close(); } } catch (IOException e1) { e1.printStackTrace(); } } return null; } }
ServerSocket#acceptで着信し、Workerに処理をお願いしています。あとはwhileでひたすらWorker用のスレッドプールに空きがある限り、着信→処理を繰り返します。
ちなみにFixedThreadPoolを使っているので、空きがないときはキューイングされて、空いたら処理をします。
Workerの処理です。
class Worker implements Callable<Void> { private byte[] readBytes; private Socket socket; Worker(Socket socket, Configuration conf) { this.socket = socket; readBytes = new byte[conf.getWorkerMaxBytes()]; } @Override public Void call() throws Exception { String threadName = Thread.currentThread().getName(); InputStream in = socket.getInputStream(); OutputStream out = socket.getOutputStream(); int readBytesCount = 0; while ((readBytesCount = in.read(readBytes)) != -1) { for (int i = 0; i < readBytesCount; i++) { System.out.printf("%c", readBytes[i]); out.write(readBytes); } } socket.close(); return null; } }
ひたすら入力ストリームからデータを読み込んで、それを出力ストリームに渡しているだけです。
読めなくなったらソケットを閉じて終了です。
ちなみに、これを実行した場合、止めるのはCtrl-Cで強制終了しかありまへん。
ExecutorServiceをshutdownしてなかったり、ServerSocketをcloseしてなかったりしますが、とりあえずいっか、という感じです。
本当はどこで止めたり閉じたりすべきなのか、実は分かってなかったりします。
ソースコードはこちら。
chanko/chanko-socket at master · nobrooklyn/chanko · GitHub