我之前在这里介绍过 Julia 这个语言,提到了它的许多好的(和不好的)特性,不过有一点没有提到的是它方便的并行和分布式计算的能力,结合优质的数值计算能力,其实让它非常方便用于做分布式数据处理——比如 distributed optimization、learning 之类的任务。虽然 Julia 这些年一直在稳步发展并且每个版本都会 break 一些东西,让人需要不断地维护和修改代码有点心累,同时 community 里的第三方库也还不够强大,不过最近在做一点点 distributed optimization 相关的东西中体会到它在这方面的好处,在这里简单分享一下。一方面因为 Julia 的文档虽然比较全,但是似乎还是比较难找到一个完整的例子。本文相关的完整代码会放在这里

在 Julia 里分布式编程主要是通过 Remote Procedure Call (RPC) 的方式来完成。但是 RPC 并不需要用户显式地启动一个 server 来监听调用,然后做 message passing 之类的传输结果,而是有更加 high level 的 API 可以直接实现“在某个节点上执行某段代码”这类的语义。这里提到的 API 如果在将来改变或者改名之类的,主要可以参考 Julia 文档的 Parallel Computing 一章查看最新的接口。

要进行分布式编程,首先要做的是启动计算节点。在 Julia 里非常简单地可以通过 julia -p 2 来启动两个 worker 进程,利用 nprocs() 函数可以看到当前有 3 个进程,myid() 会返回每个进程的 id (和系统里的 pid 并不一样,而是从 1 开始的数字),其中主进程的 id 总是 1,然后其余的是 worker。在这个例子中,nworkers() 会返回 2,表示一个主进程两个 worker 进程。默认情况下(不加 -p 参数启动 Julia 的时候)只有一个进程,nprocs()nworkers() 都返回 1,这时主进程自己同时也是 worker。除了在启动的时候用 -p 参数之外,还可以在启动之后通过调用 addprocs(...) 函数来添加 worker 进程。

当然 -p 只是启动单机多进程,要实现真正的分布式,可以通过给定一个 hostname 列表的方式,Julia 会尝试通过 SSH 启动远程节点进程,或者如果你的集群已经有一个集群或者调度管理器的话,可以通过实现 Julia 里的 ClusterManager 接口来进行对接,在专用集群里启动任务。方便的是单机多进程和真正的分布式对于在 Julia 的 high level API 来说大部分情况下并没有区别。除非实现具体的计算中需要专门考虑多机通讯比本地更慢这些方面的问题,否则可以直接在本机写代码和小范围调试,再 deploy 到集群上去。

启动 worker 进程之后,就可以进行 RPC 来调度任务了。Julia 里 RPC 的接口非常简单,最基本的一个函数是 remotecall(func, wid, args...),其中 func 是要调用的函数,wid 是对应的 worker 的 ID,而 args... 则是需要传给函数的参数,参数会被自动传输到对应的 worker 那里。remotecall 函数是立即范围的,它返回的结果是一个 Future,这相当于对于 RPC 执行结果的一个 handle,对于这个 handle 调用 fetch 会拿到对应的结果,如果对应的 RPC 计算还没有完成,则 block 等待完成,同时 fetch 还会自动把结果传输到当前调用 fetch 的这个节点上来,当然如果结果已经在同一个节点上了,fetch 则不需要额外的数据传输开销。下面是一个例子:

data_ref = remotecall(rand, 2, (20, 30))
ret = remotecall_fetch(x -> norm(fetch(x)), 2, data)

在 worker 2 上调用了 rand 生成一个随机矩阵,注意这里参数传输的代价只是 (20, 30) 这个 tuple,并且 data_ref 只是作为一个 handle 返回到当前进程,这个 handle 直接被传回 worker 2 上计算其 norm,而在 worker 2 上调用 fetch 的时候会发现生成出来的矩阵已经在该节点上了,所以最终结果并没有太大的数据传输代价。这里的 remotecall_fetch 有点类似于嵌套调用 fetch(remotecall(...)),但是由于它自己本身是一个 primitive,所以更加高效,也不会出现隐藏的 race condition 问题。除了这些 primitive 之外,还有一些更 high level 的 API 和宏,比如 @spawn@spawnat 可以很方便地在 worker 上执行任意代码,例如:

julia> ref = @spawnat 2 begin
         x = rand(2, 2)
         norm(x)
       end
Future(2, 1, 12, Nullable{Any}())

julia> fetch(ref)
1.2147872352524431

Julia 还提供一些更加 high level 的函数,诸如 pmap 之类的可以很自动地把计算分配到当前可用的 worker 上执行,并收集结果之类的。但是如果我们需要实现非常具体和细致的算法 scheduling——哪个 worker 执行那一段代码等等,则需要直接使用比较 low level 一点的 API。

假设我们现在要实现一个 ASGD,也就是 Asynchronized Stochastic Gradient Descent,算法的基本思想是每一个 worker 有自己的一份数据,算法执行的时候每个 worker 并行地从 parameter server 里拿到当前最新的参数,然后在本地数据的一个 mini-batch 上计算 gradient,再把 gradient 传回 parameter server,后者再根据传回的 gradient 更新参数。之所以是 asynchronized 是因为每个 worker 是并行并且独立地工作的,因此会有许多可能的不一致的情况,比如当一个 worker 拿到参数,计算了 gradient 并传回去的时候,parameter 上最新的参数已经由其他 worker 传回的结果更新过了,因此参数和梯度的对应关系出现了不一致。在一些给定的条件下,一定程度的不一致是可以允许的,并且能够证明最终收敛性,因为 SGD 本身就是 noisy 的,并且优化算法只要保证前进的方向和 descent 当方向有足够的 correlation,而不一定非常完全 align 起来。不过这些并不是本文要讨论的主要问题。

总而言之,要保证分布式和单进程计算的 SGD 结果一样,我们需要在每个 mini-batch 的计算中等待所有 worker 算完,把结果 gradient 求一个平均值,再更新参数,然后进行下一个 mini-batch 的计算。但是这样会导致很大的等待延迟,特别是如果其中有一个 worker 计算速度或者网速很慢的话,就会直接拖慢其他所有节点。ASGD 则选择直接无视同步从而获得更好的 scalability,并且在实际中通常收敛得也很好(有时候可能需要使用更小的 step size)。假设我么想要实现 ASGD,大致可以写成这个样子:

  1. function asgd_step(w :: Vector{Float64})
  2. grad = compute_grad(w)
  3. return grad
  4. end
  5. for wid in worker_ids
  6. grad_ref = remotecall(asgd_step, wid, w)
  7. # should we do this here?
  8. w -= step_size * fetch(grad_ref)
  9. end

看起来没有什么问题,但是实际上并不能实现我们想要的效果:问题在于第 9 行的 fetch 会 block 等待结果算完并传回当前进程(也就是主进程),所以这个 for 循环会在第一个迭代就 block 住,直到第一个 worker 算完之后才会继续第二个迭代,并且再次 block,最终结果就是每个 worker 依次执行,并没有任何并行可言。

为了解决这个问题,我们不能够调用 fetch 来 block 住主进程的执行,但是我们逻辑上我们又必须要等到每个 worker 算完之后拿到结果,然后再用那个结果来更新参数。如何将异步程序逻辑串起来这几乎是所有异步编程中都会碰到的问题。有两种比较常见的解决方案,一种是使用回调函数,在 Javascript 里这是非常常见的做法,并且有很方便的 then 函数可以把一个一个的回调函数串起来,例如下面这个没有什么意义的例子:

  1. var p = new Promise(function(resolve, reject) {
  2. // a promise that will be fulfilled after
  3. // sleeping for 1 second
  4. setTimeout( () => { resolve(1); }, 1000);
  5. });
  6. // chain callbacks that will be executed
  7. // upon the fulfillment of the promise above
  8. p.then(function(value) {
  9. // heavy computation of gradient
  10. grad = value + 1;
  11. return grad;
  12. }).then(function(grad) {
  13. // update parameter
  14. console.log('grad = ', grad);
  15. });

这里的做法就是把整个程序的逻辑分成一个一个的 block,分别包装在一些函数里,然后通过把一堆回调函数按顺序串起来的方式来实现异步操作。另一种解决办法是用不同的调度线程,这样我们在调用 fetch 进行 block 等待的时候只是 block 了那个 worker 对应的调度线程,而主线程则可以继续运行,大致可以用下面的伪代码来表示:

  1. # pseudo code for thread-based scheduling
  2. threads = []
  3. for wid in worker_ids
  4. thread = new_thread() do
  5. # now we are blocking the worker-scheduler
  6. # thread, instead of the main thread
  7. grad = remotecall_fetch(asgd_step, wid, w)
  8. w -= step_size * grad
  9. end
  10. push!(threads, thread)
  11. end
  12. # now all the jobs are scheduled, block
  13. # the main thread to wait until all scheduler
  14. # threads finish
  15. wait_all(threads)

由于每个 worker 有自己的 scheduler thread,所以我们可以直接调用 remotecall_fetch 来 block 掉它的 thread,但是由于主线程还在继续执行,所以这里并不会导致其他 worker 也被 block 住。使用一个独立的线程对应一个远端的 worker ——其本身可能是一个独立的进程或者甚至是网络上另一台机器——的好处是在每个线程里可以自由地 block 等待,代码风格基本上就变成了简单的同步执行逻辑,而不需要切成一块一块地用回调函数串联起来。

当然,多线程也有它自己的问题,就是线程之间现在有了同步和 racing 等各种问题,比如上面的伪代码中第 7 行更新 w 的那一步,通常就需要通过加锁来保证不会有多个线程同时写入 w 导致严重的数据不一致情况等等。并且 Julia 自己的多线程支持其实还在比较早期的开发阶段。

幸运的是在 Julia 里有更简单有效的解决方案,比较类似于多线程,但是这里的执行单元是一个一个的 coroutine,或者在早期有时会被称为“纤程 (Fiber)”——以区别于“线程 (Thread)”。coroutine 非常像线程,但是不同的是 coroutine 的 context switch 比较轻量级,并且最重要的是所有 coroutine 全部都在当前的一个线程中执行,每一个时刻只会有一个 coroutine 在运行,因此不会像使用线程那样出现 racing 或者需要加锁的情况。由于本质上只有一个执行单元在执行,因此 coroutine 并不像线程那样能起到加速的作用,其主要功能在于提供方便的调度功能,特别是在需要等待外部资源,比如 IO,或者远程的计算节点的时候能够方便地切换调度。

在 Julia 里有很多方式可以创建 coroutine,我们这里使用 @async 宏来方便地在一个 coroutine 里直接一个代码 block。和之前的远程 RPC 一样的,该代码 block 立即返回,其内部的代码实际在什么时候被执行是未知的,当我们通过 @async 把所有的工作 coroutine 都调度好了之后,可以通过一个 @sync 调用来 block 住当前进程,直到所有 coroutine 都执行完毕为止。下面是一个简单的例子:

@sync begin
  for i = 1:3
    @async begin
      sleep(rand(1:5))
      println(i)
    end
  end
end

在 Julia 中执行这段代码,会按照随机顺序打印出 1, 2, 3 这三个数字。这里我们创建了 3 个 coroutine,每个 coroutine 在执行的时候碰到 sleep 函数就会放弃自己的执行权,导致另一个 coroutine 被调度。在 coroutine 内部可以有很多这样的 blocking point,比如是 IO 读写的等待,或者是调用 fetch 来等待 Future 的结果甚至是显式地调用 yield 来交还执行权等等,效果上这有点类似于自动地在这些 blocking point 把代码逻辑切割成了一个一个的小回调函数来执行。

弄清了这个基本概念之后,我们可以写一个如下的基本的工具函数,来将一个任务分配到所有 worker 上执行,并等待所有 worker 执行完毕,拿到最终结果(其实和系统自带的 pmap 很像),只额外再加了一点点异常处理的代码。这段代码可以在我们提供的样例 project 中的 src/worker.jl 找到。

  1. """
  2. invoke_on_workers(f, workers, args...)
  3. Invoke `f` on each workers in parallel, with `f(worker_ref, args...)`.
  4. This function blocks until all workers finishes, and return a list
  5. of results from each worker.
  6. """
  7. function invoke_on_workers(f :: Function, workers, args...)
  8. rets = Array{Any}(length(workers))
  9. @sync begin
  10. for (i, (pid, worker_ref)) in enumerate(workers)
  11. @async begin
  12. rets[i] = remotecall_fetch(f, pid, worker_ref, args...)
  13. end
  14. end
  15. end
  16. # propagate remote exceptions
  17. for obj in rets
  18. if obj isa RemoteException
  19. throw(obj)
  20. end
  21. end
  22. return rets
  23. end

有了这个 building block,我们就可以把分布式算法的基本框架写出来(这里我们用了 Julia 的 “do block” 来方便地构建匿名函数传入 invoke_on_workers 的第一个参数):

  1. # create workers
  2. workers = spawn_workers(...)
  3. # load data
  4. invoke_on_workers(workers) do w_ref
  5. # the code here are executed on remote
  6. # worker node
  7. worker = fetch(w_ref)
  8. worker.dataset = load_data(worker.data_partition)
  9. return true
  10. end
  11. # accumulate data statistics
  12. stats_all = invoke_on_workers(workers) do w_ref
  13. worker = fetch(w_ref)
  14. stats = Dict(:n_tr => num_samples(worker.dataset),
  15. ...)
  16. return stats
  17. end
  18. n_tr_total = sum(stats[:n_tr] for stats in stats_all)
  19. # prepare for running
  20. hp = HyperParameter(step_size=args["step-size"], ...)
  21. invoke_on_workers(workers, hp) do w_ref, hp
  22. worker = fetch(w_ref)
  23. worker.hp = hp
  24. worker.tr_idx = random_tr_smp_order()
  25. ...
  26. return true
  27. end
  28. # start training
  29. for epoch = 1:args["n-epoch"]
  30. # schedule algorithm on workers
  31. ...
  32. end

这里的 worker 对象是我们自己定义的一个 struct,它并不是一个进程或者工作节点,而是我们定义的用于方便存储所有工作节点本地需要维护的状态和临时变量的一个容器。在实际的代码中它长这个样子:

  1. """
  2. A `Worker` object holds the necessary local states for a learner worker:
  3. - dset_tr: the local partition of the training dataset
  4. - dset_tt: the local partition of the test dataset
  5. - hp: hyper parameters specific to each algorithm (e.g. batch size)
  6. - lp: local parameters specific to each algorithm (e.g. algorithms in the
  7. dual might store dual variables locally)
  8. """
  9. mutable struct Worker{HPType, LPType}
  10. dset_tr :: Dataset # training set
  11. dset_tt :: Dataset # test / validation set
  12. hp :: HPType # hyper parameters
  13. lp :: LPType # worker local parameters
  14. Worker{HPType, LPType}() where {HPType, LPType} = new()
  15. end

可以看到为了要让实际的算法跑起来,除了调度计算之外,还有许多准备工作需要做,比如让每个 worker 把自己的数据加载进来,计算一些统计量,方便做一些全局的 normalization 等等,并且中央 scheduler 还需要把一些 hyper parameter,比如 batch size 之类的告诉每个 worker。准备工作做完之后就可以开始计算了。

理论上我们也可以用 invoke_on_workers 这个辅助函数来调度 ASGD 的计算逻辑。但是 ASGD 这个算法在每个 worker 上其实有一个内层循环(关于 mini-batch 的迭代),其中内层循环的每一次迭代都要同中央进程交换信息——获取最新的 w,并返回计算的 gradient。如果你尝试去写这个算法,你会发现似乎 worker 并不能很方便地主动和中央进程通信。到目前为止的通信模式都是中央进程通过 RPC 调用的方式将信息以函数参数发送到 worker 上,然后 worker 通过函数返回值把应答返回给中央进程。但是 worker 并不能在计算的过程中主动请求中央进程传一下目前最新的 w。

有这样的限制的原因是分布式编程逻辑处理起来比较复杂,如果 worker 和 scheduler 都是独立自主的节点,同时有自己的主线逻辑,以及在监听外部节点发过来的请求的话,整个逻辑线就会变得复杂起来。通常我们会把主要逻辑放在某一方,而让另一方以纯监听事件触发任务的被动方式执行。我们这里选择的是将主线逻辑放在 scheduler 这一方,另一种比较常见的做法是将主线逻辑放在 worker 上,而中央进程此时退化成一个 parameter server,它只实现简单的 pull 和 push 事件(或者说 RPC 接口),当 worker 请求 pull 最新参数的时候它发送 w,当 worker 调用 push 回传 gradient 的时候它通过传回的 gradient 来更新 w,其他时间则处于等待状态。然后每个 worker 自己处理自己的逻辑,类似于这样子:

  1. # on worker
  2. data = load_data(my_data_partition)
  3. for epoch in n_epochs
  4. for batch in n_batches
  5. w = pull(param_server)
  6. grad = compute_grad(w)
  7. push(param_server, grad)
  8. end
  9. end

看起来似乎很简单。这种 parameter server 的模式在每个 worker 能够比较独立自主地工作的时候就会比较方便,反过来如果中央 server 需要做一些调度,就会比较麻烦。比如我们希望每个 worker 把数据加载完之后算一下均值和方差,然后把所有 worker 上的统计量合并一下,最后再统一 normalize 一下数据。这个时候每个 worker 就没法很独立地执行,因为必须等所有其他的 worker 都把数据加载完毕,计算完统计量之后才能进入下一步,而这个 sync point 需要 somehow 通过与 parameter server 的通信来实现,就变得稍微有点麻烦。

总之对于调度逻辑比较多的情况下,似乎把主线逻辑放在中央节点会让程序写起来更自然一点。回到刚才的 ASGD 的问题,虽然 worker 节点只能被动地与中央节点通信,但是这并不是非常大的一个问题。一个比较 naive 的解决办法就是把 ASGD 的内层循环写开来,大概像这个样子:

  1. for epoch = 1:n_epoch
  2. for batch = 1:n_batch
  3. grad_all = invoke_on_workers(worker, w) do w_ref, w
  4. worker = fetch(w_ref)
  5. grad = compute_gradient(w)
  6. return grad
  7. end
  8. # now on central node
  9. w += -step_size * mean(grad_all)
  10. end
  11. end

当然这个其实并不是 ASGD,而是 Synchronized SGD,因为我们等待所有 worker 把 gradient 算出来之后求了 average,然后统一加到 w 上。这是由于我们之前写的 invoke_on_workers 这个辅助函数的逻辑导致的,正确的做法可以通过写开来:

  1. function asgd_step(w_ref :: Future, w :: Vector{Float64})
  2. worker = fetch(w_ref)
  3. ...
  4. return compute_gradient(w)
  5. end
  6. for epoch = 1:n_epoch
  7. for batch = 1:n_batch
  8. @sync begin
  9. for (wid, w_ref) in workers
  10. @async begin
  11. grad = remotecall_fetch(asgd_step, wid, w_ref, w)
  12. w += -step_size * grad
  13. end
  14. end
  15. end
  16. end
  17. end

这个看上去似乎是正确的,因为现在我们让每个 worker 的 coroutine 单独去更新 w 了。不过实际上还是没有达到我们想要的效果,因为我们的 @sync block 被放在了 mini-batch 循环的内部,所以每个 mini-batch 结束 worker 都会停下来等其他所有 worker 执行完,还是没有达到 async 的效果。幸运的是 Julia 的 coroutine 和 RPC 的接口非常 flexible,要实现这里想要的逻辑,我们只要把 mini-batch 和 @sync block 交换一下顺序即可。下面是我们示例代码里 ASGD.jl 中的实际 code:

  1. function run_epoch(workers, params :: Parameters)
  2. @sync begin
  3. for (pid, worker_ref) in workers
  4. @async begin
  5. while true
  6. Δw = remotecall_fetch(compute_delta, pid, worker_ref, params.w)
  7. if Δw === nothing
  8. # this worker finishes its epoch
  9. break
  10. end
  11. params.w .+= Δw
  12. end
  13. end
  14. end
  15. end
  16. end
  17. """
  18. compute_delta(worker_ref, w)
  19. To be run on remote workers. Compute the updates for the weights. We have
  20. applied step sizes locally on each worker.
  21. """
  22. function compute_delta(worker_ref :: Future, w :: Vector{Float64})
  23. worker = try_fetch(worker_ref)
  24. idx1 = (worker.lp.i_batch-1) * worker.hp.batch_size + 1
  25. if idx1 > length(worker.lp.tr_idx)
  26. return nothing # end of epoch
  27. end
  28. idx2 = min(worker.lp.i_batch * worker.hp.batch_size,
  29. length(worker.lp.tr_idx))
  30. data_idx = worker.lp.tr_idx[idx1:idx2]
  31. X = worker.dset_tr.data[:, data_idx]
  32. y = worker.dset_tr.labels[data_idx]
  33. pred = X' * w
  34. ∇w = (X * hinge_gradient(pred, y)) / size(X, 2)
  35. ∇w += worker.hp.λ * w # gradient from the regularizer
  36. Δw = -worker.lp.ηₜ * ∇w
  37. worker.lp.i_batch += 1
  38. return Δw
  39. end

这里的 @sync@async 的组合可能一开始有一点 confusing,概念明白之后就会非常清晰。由于 coroutine 的缘故,第 11 行的参数更新不需要任何加锁之类的操作。每个 coroutine 通过一个 while 循环来连续地调度 worker 进行计算,最外层的 @sync block 保证在一个 epoch 结束之后等待所有 worker 停下来。在 ASGD 中这个同步操作其实也可以去掉,不过如果是更加复杂的优化算法,比如如果有 variance reduction 相关的操作之类的,需要在这个时刻停下来做一些 maintenance 操作等等。

总结一下:我们主要使用 Julia 的 RPC 功能来实现多机并行计算,同时利用 coroutine 的功能来实现算法的具体调度。中央调度节点同时作为 scheduler 和 parameter server 的角色存在。我把完整的示例代码放在 DistLearn.jl 里,除了这里提到的 ASGD 算法之外,还提供了一个 COCOA 的实现,这是一个 dual 算法,在每个节点本地需要维护和 training samples 数目同样多的 dual variable。当然这个代码库并不是试图提供一个完整的 Julia 的分布式学习的算法实现,而主要是作为一个如何在 Julia 里实现自己的分布式系统的示例。