iOS 多线程的使用与总结

2019-04-03

Grand Central Dispatch(GCD) 是用来管理并发操作的底层API。

  1. 允许将一个程序切分为多个单一任务,然后提交到工作队列中并发或者串行地执行
  2. 为多核的并行运算提出了解决方案,自动合理的利用CPU内核(比如双核,四核)
  3. 自动的管理线程的生命周期(创建线程、调度任务、销毁线程),完全不需要我们管理,只需要告诉它任务是什么就行
  4. 配合Block,使得使用起来更加方便灵活

相对于线程和锁来说,GCD 提供了一个更加易用的模板,从而避免发生并发问题。

在了解GCD之前,我们需要了解一些跟并发和线程的概念。

Concurrency 并发

iOS应用中包含一个或者多个线程,这些线程由系统的调度器彼此独立的管理。每个线程可以被并发的执行,但是如何实现并发却是由系统决定的。

Concurrency_vs_Parallelism.png

单核设备通过 时间切片(time-slicing) 的方式来实现并发,它们运行在一个线程上,当切换其它线程时就执行上下文切换。这种切换一般进行得非常快,让用户感觉像是同时在运行多个线程一样。

而在多核设备上,可以通过并行的方式同时执行多个线程。

GCD 建立在线程之上,它负责管理共享线程池。使用 GCD,我们可以添加code block或者工作项(work items)来调度队列,GCD 决定执行它们的线程。

所以,虽然你可以使用 GCD 的接口来编写并发代码,但是 GCD 才是真正决定是否使用并行实现的人。并行要求并发,但是并发并不一定能够保证并行。

更深一层地说,并发设计其实是结构的设计。如果你带着 GCD 的思维去编写代码,你就得小心地设计代码的结构,暴露的接口要考虑可以同时执行和不可以同时执行的代码。

根本上来说,并发(concurrency)是关于结构而并行(parallelism)是关于执行。

Queues 队列

GCD 通过DispatchQueue类来操作调度队列。当我们提交工作项单元到队列中,GCD将以FIFO顺序执行它们,它会保证第一个任务是第一个启动的。

调度队列本身是线程安全的,因此你可以同时从多个线程访问它们。当了解调度队列如何为代码提供线程安全时,你就可以认识到GCD的好处是显而易见的。关键是要选择正确类型的调度队列和正确的调度函数,将你的工作提交到队列。

队列可以是并行(serial)或者串行(concurrent)的。
DispatchQueue的默认初始化方法创建的是一个同步队列,如果要创建并行队列,需要在attributes中声明concurrent。第一个参数表示队列的标签,这个标签在调试器中是可见的。

1
2
3
4
5
// 同步队列
let serialQueue = DispatchQueue(label: "queuename")

// 并发队列
let concurrentQueue = DispatchQueue(label: "queuename", attributes: .concurrent)

串行可以保证在任何给定时间只运行一个任务,由 GCD 来控制执行的时间点,你也无法知道一个任务结束和下一个任务开始之间的时间量。

Serial-Queue-Swift

而并发队列允许多个任务同时运行。队列保证它们以添加的顺序来执行,但是它们会以任何可能的顺序完成。

Concurrent-Queue-Swift

何时开始任务完全取决于 GCD。如果一个任务的执行时间与另一个任务的执行时间重叠,则由 GCD 决定是否应该在不同的核心上运行,如果是单核设备,则执行上下文切换以运行不同的任务。

下面通过代码来看下串行和并发的区别:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
let serialDispatchQueue = DispatchQueue(label: "custom_queue")

serialDispatchQueue.async {
print("serialDispatchQueue async 1 is begin.")
sleep(3)
print("serialDispatchQueue async 1 is end")
}

serialDispatchQueue.async {
print("serialDispatchQueue async 2 is begin.")
sleep(3)
print("serialDispatchQueue async 2 is end")
}

serialDispatchQueue.async {
print("serialDispatchQueue async 3 is begin.")
sleep(3)
print("serialDispatchQueue async 3 is end")
}
print("🤠Welcome to the main thread from serialDispatchQueue")

// output:
// serialDispatchQueue async 1 is begin.
// 🤠Welcome to the main thread from serialDispatchQueue
// serialDispatchQueue async 1 is end
// serialDispatchQueue async 2 is begin.
// serialDispatchQueue async 2 is end
// serialDispatchQueue async 3 is begin.
// serialDispatchQueue async 3 is end

关于并发:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
let concurrentDispatchQueue = DispatchQueue(label: "custom_queue", attributes: .concurrent)

concurrentDispatchQueue.async {
print("concurrentDispatchQueue async 1 is begin.")
sleep(3)
print("concurrentDispatchQueue async 1 is over.")
}

concurrentDispatchQueue.async {
print("concurrentDispatchQueue async 2 is begin.")
sleep(3)
print("concurrentDispatchQueue async 2 is over.")
}

print("🤠Welcome to the main thread from concurrentDispatchQueue")

// output:
// concurrentDispatchQueue async 1 is begin.
// concurrentDispatchQueue async 2 is begin.
// concurrentDispatchQueue async 3 is begin.
// 🤠Welcome to the main thread from concurrentDispatchQueue
// concurrentDispatchQueue async 1 is over.
// concurrentDispatchQueue async 2 is over.
// concurrentDispatchQueue async 3 is over.

Synchronous vs. Asynchronous 同步与异步

使用GCD,我们可以同步或异步分配任务。

任务完成后,同步函数会将控制权返回给调用者。 您可以通过调用DispatchQueue.sync(execute :)来同步调度工作单元。

异步函数立即返回,命令任务开始但不等待它完成。 因此,异步函数不会阻塞当前执行线程继续执行下一个函数。 您可以通过调用DispatchQueue.async(execute :)来异步调度工作单元。

GCD 提供了三种主要类型的队列:

  • Main queue 主队列,在主线程上运行,是一个串行队列
  • Global queues 全局队列,整个系统共享的并发队列。全局队列有四种不同的优先级:high,default,lowbackgroundbackground优先级最低,在任何I/O活动中受到限制,以最大限度地减少对系统地负面影响。
  • Custom queue 自定义队列,由开发创建的队列,可以是串行或者并发的队列。而在这个队列上的请求最终会在全局队列中的一个上执行。

而将任务发送到全局并发队列时,不直接指定优先级,而是指定服务质量(QoS)属性。QoS用来标志任务的重要性,并指导GCD赋予任务优先级。

gcd_using_a_quality_of_service

1
2
3
4
5
6
queue.async(qos: .background) {
print("Maintenance work")
}
queue.async(qos: .userInitiated) {
print(“Button tapped”)
}

下面介绍一下在创建队列时,可以设置的一些更丰富的属性。创建队列的完整方法如下:

1
convenience init(label: String, qos: DispatchQoS = default, attributes: DispatchQueue.Attributes = default, autoreleaseFrequency: DispatchQueue.AutoreleaseFrequency = default, target: DispatchQueue? = default)

label 表示队列的标识符,在debug的时候可以通过label来分辨。

QoS

队列在执行上是有优先级的,更高的优先级可以享受更多的计算资源,从高到低包括:

  • userInteractive 用户交互,表示需要立即完成的任务,用以提供良好的用户体验。将其用于UI更新,事件处理和需要低延迟地小型工作负载。在执行您的应用程序期间,此类中完成的工作总量应该很小。这应该在主线程上运行。
  • userInitiated 用户活动 用户从UI启动这些异步任务。当用户等待立即结果以及继续用户交互所需地任务时使用它们。它们在高优先级全局队列中执行。
  • utility 表示需要长时间运行的任务,伴有用户可见进度指示器。经常会用来做计算,I/O,网络,持续的数据填充等任务。这个任务节能。
  • background 这表示用户不需要直接了解的任务。用于预取,维护和其他不需要用户交互且时间不敏感的任务。这将被映射到后台优先级全局队列。

Attributes

包含两个属性:

  • concurrent 标识队列为并发队列
  • initiallyInactive 标识队列中的任务需要手动触发(为添加该标识时,向队列中添加任务会自动运行),触发时通过queue.activate()方法

AutoreleaseFrequency

这个属性表示 autorelease pool的自动释放频率,autorelease pool 管理着任务对象的内存周期。

包含三个属性:

  • inherit 继承目标队列的该属性
  • workItem 跟随每个任务的执行周期进行自动创建和释放
  • never 不会自动创建 autorelease pool,需要手动管理

一般任务采用 .workItem 就够了,特殊任务如在任务内部大量重复创建对象的操作可选择 .never属性手动创建 autorelease pool。

Target

这个属性设置的是一个队列的目标队列,即实际将该队列的任务放入指定队列中运行。目标队列最终约束了队列优先级等属性。

在程序中手动创建的队列,其实最后都指向系统自带的 主队列 或 全局并发队列。

你也许会问,为什么不直接将任务添加至系统队列中,而是自定义队列,因为这样的好处是可以将任务进行分组管理。如单独阻塞队列中的任务,而不是阻塞系统队列中的全部任务。如果阻塞了目标队列,所有指向它的原队列也将被阻塞。

在 Swift 3 及之后,对目标队列的设置进行了约束,只有两种情况可以显式地设置目标队列(原因参考):

初始化方法中,指定目标队列。
初始化方法中,attributes 设定为 initiallyInactive,然后在队列执行 activate() 之前可以指定目标队列。
在其他地方都不能再改变目标队列。

推迟时间后执行

有时候你并不需要立即将任务加入队列中运行,而是需要等待一段时间后再进入队列中,这时候可以使用 asyncAfter 方法。
可以用静态方法now获得当前时间,然后再通过加上一个DispatchTimeInterval枚举来获得一个需要延迟的时间。

1
2
3
4
let delay = DispatchTime.now() + DispatchTimeInterval.seconds(60)
DispatchQueue.main.asyncAfter(deadline: delay) {
// 延迟执行
}

这里也可以直接加上一个秒数。

1
let three = DispatchTime.now() + 3.0

因为DispatchTime中自定义了+号。

1
public func +(time: DispatchTime, seconds: Double) -> DispatchTime

同步锁

如果一段代码所在的进程中有多个线程在同时运行,那么这些线程就有可能会同时运行这段代码。假如多个线程每次运行结果和单线程运行的结果是一样的,而且其他的变量的值也和预期的是一样的,就是线程安全的。
由于可读写的全局变量及静态变量(在 Objective-C 中还包括属性和实例变量)可以在不同线程修改,所以这两者也通常是引起线程安全问题的所在。

注意,pthread_mutex_t,pthread_rwlock_t 和 OSSpinLock 是值类型,而不是引用类型。这意味着如果你用 = 进行赋值操作,实际上会复制一个副本。这会造成严重的后果,因为这些类型无法复制!如果你不小心复制了它们中的任意一个,这个副本无法使用,如果使用可能会直接崩溃。这些类型的 pthread 函数会假定它们的内存地址与初始化时一样,因此如果将它们移动到其他地方就可能会出问题。OSSpinLock 不会崩溃,但复制操作会生成一个完全独立的锁,这不是你想要的。

如果使用这些类型,就必须注意不要去复制它们,无论是显式的使用 = 操作符还是隐式地操作。
例如,将它们嵌入到结构中或在闭包中捕获它们。

另外,由于锁本质上是可变对象,需要用 var 来声明它们。

其他锁都是是引用类型,它们可以随意传递,并且可以用 let 声明。

在 Swift 中可以使用 Darwin 中传统的锁的方式。
但是 pthread 类型很难在 swift 中使用。它们在被定义为不透明的结构体中包含了一堆存储变量。所以,很不建议在 Swift 中使用这一类型的锁。

gcd_traditional_c_locks

如果想要使用传统类型的锁,我们可以使用Foundation.Lock, 因为不同于传统的基于C语言锁的结构,它是一个类,因此不会产生之前提到的那个问题。

gcd_correct_use_of_traditional_locks

同时我们推荐使用调度队列来进行同步

gcd_use_gcd_for_synchronization

如何使用调度队列来实现同步:

gcd_use_explicit_synchronization

同时,可以使用预设条件来确保代码运行在特定的队列中运行。

gcd_preconditions

DispatchGroup

当有多个任务需要处理时,你可以异步的一个个处理这些任务, 你还可以将它们组合在一起然后等待这个组合工作的结束。

DispatchGroup相当于一系列任务的松散集合,它可以来自相同或者不同队列,扮演者组织者的角色。它可以通知外部队列,组内的任务是否都已完成。或者阻塞当前的线程,直到组内的任务都完成。所有适合组内执行的任务都可以使用任务组,且任务组更适合集合异步任务(如果都是同步任务,直接使用串行队列即可)。

gcd_chaining_vs_grouping_work

通过创建的DispatchGroup对象,当你向调度器提交工作的时候,可以向异步调用增加这个group作为一个可选的参数。你可以向这个group添加更多的工作以及在不同的队列中使用同一个group将它们联系起来。

每次向group提交工作,它都会增加需要完成的项目的数量。最后当你提交了所有的工作,可以让group在所有工作被完成时在指定的队列上通知你。

gcd_grouping_work_together

有两种方式加入任务组:

  • 添加任务时指定任务组
1
2
3
4
5
let group = DispatchGroup()
let queue = DispatchQueue.global()
queue.async(group: group) {
// do something
}
  • 使用 group.enter(), group.leave() 配对方法,标识任务加入任务组。
    1
    2
    3
    4
    5
    6
    let group = DispatchGroup()
    group.enter()
    queue.async() {
    // do something
    group.leave()
    }

两种加入方式在对任务处理的特性上是没有区别的,只是便利之处不同。如果任务所在的队列是自己创建或系统队列,那么直接使用第一种方式直接加入即可。如果任务是由系统或第三方的API创建的,由于无法获取对应的队列,只能使用第二种方式将任务加入组内,例如将 URLSession 的 addDataTask 方法加入任务组中。

1
2
3
4
5
6
7
8
9
10
11
extension URLSession {
func addDataTask(to group: DispatchGroup,
with request: URLRequest,
completionHandler: @escaping (Data?, URLResponse?, Error?) -> Void) -> URLSessionDataTask {
group.enter() // 进入任务组
return dataTask(with: request) { (data, response, error) in
completionHandler(data, response, error)
group.leave() // 离开任务组
}
}
}

我们也可以等待任务组中的任务全部完成后,可以统一对外发送通知,有两种方式:

  • group.notify() 方法,它可以在所有任务完成后通知指定队列并执行一个指定任务,这个通知的操作是异步的(意味着通知后续的代码不需要等待任务,可以继续执行):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    let group = DispatchGroup()

    let queueBook = DispatchQueue(label: "book")
    queueBook.async(group: group) {
    // do something 1
    }
    let queueVideo = DispatchQueue(label: "video")
    queueVideo.async(group: group) {
    // do something 2
    }

    group.notify(queue: DispatchQueue.main) {
    print("all task done")
    }

    print("do something else.")

    // 执行结果
    // do something else.
    // do something 1(任务 1、2 完成顺序不固定)
    // do something 2
    // all task done
  • group.wait() 方法, 它会在所有任务完成后再执行当前线程中后续的代码,因此这个操作是有阻塞作用。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    let group = DispatchGroup()

    let queueBook = DispatchQueue(label: "book")
    queueBook.async(group: group) {
    // do something 1
    }
    let queueVideo = DispatchQueue(label: "video")
    queueVideo.async(group: group) {
    // do something 2
    }

    group.wait()

    print("do something else.")

    // 执行结果
    // do something 1(任务 1、2 完成顺序不固定)
    // do something 2
    // do something else.

wait 方法中还可以指定具体的时间,它表示将等待不超过这个时间,如果任务组在指定时间之内完成则立即恢复当前线程,否则将等到时间结束时再恢复当前线程。

  • 方式1,使用 DispatchTime,它表示一个时间间隔,精确到纳秒(1/1000,000,000 秒):

    1
    2
    let waitTime = DispatchTime.now() + 2.0 // 表示从当前时间开始后 2 秒,数字字面量也可以改为使用 TimeInterval 类型变量
    group.wait(timeout: waitTime)
  • 方式2,使用 DispatchWallTime,它表示当前的绝对时间戳,精确到微秒(1/1000,000 秒),通常使用字面量即可设置延时时间,也可以使用 timespec 结构体来设置一个精确的时间戳,具体参见附录章节的《时间相关的结构体说明 - DispatchWallTime》:

    1
    2
    // 使用字面量设置
    var wallTime = DispatchWallTime.now() + 2.0 // 表示从当前时间开始后 2 秒,数字字面量也可以改为使用 TimeInterval 类型变量

Semaphore

DispatchSemaphore 是传统计数信号量的封装,用来控制资源被多任务访问的情况。
简单来说就是控制访问资源的数量,比如系统有两个资源可以被利用,同时有三个线程要访问,只能允许两个线程访问,第三个应当等待资源被释放后再访问。例如,控制同一时间写文件的任务数量、控制端口访问数量、控制下载任务数量等。

信号量的使用非常的简单:

  • 首先创建一个初始数量的信号对象
  • 使用 wait 方法让信号量减 1,再安排任务。如果此时信号量仍大于或等于 0,则任务可执行,如果信号量小于 0,则任务需要等待其他地方释放信号。
  • 任务完成后,使用 signal 方法增加一个信号量。
  • 等待信号有两种方式:永久等待、可超时的等待。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
let queue = DispatchQueue(
label: "com.sinkingsoul.DispatchQueueTest.concurrentQueue",
attributes: .concurrent)
let semaphore = DispatchSemaphore(value: 2) // 设置数量为 2 的信号量

semaphore.wait()
queue.async {
print("Task 1 start")
sleep(2)
print("Task 1 finish")
semaphore.signal()
}

semaphore.wait()
queue.async {
print("Task 2 start")
sleep(2)
print("Task 2 finish")
semaphore.signal()
}

semaphore.wait()
queue.async {
print("Task 3 start")
sleep(2)
print("Task 3 finish")
semaphore.signal()
}

// 运行结果:
// Task 1 start
// Task 2 start
// Task 1 finish
// Task 2 finish
// Task 3 start
// Task 3 finish

运行结果中可以看到任务 3 在前两个任务完成后才开始运行。

DispatchWorkItem

DispatchQueue执行操作除了直接传递一个闭包外,还可以传入一个DispatchWorkItem。 DispatchWorkItem 的初始化方法可以配置 QoS 和 DispatchWorkItemFlags, 但是这两个参数都有默认参数,所以也可以只传入一个闭包。

DispatchWorkItemFlags 枚举中 assignCurrentContext 表示 QoS 根据创建时的 context 决定,这个 workItem 会从负责执行该任务的调度队列或者线程继承其QoS设置。

gcd_dispatch_work_item

另外,DispatchWorkItem 也有 wait 方法,调用会等待这个 workItem 执行完。

gcd_waiting_for_work_items

DispatchWorkItemFlags 枚举中还有其他选项,如 barrier, detached, enforceQos, inheritQos, noQos。

.barrier: 如果DispatchWorkItem被提交到.concurrent并发队列,那么这个DispatchWorkItem中的操作会具有独占性(防止此DispatchWorkItem中的block内的操作与其他操作同时执行)。执行该任务时,它会先等待队列中已有的任务全部执行完成,然后它再执行,在它之后加入的任务也必须等栅栏任务执行完成后才能执行。

gcd_dispatch_work_item_flags_barrier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
let queue = DispatchQueue(label: "BarrierWorkItem", attributes: .concurrent)
queue.async {
sleep(4)
print("async task 1 finish.")
}
let task = DispatchWorkItem(flags: .barrier) {
sleep(2)
print("barrier task finish.")
}
queue.async(execute: task)
queue.async {
sleep(1)
print("async task 2 finish.")
}

//async task 1 finish.
//barrier task finish.
//async task 2 finish.
  • .detached 表明DispatchWorkItem会无视当前执行上下文的参数。
  • .noQos 不指定QoS,由调用线程或队列来指定。
  • .inheritQos 表明DispatchWorkItem会采用队列的QoS class,而不是当前的。
  • .enforceQos 表明DispatchWorkItem会采用当前的QoS class,而不是队列的。

DispatchSource

Dispatch Source是GCD中的一个基本类型,从字面意思可称为调度源,它的作用是当有一些特定的较底层的系统事件发生时,调度源会捕捉到这些事件,然后可以做其他的逻辑处理,调度源有多种类型,分别监听对应类型的系统事件。

可监听的对象的具体类型:

  • Timer Dispatch Source:定时调度源。
  • Signal Dispatch Source:监听UNIX信号调度源,比如监听代表挂起指令的SIGSTOP信号。
  • Descriptor Dispatch Source:监听文件相关操作和Socket相关操作的调度源。
  • Process Dispatch Source:监听进程相关状态的调度源。
  • Mach port Dispatch Source:监听Mach相关事件的调度源。
  • Custom Dispatch Source:监听自定义事件的调度源。

下面以文件监听为例看下使用方法,下面例子中监听了一个指定目录下文件的写入事件,创建监听主要有几个步骤:

  • 通过 makeFileSystemObjectSource 方法创建 source
  • 通过 setEventHandler 设定事件处理程序,setCancelHandler 设定取消监听的处理。
  • 执行 resume() 方法开始接收事件
1
2
3
4
5
6
7
8
9
10
11
let queue = DispatchQueue.global()
let filePath = "..."
let fileURL = URL(fileURLWithPath: filePath)
let fd = open(fileURL.path, O_EVTONLY)

let source = DispatchSource.makeFileSystemObjectSource(fileDescriptor: fd, eventMask: .write, queue: queue)
source.setEventHandler(handler: closure)
source.setCancelHandler {
close(fd)
}
source.resume()

gcd_dispatch_source_cancellation

DispatchSourceTimer 的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func printTime(withComment comment: String){
let date = Date()
let formatter = DateFormatter()

formatter.dateFormat = "yyyy-MM-dd HH:mm:ss"

print(comment + ": " + formatter.string(from: date))
}

let timer = DispatchSource.makeTimerSource()
timer.schedule(deadline: .now() + .seconds(10),
repeating: .seconds(5),
leeway: .seconds(5))
timer.setEventHandler {
printTime(withComment: "hello world")
}
timer.activate()
printTime(withComment: "3")

// 运行结果:
// 3: 2019-04-25 16:43:53
// hello world: 2019-04-25 16:44:03
// hello world: 2019-04-25 16:44:08
// hello world: 2019-04-25 16:44:13
// hello world: 2019-04-25 16:44:18
// hello world: 2019-04-25 16:44:23

表明DispatchWorkItem会采用当前的QoS

DispatchIO 对象提供一个操作文件描述符的通道。简单讲就是你可以利用多线程异步高效的读写文件。

发起读写操作一般步奏如下:

  • 创建 DispatchIO 对象,或者说创建一个通道,并设置结束处理闭包。
  • 调用 read/write 方法。
  • 调用 close 方法关闭通道。
  • 在 clone 方法后系统将自动调用结束处理闭包。

初始化方法

一般使用两种方式初始化: 文件描述符,或者文件路径

文件描述符方式

文件描述符使用 open 方法创建: open(_ path: UnsafePointer<CChar>, _ oflag: Int32, _ mode: mode_t) -> Int32, 第一个参数是 UnsafePointer 类型的路径,oflag 、mode 指文件的操作权限,一个是系统 API 级的,一个是文件系统级的,可选项如下:

oflag:

Flag 备注 功能
O_RDONLY 以只读方式打开文件 此三种读写类型只能有一种
O_WRONLY 以只写方式打开文件 此三种读写类型只能有一种
O_RDWR 以读和写的方式打开文件 此三种读写类型只能有一种
O_CREAT 打开文件,如果文件不存在则创建文件 创建文件时会使用Mode参数与Umask配合设置文件权限
O_EXCL 如果已经置O_CREAT且文件存在,则强制open()失败 可以用来检测多个进程之间创建文件的原子操作
O_TRUNC 将文件的长度截为0 无论打开方式是RD,WR,RDWR,只要打开就会把文件清空
O_APPEND 强制write()从文件尾开始不care当前文件偏移量所处位置,只会在文件末尾开始添加 如果不使用的话,只会在文件偏移量处开始覆盖原有内容写文件

mode:包含 User、Group、Other 三个组对应的权限掩码。

User Group Other 说明
S_IRWXU S_IRWXG S_IRWXO 可读、可写、可执行
S_IRUSR S_IRGRP S_IROTH 可读
S_IWUSR S_IWGR S_IWOTH 可写
S_IXUSR S_IXGRP S_IXOTH 可执行

创建的通道有两种类型:

连续数据流:DispatchIO.StreamType.stream,这个方式是对文件从头到尾完整操作的。
随机片段数据:DispatchIO.StreamType.random,这个方式是在文件的任意一个位置(偏移量)开始操作的。

1
2
3
4
5
6
7
let filePath: NSString = "test.zip"
// 创建一个可读写的文件描述符
let fileDescriptor = open(filePath.utf8String!, (O_RDWR | O_CREAT | O_APPEND), (S_IRWXU | S_IRWXG))
let queue = DispatchQueue(label: "com.sinkingsoul.DispatchQueueTest.serialQueue")
let cleanupHandler: (Int32) -> Void = { errorNumber in
}
let io = DispatchIO(type: .stream, fileDescriptor: fileDescriptor, queue: queue, cleanupHandler: cleanupHandler)

文件路径方式

1
let io = DispatchIO(type: .stream, path: filePath.utf8String!, oflag: (O_RDWR | O_CREAT | O_APPEND), mode: (S_IRWXU | S_IRWXG), queue: queue, cleanupHandler: cleanupHandler)

数据块大小阀值
DispatchIO 支持多线程操作的原因之一就是它将文件拆分为数据块进行并行操作,你可以设置数据块大小的上下限,系统会采取合适的大小,使用这两个方法即可:setLimit(highWater: Int)、setLimit(lowWater: Int),单位是 byte。

1
io.setLimit(highWater: 1024*1024)

数据块如果设置小一点(如 1M),则可以节省 App 的内存,如果内存足够则可以大一点换取更快速度。在进行读写操作时,有一个性能问题需要注意,如果同时读写的话一般分两个通道,且读到一个数据块就立即写到另一个数据块中,那么写通道的数据块上限不要小于读通道的,否则会造成内存大量积压无法及时释放。

读操作
方法示例:

1
2
3
4
5
6
7
8
9
10
11
12
ioRead.read(offset: 0, length: Int.max, queue: ioReadQueue) { doneReading, data, error in
if (error > 0) {
print("读取发生错误了,错误码:\(error)")
return
}
if (data != nil) {
// 使用数据
}
if (doneReading) {
ioRead.close()
}
}

offset 指定读取的偏移量,如果通道是 stream 类型,值不起作用,写为 0 即可,将从文件开头读起;如果是 random 类型,则指相对于创建通道时文件的起始位置的偏移量。

length 指定读取的长度,如果是读取文件全部内容,设置 Int.max 即可,否则设置一个小于文件大小的值(单位是 byte)。

每读取到一个数据块都会调用你设置的处理闭包,系统会提供三个入参给你:结束标志、本次读取到的数据块、错误码:

在所有数据读取完成后,会额外再调用一个闭包,通过结束标志告诉你操作结束了,此时 data 大小是 0,错误码也是 0。
如果读取中间发生了错误,则会停止读取,结束标志会被设置为 true,并返回相应的错误码,错误码表参考稍后的【关闭通道】小节:
写操作
方法示例:

1
2
3
4
5
6
7
8
9
10
ioWrite.write(offset: 0, data: data!, queue: ioWriteQueue) { doneWriting, data, error in
if (error > 0) {
print("写入发生错误了,错误码:\(error)")
return
}
if doneWriting {
//...
ioWrite.close()
}
}

写操作与读操作的唯一区别是:每当写完一个数据块时,回调闭包返回的 data 是剩余的全部数据。同时注意如果是 stream 类型,将接着文件的末尾写数据。

关闭通道
当读写正常完成,或者你需要中途结束操作时,需要调用 close 方法,这个方法带一个 DispatchIO.CloseFlags 类型参数,如果不指定将默认值为 DispatchIO.CloseFlags.stop

这个方法传入 stop 标志时将会停止所有未完成的读写操作,影响范围是所有 I/O channel,其他 DispatchIO 对象进行中的读写操作将会收到一个 ECANCELED 错误码,rawValue 值是 89,这个错误码是 POSIXError 结构的一个属性,而 POSIXError 又是 NSError 中预定义的一个错误域。

因此如果要在不同 DispatchIO 对象中并行读取操作互不影响, close 方法标志可以设置一个空值:DispatchIO.CloseFlags()。如果设置了 stop 标志,则要做好不同 IO 之间的隔离,通过任务组的enter、leave、wait 方法可以做到较好的隔离。

1
2
ioWrite.close() // 停止标志
ioWrite.close(flags: DispatchIO.CloseFlags()) // 空标志