[med-svn] [python-bz2file] 01/02: Imported Upstream version 0.98

Michael Crusoe misterc-guest at moszumanska.debian.org
Thu May 28 02:48:30 UTC 2015


This is an automated email from the git hooks/post-receive script.

misterc-guest pushed a commit to branch master
in repository python-bz2file.

commit f8610188fc2c01b03192544bd5740b5c5e5ab5b4
Author: Michael R. Crusoe <mcrusoe at msu.edu>
Date:   Wed May 27 21:43:01 2015 -0400

    Imported Upstream version 0.98
---
 PKG-INFO        |  85 +++++++
 README.rst      |  59 +++++
 bz2file.py      | 499 ++++++++++++++++++++++++++++++++++++++
 setup.py        |  34 +++
 test_bz2file.py | 732 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 1409 insertions(+)

diff --git a/PKG-INFO b/PKG-INFO
new file mode 100644
index 0000000..90b8d79
--- /dev/null
+++ b/PKG-INFO
@@ -0,0 +1,85 @@
+Metadata-Version: 1.1
+Name: bz2file
+Version: 0.98
+Summary: Read and write bzip2-compressed files.
+Home-page: https://github.com/nvawda/bz2file
+Author: Nadeem Vawda
+Author-email: nadeem.vawda at gmail.com
+License: Apache License, Version 2.0
+Description: Bz2file is a Python library for reading and writing bzip2-compressed files.
+        
+        It contains a drop-in replacement for the file interface in the standard
+        library's ``bz2`` module, including features from the latest development
+        version of CPython that are not available in older releases.
+        
+        Bz2file is compatible with CPython 2.6, 2.7, and 3.0 through 3.4, as well as
+        PyPy 2.0.
+        
+        
+        Features
+        --------
+        
+        - Supports multi-stream files.
+        
+        - Can read from or write to any file-like object.
+        
+        - Can open files in either text or binary mode.
+        
+        - Added methods: ``peek()``, ``read1()``, ``readinto()``, ``fileno()``,
+          ``readable()``, ``writable()``, ``seekable()``.
+        
+        
+        Installation
+        ------------
+        
+        To install bz2file, run: ::
+        
+           $ pip install bz2file
+        
+        
+        Documentation
+        -------------
+        
+        The ``open()`` function and ``BZ2File`` class in this module provide the same
+        features and interface as the ones in the standard library's ``bz2`` module in
+        the current development version of CPython, `documented here
+        <http://docs.python.org/dev/library/bz2.html>`_.
+        
+        
+        Version History
+        ---------------
+        
+        0.98: 19 January 2014
+        
+        - Added support for the 'x' family of modes.
+        - Ignore non-bz2 data at the end of a file, rather than raising an exception.
+        - Tests now pass on PyPy.
+        
+        0.95: 08 October 2012
+        
+        - Added the ``open()`` function.
+        - Improved performance when reading in small chunks.
+        - Removed the ``fileobj`` argument to ``BZ2File()``. To wrap an existing file
+          object, pass it as the first argument (``filename``).
+        
+        0.9: 04 February 2012
+        
+        - Initial release.
+        
+Platform: UNKNOWN
+Classifier: Development Status :: 4 - Beta
+Classifier: Intended Audience :: Developers
+Classifier: License :: OSI Approved :: Apache Software License
+Classifier: Operating System :: OS Independent
+Classifier: Programming Language :: Python
+Classifier: Programming Language :: Python :: 2
+Classifier: Programming Language :: Python :: 2.6
+Classifier: Programming Language :: Python :: 2.7
+Classifier: Programming Language :: Python :: 3
+Classifier: Programming Language :: Python :: 3.0
+Classifier: Programming Language :: Python :: 3.1
+Classifier: Programming Language :: Python :: 3.2
+Classifier: Programming Language :: Python :: 3.3
+Classifier: Programming Language :: Python :: 3.4
+Classifier: Topic :: Software Development :: Libraries :: Python Modules
+Classifier: Topic :: System :: Archiving :: Compression
diff --git a/README.rst b/README.rst
new file mode 100644
index 0000000..2e848ef
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,59 @@
+Bz2file is a Python library for reading and writing bzip2-compressed files.
+
+It contains a drop-in replacement for the file interface in the standard
+library's ``bz2`` module, including features from the latest development
+version of CPython that are not available in older releases.
+
+Bz2file is compatible with CPython 2.6, 2.7, and 3.0 through 3.4, as well as
+PyPy 2.0.
+
+
+Features
+--------
+
+- Supports multi-stream files.
+
+- Can read from or write to any file-like object.
+
+- Can open files in either text or binary mode.
+
+- Added methods: ``peek()``, ``read1()``, ``readinto()``, ``fileno()``,
+  ``readable()``, ``writable()``, ``seekable()``.
+
+
+Installation
+------------
+
+To install bz2file, run: ::
+
+   $ pip install bz2file
+
+
+Documentation
+-------------
+
+The ``open()`` function and ``BZ2File`` class in this module provide the same
+features and interface as the ones in the standard library's ``bz2`` module in
+the current development version of CPython, `documented here
+<http://docs.python.org/dev/library/bz2.html>`_.
+
+
+Version History
+---------------
+
+0.98: 19 January 2014
+
+- Added support for the 'x' family of modes.
+- Ignore non-bz2 data at the end of a file, rather than raising an exception.
+- Tests now pass on PyPy.
+
+0.95: 08 October 2012
+
+- Added the ``open()`` function.
+- Improved performance when reading in small chunks.
+- Removed the ``fileobj`` argument to ``BZ2File()``. To wrap an existing file
+  object, pass it as the first argument (``filename``).
+
+0.9: 04 February 2012
+
+- Initial release.
diff --git a/bz2file.py b/bz2file.py
new file mode 100644
index 0000000..1d91195
--- /dev/null
+++ b/bz2file.py
@@ -0,0 +1,499 @@
+"""Module for reading and writing bzip2-compressed files.
+
+This module contains a backport of Python 3.4's bz2.open() function and
+BZ2File class, adapted to work with earlier versions of Python.
+"""
+
+__all__ = ["BZ2File", "open"]
+
+__author__ = "Nadeem Vawda <nadeem.vawda at gmail.com>"
+
+import io
+import sys
+import warnings
+
+try:
+    from threading import RLock
+except ImportError:
+    from dummy_threading import RLock
+
+from bz2 import BZ2Compressor, BZ2Decompressor
+
+
+_MODE_CLOSED   = 0
+_MODE_READ     = 1
+_MODE_READ_EOF = 2
+_MODE_WRITE    = 3
+
+_BUFFER_SIZE = 8192
+
+_STR_TYPES = (str, unicode) if (str is bytes) else (str, bytes)
+
+# The 'x' mode for open() was introduced in Python 3.3.
+_HAS_OPEN_X_MODE = sys.version_info[:2] >= (3, 3)
+
+_builtin_open = open
+
+
+class BZ2File(io.BufferedIOBase):
+
+    """A file object providing transparent bzip2 (de)compression.
+
+    A BZ2File can act as a wrapper for an existing file object, or refer
+    directly to a named file on disk.
+
+    Note that BZ2File provides a *binary* file interface - data read is
+    returned as bytes, and data to be written should be given as bytes.
+    """
+
+    def __init__(self, filename, mode="r", buffering=None, compresslevel=9):
+        """Open a bzip2-compressed file.
+
+        If filename is a str, bytes or unicode object, it gives the name
+        of the file to be opened. Otherwise, it should be a file object,
+        which will be used to read or write the compressed data.
+
+        mode can be 'r' for reading (default), 'w' for (over)writing,
+        'x' for creating exclusively, or 'a' for appending. These can
+        equivalently be given as 'rb', 'wb', 'xb', and 'ab'.
+
+        buffering is ignored. Its use is deprecated.
+
+        If mode is 'w', 'x' or 'a', compresslevel can be a number between 1
+        and 9 specifying the level of compression: 1 produces the least
+        compression, and 9 (default) produces the most compression.
+
+        If mode is 'r', the input file may be the concatenation of
+        multiple compressed streams.
+        """
+        # This lock must be recursive, so that BufferedIOBase's
+        # readline(), readlines() and writelines() don't deadlock.
+        self._lock = RLock()
+        self._fp = None
+        self._closefp = False
+        self._mode = _MODE_CLOSED
+        self._pos = 0
+        self._size = -1
+
+        if buffering is not None:
+            warnings.warn("Use of 'buffering' argument is deprecated",
+                          DeprecationWarning)
+
+        if not (1 <= compresslevel <= 9):
+            raise ValueError("compresslevel must be between 1 and 9")
+
+        if mode in ("", "r", "rb"):
+            mode = "rb"
+            mode_code = _MODE_READ
+            self._decompressor = BZ2Decompressor()
+            self._buffer = b""
+            self._buffer_offset = 0
+        elif mode in ("w", "wb"):
+            mode = "wb"
+            mode_code = _MODE_WRITE
+            self._compressor = BZ2Compressor(compresslevel)
+        elif mode in ("x", "xb") and _HAS_OPEN_X_MODE:
+            mode = "xb"
+            mode_code = _MODE_WRITE
+            self._compressor = BZ2Compressor(compresslevel)
+        elif mode in ("a", "ab"):
+            mode = "ab"
+            mode_code = _MODE_WRITE
+            self._compressor = BZ2Compressor(compresslevel)
+        else:
+            raise ValueError("Invalid mode: %r" % (mode,))
+
+        if isinstance(filename, _STR_TYPES):
+            self._fp = _builtin_open(filename, mode)
+            self._closefp = True
+            self._mode = mode_code
+        elif hasattr(filename, "read") or hasattr(filename, "write"):
+            self._fp = filename
+            self._mode = mode_code
+        else:
+            raise TypeError("filename must be a %s or %s object, or a file" %
+                            (_STR_TYPES[0].__name__, _STR_TYPES[1].__name__))
+
+    def close(self):
+        """Flush and close the file.
+
+        May be called more than once without error. Once the file is
+        closed, any other operation on it will raise a ValueError.
+        """
+        with self._lock:
+            if self._mode == _MODE_CLOSED:
+                return
+            try:
+                if self._mode in (_MODE_READ, _MODE_READ_EOF):
+                    self._decompressor = None
+                elif self._mode == _MODE_WRITE:
+                    self._fp.write(self._compressor.flush())
+                    self._compressor = None
+            finally:
+                try:
+                    if self._closefp:
+                        self._fp.close()
+                finally:
+                    self._fp = None
+                    self._closefp = False
+                    self._mode = _MODE_CLOSED
+                    self._buffer = b""
+                    self._buffer_offset = 0
+
+    @property
+    def closed(self):
+        """True if this file is closed."""
+        return self._mode == _MODE_CLOSED
+
+    def fileno(self):
+        """Return the file descriptor for the underlying file."""
+        self._check_not_closed()
+        return self._fp.fileno()
+
+    def seekable(self):
+        """Return whether the file supports seeking."""
+        return self.readable() and (self._fp.seekable()
+                                    if hasattr(self._fp, "seekable")
+                                    else hasattr(self._fp, "seek"))
+
+    def readable(self):
+        """Return whether the file was opened for reading."""
+        self._check_not_closed()
+        return self._mode in (_MODE_READ, _MODE_READ_EOF)
+
+    def writable(self):
+        """Return whether the file was opened for writing."""
+        self._check_not_closed()
+        return self._mode == _MODE_WRITE
+
+    # Mode-checking helper functions.
+
+    def _check_not_closed(self):
+        if self.closed:
+            raise ValueError("I/O operation on closed file")
+
+    def _check_can_read(self):
+        if self._mode not in (_MODE_READ, _MODE_READ_EOF):
+            self._check_not_closed()
+            raise io.UnsupportedOperation("File not open for reading")
+
+    def _check_can_write(self):
+        if self._mode != _MODE_WRITE:
+            self._check_not_closed()
+            raise io.UnsupportedOperation("File not open for writing")
+
+    def _check_can_seek(self):
+        if self._mode not in (_MODE_READ, _MODE_READ_EOF):
+            self._check_not_closed()
+            raise io.UnsupportedOperation("Seeking is only supported "
+                                          "on files open for reading")
+        if hasattr(self._fp, "seekable") and not self._fp.seekable():
+            raise io.UnsupportedOperation("The underlying file object "
+                                          "does not support seeking")
+
+    # Fill the readahead buffer if it is empty. Returns False on EOF.
+    def _fill_buffer(self):
+        if self._mode == _MODE_READ_EOF:
+            return False
+        # Depending on the input data, our call to the decompressor may not
+        # return any data. In this case, try again after reading another block.
+        while self._buffer_offset == len(self._buffer):
+            rawblock = (self._decompressor.unused_data or
+                        self._fp.read(_BUFFER_SIZE))
+
+            if not rawblock:
+                try:
+                    self._decompressor.decompress(b"")
+                except EOFError:
+                    # End-of-stream marker and end of file. We're good.
+                    self._mode = _MODE_READ_EOF
+                    self._size = self._pos
+                    return False
+                else:
+                    # Problem - we were expecting more compressed data.
+                    raise EOFError("Compressed file ended before the "
+                                   "end-of-stream marker was reached")
+
+            try:
+                self._buffer = self._decompressor.decompress(rawblock)
+            except EOFError:
+                # Continue to next stream.
+                self._decompressor = BZ2Decompressor()
+                try:
+                    self._buffer = self._decompressor.decompress(rawblock)
+                except IOError:
+                    # Trailing data isn't a valid bzip2 stream. We're done here.
+                    self._mode = _MODE_READ_EOF
+                    self._size = self._pos
+                    return False
+            self._buffer_offset = 0
+        return True
+
+    # Read data until EOF.
+    # If return_data is false, consume the data without returning it.
+    def _read_all(self, return_data=True):
+        # The loop assumes that _buffer_offset is 0. Ensure that this is true.
+        self._buffer = self._buffer[self._buffer_offset:]
+        self._buffer_offset = 0
+
+        blocks = []
+        while self._fill_buffer():
+            if return_data:
+                blocks.append(self._buffer)
+            self._pos += len(self._buffer)
+            self._buffer = b""
+        if return_data:
+            return b"".join(blocks)
+
+    # Read a block of up to n bytes.
+    # If return_data is false, consume the data without returning it.
+    def _read_block(self, n, return_data=True):
+        # If we have enough data buffered, return immediately.
+        end = self._buffer_offset + n
+        if end <= len(self._buffer):
+            data = self._buffer[self._buffer_offset : end]
+            self._buffer_offset = end
+            self._pos += len(data)
+            return data if return_data else None
+
+        # The loop assumes that _buffer_offset is 0. Ensure that this is true.
+        self._buffer = self._buffer[self._buffer_offset:]
+        self._buffer_offset = 0
+
+        blocks = []
+        while n > 0 and self._fill_buffer():
+            if n < len(self._buffer):
+                data = self._buffer[:n]
+                self._buffer_offset = n
+            else:
+                data = self._buffer
+                self._buffer = b""
+            if return_data:
+                blocks.append(data)
+            self._pos += len(data)
+            n -= len(data)
+        if return_data:
+            return b"".join(blocks)
+
+    def peek(self, n=0):
+        """Return buffered data without advancing the file position.
+
+        Always returns at least one byte of data, unless at EOF.
+        The exact number of bytes returned is unspecified.
+        """
+        with self._lock:
+            self._check_can_read()
+            if not self._fill_buffer():
+                return b""
+            return self._buffer[self._buffer_offset:]
+
+    def read(self, size=-1):
+        """Read up to size uncompressed bytes from the file.
+
+        If size is negative or omitted, read until EOF is reached.
+        Returns b'' if the file is already at EOF.
+        """
+        if size is None:
+            raise TypeError()
+        with self._lock:
+            self._check_can_read()
+            if size == 0:
+                return b""
+            elif size < 0:
+                return self._read_all()
+            else:
+                return self._read_block(size)
+
+    def read1(self, size=-1):
+        """Read up to size uncompressed bytes, while trying to avoid
+        making multiple reads from the underlying stream.
+
+        Returns b'' if the file is at EOF.
+        """
+        # Usually, read1() calls _fp.read() at most once. However, sometimes
+        # this does not give enough data for the decompressor to make progress.
+        # In this case we make multiple reads, to avoid returning b"".
+        with self._lock:
+            self._check_can_read()
+            if (size == 0 or
+                # Only call _fill_buffer() if the buffer is actually empty.
+                # This gives a significant speedup if *size* is small.
+                (self._buffer_offset == len(self._buffer) and not self._fill_buffer())):
+                return b""
+            if size > 0:
+                data = self._buffer[self._buffer_offset :
+                                    self._buffer_offset + size]
+                self._buffer_offset += len(data)
+            else:
+                data = self._buffer[self._buffer_offset:]
+                self._buffer = b""
+                self._buffer_offset = 0
+            self._pos += len(data)
+            return data
+
+    def readinto(self, b):
+        """Read up to len(b) bytes into b.
+
+        Returns the number of bytes read (0 for EOF).
+        """
+        with self._lock:
+            return io.BufferedIOBase.readinto(self, b)
+
+    def readline(self, size=-1):
+        """Read a line of uncompressed bytes from the file.
+
+        The terminating newline (if present) is retained. If size is
+        non-negative, no more than size bytes will be read (in which
+        case the line may be incomplete). Returns b'' if already at EOF.
+        """
+        if not isinstance(size, int):
+            if not hasattr(size, "__index__"):
+                raise TypeError("Integer argument expected")
+            size = size.__index__()
+        with self._lock:
+            self._check_can_read()
+            # Shortcut for the common case - the whole line is in the buffer.
+            if size < 0:
+                end = self._buffer.find(b"\n", self._buffer_offset) + 1
+                if end > 0:
+                    line = self._buffer[self._buffer_offset : end]
+                    self._buffer_offset = end
+                    self._pos += len(line)
+                    return line
+            return io.BufferedIOBase.readline(self, size)
+
+    def readlines(self, size=-1):
+        """Read a list of lines of uncompressed bytes from the file.
+
+        size can be specified to control the number of lines read: no
+        further lines will be read once the total size of the lines read
+        so far equals or exceeds size.
+        """
+        if not isinstance(size, int):
+            if not hasattr(size, "__index__"):
+                raise TypeError("Integer argument expected")
+            size = size.__index__()
+        with self._lock:
+            return io.BufferedIOBase.readlines(self, size)
+
+    def write(self, data):
+        """Write a byte string to the file.
+
+        Returns the number of uncompressed bytes written, which is
+        always len(data). Note that due to buffering, the file on disk
+        may not reflect the data written until close() is called.
+        """
+        with self._lock:
+            self._check_can_write()
+            compressed = self._compressor.compress(data)
+            self._fp.write(compressed)
+            self._pos += len(data)
+            return len(data)
+
+    def writelines(self, seq):
+        """Write a sequence of byte strings to the file.
+
+        Returns the number of uncompressed bytes written.
+        seq can be any iterable yielding byte strings.
+
+        Line separators are not added between the written byte strings.
+        """
+        with self._lock:
+            return io.BufferedIOBase.writelines(self, seq)
+
+    # Rewind the file to the beginning of the data stream.
+    def _rewind(self):
+        self._fp.seek(0, 0)
+        self._mode = _MODE_READ
+        self._pos = 0
+        self._decompressor = BZ2Decompressor()
+        self._buffer = b""
+        self._buffer_offset = 0
+
+    def seek(self, offset, whence=0):
+        """Change the file position.
+
+        The new position is specified by offset, relative to the
+        position indicated by whence. Values for whence are:
+
+            0: start of stream (default); offset must not be negative
+            1: current stream position
+            2: end of stream; offset must not be positive
+
+        Returns the new file position.
+
+        Note that seeking is emulated, so depending on the parameters,
+        this operation may be extremely slow.
+        """
+        with self._lock:
+            self._check_can_seek()
+
+            # Recalculate offset as an absolute file position.
+            if whence == 0:
+                pass
+            elif whence == 1:
+                offset = self._pos + offset
+            elif whence == 2:
+                # Seeking relative to EOF - we need to know the file's size.
+                if self._size < 0:
+                    self._read_all(return_data=False)
+                offset = self._size + offset
+            else:
+                raise ValueError("Invalid value for whence: %s" % (whence,))
+
+            # Make it so that offset is the number of bytes to skip forward.
+            if offset < self._pos:
+                self._rewind()
+            else:
+                offset -= self._pos
+
+            # Read and discard data until we reach the desired position.
+            self._read_block(offset, return_data=False)
+
+            return self._pos
+
+    def tell(self):
+        """Return the current file position."""
+        with self._lock:
+            self._check_not_closed()
+            return self._pos
+
+
+def open(filename, mode="rb", compresslevel=9,
+         encoding=None, errors=None, newline=None):
+    """Open a bzip2-compressed file in binary or text mode.
+
+    The filename argument can be an actual filename (a str, bytes or unicode
+    object), or an existing file object to read from or write to.
+
+    The mode argument can be "r", "rb", "w", "wb", "x", "xb", "a" or
+    "ab" for binary mode, or "rt", "wt", "xt" or "at" for text mode.
+    The default mode is "rb", and the default compresslevel is 9.
+
+    For binary mode, this function is equivalent to the BZ2File
+    constructor: BZ2File(filename, mode, compresslevel). In this case,
+    the encoding, errors and newline arguments must not be provided.
+
+    For text mode, a BZ2File object is created, and wrapped in an
+    io.TextIOWrapper instance with the specified encoding, error
+    handling behavior, and line ending(s).
+
+    """
+    if "t" in mode:
+        if "b" in mode:
+            raise ValueError("Invalid mode: %r" % (mode,))
+    else:
+        if encoding is not None:
+            raise ValueError("Argument 'encoding' not supported in binary mode")
+        if errors is not None:
+            raise ValueError("Argument 'errors' not supported in binary mode")
+        if newline is not None:
+            raise ValueError("Argument 'newline' not supported in binary mode")
+
+    bz_mode = mode.replace("t", "")
+    binary_file = BZ2File(filename, bz_mode, compresslevel=compresslevel)
+
+    if "t" in mode:
+        return io.TextIOWrapper(binary_file, encoding, errors, newline)
+    else:
+        return binary_file
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..d31f317
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,34 @@
+from distutils.core import setup
+
+with open("README.rst") as f:
+    readme = f.read()
+
+setup(
+    name="bz2file",
+    version="0.98",
+    description="Read and write bzip2-compressed files.",
+    long_description=readme,
+    author="Nadeem Vawda",
+    author_email="nadeem.vawda at gmail.com",
+    url="https://github.com/nvawda/bz2file",
+    py_modules=["bz2file"],
+    license="Apache License, Version 2.0",
+    classifiers=[
+        "Development Status :: 4 - Beta",
+        "Intended Audience :: Developers",
+        "License :: OSI Approved :: Apache Software License",
+        "Operating System :: OS Independent",
+        "Programming Language :: Python",
+        "Programming Language :: Python :: 2",
+        "Programming Language :: Python :: 2.6",
+        "Programming Language :: Python :: 2.7",
+        "Programming Language :: Python :: 3",
+        "Programming Language :: Python :: 3.0",
+        "Programming Language :: Python :: 3.1",
+        "Programming Language :: Python :: 3.2",
+        "Programming Language :: Python :: 3.3",
+        "Programming Language :: Python :: 3.4",
+        "Topic :: Software Development :: Libraries :: Python Modules",
+        "Topic :: System :: Archiving :: Compression",
+    ],
+)
diff --git a/test_bz2file.py b/test_bz2file.py
new file mode 100644
index 0000000..7ad58ff
--- /dev/null
+++ b/test_bz2file.py
@@ -0,0 +1,732 @@
+import unittest
+import bz2
+import bz2file
+from bz2file import BZ2File
+from io import BytesIO
+import os
+import platform
+
+try:
+    import threading
+except ImportError:
+    threading = None
+
+try:
+    from test import support
+except ImportError:
+    from test import test_support as support
+
+
+class BaseTest(unittest.TestCase):
+    "Base for other testcases."
+
+    TEXT_LINES = [
+        b'root:x:0:0:root:/root:/bin/bash\n',
+        b'bin:x:1:1:bin:/bin:\n',
+        b'daemon:x:2:2:daemon:/sbin:\n',
+        b'adm:x:3:4:adm:/var/adm:\n',
+        b'lp:x:4:7:lp:/var/spool/lpd:\n',
+        b'sync:x:5:0:sync:/sbin:/bin/sync\n',
+        b'shutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\n',
+        b'halt:x:7:0:halt:/sbin:/sbin/halt\n',
+        b'mail:x:8:12:mail:/var/spool/mail:\n',
+        b'news:x:9:13:news:/var/spool/news:\n',
+        b'uucp:x:10:14:uucp:/var/spool/uucp:\n',
+        b'operator:x:11:0:operator:/root:\n',
+        b'games:x:12:100:games:/usr/games:\n',
+        b'gopher:x:13:30:gopher:/usr/lib/gopher-data:\n',
+        b'ftp:x:14:50:FTP User:/var/ftp:/bin/bash\n',
+        b'nobody:x:65534:65534:Nobody:/home:\n',
+        b'postfix:x:100:101:postfix:/var/spool/postfix:\n',
+        b'niemeyer:x:500:500::/home/niemeyer:/bin/bash\n',
+        b'postgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\n',
+        b'mysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\n',
+        b'www:x:103:104::/var/www:/bin/false\n',
+        ]
+    TEXT = b''.join(TEXT_LINES)
+    DATA = b'BZh91AY&SY.\xc8N\x18\x00\x01>_\x80\x00\x10@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe00\x01\x99\xaa\x00\xc0\x03F\x86\x8c#&\x83F\x9a\x03\x06\xa6\xd0\xa6\x93M\x0fQ\xa7\xa8\x06\x804hh\x12$\x11\xa4i4\xf14S\xd2<Q\xb5\x0fH\xd3\xd4\xdd\xd5\x87\xbb\xf8\x94\r\x8f\xafI\x12\xe1\xc9\xf8/E\x00pu\x89\x12]\xc9\xbbDL\nQ\x0e\t1\x12\xdf\xa0\xc0\x97\xac2O9\x89\x13\x94\x0e\x1c7\x0ed\x95I\x0c\xaaJ\xa4\x18L\x10\x05#\x9c\xaf\xba\xbc/\x97\x8a#C\xc8\xe1\x8cW\xf9\xe2\xd0\xd6M\xa7\x8bXa<e\x84t\xcbL\xb3\xa7 [...]
+    BAD_DATA = b'this is not a valid bzip2 file'
+
+    def setUp(self):
+        self.filename = support.TESTFN
+
+    def tearDown(self):
+        if os.path.isfile(self.filename):
+            os.unlink(self.filename)
+
+    # In some of the tests, we need to verify the contents of multi-stream
+    # files, but bz2.decompress() could not handle this case prior to 3.3.
+    def decompress(self, data):
+        results = []
+        while True:
+            decomp = bz2.BZ2Decompressor()
+            results.append(decomp.decompress(data))
+            try:
+                decomp.decompress(b"")
+            except EOFError:
+                if not decomp.unused_data:
+                    return b"".join(results)
+                data = decomp.unused_data
+            else:
+                raise ValueError("Compressed data ended before the "
+                                 "end-of-stream marker was reached")
+
+
+class BZ2FileTest(BaseTest):
+    "Test the BZ2File class."
+
+    def createTempFile(self, streams=1, suffix=b""):
+        with open(self.filename, "wb") as f:
+            f.write(self.DATA * streams)
+            f.write(suffix)
+
+    def testBadArgs(self):
+        self.assertRaises(TypeError, BZ2File, 123.456)
+        self.assertRaises(ValueError, BZ2File, "/dev/null", "z")
+        self.assertRaises(ValueError, BZ2File, "/dev/null", "rx")
+        self.assertRaises(ValueError, BZ2File, "/dev/null", "rbt")
+        self.assertRaises(ValueError, BZ2File, "/dev/null", compresslevel=0)
+        self.assertRaises(ValueError, BZ2File, "/dev/null", compresslevel=10)
+
+    def testRead(self):
+        self.createTempFile()
+        with BZ2File(self.filename) as bz2f:
+            self.assertRaises(TypeError, bz2f.read, None)
+            self.assertEqual(bz2f.read(), self.TEXT)
+
+    def testReadBadFile(self):
+        self.createTempFile(streams=0, suffix=self.BAD_DATA)
+        with BZ2File(self.filename) as bz2f:
+            self.assertRaises(IOError, bz2f.read)
+
+    def testReadMultiStream(self):
+        self.createTempFile(streams=5)
+        with BZ2File(self.filename) as bz2f:
+            self.assertRaises(TypeError, bz2f.read, None)
+            self.assertEqual(bz2f.read(), self.TEXT * 5)
+
+    def testReadMonkeyMultiStream(self):
+        # Test BZ2File.read() on a multi-stream archive where a stream
+        # boundary coincides with the end of the raw read buffer.
+        buffer_size = bz2file._BUFFER_SIZE
+        bz2file._BUFFER_SIZE = len(self.DATA)
+        try:
+            self.createTempFile(streams=5)
+            with BZ2File(self.filename) as bz2f:
+                self.assertRaises(TypeError, bz2f.read, None)
+                self.assertEqual(bz2f.read(), self.TEXT * 5)
+        finally:
+            bz2file._BUFFER_SIZE = buffer_size
+
+    def testReadTrailingJunk(self):
+        self.createTempFile(suffix=self.BAD_DATA)
+        with BZ2File(self.filename) as bz2f:
+            self.assertEqual(bz2f.read(), self.TEXT)
+
+    def testReadMultiStreamTrailingJunk(self):
+        self.createTempFile(streams=5, suffix=self.BAD_DATA)
+        with BZ2File(self.filename) as bz2f:
+            self.assertEqual(bz2f.read(), self.TEXT * 5)
+
+    def testRead0(self):
+        self.createTempFile()
+        with BZ2File(self.filename) as bz2f:
+            self.assertRaises(TypeError, bz2f.read, None)
+            self.assertEqual(bz2f.read(0), b"")
+
+    def testReadChunk10(self):
+        self.createTempFile()
+        with BZ2File(self.filename) as bz2f:
+            text = b''
+            while True:
+                str = bz2f.read(10)
+                if not str:
+                    break
+                text += str
+            self.assertEqual(text, self.TEXT)
+
+    def testReadChunk10MultiStream(self):
+        self.createTempFile(streams=5)
+        with BZ2File(self.filename) as bz2f:
+            text = b''
+            while True:
+                str = bz2f.read(10)
+                if not str:
+                    break
+                text += str
+            self.assertEqual(text, self.TEXT * 5)
+
+    def testRead100(self):
+        self.createTempFile()
+        with BZ2File(self.filename) as bz2f:
+            self.assertEqual(bz2f.read(100), self.TEXT[:100])
+
+    def testPeek(self):
+        self.createTempFile()
+        with BZ2File(self.filename) as bz2f:
+            pdata = bz2f.peek()
+            self.assertNotEqual(len(pdata), 0)
+            self.assertTrue(self.TEXT.startswith(pdata))
+            self.assertEqual(bz2f.read(), self.TEXT)
+
+    def testReadInto(self):
+        self.createTempFile()
+        with BZ2File(self.filename) as bz2f:
+            n = 128
+            b = bytearray(n)
+            self.assertEqual(bz2f.readinto(b), n)
+            self.assertEqual(b, self.TEXT[:n])
+            n = len(self.TEXT) - n
+            b = bytearray(len(self.TEXT))
+            self.assertEqual(bz2f.readinto(b), n)
+            self.assertEqual(b[:n], self.TEXT[-n:])
+
+    def testReadLine(self):
+        self.createTempFile()
+        with BZ2File(self.filename) as bz2f:
+            self.assertRaises(TypeError, bz2f.readline, None)
+            for line in self.TEXT_LINES:
+                self.assertEqual(bz2f.readline(), line)
+
+    def testReadLineMultiStream(self):
+        self.createTempFile(streams=5)
+        with BZ2File(self.filename) as bz2f:
+            self.assertRaises(TypeError, bz2f.readline, None)
+            for line in self.TEXT_LINES * 5:
+                self.assertEqual(bz2f.readline(), line)
+
+    def testReadLines(self):
+        self.createTempFile()
+        with BZ2File(self.filename) as bz2f:
+            self.assertRaises(TypeError, bz2f.readlines, None)
+            self.assertEqual(bz2f.readlines(), self.TEXT_LINES)
+
+    def testReadLinesMultiStream(self):
+        self.createTempFile(streams=5)
+        with BZ2File(self.filename) as bz2f:
+            self.assertRaises(TypeError, bz2f.readlines, None)
+            self.assertEqual(bz2f.readlines(), self.TEXT_LINES * 5)
+
+    def testIterator(self):
+        self.createTempFile()
+        with BZ2File(self.filename) as bz2f:
+            self.assertEqual(list(iter(bz2f)), self.TEXT_LINES)
+
+    def testIteratorMultiStream(self):
+        self.createTempFile(streams=5)
+        with BZ2File(self.filename) as bz2f:
+            self.assertEqual(list(iter(bz2f)), self.TEXT_LINES * 5)
+
+    def testClosedIteratorDeadlock(self):
+        # Issue #3309: Iteration on a closed BZ2File should release the lock.
+        self.createTempFile()
+        bz2f = BZ2File(self.filename)
+        bz2f.close()
+        self.assertRaises(ValueError, next, bz2f)
+        # This call will deadlock if the above call failed to release the lock.
+        self.assertRaises(ValueError, bz2f.readlines)
+
+    def testWrite(self):
+        with BZ2File(self.filename, "w") as bz2f:
+            self.assertRaises(TypeError, bz2f.write)
+            bz2f.write(self.TEXT)
+        with open(self.filename, 'rb') as f:
+            self.assertEqual(self.decompress(f.read()), self.TEXT)
+
+    def testWriteChunks10(self):
+        with BZ2File(self.filename, "w") as bz2f:
+            n = 0
+            while True:
+                str = self.TEXT[n*10:(n+1)*10]
+                if not str:
+                    break
+                bz2f.write(str)
+                n += 1
+        with open(self.filename, 'rb') as f:
+            self.assertEqual(self.decompress(f.read()), self.TEXT)
+
+    def testWriteNonDefaultCompressLevel(self):
+        expected = bz2.compress(self.TEXT, compresslevel=5)
+        with BZ2File(self.filename, "w", compresslevel=5) as bz2f:
+            bz2f.write(self.TEXT)
+        with open(self.filename, "rb") as f:
+            self.assertEqual(f.read(), expected)
+
+    def testWriteLines(self):
+        with BZ2File(self.filename, "w") as bz2f:
+            self.assertRaises(TypeError, bz2f.writelines)
+            bz2f.writelines(self.TEXT_LINES)
+        # Issue #1535500: Calling writelines() on a closed BZ2File
+        # should raise an exception.
+        self.assertRaises(ValueError, bz2f.writelines, ["a"])
+        with open(self.filename, 'rb') as f:
+            self.assertEqual(self.decompress(f.read()), self.TEXT)
+
+    def testWriteMethodsOnReadOnlyFile(self):
+        with BZ2File(self.filename, "w") as bz2f:
+            bz2f.write(b"abc")
+
+        with BZ2File(self.filename, "r") as bz2f:
+            self.assertRaises(IOError, bz2f.write, b"a")
+            self.assertRaises(IOError, bz2f.writelines, [b"a"])
+
+    def testAppend(self):
+        with BZ2File(self.filename, "w") as bz2f:
+            self.assertRaises(TypeError, bz2f.write)
+            bz2f.write(self.TEXT)
+        with BZ2File(self.filename, "a") as bz2f:
+            self.assertRaises(TypeError, bz2f.write)
+            bz2f.write(self.TEXT)
+        with open(self.filename, 'rb') as f:
+            self.assertEqual(self.decompress(f.read()), self.TEXT * 2)
+
+    def testSeekForward(self):
+        self.createTempFile()
+        with BZ2File(self.filename) as bz2f:
+            self.assertRaises(TypeError, bz2f.seek)
+            bz2f.seek(150)
+            self.assertEqual(bz2f.read(), self.TEXT[150:])
+
+    def testSeekForwardAcrossStreams(self):
+        self.createTempFile(streams=2)
+        with BZ2File(self.filename) as bz2f:
+            self.assertRaises(TypeError, bz2f.seek)
+            bz2f.seek(len(self.TEXT) + 150)
+            self.assertEqual(bz2f.read(), self.TEXT[150:])
+
+    def testSeekBackwards(self):
+        self.createTempFile()
+        with BZ2File(self.filename) as bz2f:
+            bz2f.read(500)
+            bz2f.seek(-150, 1)
+            self.assertEqual(bz2f.read(), self.TEXT[500-150:])
+
+    def testSeekBackwardsAcrossStreams(self):
+        self.createTempFile(streams=2)
+        with BZ2File(self.filename) as bz2f:
+            readto = len(self.TEXT) + 100
+            while readto > 0:
+                readto -= len(bz2f.read(readto))
+            bz2f.seek(-150, 1)
+            self.assertEqual(bz2f.read(), self.TEXT[100-150:] + self.TEXT)
+
+    def testSeekBackwardsFromEnd(self):
+        self.createTempFile()
+        with BZ2File(self.filename) as bz2f:
+            bz2f.seek(-150, 2)
+            self.assertEqual(bz2f.read(), self.TEXT[len(self.TEXT)-150:])
+
+    def testSeekBackwardsFromEndAcrossStreams(self):
+        self.createTempFile(streams=2)
+        with BZ2File(self.filename) as bz2f:
+            bz2f.seek(-1000, 2)
+            self.assertEqual(bz2f.read(), (self.TEXT * 2)[-1000:])
+
+    def testSeekPostEnd(self):
+        self.createTempFile()
+        with BZ2File(self.filename) as bz2f:
+            bz2f.seek(150000)
+            self.assertEqual(bz2f.tell(), len(self.TEXT))
+            self.assertEqual(bz2f.read(), b"")
+
+    def testSeekPostEndMultiStream(self):
+        self.createTempFile(streams=5)
+        with BZ2File(self.filename) as bz2f:
+            bz2f.seek(150000)
+            self.assertEqual(bz2f.tell(), len(self.TEXT) * 5)
+            self.assertEqual(bz2f.read(), b"")
+
+    def testSeekPostEndTwice(self):
+        self.createTempFile()
+        with BZ2File(self.filename) as bz2f:
+            bz2f.seek(150000)
+            bz2f.seek(150000)
+            self.assertEqual(bz2f.tell(), len(self.TEXT))
+            self.assertEqual(bz2f.read(), b"")
+
+    def testSeekPostEndTwiceMultiStream(self):
+        self.createTempFile(streams=5)
+        with BZ2File(self.filename) as bz2f:
+            bz2f.seek(150000)
+            bz2f.seek(150000)
+            self.assertEqual(bz2f.tell(), len(self.TEXT) * 5)
+            self.assertEqual(bz2f.read(), b"")
+
+    def testSeekPreStart(self):
+        self.createTempFile()
+        with BZ2File(self.filename) as bz2f:
+            bz2f.seek(-150)
+            self.assertEqual(bz2f.tell(), 0)
+            self.assertEqual(bz2f.read(), self.TEXT)
+
+    def testSeekPreStartMultiStream(self):
+        self.createTempFile(streams=2)
+        with BZ2File(self.filename) as bz2f:
+            bz2f.seek(-150)
+            self.assertEqual(bz2f.tell(), 0)
+            self.assertEqual(bz2f.read(), self.TEXT * 2)
+
+    def testFileno(self):
+        self.createTempFile()
+        with open(self.filename, 'rb') as rawf:
+            bz2f = BZ2File(rawf)
+            try:
+                self.assertEqual(bz2f.fileno(), rawf.fileno())
+            finally:
+                bz2f.close()
+        self.assertRaises(ValueError, bz2f.fileno)
+
+    def testSeekable(self):
+        bz2f = BZ2File(BytesIO(self.DATA))
+        try:
+            self.assertTrue(bz2f.seekable())
+            bz2f.read()
+            self.assertTrue(bz2f.seekable())
+        finally:
+            bz2f.close()
+        self.assertRaises(ValueError, bz2f.seekable)
+
+        bz2f = BZ2File(BytesIO(), "w")
+        try:
+            self.assertFalse(bz2f.seekable())
+        finally:
+            bz2f.close()
+        self.assertRaises(ValueError, bz2f.seekable)
+
+        src = BytesIO(self.DATA)
+        src.seekable = lambda: False
+        bz2f = BZ2File(src)
+        try:
+            self.assertFalse(bz2f.seekable())
+        finally:
+            bz2f.close()
+        self.assertRaises(ValueError, bz2f.seekable)
+
+    def testReadable(self):
+        bz2f = BZ2File(BytesIO(self.DATA))
+        try:
+            self.assertTrue(bz2f.readable())
+            bz2f.read()
+            self.assertTrue(bz2f.readable())
+        finally:
+            bz2f.close()
+        self.assertRaises(ValueError, bz2f.readable)
+
+        bz2f = BZ2File(BytesIO(), "w")
+        try:
+            self.assertFalse(bz2f.readable())
+        finally:
+            bz2f.close()
+        self.assertRaises(ValueError, bz2f.readable)
+
+    def testWritable(self):
+        bz2f = BZ2File(BytesIO(self.DATA))
+        try:
+            self.assertFalse(bz2f.writable())
+            bz2f.read()
+            self.assertFalse(bz2f.writable())
+        finally:
+            bz2f.close()
+        self.assertRaises(ValueError, bz2f.writable)
+
+        bz2f = BZ2File(BytesIO(), "w")
+        try:
+            self.assertTrue(bz2f.writable())
+        finally:
+            bz2f.close()
+        self.assertRaises(ValueError, bz2f.writable)
+
+    def testOpenDel(self):
+        if platform.python_implementation() != "CPython":
+            self.skipTest("Test depends on CPython refcounting semantics")
+        self.createTempFile()
+        for i in range(10000):
+            o = BZ2File(self.filename)
+            del o
+
+    def testOpenNonexistent(self):
+        self.assertRaises(IOError, BZ2File, "/non/existent")
+
+    def testReadlinesNoNewline(self):
+        # Issue #1191043: readlines() fails on a file containing no newline.
+        data = b'BZh91AY&SY\xd9b\x89]\x00\x00\x00\x03\x80\x04\x00\x02\x00\x0c\x00 \x00!\x9ah3M\x13<]\xc9\x14\xe1BCe\x8a%t'
+        with open(self.filename, "wb") as f:
+            f.write(data)
+        with BZ2File(self.filename) as bz2f:
+            lines = bz2f.readlines()
+        self.assertEqual(lines, [b'Test'])
+        with BZ2File(self.filename) as bz2f:
+            xlines = list(bz2f.readlines())
+        self.assertEqual(xlines, [b'Test'])
+
+    def testContextProtocol(self):
+        f = None
+        with BZ2File(self.filename, "wb") as f:
+            f.write(b"xxx")
+        f = BZ2File(self.filename, "rb")
+        f.close()
+        try:
+            with f:
+                pass
+        except ValueError:
+            pass
+        else:
+            self.fail("__enter__ on a closed file didn't raise an exception")
+        try:
+            with BZ2File(self.filename, "wb") as f:
+                1/0
+        except ZeroDivisionError:
+            pass
+        else:
+            self.fail("1/0 didn't raise an exception")
+
+    def testThreading(self):
+        if not threading:
+            return
+        # Issue #7205: Using a BZ2File from several threads shouldn't deadlock.
+        data = b"1" * 2**20
+        nthreads = 10
+        with BZ2File(self.filename, 'wb') as f:
+            def comp():
+                for i in range(5):
+                    f.write(data)
+            threads = [threading.Thread(target=comp) for i in range(nthreads)]
+            for t in threads:
+                t.start()
+            for t in threads:
+                t.join()
+
+    def testWithoutThreading(self):
+        if not hasattr(support, "import_fresh_module"):
+            return
+        module = support.import_fresh_module("bz2file", blocked=("threading",))
+        with module.BZ2File(self.filename, "wb") as f:
+            f.write(b"abc")
+        with module.BZ2File(self.filename, "rb") as f:
+            self.assertEqual(f.read(), b"abc")
+
+    def testMixedIterationAndReads(self):
+        self.createTempFile()
+        linelen = len(self.TEXT_LINES[0])
+        halflen = linelen // 2
+        with BZ2File(self.filename) as bz2f:
+            bz2f.read(halflen)
+            self.assertEqual(next(bz2f), self.TEXT_LINES[0][halflen:])
+            self.assertEqual(bz2f.read(), self.TEXT[linelen:])
+        with BZ2File(self.filename) as bz2f:
+            bz2f.readline()
+            self.assertEqual(next(bz2f), self.TEXT_LINES[1])
+            self.assertEqual(bz2f.readline(), self.TEXT_LINES[2])
+        with BZ2File(self.filename) as bz2f:
+            bz2f.readlines()
+            self.assertRaises(StopIteration, next, bz2f)
+            self.assertEqual(bz2f.readlines(), [])
+
+    def testMultiStreamOrdering(self):
+        # Test the ordering of streams when reading a multi-stream archive.
+        data1 = b"foo" * 1000
+        data2 = b"bar" * 1000
+        with BZ2File(self.filename, "w") as bz2f:
+            bz2f.write(data1)
+        with BZ2File(self.filename, "a") as bz2f:
+            bz2f.write(data2)
+        with BZ2File(self.filename) as bz2f:
+            self.assertEqual(bz2f.read(), data1 + data2)
+
+    def testOpenBytesFilename(self):
+        str_filename = self.filename
+        try:
+            bytes_filename = str_filename.encode("ascii")
+        except UnicodeEncodeError:
+            self.skipTest("Temporary file name needs to be ASCII")
+        with BZ2File(bytes_filename, "wb") as f:
+            f.write(self.DATA)
+        with BZ2File(bytes_filename, "rb") as f:
+            self.assertEqual(f.read(), self.DATA)
+        # Sanity check that we are actually operating on the right file.
+        with BZ2File(str_filename, "rb") as f:
+            self.assertEqual(f.read(), self.DATA)
+
+
+    # Tests for a BZ2File wrapping another file object:
+
+    def testReadBytesIO(self):
+        with BytesIO(self.DATA) as bio:
+            with BZ2File(bio) as bz2f:
+                self.assertRaises(TypeError, bz2f.read, None)
+                self.assertEqual(bz2f.read(), self.TEXT)
+            self.assertFalse(bio.closed)
+
+    def testPeekBytesIO(self):
+        with BytesIO(self.DATA) as bio:
+            with BZ2File(bio) as bz2f:
+                pdata = bz2f.peek()
+                self.assertNotEqual(len(pdata), 0)
+                self.assertTrue(self.TEXT.startswith(pdata))
+                self.assertEqual(bz2f.read(), self.TEXT)
+
+    def testWriteBytesIO(self):
+        with BytesIO() as bio:
+            with BZ2File(bio, "w") as bz2f:
+                self.assertRaises(TypeError, bz2f.write)
+                bz2f.write(self.TEXT)
+            self.assertEqual(self.decompress(bio.getvalue()), self.TEXT)
+            self.assertFalse(bio.closed)
+
+    def testSeekForwardBytesIO(self):
+        with BytesIO(self.DATA) as bio:
+            with BZ2File(bio) as bz2f:
+                self.assertRaises(TypeError, bz2f.seek)
+                bz2f.seek(150)
+                self.assertEqual(bz2f.read(), self.TEXT[150:])
+
+    def testSeekBackwardsBytesIO(self):
+        with BytesIO(self.DATA) as bio:
+            with BZ2File(bio) as bz2f:
+                bz2f.read(500)
+                bz2f.seek(-150, 1)
+                self.assertEqual(bz2f.read(), self.TEXT[500-150:])
+
+    def test_read_truncated(self):
+        # Drop the eos_magic field (6 bytes) and CRC (4 bytes).
+        truncated = self.DATA[:-10]
+        with BZ2File(BytesIO(truncated)) as f:
+            self.assertRaises(EOFError, f.read)
+        with BZ2File(BytesIO(truncated)) as f:
+            self.assertEqual(f.read(len(self.TEXT)), self.TEXT)
+            self.assertRaises(EOFError, f.read, 1)
+        # Incomplete 4-byte file header, and block header of at least 146 bits.
+        for i in range(22):
+            with BZ2File(BytesIO(truncated[:i])) as f:
+                self.assertRaises(EOFError, f.read, 1)
+
+
+class OpenTest(BaseTest):
+    "Test the open function."
+
+    def open(self, *args, **kwargs):
+        return bz2file.open(*args, **kwargs)
+
+    def test_binary_modes(self):
+        modes = ["wb", "xb"] if bz2file._HAS_OPEN_X_MODE else ["wb"]
+        for mode in modes:
+            if mode == "xb":
+                support.unlink(self.filename)
+            with self.open(self.filename, mode) as f:
+                f.write(self.TEXT)
+            with open(self.filename, "rb") as f:
+                file_data = self.decompress(f.read())
+                self.assertEqual(file_data, self.TEXT)
+            with self.open(self.filename, "rb") as f:
+                self.assertEqual(f.read(), self.TEXT)
+            with self.open(self.filename, "ab") as f:
+                f.write(self.TEXT)
+            with open(self.filename, "rb") as f:
+                file_data = self.decompress(f.read())
+                self.assertEqual(file_data, self.TEXT * 2)
+
+    def test_implicit_binary_modes(self):
+        # Test implicit binary modes (no "b" or "t" in mode string).
+        modes = ["w", "x"] if bz2file._HAS_OPEN_X_MODE else ["w"]
+        for mode in modes:
+            if mode == "x":
+                support.unlink(self.filename)
+            with self.open(self.filename, mode) as f:
+                f.write(self.TEXT)
+            with open(self.filename, "rb") as f:
+                file_data = self.decompress(f.read())
+                self.assertEqual(file_data, self.TEXT)
+            with self.open(self.filename, "r") as f:
+                self.assertEqual(f.read(), self.TEXT)
+            with self.open(self.filename, "a") as f:
+                f.write(self.TEXT)
+            with open(self.filename, "rb") as f:
+                file_data = self.decompress(f.read())
+                self.assertEqual(file_data, self.TEXT * 2)
+
+    def test_text_modes(self):
+        text = self.TEXT.decode("ascii")
+        text_native_eol = text.replace("\n", os.linesep)
+        modes = ["wt", "xt"] if bz2file._HAS_OPEN_X_MODE else ["wt"]
+        for mode in modes:
+            if mode == "xt":
+                support.unlink(self.filename)
+                if not bz2file._HAS_OPEN_X_MODE:
+                    continue
+            with self.open(self.filename, mode) as f:
+                f.write(text)
+            with open(self.filename, "rb") as f:
+                file_data = self.decompress(f.read()).decode("ascii")
+                self.assertEqual(file_data, text_native_eol)
+            with self.open(self.filename, "rt") as f:
+                self.assertEqual(f.read(), text)
+            with self.open(self.filename, "at") as f:
+                f.write(text)
+            with open(self.filename, "rb") as f:
+                file_data = self.decompress(f.read()).decode("ascii")
+                self.assertEqual(file_data, text_native_eol * 2)
+
+    def test_x_mode(self):
+        if not bz2file._HAS_OPEN_X_MODE:
+            return
+        for mode in ("x", "xb", "xt"):
+            support.unlink(self.filename)
+            with self.open(self.filename, mode) as f:
+                pass
+            with self.assertRaises(FileExistsError):
+                with self.open(self.filename, mode) as f:
+                    pass
+
+    def test_fileobj(self):
+        with self.open(BytesIO(self.DATA), "r") as f:
+            self.assertEqual(f.read(), self.TEXT)
+        with self.open(BytesIO(self.DATA), "rb") as f:
+            self.assertEqual(f.read(), self.TEXT)
+        text = self.TEXT.decode("ascii")
+        with self.open(BytesIO(self.DATA), "rt") as f:
+            self.assertEqual(f.read(), text)
+
+    def test_bad_params(self):
+        # Test invalid parameter combinations.
+        self.assertRaises(ValueError,
+                          self.open, self.filename, "wbt")
+        self.assertRaises(ValueError,
+                          self.open, self.filename, "xbt")
+        self.assertRaises(ValueError,
+                          self.open, self.filename, "rb", encoding="utf-8")
+        self.assertRaises(ValueError,
+                          self.open, self.filename, "rb", errors="ignore")
+        self.assertRaises(ValueError,
+                          self.open, self.filename, "rb", newline="\n")
+
+    def test_encoding(self):
+        # Test non-default encoding.
+        text = self.TEXT.decode("ascii")
+        text_native_eol = text.replace("\n", os.linesep)
+        with self.open(self.filename, "wt", encoding="utf-16-le") as f:
+            f.write(text)
+        with open(self.filename, "rb") as f:
+            file_data = self.decompress(f.read()).decode("utf-16-le")
+            self.assertEqual(file_data, text_native_eol)
+        with self.open(self.filename, "rt", encoding="utf-16-le") as f:
+            self.assertEqual(f.read(), text)
+
+    def test_encoding_error_handler(self):
+        # Test with non-default encoding error handler.
+        with self.open(self.filename, "wb") as f:
+            f.write(b"foo\xffbar")
+        with self.open(self.filename, "rt", encoding="ascii", errors="ignore") \
+                as f:
+            self.assertEqual(f.read(), "foobar")
+
+    def test_newline(self):
+        # Test with explicit newline (universal newline mode disabled).
+        text = self.TEXT.decode("ascii")
+        with self.open(self.filename, "wt", newline="\n") as f:
+            f.write(text)
+        with self.open(self.filename, "rt", newline="\r") as f:
+            self.assertEqual(f.readlines(), [text])
+
+
+if __name__ == '__main__':
+    unittest.main()

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/python-bz2file.git



More information about the debian-med-commit mailing list