Java IO 1

InputStream源码(2)

类实现关系

img

抽象类解读

b283d7953048cced732f0b45b346fa23

1
2
// JDK9新增:读取 InputStream 中的全部字节并写入到指定的 OutputStream 中
public long transferTo(OutputStream out)

源码实现

1.inputStream

梳理部分InputStream及其实现类的源码分析。

源码解读-jdk8

https://gitee.com/laomaodu/myinput-stream/blob/master/src/main/java/MyInputStream.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Objects;

public abstract class MyInputStream implements Closeable {
// MAX_SKIP_BUFFER_SIZE 用于确定跳过时使用的最大缓冲区大小
private static final int MAX_SKIP_BUFFER_SIZE = 2048;
// 默认的buffer size
private static final int DEFAULT_BUFFER_SIZE = 8192;
// JDK11中增加了一个nullInputStream,即空模式实现,以便可以直接调用而不用判空(可以看如下的补充说明)
/**
* 从输入流中读取下一个字节的数据。值作为范围为 0 到 255 的 int 返回。
* 如果由于到达流的末尾而没有可用的字节,则返回 -1。
* 此方法将阻塞,直到有输入数据可用、检测到流的末尾或引发异常。
*
* <p> 子类必须提供此方法的实现。
*
* @return 返回下一个字节数据,如果到达流末尾则返回 -1。
* @exception IOException 如果发生 I/O 错误。
* 读取下一个字符数据,没有返回-1
*/
public abstract int read() throws IOException;
/**
* 从输入流中读取若干字节并将其存储到缓冲区数组 b 中。
* 返回读取的字节数。如果 b 的长度为 0,则不读取任何字节并返回 0。
* 如果没有可用的字节,因为流到达文件的末尾,则返回 -1。
* 否则,将至少读取一个字节并存储到 b 中。
*将读取到的数据放在 byte 数组中,该方法实际上调用read(byte b[], int off, int len)方法
* @param b 将数据读取到其中的缓冲区。
* @return 返回读取到缓冲区的总字节数,如果到达流末尾则返回 -1。
* @exception IOException 如果由于任何原因第一个字节无法读取,流已关闭,或发生其他 I/O 错误。
* @exception NullPointerException 如果 b 为 null。
*/
public int read(byte b[]) throws IOException {
return read(b, 0, b.length);
}
/**
* 从输入流中读取最多 len 个字节的数据到字节数组中。
* 此方法将阻塞,直到有输入数据可用、检测到文件结束或引发异常。
*
* @param b 将数据读取到其中的缓冲区。
* @param off 在数组 b 中开始写入数据的偏移量。
* @param len 要读取的最大字节数。
* @return 返回读取到缓冲区的总字节数,如果到达流末尾则返回 -1。
* @exception IOException 如果由于任何原因第一个字节无法读取,流已关闭,或发生其他 I/O 错误。
* @exception NullPointerException 如果 b 为 null。
* @exception IndexOutOfBoundsException 如果 off 或 len 为负数,或 len 大于 b.length - off。
*/
public int read(byte b[], int off, int len) throws IOException {
// 检查边界
//Objects.checkFromIndexSize(off, len, b.length);
//读取最大字节数为0返回
if (len == 0) {
return 0;
}

// 读取下一个字节
int c = read();//
/*
这里的设计可以提前判断流状态,提前返回,提高效率
*/
if (c == -1) { // 读到stream末尾,则返回读取的字节数量为-1
return -1;
}
///off 作为偏移量参数,指定了数据在数组 b 中的起始存储位置
b[off] = (byte)c;// 将读取到的字节转换为byte类型,存入指定的数组位置b[off]

// i用来记录取了多少个字节
int i = 1;
try {
// 循环读取
for (; i < len ; i++) {
c = read();
if (c == -1) {// 读到stream末尾,则break
break;
}
b[off + i] = (byte)c;
}
} catch (IOException ee) {
}
// 返回读取到的字节个数
return i;
}
/**
* 跳过并丢弃此输入流中的 n 个字节。
* 该方法可能跳过更少的字节(甚至为 0),负数返回 0。返回实际跳过的字节数。
*
* @param n 要跳过的字节数。
* @return 实际跳过的字节数。
* @exception IOException 如果流不支持跳过操作,或发生其他 I/O 错误。
*/
public long skip(long n) throws IOException {
// 剩余要跳过的字节数
long remaining = n;

int nr;
// 如果请求跳过的字节数小于等于0,则直接返回0
if (n <= 0) {
return 0;
}
// 设置缓冲区大小,取最小值
// 使得最大不超过MAX_SKIP_BUFFER_SIZE,避免过大消耗内存
int size = (int)Math.min(MAX_SKIP_BUFFER_SIZE, remaining);
// 用于跳过字节的缓冲区
byte[] skipBuffer = new byte[size];
// 循环读取,直到跳过指定字节数或到达流末尾
while (remaining > 0) {
// 从流中读取最多size或remaining字节,读入skipBuffer,偏移量从0开始
nr = read(skipBuffer, 0, (int)Math.min(size, remaining));
// 如果读取结果为负,表示流已结束,跳出循环
if (nr < 0) {
break;
}
// 更新剩余字节数
remaining = remaining - nr;
}

// 返回实际跳过的字节数(即总字节数n减去剩余字节数remaining)
/*
假设 n = 15,MAX_SKIP_BUFFER_SIZE = 8,并且输入流中共有 10 个字节可供读取。

第一次循环:remaining = 15,缓冲区大小 size = 8。调用 read(skipBuffer, 0, 8),假设读取 8 个字节,remaining 变为 15 - 8 = 7。
第二次循环:remaining = 7,缓冲区大小 size = 7。调用 read(skipBuffer, 0, 7),假设读取了 2 个字节(流中只剩 2 个字节),remaining 变为 7 - 2 = 5。
第三次循环:remaining = 5,调用 read(skipBuffer, 0, 5),此时返回 -1(流末尾),循环终止。

也就是read读到了文件末尾了,你remaining都没读到n的大小-就有了实际跳过数
*/
return n - remaining;
}
/**
* 返回在不阻塞的情况下可以从此输入流中读取的字节数的估计值。
*
* @return 可读取(或跳过)字节数的估计值,当到达输入流末尾时为 0。
* @exception IOException 如果发生 I/O 错误。
*/
public int available() throws IOException {
return 0;
}
/**
* 关闭此输入流并释放与流关联的任何系统资源。
*
* @exception IOException 如果发生 I/O 错误。
*/
public void close() throws IOException {}
/**
* 标记输入流中的当前位置。后续调用 reset 方法将重新定位此流。
* mark 方法不进行任何操作。
*
* @param readlimit 在标记位置失效之前可以读取的最大字节数。
*/
public synchronized void mark(int readlimit) {}

/**
* 将流重新定位到最后一次调用 mark 方法时的位置。
* 此方法对 InputStream 类无效,仅抛出 IOException。
* 标记读取位置,下次还可以从这里开始读取,使用前要看当前流是否支持,可以使用 markSupport() 方法判断
* @exception IOException 如果未标记或标记已失效。
*/

// 重置读取位置为上次 mark 标记的位置
public synchronized void reset() throws IOException {
throw new IOException("mark/reset not supported");
}
// 判断当前流是否支持标记流,和上面两个方法配套使用。默认是false,由子类方法重写
/**
* 测试此输入流是否支持 mark 和 reset 方法。
*
* @return 如果此流实例支持 mark 和 reset 方法则返回 true;否则返回 false。
*/
public boolean markSupported() {
return false;
}




}

2.jdk9更新

1.具体更新

InputStream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// JDK9新增:读取 InputStream 中的所有剩余字节,调用readNBytes(Integer.MAX_VALUE)方法
public byte[] readAllBytes()
// JDK11更新:读取 InputStream 中的剩余字节的指定上限大小的字节内容;此方法会一直阻塞,直到读取了请求的字节数、检测到流结束或引发异常为止。此方法不会关闭输入流。
public byte[] readNBytes(int len)

// JDK9新增:从输入流读取请求的字节数并保存在byte数组中; 此方法会一直阻塞,直到读取了请求的字节数、检测到流结束或引发异常为止。此方法不会关闭输入流。
public int readNBytes(byte[] b, int off, int len)

// JDK9新增:读取 InputStream 中的全部字节并写入到指定的 OutputStream 中
public long transferTo(OutputStream out)


/**
* A constant holding the maximum value an {@code int} can
* have, 2<sup>31</sup>-1.
*/
@Native public static final int MAX_VALUE = 0x7fffffff;

2.空模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// JDK11中增加了一个nullInputStream,即空模式实现,以便可以直接调用而不用判空(可以看如下的补充说明)
public static InputStream nullInputStream() {
return new InputStream() {
private volatile boolean closed;

private void ensureOpen() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
}

@Override
public int available () throws IOException {
ensureOpen();
return 0;
}

@Override
public int read() throws IOException {
ensureOpen();
return -1;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
Objects.checkFromIndexSize(off, len, b.length);
if (len == 0) {
return 0;
}
ensureOpen();
return -1;
}

@Override
public byte[] readAllBytes() throws IOException {
ensureOpen();
return new byte[0];
}

@Override
public int readNBytes(byte[] b, int off, int len)
throws IOException {
Objects.checkFromIndexSize(off, len, b.length);
ensureOpen();
return 0;
}

@Override
public byte[] readNBytes(int len) throws IOException {
if (len < 0) {
throw new IllegalArgumentException("len < 0");
}
ensureOpen();
return new byte[0];
}

@Override
public long skip(long n) throws IOException {
ensureOpen();
return 0L;
}

@Override
public long transferTo(OutputStream out) throws IOException {
Objects.requireNonNull(out);
ensureOpen();
return 0L;
}

@Override
public void close() throws IOException {
closed = true;
}
};
}

3.读取所有字节
1
2
3
4
// 分配的最大数组大小。
// 由于一些VM在数组中保留一些头字,所以尝试分配较大的阵列可能会导致OutOfMemoryError(请求的阵列大小超过VM限制)
private static final int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;

1
2
3
4
5
// JDK9新增:读取 InputStream 中的所有剩余字节,调用readNBytes(Integer.MAX_VALUE)方法
public byte[] readAllBytes() throws IOException {
return readNBytes(Integer.MAX_VALUE);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// JDK11更新:读取 InputStream 中的剩余字节的指定上限大小的字节内容;此方法会一直阻塞,直到读取了请求的字节数、检测到流结束或引发异常为止。此方法不会关闭输入流。
public byte[] readNBytes(int len) throws IOException {
// 边界检查
if (len < 0) {
throw new IllegalArgumentException("len < 0");
}

List<byte[]> bufs = null; // 缓存每次读取到的内容放到bufs,最后组装成result
byte[] result = null; // 最后读取到的内容
int total = 0;
int remaining = len; // 剩余字节长度
int n;
do {
byte[] buf = new byte[Math.min(remaining, DEFAULT_BUFFER_SIZE)];
int nread = 0;

// 读取到结束为止,读取大小n可能大于或小于缓冲区大小
while ((n = read(buf, nread,
Math.min(buf.length - nread, remaining))) > 0) {
nread += n;
remaining -= n;
}

if (nread > 0) {
if (MAX_BUFFER_SIZE - total < nread) {
throw new OutOfMemoryError("Required array size too large");
}
total += nread;
if (result == null) {
result = buf;
} else {
if (bufs == null) {
bufs = new ArrayList<>();
bufs.add(result);
}
bufs.add(buf);
}
}
// 如果读不到内容(返回-1)或者没有剩余的字节,则跳出循环
} while (n >= 0 && remaining > 0);

if (bufs == null) {
if (result == null) {
return new byte[0];
}
return result.length == total ?
result : Arrays.copyOf(result, total);
}

// 组装最后的result
result = new byte[total];
int offset = 0;
remaining = total;
for (byte[] b : bufs) {
int count = Math.min(b.length, remaining);
System.arraycopy(b, 0, result, offset, count);
offset += count;
remaining -= count;
}

return result;
}

3.阻塞读取字节保存数组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// JDK9新增:从输入流读取请求的字节数并保存在byte数组中; 此方法会一直阻塞,直到读取了请求的字节数、检测到流结束或引发异常为止。此方法不会关闭输入流。
public int readNBytes(byte[] b, int off, int len) throws IOException {
Objects.checkFromIndexSize(off, len, b.length);

int n = 0;
while (n < len) {
int count = read(b, off + n, len - n);
if (count < 0)
break;
n += count;
}
return n;
}

4.inputstream到outputstream
1
2
3
4
5
6
7
8
9
10
11
12
public long transferTo(OutputStream out) throws IOException {
Objects.requireNonNull(out, "out");
long transferred = 0;
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
int read;
while ((read = this.read(buffer, 0, DEFAULT_BUFFER_SIZE)) >= 0) {
out.write(buffer, 0, read);
transferred += read;
}
return transferred;
}

5.总结

image-20241026110518661

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class TestInputStream {
private InputStream inputStream;
private static final String CONTENT = "Hello World";
@Before
public void setUp() throws Exception {
this.inputStream =
TestInputStream.class.getResourceAsStream("/input.txt");
}
@Test
public void testReadAllBytes() throws Exception {
final String content = new String(this.inputStream.readAllBytes());
//assertEquals 方法验证读取的内容是否与 CONTENT 相等。
assertEquals(CONTENT, content);
}
@Test
public void testReadNBytes() throws Exception {
final byte[] data = new byte[5];
this.inputStream.readNBytes(data, 0, 5);
assertEquals("Hello", new String(data));
}
@Test
public void testTransferTo() throws Exception {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
this.inputStream.transferTo(outputStream);
assertEquals(CONTENT, outputStream.toString());
}
}

通过三种方法验证 InputStream 的读取功能,确保能够正确读取数据并处理流操作。每个测试方法都使用 assertEquals 进行断言,确保期望值与实际值一致。

6.read readnbytes探究

在java中,InputStream类有方法read(byte[], int, int)readNBytes(byte[], int, int)。看起来这两个方法具有完全相同的功能,所以我想知道它们之间有什么区别。

  • read()表示它尝试读取*“最多len字节…但可能读取较少的数字。此方法会阻塞,直到输入数据可用、检测到文件末尾或引发异常。”*
  • readNBytes()“阻塞直到len读取输入数据的字节,检测到流的末尾,或者抛出异常。”

尽管JDK 的实现可能会为InputStream这两种方法提供相同的结果,但记录的差异意味着从它继承的其他类可能会表现不同。

例如,给定流'12345<end>'read(s,0,10)允许返回'123',而readNbytes()更有可能继续寻找流的末尾并提供整个内容。

举个例子:如果文本内容是12345<end>, read(s,0,10)是允许返回123的, 而readNbytes(s,0,10)会一直(while循环)查找直到stream尾为止,并返回12345.


7.空对空模式

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class MyParser implements Parser {
// 创建一个空对象,代表“无操作”
private static Action NO_ACTION = new Action() {
public void doSomething() { /* do nothing */ }
};

// 根据用户输入查找动作
public Action findAction(String userInput) {
// ...
if ( /* we can't find any actions */ ) {
return NO_ACTION; // 如果找不到任何动作,返回空对象
}
}
}

然后便可以始终可以这么调用,而不用再判断空了

1
ParserFactory.getParser().findAction(someInput).doSomething();

在这里,调用者不需要检查返回的 Action 是否为 null。无论如何,调用 doSomething 方法都不会引发 NullPointerException。如果找到了有效的动作,则执行相应的操作;如果没有找到动作,则 doSomething 什么都不做。

3.输入输出缓冲区

如同管道一样

在 Java 的输入输出流实现中,缓冲区用于在读取和写入数据时提供临时存储,以提高数据传输的效率。skip 方法的设计通过缓冲区实现对输入流字节的跳过,但并不会将这些字节存储在主程序的实际变量中。

输入缓冲区:这是指从外部数据源(如文件或网络)读取数据时使用的临时存储区域。读取数据时,输入流会将数据批量加载到缓冲区中,而不是逐字节加载,优化了读取速度。

输出缓冲区:在输出数据(如写入文件或网络)时,数据首先会被存储到缓冲区,然后批量写入目标输出。

skip 方法中,skipBuffer 充当了一个 临时输入缓冲区,用于存放读取到的字节。这些字节不是直接存储到主程序的内存,而是立即舍弃,从而实现“跳过”的效果。

为什么需要缓冲区

提高效率skip 方法不能直接跳过任意数量的字节,尤其是流类型不支持随意定位的情况下。比如在网络流中,不支持直接定位到某个字节位置,因此必须通过逐个读取字节的方式跳过。

批量读取减少 I/O 操作skipBuffer 的存在使得方法可以批量读取指定字节数,而不是逐字节读取。相比逐字节读取,这种方式减少了 I/O 操作的次数,提升了性能。

4.FilterInputStream

FilterInputStream 源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class FilterInputStream extends InputStream {

// 被装饰的inputStream
protected volatile InputStream in;

// 构造函数,注入被装饰的inputStream
protected FilterInputStream(InputStream in) {
this.in = in;
}

// 本质是调用被装饰的inputStream的方法
public int read() throws IOException {
return in.read();
}
public int read(byte b[]) throws IOException {
return read(b, 0, b.length);
}
public int read(byte b[], int off, int len) throws IOException {
return in.read(b, off, len);
}
public long skip(long n) throws IOException {
return in.skip(n);
}
public int available() throws IOException {
return in.available();
}
public void close() throws IOException {
in.close();
}
public synchronized void mark(int readlimit) {
in.mark(readlimit);
}
public synchronized void reset() throws IOException {
in.reset();
}
public boolean markSupported() {
return in.markSupported();
}
}

5.ByteArrayInputStream

ByteArrayInputStream源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
public class ByteArrayInputStream extends InputStream {

// 内部保存的byte 数组
protected byte buf[];

// 读取下一个字节的数组下标,byte[pos]就是read获取的下个字节
protected int pos;

// mark的数组下标位置
protected int mark = 0;

// 保存的有效byte的个数
protected int count;

// 构造方法
public ByteArrayInputStream(byte buf[]) {
this.buf = buf;
this.pos = 0;
this.count = buf.length;
}

// 构造方法,带offset的
public ByteArrayInputStream(byte buf[], int offset, int length) {
this.buf = buf;
this.pos = offset;
this.count = Math.min(offset + length, buf.length);
this.mark = offset;
}

// 从流中读取下一个字节,没有读取到返回 -1
public synchronized int read() {
return (pos < count) ? (buf[pos++] & 0xff) : -1;
}

// 从第 off 位置读取<b>最多(实际可能小于)</b> len 长度字节的数据放到 byte 数组中,流是以 -1 来判断是否读取结束的
public synchronized int read(byte b[], int off, int len) {
// 边界检查
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}

if (pos >= count) {
return -1;
}

int avail = count - pos;
if (len > avail) {
len = avail;
}
if (len <= 0) {
return 0;
}

// 从buf拷贝到byte 数组b中
System.arraycopy(buf, pos, b, off, len);
pos += len;
return len;
}

// 跳过指定个数的字节不读取
public synchronized long skip(long n) {
long k = count - pos;
if (n < k) {
k = n < 0 ? 0 : n;
}

pos += k;
return k;
}

// 还有稍稍byte在buffer中未读取,即总的count 减去 当前byte位置
public synchronized int available() {
return count - pos;
}

// 支持mark所以返回true
public boolean markSupported() {
return true;
}

// 在流中当前位置mark, readAheadLimit参数未使用
public void mark(int readAheadLimit) {
mark = pos;
}

// 重置流,即回到mark的位置
public synchronized void reset() {
pos = mark;
}

// 关闭ByteArrayInputStream不会产生任何动作
public void close() throws IOException {

}
}

6.BufferedInputStream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
public class BufferedInputStream extends FilterInputStream {

// 默认的buffer大小
private static int DEFAULT_BUFFER_SIZE = 8192;

// 分配的最大数组大小。
// 由于一些VM在数组中保留一些头字,所以尝试分配较大的阵列可能会导致OutOfMemoryError(请求的阵列大小超过VM限制)
private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;

// 内部保存在byte 数组中
protected volatile byte buf[];

// 关闭流的方法可能是异步的,所以使用原子AtomicReferenceFieldUpdater提供CAS无锁方式(可以解决CAS的ABA问题)来保证
private static final AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =
AtomicReferenceFieldUpdater.newUpdater(BufferedInputStream.class, byte[].class, "buf");

// 有效byte的大小
protected int count;

// 当前位置
protected int pos;

// 最后一次,调用mark方法,标记的位置
protected int markpos = -1;

/**
* 该变量惟一入口就是mark(int readLimit),好比调用方法mark(1024),那么后面读取的数据若是
* 超过了1024字节,那么这次mark就为无效标记,子类能够选择抛弃该mark标记,从头开始。不过具体实现
* 跟具体的子类有关,在BufferedInputStream中,会抛弃mark标记,从新将markpos赋值为-1
*/
protected int marklimit;

// 获取被装饰的stream
private InputStream getInIfOpen() throws IOException {
InputStream input = in;
if (input == null)
throw new IOException("Stream closed");
return input;
}

// 获取实际内部的buffer数组
private byte[] getBufIfOpen() throws IOException {
byte[] buffer = buf;
if (buffer == null)
throw new IOException("Stream closed");
return buffer;
}

// 构造函数,buffer是8kb
public BufferedInputStream(InputStream in) {
this(in, DEFAULT_BUFFER_SIZE);
}

// 构造函数,指定buffer大小
public BufferedInputStream(InputStream in, int size) {
super(in);
if (size <= 0) {
throw new IllegalArgumentException("Buffer size <= 0");
}
buf = new byte[size];
}

/**
* 用更多的数据填充缓冲区,考虑到shuffling和其他处理标记的技巧,
* 假设它是由同步方法调用的。该方法还假设所有数据已经被读入,因此pos >count。
*/
private void fill() throws IOException {
// 得到内部缓冲区buffer
byte[] buffer = getBufIfOpen();
// 没有mark的情况下, pos为0
if (markpos < 0)
pos = 0; /* no mark: throw away the buffer */
// pos >= buffer.length buffer已经被读取完了
else if (pos >= buffer.length) /* no room left in buffer */
// markpos > 0 有标记,标记处在缓存中间
if (markpos > 0) { /* can throw away early part of the buffer */
// 把buffer中,markpos到pos的部分移动到0-sz处,pos设置为sz,markpos为0
int sz = pos - markpos;
System.arraycopy(buffer, markpos, buffer, 0, sz);
pos = sz;
markpos = 0;
// markpos已经为0了,marklimit比buffer.length小,再读取buffer已经没有地方了
} else if (buffer.length >= marklimit) {
// 清空缓存,清空标记,markpos为-1,pos为0
markpos = -1; /* buffer got too big, invalidate mark */
pos = 0; /* drop buffer contents */
// markpos已经为0了,marklimit比buffer.length大,而buffer.length已经最大了,不能扩容
} else if (buffer.length >= MAX_BUFFER_SIZE) {
throw new OutOfMemoryError("Required array size too large");
// markpos已经为0了,marklimit比buffer.length大
} else { /* grow buffer */
// 建立一个长度为min(2*pos,marklimit,MAX_BUFFER_SIZE),的缓存数组,然后把原来0-pos移动到新数组的0-pos处
int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
pos * 2 : MAX_BUFFER_SIZE;
if (nsz > marklimit)
nsz = marklimit;
byte nbuf[] = new byte[nsz];
System.arraycopy(buffer, 0, nbuf, 0, pos);
// 用bufUpdater替换buffer
if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
// Can't replace buf if there was an async close.
// Note: This would need to be changed if fill()
// is ever made accessible to multiple threads.
// But for now, the only way CAS can fail is via close.
// assert buf == null;
throw new IOException("Stream closed");
}
buffer = nbuf;
}
// 当前读取上限count为pos
count = pos;
// 从内部的输入流,读取pos到buffer.length部分,读取的字节数加到count
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
if (n > 0)
count = n + pos;
}

// 读取byte
public synchronized int read() throws IOException {
// 说明当前buf[]数组大小不够了,须要fill()
if (pos >= count) {
fill();
// 说明没有读取到任何数据
if (pos >= count)
return -1;
}
return getBufIfOpen()[pos++] & 0xff;
}

/**
* Read characters into a portion of an array, reading from the underlying
* stream at most once if necessary.
*/
private int read1(byte[] b, int off, int len) throws IOException {
int avail = count - pos;
if (avail <= 0) {
// 当写入指定数组b的长度大小超过BufferedInputStream中核心缓存数组buf[]的大小而且 markpos < 0,那么就直接从数据流中读取数据给b数组,而不经过buf[]缓存数组,避免buf[]数组急剧增大
if (len >= getBufIfOpen().length && markpos < 0) {
return getInIfOpen().read(b, off, len);
}
fill();
avail = count - pos;
if (avail <= 0) return -1;
}
int cnt = (avail < len) ? avail : len;
System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
pos += cnt;
return cnt;
}

// 读取到byte数组b中
public synchronized int read(byte b[], int off, int len)
throws IOException
{
getBufIfOpen(); // Check for closed stream
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}

int n = 0;
for (;;) {
int nread = read1(b, off + n, len - n);
if (nread <= 0)
return (n == 0) ? nread : n;
n += nread;
if (n >= len)
return n;
// if not closed but no bytes available, return
InputStream input = in;
if (input != null && input.available() <= 0)
return n;
}
}

// 跳过n个
public synchronized long skip(long n) throws IOException {
getBufIfOpen(); // Check for closed stream
if (n <= 0) {
return 0;
}
long avail = count - pos;

if (avail <= 0) {
// If no mark position set then don't keep in buffer
if (markpos <0)
return getInIfOpen().skip(n);

// Fill in buffer to save bytes for reset
fill();
avail = count - pos;
if (avail <= 0)
return 0;
}

long skipped = (avail < n) ? avail : n;
pos += skipped;
return skipped;
}

// buf[]数组剩余字节数+输入流中剩余字节数
public synchronized int available() throws IOException {
int n = count - pos;
int avail = getInIfOpen().available();
return n > (Integer.MAX_VALUE - avail)
? Integer.MAX_VALUE
: n + avail;
}


// 标记位置,marklimit只有在这里才可以被赋值,readlimit表示mark()方法执行后,最多可以从流中读取的数据
// 若是超过该字节大小,那么在fill()的时候,就会认为此mark()标记无效,从新将 markpos = -1,pos = 0
public synchronized void mark(int readlimit) {
marklimit = readlimit;
markpos = pos;
}

// 重置位置
public synchronized void reset() throws IOException {
getBufIfOpen(); // 如果已经close, 则直接报错
if (markpos < 0)
throw new IOException("Resetting to invalid mark");
pos = markpos;
}

// 支持mark, 所以返回true
public boolean markSupported() {
return true;
}

// 通过AtomicReferenceFieldUpdater的CAS无锁方式close
public void close() throws IOException {
byte[] buffer;
while ( (buffer = buf) != null) {
if (bufUpdater.compareAndSet(this, buffer, null)) {
InputStream input = in;
in = null;
if (input != null)
input.close();
return;
}
// Else retry in case a new buf was CASed in fill()
}
}
}

7.ps

5 6源码没细看,先去过java基础,后续再回来研究喜喜

OutputStream(3)

OutputStream是输出字节流,具体的实现类层次结构如下:

img

抽象类

OutputStream 类重要方法设计如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 写入一个字节,可以看到这里的参数是一个 int 类型,对应上面的读方法,int 类型的 32 位,只有低 8 位才写入,高 24 位将舍弃。
public abstract void write(int b)

// 将数组中的所有字节写入,实际调用的是write(byte b[], int off, int len)方法。
public void write(byte b[])

// 将 byte 数组从 off 位置开始,len 长度的字节写入
public void write(byte b[], int off, int len)

// 强制刷新,将缓冲中的数据写入; 默认是空实现,供子类覆盖
public void flush()

// 关闭输出流,流被关闭后就不能再输出数据了; 默认是空实现,供子类覆盖
public void close()

源码实现

梳理部分OutputStream及其实现类的源码分析。

OutputStream抽象类源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public abstract class OutputStream implements Closeable, Flushable {

// JDK11中增加了一个nullOutputStream,即空模式实现,以便可以直接调用而不用判空(可以看如下的补充说明)
public static OutputStream nullOutputStream() {
return new OutputStream() {
private volatile boolean closed;

private void ensureOpen() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
}

@Override
public void write(int b) throws IOException {
ensureOpen();
}

@Override
public void write(byte b[], int off, int len) throws IOException {
Objects.checkFromIndexSize(off, len, b.length);
ensureOpen();
}

@Override
public void close() {
closed = true;
}
};
}

// 写入一个字节,可以看到这里的参数是一个 int 类型,对应上面的读方法,int 类型的 32 位,只有低 8 位才写入,高 24 位将舍弃。
public abstract void write(int b) throws IOException;

// 将数组中的所有字节写入,实际调用的是write(byte b[], int off, int len)方法
public void write(byte b[]) throws IOException {
write(b, 0, b.length);
}

// 将 byte 数组从 off 位置开始,len 长度的字节写入
public void write(byte b[], int off, int len) throws IOException {
// 检查边界合理性
Objects.checkFromIndexSize(off, len, b.length);
// len == 0 的情况已经在如下的for循环中隐式处理了
for (int i = 0 ; i < len ; i++) {
write(b[off + i]);
}
}

// 强制刷新,将缓冲中的数据写入; 默认是空实现,供子类覆盖
public void flush() throws IOException {
}

// 关闭输出流,流被关闭后就不能再输出数据了; 默认是空实现,供子类覆盖
public void close() throws IOException {
}

}
}

ps:不读了,嘻嘻。就这丫


Java IO 1
http://example.com/2024/10/26/java/javaIo/javaio2/
作者
John Doe
发布于
2024年10月26日
许可协议