server端代码:
#!/usr/bin/python
import time,os, socket,sys,ConfigParser,threading
msgContainer = {}; # message container list
clientThreads = {};
def loadConf():
cnfPsr = ConfigParser.ConfigParser();
content = cnfPsr.read("socket_srv.conf");
#sections = cnfPsr.sections();#return a list, like:['sec1','sec2']
#options = cnfPsr.options('client_ip');#return a list, like:['publish', 'subscribe']
items = cnfPsr.items('client_ip');
conf = {};
for i in items:
conf[i[0]] = i[1].split(',');
items = cnfPsr.items('local_set')
for i in items:
conf[i[0]] = i[1];
return conf;
def report(msg):
log(msg);
def log(msg):
f = open('log.txt', 'ab');
tm = time.strftime('%Y-%m-%d %H:%M:%S');
f.write(tm+'\t'+msg+os.linesep);
f.close();
def worker(conn, address):
global conf;
threadName = threading.currentThread().getName();
print threadName+' was launched!';
try:
bs = int(conf['buffsize']);
que = msgContainer[threadName];
if address[0] in conf['publish']:
recv = conn.recv(bs);
if recv:
for k in msgContainer:
if k!=threadName:
msgContainer[k].append(recv);
print 'Publish send a command:', recv;
conn.send('ok');
else:
conn.send('nil');
conn.close();
print msgContainer;
else:
while 1:
if len(que) == 0:
conn.send('wait');
recv = conn.recv(bs);
if recv==None: #disconnection
conn.close();
print 'client was closed';
break;
else:
msg = que.pop(0);
conn.send(msg);
recv = conn.recv(bs);
if 'ok' == recv:
continue;
elif None == recv:
conn.close();
print 'client was closed';
break;
else:
que = [msg] + que; #when client is not success, restore the pop element
except:
conn.close();
clientThreads.pop(threadName);#delete dict key with value
msgContainer.pop(threadName);
def run():
try:
global conf;
conf = loadConf();
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM);
sk.bind((conf['srv'], int(conf['port']) )); #max connections is 5
sk.listen( int(conf['max_conn']));
print 'server is runing....';
log('server is runing....');
while 1:
conn, address = sk.accept();#address is a tuple, like ('127.0.0.1', 4433)
t = threading.Thread(target=worker, args=(conn, address));#the 3rd parameter is custorm thread name
print 'From '+address[0]+' connected and worked with '+t.getName();
t.start();
clientThreads[t.getName()] = t;
msgContainer[t.getName()] = []; #msg list for this thread
except KeyboardInterrupt:
print 'server is interrupt.';
#sk.shutdown(2);
sk.close();
sys.exit();
except Exception,e:
print e;
report('server is shutdown!error:' + str(e) );
#sk.shutdown(2);#2 means closing the read channel and write channel
sk.close();
sys.exit();
def main():
run()
if __name__ == '__main__':
main()
客户端代码:
#!/usr/bin/python
import socket,time,os,ConfigParser
def loadConf():
confFile = os.path.split(os.path.realpath(__file__))[0] + os.path.sep + 'socket_cli.conf';
cnfPsr = ConfigParser.ConfigParser();
content = cnfPsr.read(confFile);
items = cnfPsr.items('server');
conf = {};
for i in items:
conf[i[0]] = i[1];
return conf;
def run(callback):
conf = loadConf();
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM);
sk.settimeout(int(conf['timeout']));#set timeout, socket with non-blocking state
sk.connect( (conf['ip'], int(conf['port'])) );
sk.settimeout(None);#set timeout none, socket is blocking.
while True:
recv = sk.recv(int(conf['buffsize']));
if 'wait' == recv:
sk.send('ok');
elif None == recv:
sk.close();
break;
else:
callback(recv);
sk.send('ok');
sk.send('hello host');
print(recv);
sk.close();
def callback(msg):
if msg:
print msg;
def log(msg):
f = open('log.txt', 'ab');
tm = time.strftime('%Y-%m-%d %H:%M:%S');
f.write(tm+'\t'+msg+os.linesep);
f.close();
def main():
run(callback);
if __name__ == '__main__':
main()