Skip to content

Refactor ConnectionSettings #85

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions example/example_socket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ Future main() async {
path: '/var/lib/mysql/mysql.sock',
user: 'root',
db: 'testdb',
),
isUnixSocket: true,
)
);

// Create a table
Expand Down
32 changes: 12 additions & 20 deletions lib/src/buffered_socket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ library buffered_socket;
import 'dart:io';
import 'dart:async';
import 'package:logging/logging.dart';
import 'package:mysql1/mysql1.dart';
import 'buffer.dart';

typedef ErrorHandler = Function(Object err);
typedef DoneHandler = Function();
typedef DataReadyHandler = Function();
typedef ClosedHandler = Function();

typedef SocketFactory = Function(String host, int port, Duration timeout,
{bool isUnixSocket});
typedef SocketFactory = Function(ConnectionSettings conn);

class BufferedSocket {
final Logger log;
Expand Down Expand Up @@ -61,35 +61,27 @@ class BufferedSocket {
}
}

static Future<RawSocket> defaultSocketFactory(
String host, int port, Duration timeout,
{bool isUnixSocket = false}) {
if (isUnixSocket) {
static Future<RawSocket> defaultSocketFactory(ConnectionSettings conn) async {
if (conn.isUnixSocket) {
return RawSocket.connect(
InternetAddress(host, type: InternetAddressType.unix), port,
timeout: timeout);
} else {
return RawSocket.connect(host, port, timeout: timeout);
InternetAddress(conn.host, type: InternetAddressType.unix), conn.port,
timeout: conn.timeout);
}
final socket = await RawSocket.connect(conn.host, conn.port, timeout: conn.timeout);
socket.setOption(SocketOption.tcpNoDelay, true);
return socket;
}

static Future<BufferedSocket> connect(
String host,
int port,
Duration timeout, {
ConnectionSettings conn,
{
DataReadyHandler? onDataReady,
DoneHandler? onDone,
ErrorHandler? onError,
ClosedHandler? onClosed,
SocketFactory socketFactory = defaultSocketFactory,
bool isUnixSocket = false,
}) async {
RawSocket socket;
socket =
await socketFactory(host, port, timeout, isUnixSocket: isUnixSocket);
if (!isUnixSocket) {
socket.setOption(SocketOption.tcpNoDelay, true);
}
final socket = await socketFactory(conn);
return BufferedSocket._(socket, onDataReady, onDone, onError, onClosed);
}

Expand Down
105 changes: 50 additions & 55 deletions lib/src/single_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,63 +29,64 @@ import 'results/row.dart';
final Logger _log = Logger('MySqlConnection');

class ConnectionSettings {
String host;
int port;
String? user;
String? password;
String? db;
bool useCompression;
bool useSSL;
int maxPacketSize;
int characterSet;
final bool isUnixSocket;
final String host;
final int port;
final String? user;
final String? password;
final String? db;
final int maxPacketSize;
final int characterSet;

/// The timeout for connecting to the database and for all database operations.
Duration timeout;

ConnectionSettings(
{this.host = 'localhost',
this.port = 3306,
this.user,
this.password,
this.db,
this.useCompression = false,
this.useSSL = false,
this.maxPacketSize = 16 * 1024 * 1024,
this.timeout = const Duration(seconds: 30),
this.characterSet = CharacterSet.UTF8MB4});

factory ConnectionSettings.socket(
{required String path,
String? user,
String? password,
String? db,
bool useCompression = false,
bool useSSL = false,
int maxPacketSize = 16 * 1024 * 1024,
Duration timeout = const Duration(seconds: 30),
int characterSet = CharacterSet.UTF8MB4}) =>
ConnectionSettings(
host: path,
user: user,
password: password,
db: db,
useCompression: useCompression,
useSSL: useSSL,
maxPacketSize: maxPacketSize,
timeout: timeout,
characterSet: characterSet);
final Duration timeout;

ConnectionSettings._(
this.host,
this.port,
this.user,
this.password,
this.db,
this.maxPacketSize,
this.timeout,
this.characterSet,
this.isUnixSocket
);

factory ConnectionSettings({
host = 'localhost',
port = 3306,
String? user,
String? password,
String? db,
int maxPacketSize = 16 * 1024 * 1024,
Duration timeout = const Duration(seconds: 30),
int characterSet = CharacterSet.UTF8MB4}) =>
ConnectionSettings._(host, port, user, password, db, maxPacketSize, timeout, characterSet, false);

/// Connection settings for a unix socket
factory ConnectionSettings.socket({
required String path,
String? user,
String? password,
String? db,
bool useCompression = false,
bool useSSL = false,
int maxPacketSize = 16 * 1024 * 1024,
Duration timeout = const Duration(seconds: 30),
int characterSet = CharacterSet.UTF8MB4}) =>
ConnectionSettings._(path, 3306, user, password, db, maxPacketSize, timeout, characterSet, true);

ConnectionSettings.copy(ConnectionSettings o)
: host = o.host,
port = o.port,
user = o.user,
password = o.password,
db = o.db,
useCompression = o.useCompression,
useSSL = o.useSSL,
maxPacketSize = o.maxPacketSize,
timeout = o.timeout,
characterSet = o.characterSet;
characterSet = o.characterSet,
isUnixSocket = o.isUnixSocket;
}

/// Represents a connection to the database. Use [connect] to open a connection. You
Expand Down Expand Up @@ -126,18 +127,13 @@ class MySqlConnection {
/// socket.
/// A [TimeoutException] is thrown if there is a timeout in the handshake with the
/// server.
static Future<MySqlConnection> connect(ConnectionSettings c,
{bool isUnixSocket = false}) async {
assert(!c.useSSL); // Not implemented
assert(!c.useCompression);

static Future<MySqlConnection> connect(ConnectionSettings c) async {
ReqRespConnection? conn;
late Completer handshakeCompleter;

_log.fine('opening connection to ${c.host}:${c.port}/${c.db}');

var socket = await BufferedSocket.connect(c.host, c.port, c.timeout,
isUnixSocket: isUnixSocket, onDataReady: () {
var socket = await BufferedSocket.connect(c, onDataReady: () {
conn?._readPacket();
}, onDone: () {
_log.fine('done');
Expand All @@ -156,8 +152,7 @@ class MySqlConnection {
}
});

Handler handler = HandshakeHandler(c.user, c.password, c.maxPacketSize,
c.characterSet, c.db, c.useCompression, c.useSSL);
Handler handler = HandshakeHandler(c.user, c.password, c.maxPacketSize, c.characterSet, c.db);
handshakeCompleter = Completer<void>();
conn =
ReqRespConnection(socket, handler, handshakeCompleter, c.maxPacketSize);
Expand Down
29 changes: 19 additions & 10 deletions test/test_infrastructure.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,33 @@ late MySqlConnection _conn;

void initializeTest([String? tableName, String? createSql, String? insertSql]) {
var options = OptionsFile('connection.options');
var user = options.getString('user');
var password = options.getString('password', null);
var port = options.getInt('port', 3306)!;
var host = options.getString('host', 'localhost')!;

final noDb = ConnectionSettings(
user: user,
password: password,
port: port,
host: host,
);

var s = ConnectionSettings(
user: options.getString('user'),
password: options.getString('password', null),
port: options.getInt('port', 3306)!,
final withDb = ConnectionSettings(
user: user,
password: password,
port: port,
host: host,
db: options.getString('db'),
host: options.getString('host', 'localhost')!,
);

setUp(() async {
// Ensure db exists
var checkSettings = ConnectionSettings.copy(s);
checkSettings.db = null;
final c = await MySqlConnection.connect(checkSettings);
await c.query('CREATE DATABASE IF NOT EXISTS ${s.db} CHARACTER SET utf8');
final c = await MySqlConnection.connect(noDb);
await c.query('CREATE DATABASE IF NOT EXISTS ${withDb.db} CHARACTER SET utf8');
await c.close();

_conn = await MySqlConnection.connect(s);
_conn = await MySqlConnection.connect(withDb);

if (tableName != null) {
await setup(_conn, tableName, createSql, insertSql);
Expand Down
19 changes: 10 additions & 9 deletions test/unit/buffered_socket_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import 'dart:async';
import 'dart:io';

import 'package:mocktail/mocktail.dart';
import 'package:mysql1/mysql1.dart';
import 'package:pedantic/pedantic.dart';
import 'package:test/test.dart';

Expand All @@ -22,7 +23,7 @@ void main() {

setUp(() {
var streamController = StreamController<RawSocketEvent>();
factory = (host, port, timeout, {bool isUnixSocket = false}) {
factory = (conn) {
rawSocket = MockSocket(streamController);
return Future.value(rawSocket);
};
Expand All @@ -33,7 +34,7 @@ void main() {

var socket;
var thesocket = await BufferedSocket.connect(
'localhost', 100, const Duration(seconds: 5), onDataReady: () async {
ConnectionSettings(port: 100, timeout: const Duration(seconds: 5)), onDataReady: () async {
var buffer = Buffer(4);
await socket.readBuffer(buffer);
expect(buffer.list, equals([1, 2, 3, 4]));
Expand All @@ -49,7 +50,7 @@ void main() {

var socket;
var thesocket = await BufferedSocket.connect(
'localhost', 100, const Duration(seconds: 5), onDataReady: () async {
ConnectionSettings(port: 100, timeout: Duration(seconds: 5)), onDataReady: () async {
var buffer = Buffer(4);
socket.readBuffer(buffer).then((_) {
expect(buffer.list, equals([1, 2, 3, 4]));
Expand All @@ -65,7 +66,7 @@ void main() {
test('can read data which is not yet available', () async {
var c = Completer();
var socket = await BufferedSocket.connect(
'localhost', 100, const Duration(seconds: 5),
ConnectionSettings(port: 100, timeout: Duration(seconds: 5)),
onDataReady: () {},
onDone: () {},
onError: (e) {},
Expand All @@ -83,7 +84,7 @@ void main() {
() async {
var c = Completer();
var socket = await BufferedSocket.connect(
'localhost', 100, const Duration(seconds: 30),
ConnectionSettings(port: 100, timeout: Duration(seconds: 30)),
onDataReady: () {},
onDone: () {},
onError: (e) {},
Expand All @@ -100,7 +101,7 @@ void main() {

test('cannot read data when already reading', () async {
var socket = await BufferedSocket.connect(
'localhost', 100, const Duration(seconds: 5),
ConnectionSettings(port: 100, timeout: Duration(seconds: 5)),
onDataReady: () {},
onDone: () {},
onError: (e) {},
Expand All @@ -116,7 +117,7 @@ void main() {

test('should write buffer', () async {
var socket = await BufferedSocket.connect(
'localhost', 100, const Duration(seconds: 5),
ConnectionSettings(port: 100, timeout: Duration(seconds: 5)),
onDataReady: () {},
onDone: () {},
onError: (e) {},
Expand All @@ -135,7 +136,7 @@ void main() {

test('should write part of buffer', () async {
var socket = await BufferedSocket.connect(
'localhost', 100, const Duration(seconds: 5),
ConnectionSettings(port: 100, timeout: Duration(seconds: 5)),
onDataReady: () {},
onDone: () {},
onError: (e) {},
Expand All @@ -152,7 +153,7 @@ void main() {
var onClosed = () {
closed = true;
};
await BufferedSocket.connect('localhost', 100, const Duration(seconds: 5),
await BufferedSocket.connect(ConnectionSettings(port: 100, timeout: Duration(seconds: 5)),
onDataReady: () {},
onDone: () {},
onError: (e) {},
Expand Down