我正在计算大型数据集上的恒心矩阵。我试图在多个CPU上并行执行这些计算。我的设置目前有一个带有10个CPU的节点。
为了更好地理解分布式tensorflow,我编写了代码的一个小抽象。下面是错误
2017-07-23 16:16:17.281414: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:316] Started server with target: grpc://localhost:2225
Process Process-3:
Traceback (most recent call last):
File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 32, in cifar10
serv = tf.train.Server(cluster, job_name= params.job_name,task_index=params.task_index)
File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/training/server_lib.py", line 145, in __init__
self._server_def.SerializeToString(), status)
File "/home/skay/anaconda2/lib/python2.7/contextlib.py", line 24, in __exit__
self.gen.next()
File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/framework/errors_impl.py", line 466, in raise_exception_on_not_ok_status
pywrap_tensorflow.TF_GetCode(status)) UnknownError: Could not start gRPC server每次运行代码时都会收到此错误。但是,它继续为我设置的两个过程中的一个产生输出,如下所示
> `2017-07-23 16:27:48.605617: I tensorflow/core/distributed_runtime/master_session.cc:999] Start master session fe9fd6a338e2c9a7 with config:
2017-07-23 16:27:48.607126: I tensorflow/core/distributed_runtime/master_session.cc:999] Start master session 3560417f98b00dea with config:
[ 1. 2. 3. 4. 5. 6. 7. 8. 9. 10.]
Process-3
[ 1. 2. 3. 4. 5. 6. 7. 8. 9. 10.]
Process-3
[ 1. 2. 3. 4. 5. 6. 7. 8. 9. 10.]
Process-3在这一点上,它继续等待下一个。
ERROR:tensorflow:==================================
Object was never used (type <class 'tensorflow.python.framework.ops.Operation'>):
<tf.Operation 'worker_0/init' type=NoOp>
If you want to mark it as used call its "mark_used()" method.
It was originally created here:
['File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 83, in <module>\n proc.start()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 130, in start\n self._popen = Popen(self)', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/forking.py", line 126, in __init__\n code = process_obj._bootstrap()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap\n self.run()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 114, in run\n self._target(*self._args, **self._kwargs)', 'File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 49, in cifar10\n init_op=tf.initialize_all_variables(),logdir=\'/tmp/mydir\')', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 170, in wrapped\n return _add_should_use_warning(fn(*args, **kwargs))', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 139, in _add_should_use_warning\n wrapped = TFShouldUseWarningWrapper(x)', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 96, in __init__\n stack = [s.strip() for s in traceback.format_stack()]']
==================================
2017-07-23 16:28:28.646871: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0
2017-07-23 16:28:38.647276: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0
2017-07-23 16:28:48.647526: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica: 我这里有两个问题
{“工人”:0},{“工人”:0}
我可以使用多处理队列在tensorflow上的两个不同进程上运行的两个会话之间共享字典吗?
下面是我的代码
# build a python mutliprocess.py
import multiprocessing
import time
import tensorflow as tf
from tensorflow.contrib.training import HParams
import os
import psutil
import numpy as np
from tensorflow.python.client import device_lib
from resnet import *
import Queue
cluster_spec ={"ps": ["localhost:2226"
],
"worker": [
"localhost:2227",
"localhost:2228"]}
cluster = tf.train.ClusterSpec(cluster_spec)
im_Test = np.linspace(1,10,10)
def model_fun(input):
print multiprocessing.current_process().name
return input
def cifar10(device,return_dict,result_t):
params = HParams(cluster=cluster,
job_name = device[0],
task_index = device[1])
serv = tf.train.Server(cluster, job_name= params.job_name,task_index=params.task_index)
input_img=[]
true_lab=[]
if params.job_name == "ps":
##try and wait for all the wokers t
serv.join()
elif params.job_name == "worker":
with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/replica:0/task:%d" % params.task_index,
cluster=cluster)):
# with tf.Graph().as_default(), tf.device('/cpu:%d' % params.task_index):
# with tf.container('%s %d' % ('batchname', params.task_index)) as scope:
input_img = tf.placeholder(dtype=tf.float32, shape=[10,])
with tf.name_scope('%s_%d' % (params.job_name, params.task_index)) as scope:
hess_op = model_fun(input_img)
global_step = tf.contrib.framework.get_or_create_global_step()
sv = tf.train.Supervisor(is_chief=(params.task_index == 0),
global_step=global_step,
init_op=tf.initialize_all_variables(),logdir='/tmp/mydir')
with sv.prepare_or_wait_for_session(serv.target) as sess:
step = 0
while not sv.should_stop() :
hess = sess.run(hess_op, feed_dict={input_img:im_Test })
print(np.array(hess))
print multiprocessing.current_process().name
step += 1
if(step==3):
return_dict[params.job_name] = params.task_index
result_t.put(return_dict)
break
sv.stop()
sess.close()
return
if __name__ == '__main__':
logger = multiprocessing.log_to_stderr()
manager = multiprocessing.Manager()
result = manager.Queue()
return_dict = manager.dict()
processes = []
devices = [['ps', 0],
['worker', 0],
['worker', 1]
]
for i in (devices):
start_time = time.time()
proc = multiprocessing.Process(target=cifar10,args=(i,return_dict,result))
processes.append(proc)
proc.start()
for p in processes:
p.join()
# print return_dict.values()
kill = []
while True:
if result.empty() == True:
break
kill.append(result.get())
print kill
print("time taken = %d" % (start_time - time.time()))发布于 2018-01-17 03:41:46
在我的情况下,我发现ps引发了这个错误,当我提交张力流作业纱线集群模式时,炒锅等待响应。
ps错误如下
2018-01-17 11:08:46,366 INFO (MainThread-7305)启动TensorFlow ps:0在集群节点0上,背景进程2018-01-17 11:08:56,085 INFO (MainThread-7395) 0:======== ps:0 ======== 2018-01-17 11:08:56,086 INFO (MainThread-7395) 0:群集规范:{'ps':‘172.5.16.30:33088’,'worker':‘172.5.16.22:41428’,'172.16.5.30:33595'} 2018-1711:08:56,086 INFO (MainThread-7395) 0:使用CPU 2018-01-17 11:08:56.087452: tensorflow/core/platform/cpu_feature_guard.cc:137]您的CPU支持这样的指令,即这个TensorFlow二进制文件没有编译成使用:SSE4.1SSE4.2 AVX AVX2 FMA E0117 11:08:56.088501182 7395 ev_epoll1_linux.c:1051][ server_chttp2.c:38] {“创建”:“@1516158536.088783549”,“说明”:“已解决的总数中没有添加地址”,"file":"external/grpc/src/core/ext/transport/chttp2/server/chttp2_server.c","file_line":245,"referenced_errors":[{"created":"@1516158536.088779164",“description”:“未能添加任何通配符侦听器”,"file":"external/grpc/src/core/lib/iomgr/tcp_server_posix.c","file_line":338,"referenced_errors":[{"created":"@1516158536.088771177",“描述”:“无法配置套接字”,"fd":12,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":200,"referenced_errors":{"created":"@1516158536.088767669",“描述”:“OS错误”,"errno":98,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":173,“os_error”:“已使用的地址”,"syscall":"bind"}},{“已创建”:“@1516158536.088778651”,“描述”:“无法配置套接字”,"fd":12,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c",“"referenced_errors":{"created":"@1516158536.088776541",”:200,“描述”:“OS错误”,"errno":98,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":173,“os_error”:“已在使用的地址”,}}进程进程-2:回溯(最近一次调用):文件"/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/multiprocessing/process.py",行258,在_bootstrap self.run() File self.run第114行中,在运行self._target(*self._args )中文件"/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000001/tfspark.zip/tensorflowonspark/TFSparkNode.py",第269行,在wrapper_fn文件"/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/pyfiles/mnist_dist.py",第38行,在map_fun集群中,server = ctx.start_cluster_server(1,args.rdma)文件start_cluster_server第56行,在start_cluster_server返回TFNode.start_cluster_server(self,num_gpus,文件"/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/tfspark.zip/tensorflowonspark/TFNode.py",第110行,在start_cluster_server server =tf.train.Server(集群,ctx.job_name,文件"/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/site-packages/tensorflow/python/training/server_lib.py",第145行,在init self._server_def.SerializeToString()中(状态)文件"/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/site-packages/tensorflow/python/framework/errors_impl.py",行473,在exit c_api.TF_GetCode(self.status.status)中) UnknownError:无法启动gRPC服务器
woker:1 log2018-01-17 11:09:14.614244: I tensorflow/core/distributed_runtime/master.cc:221] CreateSession仍在等待工人的响应:/job:ps/replica:0/Task0
然后检查ps服务器中的端口。是的,港口是用过的。
所以重新提交工作解决问题。
但是,如果您每次运行代码时都会收到此错误,您应该检查更多的日志以找到原因。
https://stackoverflow.com/questions/45269790
复制相似问题