代码拉取完成,页面将自动刷新
Others
binary
secretflow 0.8.0b1
centos7.5
3.8.13
按照生产模式配置两个节点并在两方都执行一段lr任务。执行日志提示
No avaiable node types can fulfill resource request {'bob':1.0,'CPU':1.0} .Add suitable node types to this cluster to resolve this issue
No avaiable node types can fulfill resource request {'CPU':1.0,'alice':1.0} .Add suitable node types to this cluster to resolve this issue
两个节点 alice:192.168.1.1:9010 和bob: 192.168.1.2:9010
1、启动每个节点ray
ray start --head --node-ip-address="192.168.1.1" --port="9010" --include-dashboard=False --disable-usage-stats
ray start --head --node-ip-address="192.168.1.2" --port="9010" --include-dashboard=False --disable-usage-stats
2、sf.init初始化sf集群
alice:
cluster_config ={
'parties': {
'alice': {
'address': '192.168.1.1:9011',
'listen_addr': '0.0.0.0:9011'
},
'bob': {
'address': '192.168.1.2:9011',
'listen_addr': '0.0.0.0:9011'
},
},
'self_party': 'alice'
}
sf.init(address='192.168.1.1:9010', cluster_config=cluster_config)
bob
cluster_config ={
'parties': {
'alice': {
'address': '192.168.1.1:9011',
'listen_addr': '0.0.0.0:9011'
},
'bob': {
'address': '192.168.1.2:9011',
'listen_addr': '0.0.0.0:9011'
},
},
'self_party': 'bob'
}
sf.init(address='192.168.1.2:9010', cluster_config=cluster_config)
3、分别在alice和bob执行算法
spu_cluster_config= {
'runtime_config': {
'protocol': spu.spu_pb2.SEMI2K,
'field': 'FM128',
'fxp_fraction_bits": "32"
},
'nodes': [
{
'id':'alice',
'party': 'alice',
'address': '192.168.1.1:9012'
},
{
'id':'bob',
'party': 'bob',
'address': '192.168.1.2:9012'
},
]
}
spu_link_desc = {
'connect_retry_interval_ms': 30000,
'recv_timeout_ms': 30*60*1000
}
sf.shutdown()
sf.init(address='192.168.1.1:9010', cluster_config=cluster_config,num_cpus=128)
#如果bob,则换成sf.init(address='192.168.1.2:9010', cluster_config=cluster_config,num_cpus=128)
alice=sf.PYU('alice')
bob=sf.PYU('bob')
spud=sf.SPU(cluster_def=spu_cluster_config,link_desc=spu_link_desc)
def create_conv_model():
def create_model():
from tensorflow import keras
from tensorflow.keras import layers
# Create model
model = tf.keras.Sequential(
[
tf.keras.layers.Dense(1,input_shape=( v_data.shape[1],),activation=tf.nn.sigmoid)
])
model.compile(
optimizer='adam',
loss='binary_crossentropy', # 逻辑回归使用 binary_crossentropy 作为损失函数
metrics=[tf.keras.metrics.AUC()])
return model
return create_model
def get_data(path,train):
from sklearn.preprocessing import StandardScaler
import numpy as np
data = pd.read_csv(path, encoding='utf-8')
if train:
print('正在get_data函数中处理训练数据....')
print('data columns:',data.columns)
data = data.iloc[:,0:-1]
#scaler = StandardScaler()
#x = scaler.fit_transform(data)
return data
else:
print('正在get_data函数中处理label数据....')
data = data.iloc[:,-1:]
y = np.array(data)
return y
print('加载v_data中.....')
v_data = FedNdarray(
partitions={
alice: alice(get_data)(path=r'/root/dataset/检测/augmented_sensor_45_data1.csv',train = True),
bob: bob(get_data)(path=r'/root/dataset/检测/augmented_sensor_45_data2.csv',train = True),
},
partition_way=PartitionWay.HORIZONTAL,
)
print('加载v_data完毕.....')
print('加载label_data 中.....')
label_data = FedNdarray(
partitions={
alice: alice(get_data)(path=r'/root/dataset/检测/augmented_sensor_45_data1.csv',train = False),
bob: bob(get_data)(path=r'/root/dataset/检测/augmented_sensor_45_data2.csv',train = False),
},
partition_way=PartitionWay.HORIZONTAL,
)
print('加载label_data 完毕.....')
device_list = [alice,bob]
secure_aggregator = SecureAggregator(alice, device_list)
spu_aggregator = SPUAggregator(spud)
fed_model = FLModel(server=alice,
device_list=device_list,
model=model,
aggregator=secure_aggregator,
strategy="fed_avg_w",
backend = "tensorflow")
print('开始训练中.....')
history = fed_model.fit(v_data,
label_data,
epochs=1,
sampler_method="batch",
batch_size=1023,
aggregate_freq=1000)
#问题
按照上述步骤执行,日志会提示: No avaiable node types can fulfill resource request {'bob':1.0,'CPU':1.0} .Add suitable node types to this cluster to resolve this issue
No avaiable node types can fulfill resource request {'CPU':1.0,'alice':1.0} .Add suitable node types to this cluster to resolve this issue