hatenob

プログラムって分からないことだらけ

Javaで適当にSocketプログラム

ひょんなことから、Javaのソケットの振る舞いをちゃんと確認しておこうと思いまして、サーバソケットプログラムを書いてみました。

ただ書くのも何なので、マルチスレッドで処理できるようにしてみました。
で、ただマルチスレッドも何なので、着信スレッドと処理スレッドを分けてみました。
Tomcatがこんな感じのことをやってるので、それの真似です。
設定値は手抜きで、保持するクラスを作ってそれを持ちまわっています。

まずは接続を待ち受けるListenerの処理から。

public class Listener {
	private ExecutorService acceptorPool;
	private Configuration conf;

	public Listener() {
		conf = new Configuration();
		acceptorPool = Executors.newFixedThreadPool(conf.getAcceptorCount());
	}

	public void listen() {
		try {
			int acceptorCount = conf.getAcceptorCount();

			ServerSocket serverSocket = new ServerSocket(conf.getPort(),
					conf.getBacklog());

			System.out.println("listening on " + conf.getPort());

			for (int i = 0; i < acceptorCount; i++) {
				acceptorPool.submit(new Acceptor(serverSocket, conf));
			}

		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

指定ポートをバインドするサーバソケットを1つ作って、それを複数のAcceptorスレッドに渡してます。

ServerSocketをnewしたら待ち受け状態になります。
でもこの時点では着信処理(Acceptorの処理)はまだ実行されていないので、クライアントの要求は、ServerSocket作成時に指定したバックログ数分だけキューイングされ、バックログ数を越えた接続は拒否されます。

「listening on ...」のところにブレークポイントを張って止めた状態で、バックログ数を越える接続を張ろうとすると確認できます。
処理を再開すると、受け付けた接続分はダーッと処理されます。

続いてAcceptorの処理です。

class Acceptor implements Callable<Void> {
	private ServerSocket serverSocket;
	private Configuration conf;
	private static ExecutorService workerPool;

	Acceptor(ServerSocket serverSocket, Configuration conf) {
		this.serverSocket = serverSocket;
		this.conf = conf;

		if (workerPool == null) {
			workerPool = Executors.newFixedThreadPool(conf.getWorkerCount());
		}
	}

	@Override
	public Void call() {
		String threadName = Thread.currentThread().getName();

		if (serverSocket == null || serverSocket.isClosed()) {
			System.err.println("ServerSocket is null or already closed.");
			return null;
		}

		Socket socket = null;
		try {
			System.out.println("accepted by " + threadName);
			while ((socket = serverSocket.accept()) != null) {

				Worker worker = new Worker(socket, conf);
				workerPool.submit(worker);

			}

		} catch (IOException e) {
			e.printStackTrace();
			try {
				if (socket != null && !socket.isClosed()) {
					socket.close();
				}
			} catch (IOException e1) {
				e1.printStackTrace();
			}
		}
		return null;
	}
}

ServerSocket#acceptで着信し、Workerに処理をお願いしています。あとはwhileでひたすらWorker用のスレッドプールに空きがある限り、着信→処理を繰り返します。
ちなみにFixedThreadPoolを使っているので、空きがないときはキューイングされて、空いたら処理をします。

Workerの処理です。

class Worker implements Callable<Void> {
	private byte[] readBytes;
	private Socket socket;

	Worker(Socket socket, Configuration conf) {
		this.socket = socket;
		readBytes = new byte[conf.getWorkerMaxBytes()];
	}

	@Override
	public Void call() throws Exception {
		String threadName = Thread.currentThread().getName();

		InputStream in = socket.getInputStream();
		OutputStream out = socket.getOutputStream();

		int readBytesCount = 0;

		while ((readBytesCount = in.read(readBytes)) != -1) {
			for (int i = 0; i < readBytesCount; i++) {
				System.out.printf("%c", readBytes[i]);
				out.write(readBytes);
			}
		}

		socket.close();

		return null;
	}
}

ひたすら入力ストリームからデータを読み込んで、それを出力ストリームに渡しているだけです。
読めなくなったらソケットを閉じて終了です。

ちなみに、これを実行した場合、止めるのはCtrl-Cで強制終了しかありまへん。

ExecutorServiceをshutdownしてなかったり、ServerSocketをcloseしてなかったりしますが、とりあえずいっか、という感じです。
本当はどこで止めたり閉じたりすべきなのか、実は分かってなかったりします。

ソースコードはこちら。
chanko/chanko-socket at master · nobrooklyn/chanko · GitHub