Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Providing a mock Stream that can be used for testing zio-nio functionality w/o requiring files #357

Open
searler opened this issue May 23, 2021 · 0 comments

Comments

@searler
Copy link

searler commented May 23, 2021

I am looking at zio-nio and zio-json interactions and would like to avoid the need to populate data files.

I created a mock Channel that is populated with a String.

Would this functionality be generally useful?
If so, where it best be placed?
I will create a pull request based on the responses.

import zio.{Chunk, Queue, ZIO}

import java.nio.channels.Channel

/**
 *
 * Mock a Channel containing a specified string.
 *
 * End of data is indicated by Chunk.empty
 *
 * @param queue
 */
class StringChannel(queue: Queue[Chunk[Byte]]) extends zio.nio.core.channels.Channel {
  override protected val channel: Channel = null // not referenced

  final def readChunk(capacity: Int): ZIO[Any, Nothing, Chunk[Byte]] = queue.take
}

object StringChannel {
  /**
   * Stream of individual Chunks for each byte in string.
   *
   * @param string
   * @return
   */
  def individual(string: String) = for {
    q <- Queue.unbounded[Chunk[Byte]]
    _ <- q.offerAll(string.getBytes().map(Chunk.single))
    _ <- q.offer(Chunk.empty)
  } yield new StringChannel(q)

  /**
   * Single Chunk containing entire string
   *
   * @param string
   * @return
   */
  def single(string: String) = for {
    q <- Queue.unbounded[Chunk[Byte]]
    _ <- q.offer(Chunk.fromArray(string.getBytes()))
    _ <- q.offer(Chunk.empty)
  } yield new StringChannel(q)
}

Usage

import zio.blocking.Blocking
import zio.console.Console
import zio.nio.core.charset.Charset
import zio.stream.ZStream
import zio.{App, ExitCode, URIO, ZIO, console}

object StringChannelDump extends App {

  val lines =
    """{"curvature":0.5}
      |{"curvature":1.5}
      |""".stripMargin

  override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {

    val program = for {
      channel <- StringChannel.single(lines)
      _ <- dump(channel)
    } yield ()

    program.exitCode
  }

  private def dump(chan: StringChannel): ZIO[Console with Blocking, Exception, Unit] = {
    val inStream: ZStream[Blocking, Exception, Byte] = ZStream.repeatEffectChunkOption {
      chan.readChunk(1000).asSomeError.flatMap { chunk =>
        if (chunk.isEmpty) ZIO.fail(None) else ZIO.succeed(chunk)
      }
    }
    
    val charStream: ZStream[Blocking, Exception, Char] =
      inStream.transduce(Charset.Standard.utf8.newDecoder.transducer())

    charStream.foreachChunk(chars => console.putStr(chars.mkString))
  }

}
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant